-
-
Notifications
You must be signed in to change notification settings - Fork 687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add JWK support for HMAC and RSA keys #202
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,24 @@ | ||
import hashlib | ||
import hmac | ||
import json | ||
|
||
from .compat import binary_type, constant_time_compare, is_string_type | ||
|
||
from .compat import constant_time_compare, string_types | ||
from .exceptions import InvalidKeyError | ||
from .utils import der_to_raw_signature, raw_to_der_signature | ||
from .utils import ( | ||
base64url_decode, base64url_encode, der_to_raw_signature, | ||
force_bytes, force_unicode, from_base64url_uint, raw_to_der_signature, | ||
to_base64url_uint | ||
) | ||
|
||
try: | ||
from cryptography.hazmat.primitives import hashes | ||
from cryptography.hazmat.primitives.serialization import ( | ||
load_pem_private_key, load_pem_public_key, load_ssh_public_key | ||
) | ||
from cryptography.hazmat.primitives.asymmetric.rsa import ( | ||
RSAPrivateKey, RSAPublicKey | ||
RSAPrivateKey, RSAPublicKey, RSAPrivateNumbers, RSAPublicNumbers, | ||
rsa_recover_prime_factors, rsa_crt_dmp1, rsa_crt_dmq1, rsa_crt_iqmp | ||
) | ||
from cryptography.hazmat.primitives.asymmetric.ec import ( | ||
EllipticCurvePrivateKey, EllipticCurvePublicKey | ||
|
@@ -77,6 +84,20 @@ def verify(self, msg, key, sig): | |
""" | ||
raise NotImplementedError | ||
|
||
@staticmethod | ||
def to_jwk(key_obj): | ||
""" | ||
Serializes a given RSA key into a JWK | ||
""" | ||
raise NotImplementedError | ||
|
||
@staticmethod | ||
def from_jwk(jwk): | ||
""" | ||
Deserializes a given RSA key from JWK back into a PublicKey or PrivateKey object | ||
""" | ||
raise NotImplementedError | ||
|
||
|
||
class NoneAlgorithm(Algorithm): | ||
""" | ||
|
@@ -112,11 +133,7 @@ def __init__(self, hash_alg): | |
self.hash_alg = hash_alg | ||
|
||
def prepare_key(self, key): | ||
if not is_string_type(key): | ||
raise TypeError('Expecting a string- or bytes-formatted key.') | ||
|
||
if not isinstance(key, binary_type): | ||
key = key.encode('utf-8') | ||
key = force_bytes(key) | ||
|
||
invalid_strings = [ | ||
b'-----BEGIN PUBLIC KEY-----', | ||
|
@@ -131,6 +148,22 @@ def prepare_key(self, key): | |
|
||
return key | ||
|
||
@staticmethod | ||
def to_jwk(key_obj): | ||
return json.dumps({ | ||
'k': force_unicode(base64url_encode(force_bytes(key_obj))), | ||
'kty': 'oct' | ||
}) | ||
|
||
@staticmethod | ||
def from_jwk(jwk): | ||
obj = json.loads(jwk) | ||
|
||
if obj.get('kty') != 'oct': | ||
raise InvalidKeyError('Not an HMAC key') | ||
|
||
return base64url_decode(obj['k']) | ||
|
||
def sign(self, msg, key): | ||
return hmac.new(key, msg, self.hash_alg).digest() | ||
|
||
|
@@ -156,9 +189,8 @@ def prepare_key(self, key): | |
isinstance(key, RSAPublicKey): | ||
return key | ||
|
||
if is_string_type(key): | ||
if not isinstance(key, binary_type): | ||
key = key.encode('utf-8') | ||
if isinstance(key, string_types): | ||
key = force_bytes(key) | ||
|
||
try: | ||
if key.startswith(b'ssh-rsa'): | ||
|
@@ -172,6 +204,105 @@ def prepare_key(self, key): | |
|
||
return key | ||
|
||
@staticmethod | ||
def to_jwk(key_obj): | ||
obj = None | ||
|
||
if getattr(key_obj, 'private_numbers', None): | ||
# Private key | ||
numbers = key_obj.private_numbers() | ||
|
||
obj = { | ||
'kty': 'RSA', | ||
'key_ops': ['sign'], | ||
'n': force_unicode(to_base64url_uint(numbers.public_numbers.n)), | ||
'e': force_unicode(to_base64url_uint(numbers.public_numbers.e)), | ||
'd': force_unicode(to_base64url_uint(numbers.d)), | ||
'p': force_unicode(to_base64url_uint(numbers.p)), | ||
'q': force_unicode(to_base64url_uint(numbers.q)), | ||
'dp': force_unicode(to_base64url_uint(numbers.dmp1)), | ||
'dq': force_unicode(to_base64url_uint(numbers.dmq1)), | ||
'qi': force_unicode(to_base64url_uint(numbers.iqmp)) | ||
} | ||
|
||
elif getattr(key_obj, 'verifier', None): | ||
# Public key | ||
numbers = key_obj.public_numbers() | ||
|
||
obj = { | ||
'kty': 'RSA', | ||
'key_ops': ['verify'], | ||
'n': force_unicode(to_base64url_uint(numbers.n)), | ||
'e': force_unicode(to_base64url_uint(numbers.e)) | ||
} | ||
else: | ||
raise InvalidKeyError('Not a public or private key') | ||
|
||
return json.dumps(obj) | ||
|
||
@staticmethod | ||
def from_jwk(jwk): | ||
try: | ||
obj = json.loads(jwk) | ||
except ValueError: | ||
raise InvalidKeyError('Key is not valid JSON') | ||
|
||
if obj.get('kty') != 'RSA': | ||
raise InvalidKeyError('Not an RSA key') | ||
|
||
if 'd' in obj and 'e' in obj and 'n' in obj: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neither There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great point. I definitely missed that. I'll fix it. |
||
# Private key | ||
if 'oth' in obj: | ||
raise InvalidKeyError('Unsupported RSA private key: > 2 primes not supported') | ||
|
||
other_props = ['p', 'q', 'dp', 'dq', 'qi'] | ||
props_found = [prop in obj for prop in other_props] | ||
any_props_found = any(props_found) | ||
|
||
if any_props_found and not all(props_found): | ||
raise InvalidKeyError('RSA key must include all parameters if any are present besides d') | ||
|
||
public_numbers = RSAPublicNumbers( | ||
from_base64url_uint(obj['e']), from_base64url_uint(obj['n']) | ||
) | ||
|
||
if any_props_found: | ||
numbers = RSAPrivateNumbers( | ||
d=from_base64url_uint(obj['d']), | ||
p=from_base64url_uint(obj['p']), | ||
q=from_base64url_uint(obj['q']), | ||
dmp1=from_base64url_uint(obj['dp']), | ||
dmq1=from_base64url_uint(obj['dq']), | ||
iqmp=from_base64url_uint(obj['qi']), | ||
public_numbers=public_numbers | ||
) | ||
else: | ||
d = from_base64url_uint(obj['d']) | ||
p, q = rsa_recover_prime_factors( | ||
public_numbers.n, d, public_numbers.e | ||
) | ||
|
||
numbers = RSAPrivateNumbers( | ||
d=d, | ||
p=p, | ||
q=q, | ||
dmp1=rsa_crt_dmp1(d, p), | ||
dmq1=rsa_crt_dmq1(d, q), | ||
iqmp=rsa_crt_iqmp(p, q), | ||
public_numbers=public_numbers | ||
) | ||
|
||
return numbers.private_key(default_backend()) | ||
elif 'n' in obj and 'e' in obj: | ||
# Public key | ||
numbers = RSAPublicNumbers( | ||
from_base64url_uint(obj['e']), from_base64url_uint(obj['n']) | ||
) | ||
|
||
return numbers.public_key(default_backend()) | ||
else: | ||
raise InvalidKeyError('Not a public or private key') | ||
|
||
def sign(self, msg, key): | ||
signer = key.signer( | ||
padding.PKCS1v15(), | ||
|
@@ -213,9 +344,8 @@ def prepare_key(self, key): | |
isinstance(key, EllipticCurvePublicKey): | ||
return key | ||
|
||
if is_string_type(key): | ||
if not isinstance(key, binary_type): | ||
key = key.encode('utf-8') | ||
if isinstance(key, string_types): | ||
key = force_bytes(key) | ||
|
||
# Attempt to load key. We don't know if it's | ||
# a Signing Key or a Verifying Key, so we try | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,9 @@ | |
versions of python, and compatibility wrappers around optional packages. | ||
""" | ||
# flake8: noqa | ||
import sys | ||
import hmac | ||
import struct | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: alphabetize these imports. |
||
import sys | ||
|
||
|
||
PY3 = sys.version_info[0] == 3 | ||
|
@@ -20,10 +21,6 @@ | |
string_types = (text_type, binary_type) | ||
|
||
|
||
def is_string_type(val): | ||
return any([isinstance(val, typ) for typ in string_types]) | ||
|
||
|
||
def timedelta_total_seconds(delta): | ||
try: | ||
delta.total_seconds | ||
|
@@ -56,3 +53,24 @@ def constant_time_compare(val1, val2): | |
result |= ord(x) ^ ord(y) | ||
|
||
return result == 0 | ||
|
||
# Use int.to_bytes if it exists (Python 3) | ||
if getattr(int, 'to_bytes', None): | ||
def bytes_from_int(val): | ||
remaining = val | ||
byte_length = 0 | ||
|
||
while remaining != 0: | ||
remaining = remaining >> 8 | ||
byte_length += 1 | ||
|
||
return val.to_bytes(byte_length, 'big', signed=False) | ||
else: | ||
def bytes_from_int(val): | ||
buf = [] | ||
while val: | ||
val, remainder = divmod(val, 256) | ||
buf.append(remainder) | ||
|
||
buf.reverse() | ||
return struct.pack('%sB' % len(buf), *buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot find specific documentation on encoding RSA keys, but the examples I have seen include modulus (n) and public exponent (e) values in both the private and public keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. That was an oversight on my part. I missed the text that says: