From 17b7e0c665be12a456367720d32bc09db747d32d Mon Sep 17 00:00:00 2001
From: Jonah Bron <hi@jonah.id>
Date: Mon, 18 Mar 2024 01:09:32 +0000
Subject: [PATCH 1/3] pyjwt: ES256 algorithm support for PyJWT.

Add optional support for ES256 JWT signing/verifying to PyJWT using
@dmazzella's cryptography port.

Signed-off-by: Jonah Bron <hi@jonah.id>
---
 python-ecosys/pyjwt/jwt.py      | 87 ++++++++++++++++++++++++++++-----
 python-ecosys/pyjwt/test_jwt.py | 53 ++++++++++++++++++--
 2 files changed, 123 insertions(+), 17 deletions(-)

diff --git a/python-ecosys/pyjwt/jwt.py b/python-ecosys/pyjwt/jwt.py
index 11c28f479..93a2ff1d8 100644
--- a/python-ecosys/pyjwt/jwt.py
+++ b/python-ecosys/pyjwt/jwt.py
@@ -4,6 +4,17 @@
 import json
 from time import time
 
+# Optionally depend on https://github.com/dmazzella/ucryptography
+try:
+    # Try importing from ucryptography port.
+    import cryptography
+    from cryptography import hashes, ec, serialization, utils
+
+    _ec_supported = True
+except ImportError:
+    # No cryptography library available, no EC256 support.
+    _ec_supported = False
+
 
 def _to_b64url(data):
     return (
@@ -19,6 +30,28 @@ def _from_b64url(data):
     return binascii.a2b_base64(data.replace(b"-", b"+").replace(b"_", b"/") + b"===")
 
 
+def _sig_der_to_jws(signed):
+    """Accept a DER signature and convert to JSON Web Signature bytes.
+
+    `cryptography` produces signatures encoded in DER ASN.1 binary format.
+    JSON Web Algorithm instead encodes the signature as the point coordinates
+    as bigendian byte strings concatenated.
+
+    See https://datatracker.ietf.org/doc/html/rfc7518#section-3.4
+    """
+    r, s = utils.decode_dss_signature(signed)
+    return r.to_bytes(32, "big") + s.to_bytes(32, "big")
+
+
+def _sig_jws_to_der(signed):
+    """Accept a JSON Web Signature and convert to a DER signature.
+
+    See `_sig_der_to_jws()`
+    """
+    r, s = int.from_bytes(signed[0:32], "big"), int.from_bytes(signed[32:], "big")
+    return utils.encode_dss_signature(r, s)
+
+
 class exceptions:
     class PyJWTError(Exception):
         pass
@@ -37,19 +70,32 @@ class ExpiredSignatureError(PyJWTError):
 
 
 def encode(payload, key, algorithm="HS256"):
-    if algorithm != "HS256":
+    if algorithm != "HS256" and algorithm != "ES256":
         raise exceptions.InvalidAlgorithmError
 
-    if isinstance(key, str):
-        key = key.encode()
     header = _to_b64url(json.dumps({"typ": "JWT", "alg": algorithm}).encode())
     payload = _to_b64url(json.dumps(payload).encode())
-    signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
+
+    if algorithm == "HS256":
+        if isinstance(key, str):
+            key = key.encode()
+        signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
+    elif algorithm == "ES256":
+        if not _ec_supported:
+            raise exceptions.InvalidAlgorithmError(
+                "Required dependencies for ES256 are not available"
+            )
+        if isinstance(key, int):
+            key = ec.derive_private_key(key, ec.SECP256R1())
+        signature = _to_b64url(
+            _sig_der_to_jws(key.sign(header + b"." + payload, ec.ECDSA(hashes.SHA256())))
+        )
+
     return (header + b"." + payload + b"." + signature).decode()
 
 
-def decode(token, key, algorithms=["HS256"]):
-    if "HS256" not in algorithms:
+def decode(token, key, algorithms=["HS256", "ES256"]):
+    if "HS256" not in algorithms and "ES256" not in algorithms:
         raise exceptions.InvalidAlgorithmError
 
     parts = token.encode().split(b".")
@@ -63,14 +109,31 @@ def decode(token, key, algorithms=["HS256"]):
     except Exception:
         raise exceptions.InvalidTokenError
 
-    if header["alg"] not in algorithms or header["alg"] != "HS256":
+    if header["alg"] not in algorithms or (header["alg"] != "HS256" and header["alg"] != "ES256"):
         raise exceptions.InvalidAlgorithmError
 
-    if isinstance(key, str):
-        key = key.encode()
-    calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
-    if signature != calculated_signature:
-        raise exceptions.InvalidSignatureError
+    if header["alg"] == "HS256":
+        if isinstance(key, str):
+            key = key.encode()
+        calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
+        if signature != calculated_signature:
+            raise exceptions.InvalidSignatureError
+    elif header["alg"] == "ES256":
+        if not _ec_supported:
+            raise exceptions.InvalidAlgorithmError(
+                "Required dependencies for ES256 are not available"
+            )
+
+        if isinstance(key, bytes):
+            key = ec.EllipticCurvePublicKey.from_encoded_point(key, ec.SECP256R1())
+        try:
+            key.verify(
+                _sig_jws_to_der(signature),
+                parts[0] + b"." + parts[1],
+                ec.ECDSA(hashes.SHA256()),
+            )
+        except cryptography.exceptions.InvalidSignature:
+            raise exceptions.InvalidSignatureError
 
     if "exp" in payload:
         if time() > payload["exp"]:
diff --git a/python-ecosys/pyjwt/test_jwt.py b/python-ecosys/pyjwt/test_jwt.py
index fb30b8bbd..fd510e028 100644
--- a/python-ecosys/pyjwt/test_jwt.py
+++ b/python-ecosys/pyjwt/test_jwt.py
@@ -1,28 +1,71 @@
 import jwt
 from time import time
 
+"""
+Run tests by executing:
+
+```
+mpremote fs cp jwt.py :lib/jwt.py + run test_jwt.py
+```
+
+Only the full test suite can be run if
+[ucryptography](https://github.com/dmazzella/ucryptography) is present in the
+firmware.
+"""
+
+# Indentation
+I = "    "
+
+print("Testing HS256")
 secret_key = "top-secret!"
 
 token = jwt.encode({"user": "joe"}, secret_key, algorithm="HS256")
-print(token)
 decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
 if decoded != {"user": "joe"}:
     raise Exception("Invalid decoded JWT")
 else:
-    print("Encode/decode test: OK")
+    print(I, "Encode/decode test: OK")
 
 try:
     decoded = jwt.decode(token, "wrong-secret", algorithms=["HS256"])
 except jwt.exceptions.InvalidSignatureError:
-    print("Invalid signature test: OK")
+    print(I, "Invalid signature test: OK")
 else:
     raise Exception("Invalid JWT should have failed decoding")
 
 token = jwt.encode({"user": "joe", "exp": time() - 1}, secret_key)
-print(token)
 try:
     decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
 except jwt.exceptions.ExpiredSignatureError:
-    print("Expired token test: OK")
+    print(I, "Expired token test: OK")
 else:
     raise Exception("Expired JWT should have failed decoding")
+
+
+print("Testing ES256")
+try:
+    from cryptography import ec
+except ImportError:
+    raise Exception("No cryptography lib present, can't test ES256")
+
+private_key = ec.derive_private_key(
+    0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75, ec.SECP256R1()
+)
+wrong_private_key = ec.derive_private_key(
+    0x25D91A0DA38F69283A0CE32B87D82817CA4E134A1693BE6083C2292BF562A451, ec.SECP256R1()
+)
+
+token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
+decoded = jwt.decode(token, private_key.public_key(), algorithms=["ES256"])
+if decoded != {"user": "joe"}:
+    raise Exception("Invalid decoded JWT")
+else:
+    print(I, "Encode/decode test: OK")
+
+token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
+try:
+    decoded = jwt.decode(token + "a", wrong_private_key.public_key(), algorithms=["ES256"])
+except jwt.exceptions.InvalidSignatureError:
+    print(I, "Invalid signature test: OK")
+else:
+    raise Exception("Invalid JWT should have fialed decoding")

From 4e43bc0c78ba368e81490b75bb84ed395085970f Mon Sep 17 00:00:00 2001
From: Jonah Bron <hi@jonah.id>
Date: Mon, 18 Mar 2024 05:32:31 +0000
Subject: [PATCH 2/3] pyjwt: Incremented package version and added a brief
 description.

Signed-off-by: Jonah Bron <hi@jonah.id>
---
 python-ecosys/pyjwt/manifest.py | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/python-ecosys/pyjwt/manifest.py b/python-ecosys/pyjwt/manifest.py
index b3de5efc9..ee03673d7 100644
--- a/python-ecosys/pyjwt/manifest.py
+++ b/python-ecosys/pyjwt/manifest.py
@@ -1,4 +1,13 @@
-metadata(version="0.1.0", pypi="pyjwt")
+metadata(
+    version="0.2.0",
+    pypi="pyjwt",
+    description="""
+JWT library for MicroPython.  Supports HMAC (HS256) encoding essentially.
+Optionally supports ECDSA (ES256) asymmetric-key signing/verification when the
+[dmazella/ucryptography](https://github.com/dmazzella/ucryptography/) library
+is available in the MicroPython firmware.
+""",
+)
 
 require("hmac")
 

From 1d642ad8eb2fe0a31d02a8f2161698c0ad6adfd6 Mon Sep 17 00:00:00 2001
From: Jonah Bron <hi@jonah.id>
Date: Mon, 18 Mar 2024 08:13:24 +0000
Subject: [PATCH 3/3] py_vapid: Basic VAPID header generation based on PyPi
 py_vapid.

Signed-off-by: Jonah Bron <hi@jonah.id>
---
 python-ecosys/py_vapid/manifest.py          |  12 +++
 python-ecosys/py_vapid/py_vapid/__init__.py |  50 +++++++++
 python-ecosys/py_vapid/test_vapid.py        | 111 ++++++++++++++++++++
 3 files changed, 173 insertions(+)
 create mode 100644 python-ecosys/py_vapid/manifest.py
 create mode 100644 python-ecosys/py_vapid/py_vapid/__init__.py
 create mode 100644 python-ecosys/py_vapid/test_vapid.py

diff --git a/python-ecosys/py_vapid/manifest.py b/python-ecosys/py_vapid/manifest.py
new file mode 100644
index 000000000..9e947ddc2
--- /dev/null
+++ b/python-ecosys/py_vapid/manifest.py
@@ -0,0 +1,12 @@
+metadata(
+    version="0.1.0",
+    pypi="py-vapid",
+    author="Jonah Bron <hi@jonah.id>",
+    description="""
+VAPID
+""",
+)
+
+require("pyjwt")
+
+package("py_vapid")
diff --git a/python-ecosys/py_vapid/py_vapid/__init__.py b/python-ecosys/py_vapid/py_vapid/__init__.py
new file mode 100644
index 000000000..75a3c75f1
--- /dev/null
+++ b/python-ecosys/py_vapid/py_vapid/__init__.py
@@ -0,0 +1,50 @@
+"""
+Based on https://github.com/web-push-libs/vapid
+"""
+
+import binascii
+import time
+import jwt
+
+from cryptography import serialization
+
+
+def _to_b64url(data):
+    return (
+        binascii.b2a_base64(data)
+        .rstrip(b"\n")
+        .rstrip(b"=")
+        .replace(b"+", b"-")
+        .replace(b"/", b"_")
+    )
+
+
+class Vapid:
+    def __init__(self, private_key):
+        self._private_key = private_key
+
+    def sign(self, claims):
+        claim = claims
+        if "exp" not in claim:
+            # Default to expiring 24 hours into the future (the max).
+            # https://datatracker.ietf.org/doc/html/rfc8292#section-2
+            exp = int(time.time()) + 86400
+            # Correct the epoch offset if not the Unix standard.
+            if time.gmtime(0)[0] == 2000:
+                exp += 946684800  # Unix timestamp of 2000-01-01
+
+            claim["exp"] = exp
+
+        token = jwt.encode(claim, self._private_key, "ES256")
+        public_key = _to_b64url(
+            self._private_key.public_key().public_bytes(
+                encoding=serialization.Encoding.X962,
+                format=serialization.PublicFormat.UncompressedPoint,
+            )
+        ).decode()
+
+        return {"Authorization": f"vapid t={token},k={public_key}"}
+
+
+# Re-export for interface compatibility with PyPi py-vapid
+Vapid02 = Vapid
diff --git a/python-ecosys/py_vapid/test_vapid.py b/python-ecosys/py_vapid/test_vapid.py
new file mode 100644
index 000000000..45843f4fc
--- /dev/null
+++ b/python-ecosys/py_vapid/test_vapid.py
@@ -0,0 +1,111 @@
+import jwt
+import py_vapid
+from time import time
+from cryptography import ec
+from machine import RTC
+
+
+"""
+Run tests by executing:
+
+```
+mpremote fs cp py_vapid/__init__.py :lib/py_vapid.py + run test_vapid.py
+```
+
+The [ucryptography](https://github.com/dmazzella/ucryptography) library must
+be present in the firmware for this library and tests to work.
+"""
+
+rtc = RTC()
+
+GOLDEN_0 = (
+    0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75,
+    (2010, 1, 1, 0, 0, 0, 0, 0),
+    {
+        "aud": "https://updates.push.services.mozilla.com",
+        "sub": "mailto:admin@example.com",
+        "exp": 9876543,
+    },
+    "vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20iLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJleHAiOiA5ODc2NTQzfQ.DLB6PF2RApzk0n0oH-Kv_Onuwg9C7VXakM-GlEMCwj50rQ7G0hF_vLIYzCPeXT8Hu8Uup900YBapZ9y45vc8QA,k=BKoKs6nJ3466nCEQ5TvFkBIGBKSGplPTUBzJlLXM13I8S0SF-o_NSB-Q4At3BeLSrZVptEd5xBuGRXCKMe_YRg8",
+)
+
+GOLDEN_1 = (
+    0x4370082632776C74FDC5517AC12881413A60B25D10E863296AD67E4260A3BF56,
+    (2015, 1, 1, 0, 0, 0, 0, 0),
+    {
+        "aud": "https://updates.push.services.mozilla.com",
+        "sub": "mailto:admin@example.com",
+    },
+    "vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJleHAiOiAxNDIwMTU2ODAwLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20ifQ.NlVtqjGWy-hvNtoScrwAv-4cpNYrgUJ4EVgtxTnIn-haPtBSpak7aQN518tVYelQB1TZqc0bxAjWfK9QvZUbOA,k=BGEwf7m9F3vCvOuPeN4pEZ91t-dpSmg_y8ZXMfOyl-f22zw10ho_4EeBqZj2-NtW_Kb98b6tGjOKO_-TJiWvyfo",
+)
+
+# Set of opaquely known-good scenarios to check against
+golden_test_cases = [GOLDEN_0, GOLDEN_1]
+
+
+# Test basic validation of claim
+private_key_0 = ec.derive_private_key(
+    0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1()
+)
+vapid = py_vapid.Vapid(private_key=private_key_0)
+rtc.datetime((2018, 1, 1, 0, 0, 0, 0, 0))
+headers = vapid.sign(
+    {
+        "aud": "https://fcm.googleapis.com",
+        "sub": "mailto:foo@bar.com",
+        "exp": 1493315200,
+    }
+)
+
+actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1]
+actual_decoded_claim = jwt.decode(actual_token, private_key_0.public_key(), "ES256")
+assert (
+    actual_decoded_claim["aud"] == "https://fcm.googleapis.com"
+), f"Claim audience '{actual_decoded_claim['aud']}' does not match input"
+assert (
+    actual_decoded_claim["sub"] == "mailto:foo@bar.com"
+), f"Claim subscriber '{actual_decoded_claim['sub']}' does not match input"
+assert (
+    actual_decoded_claim["exp"] == 1493315200
+), f"Claim exp '{actual_decoded_claim['exp']}' does not match input"
+print(f"Test claim validation: Passed")
+
+
+# Test auto expiration date population
+private_key_1 = ec.derive_private_key(
+    0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1()
+)
+vapid = py_vapid.Vapid(private_key=private_key_1)
+rtc.datetime((2017, 1, 1, 0, 0, 0, 0, 0))
+headers = vapid.sign(
+    {
+        "aud": "https://updates.push.services.mozilla.com",
+        "sub": "mailto:admin@example.com",
+    }
+)
+
+actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1]
+actual_decoded_claim = jwt.decode(actual_token, private_key_1.public_key(), "ES256")
+assert (
+    actual_decoded_claim["exp"] == 1483315200
+), f"Claim exp '{actual_decoded_claim['exp']}' does not match expected 2017-01-02 value"
+print(f"Test auto expiry: Passed")
+
+
+# Because they provide the least information about what could have gone wrong,
+# Run golden test cases after all more specific tests pass first.
+for case_no, case in enumerate(golden_test_cases):
+    private_key_number, curr_time, claim, expected_id = case
+    try:
+        private_key = ec.derive_private_key(private_key_number, ec.SECP256R1())
+        vapid = py_vapid.Vapid(private_key=private_key)
+        rtc.datetime(curr_time)
+        headers = vapid.sign(claim)
+
+        assert (
+            headers["Authorization"] == expected_id
+        ), f"Authorization header '{headers['Authorization']}' does not match golden test case {case_no}"
+        print(f"Golden test case {case_no}: Passed")
+    except Exception as e:
+        print(f"Golden test case {case_no}: Failed")
+        raise e