Skip to content

Commit

Permalink
Fix failing linting
Browse files Browse the repository at this point in the history
  • Loading branch information
jpadilla committed Jun 19, 2020
1 parent c29f42a commit 6b7b7f0
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 68 deletions.
61 changes: 23 additions & 38 deletions jwt/algorithms.py
Expand Up @@ -198,10 +198,7 @@ def prepare_key(self, key):
@staticmethod
def to_jwk(key_obj):
return json.dumps(
{
"k": force_unicode(base64url_encode(force_bytes(key_obj))),
"kty": "oct",
}
{"k": force_unicode(base64url_encode(force_bytes(key_obj))), "kty": "oct",}
)

@staticmethod
Expand Down Expand Up @@ -244,9 +241,7 @@ def prepare_key(self, key):

try:
if key.startswith(b"ssh-rsa"):
key = load_ssh_public_key(
key, backend=default_backend()
)
key = load_ssh_public_key(key, backend=default_backend())
else:
key = load_pem_private_key(
key, password=None, backend=default_backend()
Expand All @@ -269,12 +264,8 @@ def to_jwk(key_obj):
obj = {
"kty": "RSA",
"key_ops": ["sign"],
"n": force_unicode(
to_base64url_uint(numbers.public_numbers.n)
),
"e": force_unicode(
to_base64url_uint(numbers.public_numbers.e)
),
"n": force_unicode(to_base64url_uint(numbers.public_numbers.n)),
"e": force_unicode(to_base64url_uint(numbers.public_numbers.e)),
"d": force_unicode(to_base64url_uint(numbers.d)),
"p": force_unicode(to_base64url_uint(numbers.p)),
"q": force_unicode(to_base64url_uint(numbers.q)),
Expand Down Expand Up @@ -325,8 +316,7 @@ def from_jwk(jwk):
)

public_numbers = RSAPublicNumbers(
from_base64url_uint(obj["e"]),
from_base64url_uint(obj["n"]),
from_base64url_uint(obj["e"]), from_base64url_uint(obj["n"]),
)

if any_props_found:
Expand Down Expand Up @@ -359,8 +349,7 @@ def from_jwk(jwk):
elif "n" in obj and "e" in obj:
# Public key
numbers = RSAPublicNumbers(
from_base64url_uint(obj["e"]),
from_base64url_uint(obj["n"]),
from_base64url_uint(obj["e"]), from_base64url_uint(obj["n"]),
)

return numbers.public_key(default_backend())
Expand Down Expand Up @@ -404,13 +393,9 @@ def prepare_key(self, key):
# the Verifying Key first.
try:
if key.startswith(b"ecdsa-sha2-"):
key = load_ssh_public_key(
key, backend=default_backend()
)
key = load_ssh_public_key(key, backend=default_backend())
else:
key = load_pem_public_key(
key, backend=default_backend()
)
key = load_pem_public_key(key, backend=default_backend())
except ValueError:
key = load_pem_private_key(
key, password=None, backend=default_backend()
Expand Down Expand Up @@ -444,29 +429,29 @@ def from_jwk(jwk):
try:
obj = json.loads(jwk)
except ValueError:
raise InvalidKeyError('Key is not valid JSON')
raise InvalidKeyError("Key is not valid JSON")

if obj.get('kty') != 'EC':
raise InvalidKeyError('Not an Elliptic curve key')
if obj.get("kty") != "EC":
raise InvalidKeyError("Not an Elliptic curve key")

if 'x' not in obj or 'y' not in obj:
raise InvalidKeyError('Not an Elliptic curve key')
if "x" not in obj or "y" not in obj:
raise InvalidKeyError("Not an Elliptic curve key")

x = base64url_decode(force_bytes(obj.get('x')))
y = base64url_decode(force_bytes(obj.get('y')))
x = base64url_decode(force_bytes(obj.get("x")))
y = base64url_decode(force_bytes(obj.get("y")))

curve = obj.get('crv')
if curve == 'P-256':
curve = obj.get("crv")
if curve == "P-256":
if len(x) == len(y) == 32:
curve_obj = ec.SECP256R1()
else:
raise InvalidKeyError("Coords should be 32 bytes for curve P-256")
elif curve == 'P-384':
elif curve == "P-384":
if len(x) == len(y) == 48:
curve_obj = ec.SECP384R1()
else:
raise InvalidKeyError("Coords should be 48 bytes for curve P-384")
elif curve == 'P-521':
elif curve == "P-521":
if len(x) == len(y) == 66:
curve_obj = ec.SECP521R1()
else:
Expand All @@ -475,20 +460,20 @@ def from_jwk(jwk):
raise InvalidKeyError("Invalid curve: {}".format(curve))

public_numbers = ec.EllipticCurvePublicNumbers(
x=int_from_bytes(x, 'big'), y=int_from_bytes(y, 'big'), curve=curve_obj
x=int_from_bytes(x, "big"), y=int_from_bytes(y, "big"), curve=curve_obj
)

if 'd' not in obj:
if "d" not in obj:
return public_numbers.public_key(default_backend())

d = base64url_decode(force_bytes(obj.get('d')))
d = base64url_decode(force_bytes(obj.get("d")))
if len(d) != len(x):
raise InvalidKeyError(
"D should be {} bytes for curve {}", len(x), curve
)

return ec.EllipticCurvePrivateNumbers(
int_from_bytes(d, 'big'), public_numbers
int_from_bytes(d, "big"), public_numbers
).private_key(default_backend())

class RSAPSSAlgorithm(RSAAlgorithm):
Expand Down
6 changes: 3 additions & 3 deletions tests/keys/__init__.py
Expand Up @@ -43,12 +43,12 @@ def load_ec_key():
keyobj = json.load(infile)

return ec.EllipticCurvePrivateNumbers(
private_value=decode_value(keyobj['d']),
public_numbers=load_ec_pub_key_p_521().public_numbers()
private_value=decode_value(keyobj["d"]),
public_numbers=load_ec_pub_key_p_521().public_numbers(),
)

def load_ec_pub_key_p_521():
with open(os.path.join(BASE_PATH, 'jwk_ec_pub_P-521.json'), 'r') as infile:
with open(os.path.join(BASE_PATH, "jwk_ec_pub_P-521.json"), "r") as infile:
keyobj = json.load(infile)

return ec.EllipticCurvePublicNumbers(
Expand Down
2 changes: 1 addition & 1 deletion tests/keys/jwk_ec_key_P-256.json
Expand Up @@ -5,4 +5,4 @@
"x": "PTTjIY84aLtaZCxLTrG_d8I0G6YKCV7lg8M4xkKfwQ4=",
"y": "ank6KA34vv24HZLXlChVs85NEGlpg2sbqNmR_BcgyJU=",
"d": "9GJquUJf57a9sev-u8-PoYlIezIPqI_vGpIaiu4zyZk="
}
}
2 changes: 1 addition & 1 deletion tests/keys/jwk_ec_key_P-384.json
Expand Up @@ -5,4 +5,4 @@
"x": "IDC-5s6FERlbC4Nc_4JhKW8sd51AhixtMdNUtPxhRFP323QY6cwWeIA3leyZhz-J",
"y": "eovmN9ocANS8IJxDAGSuC1FehTq5ZFLJU7XSPg36zHpv4H2byKGEcCBiwT4sFJsy",
"d": "xKPj5IXjiHpQpLOgyMGo6lg_DUp738SuXkiugCFMxbGNKTyTprYPfJz42wTOXbtd"
}
}
2 changes: 1 addition & 1 deletion tests/keys/jwk_ec_pub_P-256.json
Expand Up @@ -4,4 +4,4 @@
"crv": "P-256",
"x": "PTTjIY84aLtaZCxLTrG_d8I0G6YKCV7lg8M4xkKfwQ4=",
"y": "ank6KA34vv24HZLXlChVs85NEGlpg2sbqNmR_BcgyJU="
}
}
2 changes: 1 addition & 1 deletion tests/keys/jwk_ec_pub_P-384.json
Expand Up @@ -4,4 +4,4 @@
"crv": "P-384",
"x": "IDC-5s6FERlbC4Nc_4JhKW8sd51AhixtMdNUtPxhRFP323QY6cwWeIA3leyZhz-J",
"y": "eovmN9ocANS8IJxDAGSuC1FehTq5ZFLJU7XSPg36zHpv4H2byKGEcCBiwT4sFJsy"
}
}
53 changes: 30 additions & 23 deletions tests/test_algorithms.py
Expand Up @@ -13,6 +13,7 @@
try:
from jwt.algorithms import RSAAlgorithm, ECAlgorithm, RSAPSSAlgorithm
from .keys import load_rsa_pub_key, load_ec_pub_key_p_521

