# License Server Remote Testing

Exercise the deployed Cloudflare Worker end-to-end: create or update a license, issue a JWT, verify claims, and refresh the session.


## Requirements

Install the Python dependencies once:

```bash
pip install requests python-jose cryptography
```

Environment variables you can set before running the notebook:

- `LICENSE_ADMIN_API_KEY` (required for admin endpoints)
- `LICENSE_BASE_URL` (defaults to `https://license.serp.co`)
- `LICENSE_ISSUER` (defaults to `https://license.serp.co`)
- `LICENSE_AUDIENCE` (optional JWT `aud` check)
- `LICENSE_PUBLIC_KEY_PATH` (defaults to `../../license-public.pem`)
- `LICENSE_DEVICE_ID` & `LICENSE_TEST_USER_AGENT` (optional overrides)


In [None]:
import os
import json
import time
from uuid import uuid4
from pathlib import Path
from typing import Any, Dict, List, Optional

import requests
from jose import jwt

In [None]:
# Configuration
BASE_URL = os.getenv("LICENSE_BASE_URL", "https://license.serp.co").rstrip("/")
ADMIN_API_KEY = os.getenv("LICENSE_ADMIN_API_KEY", "")
ISSUER = os.getenv("LICENSE_ISSUER", "https://license.serp.co")
TOKEN_AUDIENCE = os.getenv("LICENSE_AUDIENCE") or "local-clients"
DEVICE_ID = os.getenv("LICENSE_DEVICE_ID", "notebook-device-1")

PUBLIC_KEY_PATH = Path(os.getenv("LICENSE_PUBLIC_KEY_PATH", "../../license-public.pem"))
if not PUBLIC_KEY_PATH.exists():
    raise FileNotFoundError(f"Public key not found at {PUBLIC_KEY_PATH.resolve()}")

PUBLIC_KEY_PEM = PUBLIC_KEY_PATH.read_text(encoding="utf-8")

DEFAULT_USER_AGENT = os.getenv(
    "LICENSE_TEST_USER_AGENT",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) LicenseNotebook/1.0",
)

