Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ jobs:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v6
- run: uv python install ${{ matrix.python }}
- run: uv run ruff check src/obelisk/
- run: uv run ruff format --check src/obelisk/
- run: uv run ruff check
- run: uv run ruff format
- run: uv run mypy
test:
name: Run tests
Expand Down
3 changes: 2 additions & 1 deletion hooks/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
# Redirect output to stderr.
exec 1>&2

uv run ruff format --check src/obelisk/
uv run ruff format --check
uv run ruff check
uv run mypy
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,11 @@ packages = ["src/obelisk"]
[tool.mypy]
files = "src/obelisk"
strict = true

[tool.ruff]
include = ["src/obelisk/**/*.py"]

[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "ASYNC", "S", "B", "FIX", "SIM", "C90", "N", "PERF", "UP"]
# Ignore N815, camelcase field names are usually for serialisation reasons
ignore = ["N815"]
18 changes: 9 additions & 9 deletions src/obelisk/asynchronous/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timedelta
import logging
import base64
from typing import Any, Optional
from typing import Any

import httpx

Expand All @@ -19,9 +19,9 @@ class BaseClient:
_client: str = ""
_secret: str = ""

_token: Optional[str] = None
_token: str | None = None
"""Current authentication token"""
_token_expires: Optional[datetime] = None
_token_expires: datetime | None = None
"""Deadline after which token is no longer useable"""

grace_period: timedelta = timedelta(seconds=10)
Expand All @@ -35,7 +35,7 @@ def __init__(
self,
client: str,
secret: str,
retry_strategy: RetryStrategy = NoRetryStrategy(),
retry_strategy: RetryStrategy = NoRetryStrategy(), # noqa: B008 # This is fine to bew shared
kind: ObeliskKind = ObeliskKind.CLASSIC,
) -> None:
self._client = client
Expand All @@ -47,7 +47,7 @@ def __init__(

async def _get_token(self) -> None:
auth_string = str(
base64.b64encode(f"{self._client}:{self._secret}".encode("utf-8")), "utf-8"
base64.b64encode(f"{self._client}:{self._secret}".encode()), "utf-8"
)
headers = {
"Authorization": f"Basic {auth_string}",
Expand All @@ -74,7 +74,7 @@ async def _get_token(self) -> None:
)

response = request.json()
except Exception as e:
except Exception as e: # noqa: PERF203 # retry strategy should add delay
last_error = e
self.log.error(e)
continue
Expand All @@ -88,7 +88,7 @@ async def _get_token(self) -> None:
if request.status_code != 200:
if "error" in response:
self.log.warning(f"Could not authenticate, {response['error']}")
raise AuthenticationError
raise AuthenticationError

self._token = response["access_token"]
self._token_expires = datetime.now() + timedelta(
Expand All @@ -113,7 +113,7 @@ async def _verify_token(self) -> None:
continue

async def http_post(
self, url: str, data: Any = None, params: Optional[dict[str, str]] = None
self, url: str, data: Any = None, params: dict[str, str] | None = None
) -> httpx.Response:
"""
Send an HTTP POST request to Obelisk,
Expand Down Expand Up @@ -162,7 +162,7 @@ async def http_post(
return response

async def http_get(
self, url: str, params: Optional[dict[str, str]] = None
self, url: str, params: dict[str, str] | None = None
) -> httpx.Response:
"""
Send an HTTP GET request to Obelisk,
Expand Down
61 changes: 31 additions & 30 deletions src/obelisk/asynchronous/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json
from datetime import datetime, timedelta
from math import floor
from typing import Any, AsyncGenerator, List, Literal, Optional
from typing import Any, Literal
from collections.abc import AsyncGenerator

import httpx
from pydantic import ValidationError
Expand All @@ -23,16 +24,16 @@ class Obelisk(BaseClient):

async def fetch_single_chunk(
self,
datasets: List[str],
metrics: Optional[List[str]] = None,
fields: Optional[List[str]] = None,
from_timestamp: Optional[int] = None,
to_timestamp: Optional[int] = None,
order_by: Optional[dict[str, Any]] = None,
filter_: Optional[dict[str, Any]] = None,
limit: Optional[int] = None,
limit_by: Optional[dict[str, Any]] = None,
cursor: Optional[str] = None,
datasets: list[str],
metrics: list[str] | None = None,
fields: list[str] | None = None,
from_timestamp: int | None = None,
to_timestamp: int | None = None,
order_by: dict[str, Any] | None = None,
filter_: dict[str, Any] | None = None,
limit: int | None = None,
limit_by: dict[str, Any] | None = None,
cursor: str | None = None,
) -> QueryResult:
"""
Queries one chunk of events from Obelisk for given parameters,
Expand Down Expand Up @@ -103,24 +104,24 @@ async def fetch_single_chunk(
except json.JSONDecodeError as e:
msg = f"Obelisk response is not a JSON object: {e}"
self.log.warning(msg)
raise ObeliskError(msg)
raise ObeliskError(msg) from e
except ValidationError as e:
msg = f"Response cannot be validated: {e}"
self.log.warning(msg)
raise ObeliskError(msg)
raise ObeliskError(msg) from e

async def query(
self,
datasets: List[str],
metrics: Optional[List[str]] = None,
fields: Optional[List[str]] = None,
from_timestamp: Optional[int] = None,
to_timestamp: Optional[int] = None,
order_by: Optional[dict[str, Any]] = None,
filter_: Optional[dict[str, Any]] = None,
limit: Optional[int] = None,
limit_by: Optional[dict[str, Any]] = None,
) -> List[Datapoint]:
datasets: list[str],
metrics: list[str] | None = None,
fields: list[str] | None = None,
from_timestamp: int | None = None,
to_timestamp: int | None = None,
order_by: dict[str, Any] | None = None,
filter_: dict[str, Any] | None = None,
limit: int | None = None,
limit_by: dict[str, Any] | None = None,
) -> list[Datapoint]:
"""
Queries data from obelisk,
automatically iterating when a cursor is returned.
Expand Down Expand Up @@ -157,8 +158,8 @@ async def query(
to a specified maximum number.
"""

cursor: Optional[str] | Literal[True] = True
result_set: List[Datapoint] = []
cursor: str | None | Literal[True] = True
result_set: list[Datapoint] = []

while cursor:
actual_cursor = cursor if cursor is not True else None
Expand Down Expand Up @@ -191,14 +192,14 @@ async def query(

async def query_time_chunked(
self,
datasets: List[str],
metrics: List[str],
datasets: list[str],
metrics: list[str],
from_time: datetime,
to_time: datetime,
jump: timedelta,
filter_: Optional[dict[str, Any]] = None,
filter_: dict[str, Any] | None = None,
direction: Literal["asc", "desc"] = "asc",
) -> AsyncGenerator[List[Datapoint], None]:
) -> AsyncGenerator[list[Datapoint], None]:
"""
Fetches all data matching the provided filters,
yielding one chunk at a time.
Expand Down Expand Up @@ -239,7 +240,7 @@ async def query_time_chunked(
async def send(
self,
dataset: str,
data: List[dict[str, Any]],
data: list[dict[str, Any]],
precision: TimestampPrecision = TimestampPrecision.MILLISECONDS,
mode: IngestMode = IngestMode.DEFAULT,
) -> httpx.Response:
Expand Down
Loading