-
Notifications
You must be signed in to change notification settings - Fork 2
/
streaming_aead_cli.py
163 lines (135 loc) · 5.4 KB
/
streaming_aead_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START streaming-aead-example]
"""A command-line utility for using streaming AEAD for a file.
It loads cleartext keys from disk - this is not recommended!
It requires 4 arguments (and one optional one):
mode: either 'encrypt' or 'decrypt'
keyset_path: name of the file with the keyset to be used for encryption or
decryption
input_path: name of the file with the input data to be encrypted or decrypted
output_path: name of the file to write the ciphertext respectively plaintext
to
[optional] associated_data: the associated data used for encryption/decryption
provided as a string.
"""
from typing import BinaryIO
from absl import app
from absl import flags
from absl import logging
import tink
from tink import secret_key_access
from tink import streaming_aead
FLAGS = flags.FLAGS
BLOCK_SIZE = 1024 * 1024 # The CLI tool will read/write at most 1 MB at once.
flags.DEFINE_enum('mode', None, ['encrypt', 'decrypt'],
'Selects if the file should be encrypted or decrypted.')
flags.DEFINE_string('keyset_path', None,
'Path to the keyset used for encryption or decryption.')
flags.DEFINE_string('input_path', None, 'Path to the input file.')
flags.DEFINE_string('output_path', None, 'Path to the output file.')
flags.DEFINE_string('associated_data', None,
'Associated data used for the encryption or decryption.')
def read_as_blocks(file: BinaryIO):
"""Generator function to read from a file BLOCK_SIZE bytes.
Args:
file: The file object to read from.
Yields:
Returns up to BLOCK_SIZE bytes from the file.
"""
while True:
data = file.read(BLOCK_SIZE)
# If file was opened in rawIO, EOF is only reached when b'' is returned.
# pylint: disable=g-explicit-bool-comparison
if data == b'':
break
# pylint: enable=g-explicit-bool-comparison
yield data
def encrypt_file(input_file: BinaryIO, output_file: BinaryIO,
associated_data: bytes,
primitive: streaming_aead.StreamingAead):
"""Encrypts a file with the given streaming AEAD primitive.
Args:
input_file: File to read from.
output_file: File to write to.
associated_data: Associated data provided for the AEAD.
primitive: The streaming AEAD primitive used for encryption.
"""
with primitive.new_encrypting_stream(output_file,
associated_data) as enc_stream:
for data_block in read_as_blocks(input_file):
enc_stream.write(data_block)
def decrypt_file(input_file: BinaryIO, output_file: BinaryIO,
associated_data: bytes,
primitive: streaming_aead.StreamingAead):
"""Decrypts a file with the given streaming AEAD primitive.
This function will cause the program to exit with 1 if the decryption fails.
Args:
input_file: File to read from.
output_file: File to write to.
associated_data: Associated data provided for the AEAD.
primitive: The streaming AEAD primitive used for decryption.
"""
try:
with primitive.new_decrypting_stream(input_file,
associated_data) as dec_stream:
for data_block in read_as_blocks(dec_stream):
output_file.write(data_block)
except tink.TinkError as e:
logging.exception('Error decrypting ciphertext: %s', e)
exit(1)
def main(argv):
del argv
associated_data = b'' if not FLAGS.associated_data else bytes(
FLAGS.associated_data, 'utf-8')
# Initialise Tink.
try:
streaming_aead.register()
except tink.TinkError as e:
logging.exception('Error initialising Tink: %s', e)
return 1
# Read the keyset into a keyset_handle.
with open(FLAGS.keyset_path, 'rt') as keyset_file:
try:
text = keyset_file.read()
keyset_handle = tink.json_proto_keyset_format.parse(
text, secret_key_access.TOKEN
)
except tink.TinkError as e:
logging.exception('Error reading key: %s', e)
return 1
# Get the primitive.
try:
streaming_aead_primitive = keyset_handle.primitive(
streaming_aead.StreamingAead)
except tink.TinkError as e:
logging.exception('Error creating streaming AEAD primitive from keyset: %s',
e)
return 1
# Encrypt or decrypt the file.
with open(FLAGS.input_path, 'rb') as input_file:
with open(FLAGS.output_path, 'wb') as output_file:
if FLAGS.mode == 'encrypt':
encrypt_file(input_file, output_file, associated_data,
streaming_aead_primitive)
elif FLAGS.mode == 'decrypt':
decrypt_file(input_file, output_file, associated_data,
streaming_aead_primitive)
if __name__ == '__main__':
flags.mark_flag_as_required('mode')
flags.mark_flag_as_required('keyset_path')
flags.mark_flag_as_required('input_path')
flags.mark_flag_as_required('output_path')
app.run(main)
# [END streaming-aead-example]