Skip to content

Commit

Permalink
Merge pull request #54 from rossumai/oh/enums-instead-of-strings
Browse files Browse the repository at this point in the history
refactor: Use an enum instead of strings to identify API resources
  • Loading branch information
bara-m committed Nov 2, 2023
2 parents a89e6e6 + acbf731 commit 2c24376
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 168 deletions.
113 changes: 88 additions & 25 deletions rossum_api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import random
import typing
from enum import Enum

import httpx

Expand All @@ -20,6 +21,25 @@
logger = logging.getLogger(__name__)


class Resource(Enum):
"""Convenient representation of resources provided by Elis API.
Value is always the corresponding URL part.
"""

Annotation = "annotations"
Auth = "auth"
Connector = "connectors"
Group = "groups"
Hook = "hooks"
Inbox = "inboxes"
Organization = "organizations"
Queue = "queues"
Schema = "schemas"
User = "users"
Workspace = "workspaces"


class APIClientError(Exception):
def __init__(self, status_code, error):
self.status_code = status_code
Expand Down Expand Up @@ -126,13 +146,13 @@ def __init__(
def _headers(self):
return {"Authorization": f"token {self.token}"}

async def fetch_one(self, resource: str, id_: Union[int, str]) -> Dict[str, Any]:
async def fetch_one(self, resource: Resource, id_: Union[int, str]) -> Dict[str, Any]:
"""Retrieve a single object in a specific resource."""
return await self.request_json("GET", f"{resource}/{id_}")
return await self.request_json("GET", f"{resource.value}/{id_}")

async def fetch_all(
self,
resource: str,
resource: Resource,
ordering: Sequence[str] = (),
sideloads: Sequence[str] = (),
content_schema_ids: Sequence[str] = (),
Expand Down Expand Up @@ -165,6 +185,53 @@ async def fetch_all(
filters
mapping from resource field to value used to filter records
"""
async for result in self.fetch_all_by_url(
resource.value,
ordering,
sideloads,
content_schema_ids,
method,
max_pages,
json,
**filters,
):
yield result

async def fetch_all_by_url(
self,
url: str,
ordering: Sequence[str] = (),
sideloads: Sequence[str] = (),
content_schema_ids: Sequence[str] = (),
method: str = "GET",
max_pages: Optional[int] = None,
json: Optional[dict] = None,
**filters: Any,
) -> AsyncIterator[Dict[str, Any]]:
"""Retrieve a list of objects from a specified URL.
Arguments
---------
url
url relative to the Elis API domain, e.g. "/annotations/search"
ordering
comma-delimited fields of the resource, prepend the field with - for descending
sideloads
A sequence of resources to be fetched along with the requested resource,
e.g. ["content", "automation_blockers"] when fetching `annotations` resource.
content_schema_ids
sideloads only particular `content` fields when fetching `annotations` resource,
has no effect when fetching other resources
method
export endpoints have different semantics when POST is used, allow customization of
method so that export() can re-use fetch_all() implementation
max_pages
maximum number of pages to fetch
json
json payload sent with the request. Used for POST requests.
filters
mapping from resource field to value used to filter records
"""
query_params = {
"page_size": 100,
"ordering": ",".join(ordering),
Expand All @@ -173,7 +240,7 @@ async def fetch_all(
**filters,
}
results, total_pages = await self._fetch_page(
f"{resource}", method, query_params, sideloads, json=json
url, method, query_params, sideloads, json=json
)
# Fire async tasks to fetch the rest of the pages and start yielding results from page 1
last_page = min(total_pages, max_pages or total_pages)
Expand All @@ -183,11 +250,7 @@ async def fetch_all(
async def _fetch_page(page_number):
async with in_flight_guard:
return await self._fetch_page(
f"{resource}",
method,
{**query_params, "page": page_number},
sideloads,
json=json,
url, method, {**query_params, "page": page_number}, sideloads, json=json
)

page_requests = [asyncio.create_task(_fetch_page(i)) for i in range(2, last_page + 1)]
Expand All @@ -202,13 +265,13 @@ async def _fetch_page(page_number):

async def _fetch_page(
self,
resource: str,
url: str,
method: str,
query_params: Dict[str, Any],
sideload_groups: Sequence[str],
json: Optional[dict] = None,
) -> Tuple[List[Dict[str, Any]], int]:
data = await self.request_json(method, resource, params=query_params, json=json)
data = await self.request_json(method, url, params=query_params, json=json)
self._embed_sideloads(data, sideload_groups)
return data["results"], data["pagination"]["total_pages"]

Expand Down Expand Up @@ -248,28 +311,28 @@ def annotation_id(datapoint):
sideload_id, []
) # `content` can have 0 datapoints, use [] default value in this case

async def create(self, resource: str, data: Dict[str, Any]) -> Dict[str, Any]:
async def create(self, resource: Resource, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new object."""
return await self.request_json("POST", resource, json=data)
return await self.request_json("POST", resource.value, json=data)

async def replace(self, resource: str, id_: int, data: Dict[str, Any]) -> Dict[str, Any]:
async def replace(self, resource: Resource, id_: int, data: Dict[str, Any]) -> Dict[str, Any]:
"Modify an entire existing object."
return await self.request_json("PUT", f"{resource}/{id_}", json=data)
return await self.request_json("PUT", f"{resource.value}/{id_}", json=data)

async def update(self, resource: str, id_: int, data: Dict[str, Any]) -> Dict[str, Any]:
async def update(self, resource: Resource, id_: int, data: Dict[str, Any]) -> Dict[str, Any]:
"Modify particular fields of an existing object."
return await self.request_json("PATCH", f"{resource}/{id_}", json=data)
return await self.request_json("PATCH", f"{resource.value}/{id_}", json=data)

async def delete(self, resource: str, id_: int) -> None:
async def delete(self, resource: Resource, id_: int) -> None:
"""Delete a particular object.
Use with caution: For some objects, it triggers a cascade delete of related objects.
"""
await self._request("DELETE", f"{resource}/{id_}")
await self._request("DELETE", f"{resource.value}/{id_}")

async def upload(
self,
resource: str,
resource: Resource,
id_: int,
fp: AsyncBufferedReader,
filename: str,
Expand All @@ -288,7 +351,7 @@ async def upload(
may be used to initialize values of the object created from the uploaded file,
semantics is different for each resource
"""
url = f"{resource}/{id_}/upload"
url = f"{resource.value}/{id_}/upload"
files = {"content": (filename, await fp.read(), "application/octet-stream")}

# Filename of values and metadata must be "", otherwise Elis API returns HTTP 400 with body
Expand All @@ -301,7 +364,7 @@ async def upload(

async def export(
self,
resource: str,
resource: Resource,
id_: int,
export_format: str,
columns: Sequence[str] = (),
Expand All @@ -313,13 +376,13 @@ async def export(
query_params = {**query_params, **filters}
if columns:
query_params["columns"] = ",".join(columns)
url = f"{resource}/{id_}/export"
url = f"{resource.value}/{id_}/export"
# to_status parameter is valid only in POST requests, we can use GET in all other cases
method = "POST" if "to_status" in filters else "GET"
if export_format == "json":
# JSON export is paginated just like a regular fetch_all, it abuses **filters kwargs of
# fetch_all to pass export-specific query params
async for result in self.fetch_all(url, method=method, **query_params): # type: ignore
# fetch_all_by_url to pass export-specific query params
async for result in self.fetch_all_by_url(url, method=method, **query_params): # type: ignore
yield result
else:
# In CSV/XML/XLSX case, all annotations are returned, i.e. the response can be large,
Expand Down

0 comments on commit 2c24376

Please sign in to comment.