Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/layerv_qurl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
QURL,
AccessGrant,
AccessPolicy,
AccessToken,
CreateOutput,
ListOutput,
MintOutput,
Expand Down Expand Up @@ -47,6 +48,7 @@
"QURLStatus",
"AccessGrant",
"AccessPolicy",
"AccessToken",
"CreateOutput",
"ListOutput",
"MintOutput",
Expand Down
55 changes: 40 additions & 15 deletions src/layerv_qurl/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
QURL,
AccessGrant,
AccessPolicy,
AccessToken,
CreateOutput,
ListOutput,
MintOutput,
Expand Down Expand Up @@ -98,31 +99,55 @@ def build_body(kwargs: dict[str, Any]) -> dict[str, Any]:
return body


def _parse_access_policy(data: dict[str, Any]) -> AccessPolicy:
"""Parse an AccessPolicy from API response data."""
return AccessPolicy(
ip_allowlist=data.get("ip_allowlist"),
ip_denylist=data.get("ip_denylist"),
geo_allowlist=data.get("geo_allowlist"),
geo_denylist=data.get("geo_denylist"),
user_agent_allow_regex=data.get("user_agent_allow_regex"),
user_agent_deny_regex=data.get("user_agent_deny_regex"),
)


def _parse_access_token(data: dict[str, Any]) -> AccessToken:
"""Parse an AccessToken from API response data."""
policy = None
if data.get("access_policy") is not None:
policy = _parse_access_policy(data["access_policy"])
return AccessToken(
qurl_id=data["qurl_id"],
status=data["status"],
one_time_use=data.get("one_time_use", False),
max_sessions=data.get("max_sessions", 0),
session_duration=data.get("session_duration", 0),
use_count=data.get("use_count", 0),
label=data.get("label"),
qurl_site=data.get("qurl_site"),
access_policy=policy,
created_at=_parse_dt(data.get("created_at")),
expires_at=_parse_dt(data.get("expires_at")),
)


def parse_qurl(data: dict[str, Any]) -> QURL:
"""Parse a QURL resource from API response data."""
policy = None
if data.get("access_policy"):
p = data["access_policy"]
policy = AccessPolicy(
ip_allowlist=p.get("ip_allowlist"),
ip_denylist=p.get("ip_denylist"),
geo_allowlist=p.get("geo_allowlist"),
geo_denylist=p.get("geo_denylist"),
user_agent_allow_regex=p.get("user_agent_allow_regex"),
user_agent_deny_regex=p.get("user_agent_deny_regex"),
)
tokens = None
# API returns "qurls" array; SDK exposes as "access_tokens" for clarity.
raw_tokens = data.get("qurls") if "qurls" in data else data.get("access_tokens")
if raw_tokens is not None:
tokens = [_parse_access_token(t) for t in raw_tokens]
return QURL(
resource_id=data["resource_id"],
target_url=data["target_url"],
status=data["status"],
created_at=_parse_dt(data.get("created_at")),
expires_at=_parse_dt(data.get("expires_at")),
one_time_use=data.get("one_time_use", False),
max_sessions=data.get("max_sessions"),
description=data.get("description"),
qurl_site=data.get("qurl_site"),
qurl_link=data.get("qurl_link"),
access_policy=policy,
qurl_count=data.get("qurl_count"),
access_tokens=tokens,
)


Expand Down
23 changes: 19 additions & 4 deletions src/layerv_qurl/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ class AccessPolicy:
user_agent_deny_regex: str | None = None


@dataclass
class AccessToken:
"""An individual access token within a QURL."""

qurl_id: str
status: QURLStatus
one_time_use: bool = False
max_sessions: int = 0
session_duration: int = 0
use_count: int = 0
label: str | None = None
qurl_site: str | None = None
access_policy: AccessPolicy | None = None
created_at: datetime | None = None
expires_at: datetime | None = None


@dataclass
class QURL:
"""A QURL resource as returned by the API."""
Expand All @@ -41,12 +58,10 @@ class QURL:
status: QURLStatus
created_at: datetime | None = None
expires_at: datetime | None = None
one_time_use: bool = False
max_sessions: int | None = None
description: str | None = None
qurl_site: str | None = None
qurl_link: str | None = None
access_policy: AccessPolicy | None = None
qurl_count: int | None = None
access_tokens: list[AccessToken] | None = None