has_crypto = True
except ImportError:
has_crypto = False
Expand Down Expand Up @@ -189,47 +190,51 @@ def test_rsa_verify_should_return_false_if_signature_invalid(self):
result = algo.verify(message, pub_key, sig)
assert not result

@pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
@pytest.mark.skipif(
not has_crypto, reason="Not supported without cryptography library"
)
def test_ec_jwk_public_and_private_keys_should_parse_and_verify(self):
tests = {
'P-256': ECAlgorithm.SHA256,
'P-384': ECAlgorithm.SHA384,
'P-521': ECAlgorithm.SHA512
"P-256": ECAlgorithm.SHA256,
"P-384": ECAlgorithm.SHA384,
"P-521": ECAlgorithm.SHA512,
}
for (curve, hash) in tests.items():
algo = ECAlgorithm(hash)

with open(key_path('jwk_ec_pub_{}.json'.format(curve)), 'r') as keyfile:
with open(key_path("jwk_ec_pub_{}.json".format(curve)), "r") as keyfile:
pub_key = algo.from_jwk(keyfile.read())

with open(key_path('jwk_ec_key_{}.json'.format(curve)), 'r') as keyfile:
with open(key_path("jwk_ec_key_{}.json".format(curve)), "r") as keyfile:
priv_key = algo.from_jwk(keyfile.read())