session = requests.Session()
session.headers.update(
    {
        "User-Agent": DEFAULT_USER_AGENT,
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
)

ADMIN_HEADERS = {"Authorization": f"Bearer {ADMIN_API_KEY}"}

if not ADMIN_API_KEY or "replace" in ADMIN_API_KEY.lower():
    print("⚠️ Update ADMIN_API_KEY before running admin endpoints.")

TEST_CONTEXT: Dict[str, Any] = {}

## Helper functions


In [None]:
def print_json(data: Any) -> None:
    if data is None:
        print("null")
        return
    if isinstance(data, (dict, list)):
        print(json.dumps(data, indent=2, sort_keys=True))
        return
    print(data)


def upsert_purchase(
    email: str,
    tier: str = "pro",
    entitlements: Optional[List[str]] = None,
    features: Optional[Dict[str, Any]] = None,
    amount: Optional[int] = 0,
    currency: Optional[str] = "usd",
    metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    entitlements = entitlements or []
    features = features or {}
    metadata = metadata or {}

    payload = {
        "id": f"evt-{uuid4().hex}",
        "provider": "notebook",
        "providerObjectId": f"sub-{uuid4().hex[:10]}",
        "eventType": "notebook.purchase",
        "status": "completed",
        "amount": amount,
        "currency": currency,
        "userEmail": email,
        "tier": tier,
        "entitlements": entitlements,
        "features": features,
        "expiresAt": None,
        "metadata": {**metadata, "notebookRunAt": int(time.time())},
        "rawEvent": {"source": "remote_testing.ipynb"},
    }

    response = session.post(
        f"{BASE_URL}/admin/purchases",
        json=payload,
        headers=ADMIN_HEADERS,
        timeout=15,
    )
    response.raise_for_status()
    return response.json()


def get_license_by_email(email: str) -> Optional[Dict[str, Any]]:
    response = session.get(
        f"{BASE_URL}/admin/licenses",
        params={"email": email},
        headers=ADMIN_HEADERS,
        timeout=15,
    )
    if response.status_code == 404:
        return None
    response.raise_for_status()
    return response.json()


def issue_token(
    license_key: str,
    email: Optional[str] = None,
    license_id: Optional[str] = None,
    device_id: str = DEVICE_ID,
) -> Dict[str, Any]:
    payload = {"licenseKey": license_key, "deviceId": device_id}
    if email:
        payload["email"] = email
    if license_id:
        payload["licenseId"] = license_id

    response = session.post(
        f"{BASE_URL}/auth/token",
        json=payload,
        timeout=15,
    )
    response.raise_for_status()
    return response.json()


def refresh_token(license_id: str, device_id: str = DEVICE_ID) -> Dict[str, Any]:
    payload = {"licenseId": license_id, "deviceId": device_id}
    response = session.post(
        f"{BASE_URL}/auth/refresh",
        json=payload,
        timeout=15,
    )
    response.raise_for_status()
    return response.json()


def verify_jwt(
    token: str,
    issuer: str = ISSUER,
    audience: Optional[str] = TOKEN_AUDIENCE,
) -> Dict[str, Any]:
    decode_kwargs = {"issuer": issuer}
    if audience:
        decode_kwargs["audience"] = audience
    return jwt.decode(token, PUBLIC_KEY_PEM, algorithms=["RS256"], **decode_kwargs)

## 1. Create or update a license via the admin API


In [None]:
test_email = f"test@serp.co"
purchase_result = upsert_purchase(
    email=test_email,
    tier="pro",
    entitlements=["tts_app"],
    features={"seats": 1},
)

TEST_CONTEXT.update(
    {
        "email": test_email,
        "licenseId": purchase_result.get("licenseId"),
        "licenseKey": purchase_result.get("licenseKey"),
        "purchase": purchase_result,
    }
)

print(f"Created/updated license for {test_email}")
print_json(purchase_result)

## 2. Inspect the latest license record


In [None]:
license_details = get_license_by_email(TEST_CONTEXT["email"])
if not license_details:
    raise RuntimeError("License lookup failed; check the admin API response.")

TEST_CONTEXT["license_details"] = license_details
TEST_CONTEXT["licenseId"] = TEST_CONTEXT.get("licenseId") or license_details.get(
    "licenseId"
)
TEST_CONTEXT["licenseKey"] = TEST_CONTEXT.get("licenseKey") or license_details.get(
    "key"
)

print_json(license_details)

## 3. Request an access token


In [None]:
license_key = TEST_CONTEXT.get("licenseKey")
if not license_key:
    raise RuntimeError("License key missing; ensure the license record includes `key`.")

token_response = issue_token(
    license_key=license_key,
    email=TEST_CONTEXT.get("email"),
    license_id=TEST_CONTEXT.get("licenseId"),
    device_id=DEVICE_ID,
)

TEST_CONTEXT["token_response"] = token_response
print_json(token_response)

## 4. Validate JWT claims locally


In [None]:
decoded_payload = verify_jwt(TEST_CONTEXT["token_response"]["token"])
TEST_CONTEXT["decoded_payload"] = decoded_payload
print_json(decoded_payload)

## 5. Refresh the session


In [None]:
license_id_for_refresh = TEST_CONTEXT.get("licenseId") or TEST_CONTEXT[
    "token_response"
].get("licenseId")
if not license_id_for_refresh:
    raise RuntimeError("licenseId missing; cannot refresh token.")

refresh_response = refresh_token(license_id=license_id_for_refresh, device_id=DEVICE_ID)
TEST_CONTEXT["refresh_response"] = refresh_response
print_json(refresh_response)

## 6. Snapshot the persisted license


In [None]:
latest_license = get_license_by_email(TEST_CONTEXT["email"])
TEST_CONTEXT["latest_license"] = latest_license
print_json(latest_license)