@dataclass
Expand Down
104 changes: 97 additions & 7 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
ServerError,
ValidationError,
)
from layerv_qurl.types import AccessPolicy
from layerv_qurl.types import AccessPolicy, AccessToken

BASE_URL = "https://api.test.layerv.ai"

Expand Down Expand Up @@ -209,7 +209,22 @@ def test_get(client: QURLClient) -> None:
"status": "active",
"created_at": "2026-03-10T10:00:00Z",
"expires_at": "2026-03-15T10:00:00Z",
"one_time_use": False,
"qurl_count": 2,
# API wire format uses "qurls"; parse_qurl maps to access_tokens
"qurls": [
{
"qurl_id": "at_abc",
"status": "active",
"one_time_use": True,
"max_sessions": 5,
"session_duration": 300,
"use_count": 1,
"label": "test token",
"qurl_site": "https://r_abc123def45.qurl.site",
"created_at": "2026-03-10T10:00:00Z",
"expires_at": "2026-03-15T10:00:00Z",
},
],
},
"meta": {"request_id": "req_2"},
},
Expand All @@ -221,6 +236,85 @@ def test_get(client: QURLClient) -> None:
assert result.status == "active"
assert isinstance(result.created_at, datetime)
assert result.created_at == datetime(2026, 3, 10, 10, 0, 0, tzinfo=timezone.utc)
assert result.qurl_count == 2
assert result.access_tokens is not None
assert len(result.access_tokens) == 1
token = result.access_tokens[0]
assert isinstance(token, AccessToken)
assert token.qurl_id == "at_abc"
assert token.one_time_use is True
assert token.max_sessions == 5
assert token.session_duration == 300
assert token.use_count == 1
assert token.label == "test token"


@respx.mock
def test_get_token_with_access_policy(client: QURLClient) -> None:
respx.get(f"{BASE_URL}/v1/qurls/r_abc123def45").mock(
return_value=httpx.Response(
200,
json={
"data": {
"resource_id": "r_abc123def45",
"target_url": "https://example.com",
"status": "active",
"created_at": "2026-03-10T10:00:00Z",
"qurls": [
{
"qurl_id": "q_abc12345678",
"status": "active",
"access_policy": {
"ip_allowlist": ["10.0.0.0/8"],
"geo_denylist": ["CN"],
},
},
],
},
"meta": {"request_id": "req_p"},
},
)
)

result = client.get("r_abc123def45")
assert result.access_tokens is not None
token = result.access_tokens[0]
assert token.access_policy is not None
assert token.access_policy.ip_allowlist == ["10.0.0.0/8"]
assert token.access_policy.geo_denylist == ["CN"]
# Verify defaults on sparse token
assert token.one_time_use is False
assert token.max_sessions == 0
assert token.session_duration == 0
assert token.use_count == 0
assert token.label is None
assert token.qurl_site is None
assert token.created_at is None
assert token.expires_at is None


@respx.mock
def test_get_without_tokens(client: QURLClient) -> None:
respx.get(f"{BASE_URL}/v1/qurls/r_abc123def45").mock(
return_value=httpx.Response(
200,
json={
"data": {
"resource_id": "r_abc123def45",
"target_url": "https://example.com",
"status": "active",
"created_at": "2026-03-10T10:00:00Z",
"qurl_count": 0,
},
"meta": {"request_id": "req_x"},
},
)
)

result = client.get("r_abc123def45")
assert result.resource_id == "r_abc123def45"
assert result.qurl_count == 0
assert result.access_tokens is None


@respx.mock
Expand Down Expand Up @@ -1116,17 +1210,13 @@ def test_access_policy_in_update(client: QURLClient) -> None:
"target_url": "https://example.com",
"status": "active",
"created_at": "2026-03-10T10:00:00Z",
"access_policy": {
"user_agent_deny_regex": "curl.*",
},
},
},
)
)

policy = AccessPolicy(user_agent_deny_regex="curl.*")
result = client.update("r_abc", access_policy=policy)
assert result.access_policy is not None
assert result.access_policy.user_agent_deny_regex == "curl.*"
assert result.resource_id == "r_abc"
body = json.loads(route.calls[0].request.content)
assert body["access_policy"] == {"user_agent_deny_regex": "curl.*"}
Loading