-
Notifications
You must be signed in to change notification settings - Fork 237
/
jwe.py
607 lines (499 loc) · 21.6 KB
/
jwe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
import binascii
import json
import zlib
from struct import pack
import six
try:
from collections.abc import Mapping # Python 3
except ImportError:
from collections import Mapping # Python 2, will be deprecated in Python 3.8
from .backends import get_random_bytes
from .constants import ALGORITHMS, ZIPS
from .exceptions import JWEParseError, JWEError
from .utils import base64url_decode, base64url_encode
from . import jwk
def encrypt(plaintext, key, encryption=ALGORITHMS.A256GCM,
algorithm=ALGORITHMS.DIR, zip=None, cty=None, kid=None):
"""Encrypts plaintext and returns a JWE cmpact serialization string.
Args:
plaintext (bytes): A bytes object to encrypt
key (str or dict): The key(s) to use for encrypting the content. Can be
individual JWK or JWK set.
encryption (str, optional): The content encryption algorithm used to
perform authenticated encryption on the plaintext to produce the
ciphertext and the Authentication Tag. Defaults to A256GCM.
algorithm (str, optional): The cryptographic algorithm used
to encrypt or determine the value of the CEK. Defaults to dir.
zip (str, optional): The compression algorithm) applied to the
plaintext before encryption. Defaults to None.
cty (str, optional): The media type for the secured content.
See http://www.iana.org/assignments/media-types/media-types.xhtml
kid (str, optional): Key ID for the provided key
Returns:
bytes: The string representation of the header, encrypted key,
initialization vector, ciphertext, and authentication tag.
Raises:
JWEError: If there is an error signing the token.
Examples:
>>> from jose import jwe
>>> jwe.encrypt('Hello, World!', 'asecret128bitkey', algorithm='dir', encryption='A128GCM')
'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4R0NNIn0..McILMB3dYsNJSuhcDzQshA.OfX9H_mcUpHDeRM4IA.CcnTWqaqxNsjT4eCaUABSg'
"""
plaintext = six.ensure_binary(plaintext) # Make sure it's bytes
if algorithm not in ALGORITHMS.SUPPORTED:
raise JWEError('Algorithm %s not supported.' % algorithm)
if encryption not in ALGORITHMS.SUPPORTED:
raise JWEError('Algorithm %s not supported.' % encryption)
key = jwk.construct(key, algorithm)
encoded_header = _encoded_header(algorithm, encryption, zip, cty, kid)
plaintext = _compress(zip, plaintext)
enc_cek, iv, cipher_text, auth_tag = _encrypt_and_auth(
key, algorithm, encryption, zip, plaintext, encoded_header)
jwe_string = _jwe_compact_serialize(
encoded_header, enc_cek, iv, cipher_text, auth_tag)
return jwe_string
def decrypt(jwe_str, key):
"""Decrypts a JWE compact serialized string and returns the plaintext.
Args:
jwe_str (str): A JWE to be decrypt.
key (str or dict): A key to attempt to decrypt the payload with. Can be
individual JWK or JWK set.
Returns:
bytes: The plaintext bytes, assuming the authentication tag is valid.
Raises:
JWEError: If there is an exception verifying the token.
Examples:
>>> from jose import jwe
>>> jwe.decrypt(jwe_string, 'asecret128bitkey')
'Hello, World!'
"""
header, encoded_header, encrypted_key, iv, cipher_text, auth_tag = _jwe_compact_deserialize(jwe_str)
# Verify that the implementation understands and can process all
# fields that it is required to support, whether required by this
# specification, by the algorithms being used, or by the "crit"
# Header Parameter value, and that the values of those parameters
# are also understood and supported.
try:
# Determine the Key Management Mode employed by the algorithm
# specified by the "alg" (algorithm) Header Parameter.
alg = header["alg"]
enc = header["enc"]
if alg not in ALGORITHMS.SUPPORTED:
raise JWEError('Algorithm %s not supported.' % alg)
if enc not in ALGORITHMS.SUPPORTED:
raise JWEError('Algorithm %s not supported.' % enc)
except KeyError:
raise JWEParseError("alg and enc headers are required!")
# Verify that the JWE uses a key known to the recipient.
key = jwk.construct(key, alg)
# When Direct Key Agreement or Key Agreement with Key Wrapping are
# employed, use the key agreement algorithm to compute the value
# of the agreed upon key. When Direct Key Agreement is employed,
# let the CEK be the agreed upon key. When Key Agreement with Key
# Wrapping is employed, the agreed upon key will be used to
# decrypt the JWE Encrypted Key.
#
# When Key Wrapping, Key Encryption, or Key Agreement with Key
# Wrapping are employed, decrypt the JWE Encrypted Key to produce
# the CEK. The CEK MUST have a length equal to that required for
# the content encryption algorithm. Note that when there are
# multiple recipients, each recipient will only be able to decrypt
# JWE Encrypted Key values that were encrypted to a key in that
# recipient's possession. It is therefore normal to only be able
# to decrypt one of the per-recipient JWE Encrypted Key values to
# obtain the CEK value. Also, see Section 11.5 for security
# considerations on mitigating timing attacks.
if alg == ALGORITHMS.DIR:
# When Direct Key Agreement or Direct Encryption are employed,
# verify that the JWE Encrypted Key value is an empty octet
# sequence.
# Record whether the CEK could be successfully determined for this
# recipient or not.
cek_valid = encrypted_key == b""
# When Direct Encryption is employed, let the CEK be the shared
# symmetric key.
cek_bytes = _get_key_bytes_from_key(key)
else:
try:
cek_bytes = key.unwrap_key(encrypted_key)
# Record whether the CEK could be successfully determined for this
# recipient or not.
cek_valid = True
except NotImplementedError:
raise JWEError("alg {} is not implemented".format(alg))
except Exception:
# Record whether the CEK could be successfully determined for this
# recipient or not.
cek_valid = False
# To mitigate the attacks described in RFC 3218 [RFC3218], the
# recipient MUST NOT distinguish between format, padding, and length
# errors of encrypted keys. It is strongly recommended, in the event
# of receiving an improperly formatted key, that the recipient
# substitute a randomly generated CEK and proceed to the next step, to
# mitigate timing attacks.
cek_bytes = _get_random_cek_bytes_for_enc(enc)
# Compute the Encoded Protected Header value BASE64URL(UTF8(JWE
# Protected Header)). If the JWE Protected Header is not present
# (which can only happen when using the JWE JSON Serialization and
# no "protected" member is present), let this value be the empty
# string.
protected_header = encoded_header
# Let the Additional Authenticated Data encryption parameter be
# ASCII(Encoded Protected Header). However, if a JWE AAD value is
# present (which can only be the case when using the JWE JSON
# Serialization), instead let the Additional Authenticated Data
# encryption parameter be ASCII(Encoded Protected Header || '.' ||
# BASE64URL(JWE AAD)).
aad = protected_header
# Decrypt the JWE Ciphertext using the CEK, the JWE Initialization
# Vector, the Additional Authenticated Data value, and the JWE
# Authentication Tag (which is the Authentication Tag input to the
# calculation) using the specified content encryption algorithm,
# returning the decrypted plaintext and validating the JWE
# Authentication Tag in the manner specified for the algorithm,
# rejecting the input without emitting any decrypted output if the
# JWE Authentication Tag is incorrect.
try:
plain_text = _decrypt_and_auth(cek_bytes, enc, cipher_text, iv, aad, auth_tag)
except NotImplementedError:
raise JWEError("enc {} is not implemented".format(enc))
except Exception as e:
raise JWEError(e)
# If a "zip" parameter was included, uncompress the decrypted
# plaintext using the specified compression algorithm.
if plain_text is not None:
plain_text = _decompress(header.get("zip"), plain_text)
return plain_text if cek_valid else None
def get_unverified_header(jwe_str):
"""Returns the decoded headers without verification of any kind.
Args:
jwe_str (str): A compact serialized JWE to decode the headers from.
Returns:
dict: The dict representation of the JWE headers.
Raises:
JWEError: If there is an exception decoding the JWE.
"""
header = _jwe_compact_deserialize(jwe_str)[0]
return header
def _decrypt_and_auth(cek_bytes, enc, cipher_text, iv, aad, auth_tag):
"""
Decrypt and verify the data
Args:
cek_bytes (bytes): cek to derive encryption and possible auth key to
verify the auth tag
cipher_text (bytes): Encrypted data
iv (bytes): Initialization vector (iv) used to encrypt data
aad (bytes): Additional Authenticated Data used to verify the data
auth_tag (bytes): Authentication ntag to verify the data
Returns:
(bytes): Decrypted data
"""
# Decrypt the JWE Ciphertext using the CEK, the JWE Initialization
# Vector, the Additional Authenticated Data value, and the JWE
# Authentication Tag (which is the Authentication Tag input to the
# calculation) using the specified content encryption algorithm,
# returning the decrypted plaintext
# and validating the JWE
# Authentication Tag in the manner specified for the algorithm,
if enc in ALGORITHMS.HMAC_AUTH_TAG:
encryption_key, mac_key, key_len = _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc)
auth_tag_check = _auth_tag(cipher_text, iv, aad, mac_key, key_len)
elif enc in ALGORITHMS.GCM:
encryption_key = jwk.construct(cek_bytes, enc)
auth_tag_check = auth_tag # GCM check auth on decrypt
else:
raise NotImplementedError("enc {} is not implemented!".format(enc))
plaintext = encryption_key.decrypt(cipher_text, iv, aad, auth_tag)
if auth_tag != auth_tag_check:
raise JWEError("Invalid JWE Auth Tag")
return plaintext
def _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc):
derived_key_len = len(cek_bytes) // 2
mac_key_bytes = cek_bytes[0:derived_key_len]
mac_key = _get_hmac_key(enc, mac_key_bytes)
encryption_key_bytes = cek_bytes[-derived_key_len:]
encryption_alg, _ = enc.split("-")
encryption_key = jwk.construct(encryption_key_bytes, encryption_alg)
return encryption_key, mac_key, derived_key_len
def _jwe_compact_deserialize(jwe_bytes):
"""
Deserialize and verify the header and segments are appropriate.
Args:
jwe_bytes (bytes): The compact serialized JWE
Returns:
(dict, bytes, bytes, bytes, bytes, bytes)
"""
# Base64url decode the encoded representations of the JWE
# Protected Header, the JWE Encrypted Key, the JWE Initialization
# Vector, the JWE Ciphertext, the JWE Authentication Tag, and the
# JWE AAD, following the restriction that no line breaks,
# whitespace, or other additional characters have been used.
jwe_bytes = six.ensure_binary(jwe_bytes)
try:
header_segment, encrypted_key_segment, iv_segment, \
cipher_text_segment, auth_tag_segment = jwe_bytes.split(b'.', 4)
header_data = base64url_decode(header_segment)
except ValueError:
raise JWEParseError('Not enough segments')
except (TypeError, binascii.Error):
raise JWEParseError('Invalid header')
# Verify that the octet sequence resulting from decoding the
# encoded JWE Protected Header is a UTF-8-encoded representation
# of a completely valid JSON object conforming to RFC 7159
# [RFC7159]; let the JWE Protected Header be this JSON object.
#
# If using the JWE Compact Serialization, let the JOSE Header be
# the JWE Protected Header. Otherwise, when using the JWE JSON
# Serialization, let the JOSE Header be the union of the members
# of the JWE Protected Header, the JWE Shared Unprotected Header
# and the corresponding JWE Per-Recipient Unprotected Header, all
# of which must be completely valid JSON objects. During this
# step, verify that the resulting JOSE Header does not contain
# duplicate Header Parameter names. When using the JWE JSON
# Serialization, this restriction includes that the same Header
# Parameter name also MUST NOT occur in distinct JSON object
# values that together comprise the JOSE Header.
try:
header = json.loads(six.ensure_str(header_data))
except ValueError as e:
raise JWEParseError('Invalid header string: %s' % e)
if not isinstance(header, Mapping):
raise JWEParseError('Invalid header string: must be a json object')
try:
encrypted_key = base64url_decode(encrypted_key_segment)
except (TypeError, binascii.Error):
raise JWEParseError('Invalid encrypted key')
try:
iv = base64url_decode(iv_segment)
except (TypeError, binascii.Error):
raise JWEParseError('Invalid IV')
try:
ciphertext = base64url_decode(cipher_text_segment)
except (TypeError, binascii.Error):
raise JWEParseError('Invalid cyphertext')
try:
auth_tag = base64url_decode(auth_tag_segment)
except (TypeError, binascii.Error):
raise JWEParseError('Invalid auth tag')
return header, header_segment, encrypted_key, iv, ciphertext, auth_tag
def _encoded_header(alg, enc, zip, cty, kid):
"""
Generate an appropriate JOSE header based on the values provided
Args:
alg (str): Key wrap/negotiation algorithm
enc (str): Encryption algorithm
zip (str): Compression method
cty (str): Content type of the encrypted data
kid (str): ID for the key used for the operation
Returns:
bytes: JSON object of header based on input
"""
header = {"alg": alg, "enc": enc}
if zip:
header["zip"] = zip
if cty:
header["cty"] = cty
if kid:
header["kid"] = kid
json_header = json.dumps(
header,
separators=(',', ':'),
sort_keys=True,
).encode('utf-8')
return base64url_encode(json_header)
def _big_endian(int_val):
return pack("!Q", int_val)
def _encrypt_and_auth(key, alg, enc, zip, plaintext, aad):
"""
Generate a content encryption key (cek) and initialization
vector (iv) based on enc and alg, compress the plaintext based on zip,
encrypt the compressed plaintext using the cek and iv based on enc
Args:
key (Key): The key provided for encryption
alg (str): The algorithm use for key wrap/negotiation
enc (str): The encryption algorithm with which to encrypt the plaintext
zip (str): The compression algorithm with which to compress the plaintext
plaintext (bytes): The data to encrypt
aad (str): Additional authentication data utilized for generating an
auth tag
Returns:
(bytes, bytes, bytes, bytes): A tuple of the following data
(key wrapped cek, iv, cipher text, auth tag)
"""
try:
cek_bytes, kw_cek = _get_cek(enc, alg, key)
except NotImplementedError:
raise JWEError("alg {} is not implemented".format(alg))
if enc in ALGORITHMS.HMAC_AUTH_TAG:
encryption_key, mac_key, key_len = _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc)
iv, ciphertext, tag = encryption_key.encrypt(plaintext, aad)
auth_tag = _auth_tag(ciphertext, iv, aad, mac_key, key_len)
elif enc in ALGORITHMS.GCM:
encryption_key = jwk.construct(cek_bytes, enc)
iv, ciphertext, auth_tag = encryption_key.encrypt(plaintext, aad)
else:
raise NotImplementedError("enc {} is not implemented!".format(enc))
return kw_cek, iv, ciphertext, auth_tag
def _get_hmac_key(enc, mac_key_bytes):
"""
Get an HMACKey for the provided encryption algorithm and key bytes
Args:
enc (str): Encryption algorithm
mac_key_bytes (bytes): vytes for the HMAC key
Returns:
(HMACKey): The key to perform HMAC actions
"""
_, hash_alg = enc.split("-")
mac_key = jwk.construct(mac_key_bytes, hash_alg)
return mac_key
def _compress(zip, plaintext):
"""
Compress the plaintext based on the algorithm supplied
Args:
zip (str): Compression Algorithm
plaintext (bytes): plaintext to compress
Returns:
(bytes): Compressed plaintext
"""
if zip not in ZIPS.SUPPORTED:
raise NotImplementedError("ZIP {} is not supported!")
if zip is None:
compressed = plaintext
elif zip == ZIPS.DEF:
compressed = zlib.compress(plaintext)
else:
raise NotImplementedError("ZIP {} is not implemented!")
return compressed
def _decompress(zip, compressed):
"""
Decompress the plaintext based on the algorithm supplied
Args:
zip (str): Compression Algorithm
plaintext (bytes): plaintext to decompress
Returns:
(bytes): Compressed plaintext
"""
if zip not in ZIPS.SUPPORTED:
raise NotImplementedError("ZIP {} is not supported!")
if zip is None:
decompressed = compressed
elif zip == ZIPS.DEF:
decompressed = zlib.decompress(compressed)
else:
raise NotImplementedError("ZIP {} is not implemented!")
return decompressed
def _get_cek(enc, alg, key):
"""
Get the content encryption key
Args:
enc (str): Encryption algorithm
alg (str): kwy wrap/negotiation algorithm
key (Key): Key provided to encryption method
Return:
(bytes, bytes): Tuple of (cek bytes and wrapped cek)
"""
if alg == ALGORITHMS.DIR:
cek, wrapped_cek = _get_direct_key_wrap_cek(key)
else:
cek, wrapped_cek = _get_key_wrap_cek(enc, key)
return cek, wrapped_cek
def _get_direct_key_wrap_cek(key):
"""
Get the cek and wrapped cek from the encryption key direct
Args:
key (Key): Key provided to encryption method
Return:
(Key, bytes): Tuple of (cek Key object and wrapped cek)
"""
# Get the JWK data to determine how to derive the cek
jwk_data = key.to_dict()
if jwk_data["kty"] == "oct":
# Get the last half of an octal key as the cek
cek_bytes = _get_key_bytes_from_key(key)
wrapped_cek = b""
else:
raise NotImplementedError(
"JWK type {} not supported!".format(jwk_data['kty']))
return cek_bytes, wrapped_cek
def _get_key_bytes_from_key(key):
"""
Get the raw key bytes from a Key object
Args:
key (Key): Key from which to extract the raw key bytes
Returns:
(bytes) key data
"""
jwk_data = key.to_dict()
encoded_key = jwk_data["k"]
cek_bytes = base64url_decode(encoded_key)
return cek_bytes
def _get_key_wrap_cek(enc, key):
"""_get_rsa_key_wrap_cek
Get the content encryption key for RSA key wrap
Args:
enc (str): Encryption algorithm
key (Key): Key provided to encryption method
Returns:
(Key, bytes): Tuple of (cek Key object and wrapped cek)
"""
cek_bytes = _get_random_cek_bytes_for_enc(enc)
wrapped_cek = key.wrap_key(cek_bytes)
return cek_bytes, wrapped_cek
def _get_random_cek_bytes_for_enc(enc):
"""
Get the random cek bytes based on the encryptionn algorithm
Args:
enc (str): Encryption algorithm
Returns:
(bytes) random bytes for cek key
"""
if enc == ALGORITHMS.A128GCM:
num_bits = 128
elif enc == ALGORITHMS.A192GCM:
num_bits = 192
elif enc in (ALGORITHMS.A128CBC_HS256, ALGORITHMS.A256GCM):
num_bits = 256
elif enc == ALGORITHMS.A192CBC_HS384:
num_bits = 384
elif enc == ALGORITHMS.A256CBC_HS512:
num_bits = 512
else:
raise NotImplementedError("{} not supported".format(enc))
cek_bytes = get_random_bytes(num_bits // 8)
return cek_bytes
def _auth_tag(ciphertext, iv, aad, mac_key, tag_length):
"""
Get ann auth tag from the provided data
Args:
ciphertext (bytes): Encrypted value
iv (bytes): Initialization vector
aad (bytes): Additional Authenticated Data
mac_key (bytes): Key to use in generating the MAC
tag_length (int): How log the tag should be
Returns:
(bytes) Auth tag
"""
al = _big_endian(len(aad) * 8)
auth_tag_input = aad + iv + ciphertext + al
signature = mac_key.sign(auth_tag_input)
auth_tag = signature[0:tag_length]
return auth_tag
def _jwe_compact_serialize(encoded_header, encrypted_cek, iv, cipher_text, auth_tag):
"""
Generate a compact serialized JWE
Args:
encoded_header (bytes): Base64 URL Encoded JWE header JSON
encrypted_cek (bytes): Encrypted content encryption key (cek)
iv (bytes): Initialization vector (IV)
cipher_text (bytes): Cipher text
auth_tag (bytes): JWE Auth Tag
Returns:
(str): JWE compact serialized string
"""
cipher_text = six.ensure_binary(cipher_text) # Maker sure it's bytes
encoded_encrypted_cek = base64url_encode(encrypted_cek)
encoded_iv = base64url_encode(iv)
encoded_cipher_text = base64url_encode(cipher_text)
encoded_auth_tag = base64url_encode(auth_tag)
return encoded_header + b"." + encoded_encrypted_cek + b"." + \
encoded_iv + b"." + encoded_cipher_text + b"." + encoded_auth_tag