signature = algo.sign(force_bytes('Hello World!'), priv_key)
assert algo.verify(force_bytes('Hello World!'), pub_key, signature)
signature = algo.sign(force_bytes("Hello World!"), priv_key)
assert algo.verify(force_bytes("Hello World!"), pub_key, signature)

@pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
@pytest.mark.skipif(
not has_crypto, reason="Not supported without cryptography library"
)
def test_ec_jwk_fails_on_invalid_json(self):
algo = ECAlgorithm(ECAlgorithm.SHA512)

valid_points = {
'P-256': {
'x': 'PTTjIY84aLtaZCxLTrG_d8I0G6YKCV7lg8M4xkKfwQ4=',
'y': 'ank6KA34vv24HZLXlChVs85NEGlpg2sbqNmR_BcgyJU='
"P-256": {
"x": "PTTjIY84aLtaZCxLTrG_d8I0G6YKCV7lg8M4xkKfwQ4=",
"y": "ank6KA34vv24HZLXlChVs85NEGlpg2sbqNmR_BcgyJU=",
},
"P-384": {
"x": "IDC-5s6FERlbC4Nc_4JhKW8sd51AhixtMdNUtPxhRFP323QY6cwWeIA3leyZhz-J",
"y": "eovmN9ocANS8IJxDAGSuC1FehTq5ZFLJU7XSPg36zHpv4H2byKGEcCBiwT4sFJsy",
},
'P-384': {
'x': 'IDC-5s6FERlbC4Nc_4JhKW8sd51AhixtMdNUtPxhRFP323QY6cwWeIA3leyZhz-J',
'y': 'eovmN9ocANS8IJxDAGSuC1FehTq5ZFLJU7XSPg36zHpv4H2byKGEcCBiwT4sFJsy'
"P-521": {
"x": "AHKZLLOsCOzz5cY97ewNUajB957y-C-U88c3v13nmGZx6sYl_oJXu9A5RkTKqjqvjyekWF-7ytDyRXYgCF5cj0Kt",
"y": "AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1",
},
'P-521': {
'x': 'AHKZLLOsCOzz5cY97ewNUajB957y-C-U88c3v13nmGZx6sYl_oJXu9A5RkTKqjqvjyekWF-7ytDyRXYgCF5cj0Kt',
'y': 'AdymlHvOiLxXkEhayXQnNCvDX4h9htZaCJN34kfmC6pV5OhQHiraVySsUdaQkAgDPrwQrJmbnX9cwlGfP-HqHZR1'
}
}

# Invalid JSON
with pytest.raises(InvalidKeyError):
algo.from_jwk('<this isn\'t json>')
algo.from_jwk("<this isn't json>")

# Bad key type
with pytest.raises(InvalidKeyError):
Expand All @@ -252,7 +257,7 @@ def test_ec_jwk_fails_on_invalid_json(self):
algo.from_jwk('{"kty": "EC", "x": "dGVzdHRlc3Q=", "y": "dGVzdA=="}')

# EC coordinates length invalid
for curve in ('P-256', 'P-384', 'P-521'):
for curve in ("P-256", "P-384", "P-521"):
with pytest.raises(InvalidKeyError):
algo.from_jwk(
'{{"kty": "EC", "crv": "{}", "x": "dGVzdA==", '
Expand All @@ -264,10 +269,12 @@ def test_ec_jwk_fails_on_invalid_json(self):
with pytest.raises(InvalidKeyError):
algo.from_jwk(
'{{"kty": "EC", "crv": "{}", "x": "{}", "y": "{}", '
'"d": "dGVzdA=="}}'.format(curve, point['x'], point['y'])
'"d": "dGVzdA=="}}'.format(curve, point["x"], point["y"])
)

@pytest.mark.skipif(not has_crypto, reason='Not supported without cryptography library')
@pytest.mark.skipif(
not has_crypto, reason="Not supported without cryptography library"
)
def test_rsa_jwk_public_and_private_keys_should_parse_and_verify(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)

Expand Down

0 comments on commit 6b7b7f0

Please sign in to comment.