From 17b7e0c665be12a456367720d32bc09db747d32d Mon Sep 17 00:00:00 2001 From: Jonah Bron <hi@jonah.id> Date: Mon, 18 Mar 2024 01:09:32 +0000 Subject: [PATCH 1/3] pyjwt: ES256 algorithm support for PyJWT. Add optional support for ES256 JWT signing/verifying to PyJWT using @dmazzella's cryptography port. Signed-off-by: Jonah Bron <hi@jonah.id> --- python-ecosys/pyjwt/jwt.py | 87 ++++++++++++++++++++++++++++----- python-ecosys/pyjwt/test_jwt.py | 53 ++++++++++++++++++-- 2 files changed, 123 insertions(+), 17 deletions(-) diff --git a/python-ecosys/pyjwt/jwt.py b/python-ecosys/pyjwt/jwt.py index 11c28f479..93a2ff1d8 100644 --- a/python-ecosys/pyjwt/jwt.py +++ b/python-ecosys/pyjwt/jwt.py @@ -4,6 +4,17 @@ import json from time import time +# Optionally depend on https://github.com/dmazzella/ucryptography +try: + # Try importing from ucryptography port. + import cryptography + from cryptography import hashes, ec, serialization, utils + + _ec_supported = True +except ImportError: + # No cryptography library available, no EC256 support. + _ec_supported = False + def _to_b64url(data): return ( @@ -19,6 +30,28 @@ def _from_b64url(data): return binascii.a2b_base64(data.replace(b"-", b"+").replace(b"_", b"/") + b"===") +def _sig_der_to_jws(signed): + """Accept a DER signature and convert to JSON Web Signature bytes. + + `cryptography` produces signatures encoded in DER ASN.1 binary format. + JSON Web Algorithm instead encodes the signature as the point coordinates + as bigendian byte strings concatenated. + + See https://datatracker.ietf.org/doc/html/rfc7518#section-3.4 + """ + r, s = utils.decode_dss_signature(signed) + return r.to_bytes(32, "big") + s.to_bytes(32, "big") + + +def _sig_jws_to_der(signed): + """Accept a JSON Web Signature and convert to a DER signature. + + See `_sig_der_to_jws()` + """ + r, s = int.from_bytes(signed[0:32], "big"), int.from_bytes(signed[32:], "big") + return utils.encode_dss_signature(r, s) + + class exceptions: class PyJWTError(Exception): pass @@ -37,19 +70,32 @@ class ExpiredSignatureError(PyJWTError): def encode(payload, key, algorithm="HS256"): - if algorithm != "HS256": + if algorithm != "HS256" and algorithm != "ES256": raise exceptions.InvalidAlgorithmError - if isinstance(key, str): - key = key.encode() header = _to_b64url(json.dumps({"typ": "JWT", "alg": algorithm}).encode()) payload = _to_b64url(json.dumps(payload).encode()) - signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest()) + + if algorithm == "HS256": + if isinstance(key, str): + key = key.encode() + signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest()) + elif algorithm == "ES256": + if not _ec_supported: + raise exceptions.InvalidAlgorithmError( + "Required dependencies for ES256 are not available" + ) + if isinstance(key, int): + key = ec.derive_private_key(key, ec.SECP256R1()) + signature = _to_b64url( + _sig_der_to_jws(key.sign(header + b"." + payload, ec.ECDSA(hashes.SHA256()))) + ) + return (header + b"." + payload + b"." + signature).decode() -def decode(token, key, algorithms=["HS256"]): - if "HS256" not in algorithms: +def decode(token, key, algorithms=["HS256", "ES256"]): + if "HS256" not in algorithms and "ES256" not in algorithms: raise exceptions.InvalidAlgorithmError parts = token.encode().split(b".") @@ -63,14 +109,31 @@ def decode(token, key, algorithms=["HS256"]): except Exception: raise exceptions.InvalidTokenError - if header["alg"] not in algorithms or header["alg"] != "HS256": + if header["alg"] not in algorithms or (header["alg"] != "HS256" and header["alg"] != "ES256"): raise exceptions.InvalidAlgorithmError - if isinstance(key, str): - key = key.encode() - calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest() - if signature != calculated_signature: - raise exceptions.InvalidSignatureError + if header["alg"] == "HS256": + if isinstance(key, str): + key = key.encode() + calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest() + if signature != calculated_signature: + raise exceptions.InvalidSignatureError + elif header["alg"] == "ES256": + if not _ec_supported: + raise exceptions.InvalidAlgorithmError( + "Required dependencies for ES256 are not available" + ) + + if isinstance(key, bytes): + key = ec.EllipticCurvePublicKey.from_encoded_point(key, ec.SECP256R1()) + try: + key.verify( + _sig_jws_to_der(signature), + parts[0] + b"." + parts[1], + ec.ECDSA(hashes.SHA256()), + ) + except cryptography.exceptions.InvalidSignature: + raise exceptions.InvalidSignatureError if "exp" in payload: if time() > payload["exp"]: diff --git a/python-ecosys/pyjwt/test_jwt.py b/python-ecosys/pyjwt/test_jwt.py index fb30b8bbd..fd510e028 100644 --- a/python-ecosys/pyjwt/test_jwt.py +++ b/python-ecosys/pyjwt/test_jwt.py @@ -1,28 +1,71 @@ import jwt from time import time +""" +Run tests by executing: + +``` +mpremote fs cp jwt.py :lib/jwt.py + run test_jwt.py +``` + +Only the full test suite can be run if +[ucryptography](https://github.com/dmazzella/ucryptography) is present in the +firmware. +""" + +# Indentation +I = " " + +print("Testing HS256") secret_key = "top-secret!" token = jwt.encode({"user": "joe"}, secret_key, algorithm="HS256") -print(token) decoded = jwt.decode(token, secret_key, algorithms=["HS256"]) if decoded != {"user": "joe"}: raise Exception("Invalid decoded JWT") else: - print("Encode/decode test: OK") + print(I, "Encode/decode test: OK") try: decoded = jwt.decode(token, "wrong-secret", algorithms=["HS256"]) except jwt.exceptions.InvalidSignatureError: - print("Invalid signature test: OK") + print(I, "Invalid signature test: OK") else: raise Exception("Invalid JWT should have failed decoding") token = jwt.encode({"user": "joe", "exp": time() - 1}, secret_key) -print(token) try: decoded = jwt.decode(token, secret_key, algorithms=["HS256"]) except jwt.exceptions.ExpiredSignatureError: - print("Expired token test: OK") + print(I, "Expired token test: OK") else: raise Exception("Expired JWT should have failed decoding") + + +print("Testing ES256") +try: + from cryptography import ec +except ImportError: + raise Exception("No cryptography lib present, can't test ES256") + +private_key = ec.derive_private_key( + 0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75, ec.SECP256R1() +) +wrong_private_key = ec.derive_private_key( + 0x25D91A0DA38F69283A0CE32B87D82817CA4E134A1693BE6083C2292BF562A451, ec.SECP256R1() +) + +token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256") +decoded = jwt.decode(token, private_key.public_key(), algorithms=["ES256"]) +if decoded != {"user": "joe"}: + raise Exception("Invalid decoded JWT") +else: + print(I, "Encode/decode test: OK") + +token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256") +try: + decoded = jwt.decode(token + "a", wrong_private_key.public_key(), algorithms=["ES256"]) +except jwt.exceptions.InvalidSignatureError: + print(I, "Invalid signature test: OK") +else: + raise Exception("Invalid JWT should have fialed decoding") From 4e43bc0c78ba368e81490b75bb84ed395085970f Mon Sep 17 00:00:00 2001 From: Jonah Bron <hi@jonah.id> Date: Mon, 18 Mar 2024 05:32:31 +0000 Subject: [PATCH 2/3] pyjwt: Incremented package version and added a brief description. Signed-off-by: Jonah Bron <hi@jonah.id> --- python-ecosys/pyjwt/manifest.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python-ecosys/pyjwt/manifest.py b/python-ecosys/pyjwt/manifest.py index b3de5efc9..ee03673d7 100644 --- a/python-ecosys/pyjwt/manifest.py +++ b/python-ecosys/pyjwt/manifest.py @@ -1,4 +1,13 @@ -metadata(version="0.1.0", pypi="pyjwt") +metadata( + version="0.2.0", + pypi="pyjwt", + description=""" +JWT library for MicroPython. Supports HMAC (HS256) encoding essentially. +Optionally supports ECDSA (ES256) asymmetric-key signing/verification when the +[dmazella/ucryptography](https://github.com/dmazzella/ucryptography/) library +is available in the MicroPython firmware. +""", +) require("hmac") From 1d642ad8eb2fe0a31d02a8f2161698c0ad6adfd6 Mon Sep 17 00:00:00 2001 From: Jonah Bron <hi@jonah.id> Date: Mon, 18 Mar 2024 08:13:24 +0000 Subject: [PATCH 3/3] py_vapid: Basic VAPID header generation based on PyPi py_vapid. Signed-off-by: Jonah Bron <hi@jonah.id> --- python-ecosys/py_vapid/manifest.py | 12 +++ python-ecosys/py_vapid/py_vapid/__init__.py | 50 +++++++++ python-ecosys/py_vapid/test_vapid.py | 111 ++++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 python-ecosys/py_vapid/manifest.py create mode 100644 python-ecosys/py_vapid/py_vapid/__init__.py create mode 100644 python-ecosys/py_vapid/test_vapid.py diff --git a/python-ecosys/py_vapid/manifest.py b/python-ecosys/py_vapid/manifest.py new file mode 100644 index 000000000..9e947ddc2 --- /dev/null +++ b/python-ecosys/py_vapid/manifest.py @@ -0,0 +1,12 @@ +metadata( + version="0.1.0", + pypi="py-vapid", + author="Jonah Bron <hi@jonah.id>", + description=""" +VAPID +""", +) + +require("pyjwt") + +package("py_vapid") diff --git a/python-ecosys/py_vapid/py_vapid/__init__.py b/python-ecosys/py_vapid/py_vapid/__init__.py new file mode 100644 index 000000000..75a3c75f1 --- /dev/null +++ b/python-ecosys/py_vapid/py_vapid/__init__.py @@ -0,0 +1,50 @@ +""" +Based on https://github.com/web-push-libs/vapid +""" + +import binascii +import time +import jwt + +from cryptography import serialization + + +def _to_b64url(data): + return ( + binascii.b2a_base64(data) + .rstrip(b"\n") + .rstrip(b"=") + .replace(b"+", b"-") + .replace(b"/", b"_") + ) + + +class Vapid: + def __init__(self, private_key): + self._private_key = private_key + + def sign(self, claims): + claim = claims + if "exp" not in claim: + # Default to expiring 24 hours into the future (the max). + # https://datatracker.ietf.org/doc/html/rfc8292#section-2 + exp = int(time.time()) + 86400 + # Correct the epoch offset if not the Unix standard. + if time.gmtime(0)[0] == 2000: + exp += 946684800 # Unix timestamp of 2000-01-01 + + claim["exp"] = exp + + token = jwt.encode(claim, self._private_key, "ES256") + public_key = _to_b64url( + self._private_key.public_key().public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint, + ) + ).decode() + + return {"Authorization": f"vapid t={token},k={public_key}"} + + +# Re-export for interface compatibility with PyPi py-vapid +Vapid02 = Vapid diff --git a/python-ecosys/py_vapid/test_vapid.py b/python-ecosys/py_vapid/test_vapid.py new file mode 100644 index 000000000..45843f4fc --- /dev/null +++ b/python-ecosys/py_vapid/test_vapid.py @@ -0,0 +1,111 @@ +import jwt +import py_vapid +from time import time +from cryptography import ec +from machine import RTC + + +""" +Run tests by executing: + +``` +mpremote fs cp py_vapid/__init__.py :lib/py_vapid.py + run test_vapid.py +``` + +The [ucryptography](https://github.com/dmazzella/ucryptography) library must +be present in the firmware for this library and tests to work. +""" + +rtc = RTC() + +GOLDEN_0 = ( + 0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75, + (2010, 1, 1, 0, 0, 0, 0, 0), + { + "aud": "https://updates.push.services.mozilla.com", + "sub": "mailto:admin@example.com", + "exp": 9876543, + }, + "vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20iLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJleHAiOiA5ODc2NTQzfQ.DLB6PF2RApzk0n0oH-Kv_Onuwg9C7VXakM-GlEMCwj50rQ7G0hF_vLIYzCPeXT8Hu8Uup900YBapZ9y45vc8QA,k=BKoKs6nJ3466nCEQ5TvFkBIGBKSGplPTUBzJlLXM13I8S0SF-o_NSB-Q4At3BeLSrZVptEd5xBuGRXCKMe_YRg8", +) + +GOLDEN_1 = ( + 0x4370082632776C74FDC5517AC12881413A60B25D10E863296AD67E4260A3BF56, + (2015, 1, 1, 0, 0, 0, 0, 0), + { + "aud": "https://updates.push.services.mozilla.com", + "sub": "mailto:admin@example.com", + }, + "vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJleHAiOiAxNDIwMTU2ODAwLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20ifQ.NlVtqjGWy-hvNtoScrwAv-4cpNYrgUJ4EVgtxTnIn-haPtBSpak7aQN518tVYelQB1TZqc0bxAjWfK9QvZUbOA,k=BGEwf7m9F3vCvOuPeN4pEZ91t-dpSmg_y8ZXMfOyl-f22zw10ho_4EeBqZj2-NtW_Kb98b6tGjOKO_-TJiWvyfo", +) + +# Set of opaquely known-good scenarios to check against +golden_test_cases = [GOLDEN_0, GOLDEN_1] + + +# Test basic validation of claim +private_key_0 = ec.derive_private_key( + 0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1() +) +vapid = py_vapid.Vapid(private_key=private_key_0) +rtc.datetime((2018, 1, 1, 0, 0, 0, 0, 0)) +headers = vapid.sign( + { + "aud": "https://fcm.googleapis.com", + "sub": "mailto:foo@bar.com", + "exp": 1493315200, + } +) + +actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1] +actual_decoded_claim = jwt.decode(actual_token, private_key_0.public_key(), "ES256") +assert ( + actual_decoded_claim["aud"] == "https://fcm.googleapis.com" +), f"Claim audience '{actual_decoded_claim['aud']}' does not match input" +assert ( + actual_decoded_claim["sub"] == "mailto:foo@bar.com" +), f"Claim subscriber '{actual_decoded_claim['sub']}' does not match input" +assert ( + actual_decoded_claim["exp"] == 1493315200 +), f"Claim exp '{actual_decoded_claim['exp']}' does not match input" +print(f"Test claim validation: Passed") + + +# Test auto expiration date population +private_key_1 = ec.derive_private_key( + 0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1() +) +vapid = py_vapid.Vapid(private_key=private_key_1) +rtc.datetime((2017, 1, 1, 0, 0, 0, 0, 0)) +headers = vapid.sign( + { + "aud": "https://updates.push.services.mozilla.com", + "sub": "mailto:admin@example.com", + } +) + +actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1] +actual_decoded_claim = jwt.decode(actual_token, private_key_1.public_key(), "ES256") +assert ( + actual_decoded_claim["exp"] == 1483315200 +), f"Claim exp '{actual_decoded_claim['exp']}' does not match expected 2017-01-02 value" +print(f"Test auto expiry: Passed") + + +# Because they provide the least information about what could have gone wrong, +# Run golden test cases after all more specific tests pass first. +for case_no, case in enumerate(golden_test_cases): + private_key_number, curr_time, claim, expected_id = case + try: + private_key = ec.derive_private_key(private_key_number, ec.SECP256R1()) + vapid = py_vapid.Vapid(private_key=private_key) + rtc.datetime(curr_time) + headers = vapid.sign(claim) + + assert ( + headers["Authorization"] == expected_id + ), f"Authorization header '{headers['Authorization']}' does not match golden test case {case_no}" + print(f"Golden test case {case_no}: Passed") + except Exception as e: + print(f"Golden test case {case_no}: Failed") + raise e