Skip to content

Commit

Permalink
feat(connector): rename pagination types
Browse files Browse the repository at this point in the history
According to
https://nordicapis.com/everything-you-need-to-know-about-api-pagination/
Also make all strings into camelCase
  • Loading branch information
dovahcrow committed Sep 24, 2020
1 parent 53059f9 commit 500ce13
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 50 deletions.
57 changes: 28 additions & 29 deletions dataprep/connector/connector.py
Expand Up @@ -97,7 +97,6 @@ async def query( # pylint: disable=too-many-locals
table: str,
_auth: Optional[Dict[str, Any]] = None,
_count: Optional[int] = None,
_concurrency: Optional[int] = None,
**where: Any,
) -> Union[Awaitable[pd.DataFrame], pd.DataFrame]:
"""
Expand Down Expand Up @@ -232,10 +231,10 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
total = _count
n_page = math.ceil(total / max_per_page)

if pag_type == "cursor":
if pag_type == "seek":
last_id = 0
dfs = []
# No way to parallelize for cursor type
# No way to parallelize for seek type
for i in range(n_page):
count = min(total - i * max_per_page, max_per_page)

Expand All @@ -245,8 +244,8 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
_client=client,
_throttler=throttler,
_auth=_auth,
_count=count,
_cursor=last_id - 1,
_limit=count,
_offset=last_id - 1,
)

if df is None:
Expand All @@ -256,10 +255,10 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
# The API returns empty for this page, maybe we've reached the end
break

last_id = int(df[itable.pag_params.cursor_id][len(df) - 1]) - 1
last_id = int(df[itable.pag_params.seek_id][len(df) - 1]) - 1
dfs.append(df)

elif pag_type == "limit":
elif pag_type == "offset":
resps_coros = []
allowed_page = IntRef(n_page)
for i in range(n_page):
Expand All @@ -273,8 +272,8 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
_page=i,
_allowed_page=allowed_page,
_auth=_auth,
_count=count,
_cursor=i * max_per_page,
_limit=count,
_offset=i * max_per_page,
)
)

Expand All @@ -300,12 +299,12 @@ async def _fetch( # pylint: disable=too-many-locals,too-many-branches
_throttler: ThrottleSession,
_page: int = 0,
_allowed_page: Optional[IntRef] = None,
_count: Optional[int] = None,
_cursor: Optional[int] = None,
_limit: Optional[int] = None,
_offset: Optional[int] = None,
_auth: Optional[Dict[str, Any]] = None,
) -> Optional[pd.DataFrame]:
if (_count is None) != (_cursor is None):
raise ValueError("_cursor and _count should both be None or not None")
if (_limit is None) != (_offset is None):
raise ValueError("_limit and _offset should both be None or not None")

method = table.method
url = table.url
Expand Down Expand Up @@ -336,31 +335,31 @@ async def _fetch( # pylint: disable=too-many-locals,too-many-branches
else:
raise NotImplementedError(table.body_ctype)

if table.pag_params is not None and _count is not None:
if table.pag_params is not None and _limit is not None:
pag_type = table.pag_params.type
count_key = table.pag_params.count_key
if pag_type == "cursor":
if table.pag_params.cursor_key is None:
limit_key = table.pag_params.limit_key
if pag_type == "seek":
if table.pag_params.seek_key is None:
raise ValueError(
"pagination type is cursor but no cursor_key set in the configuration file."
"pagination type is seek but no seek_key set in the configuration file."
)
cursor_key = table.pag_params.cursor_key
elif pag_type == "limit":
if table.pag_params.anchor_key is None:
offset_key = table.pag_params.seek_key
elif pag_type == "offset":
if table.pag_params.offset_key is None:
raise ValueError(
"pagination type is limit but no anchor_key set in the configuration file."
"pagination type is offset but no offset_key set in the configuration file."
)
cursor_key = table.pag_params.anchor_key
offset_key = table.pag_params.offset_key
else:
raise UnreachableError()

if count_key in req_data["params"]:
raise UniversalParameterOverridden(count_key, "_count")
req_data["params"][count_key] = _count
if limit_key in req_data["params"]:
raise UniversalParameterOverridden(limit_key, "_limit")
req_data["params"][limit_key] = _limit

if cursor_key in req_data["params"]:
raise UniversalParameterOverridden(cursor_key, "_cursor")
req_data["params"][cursor_key] = _cursor
if offset_key in req_data["params"]:
raise UniversalParameterOverridden(offset_key, "_offset")
req_data["params"][offset_key] = _offset

await _throttler.acquire(_page)

Expand Down
24 changes: 10 additions & 14 deletions dataprep/connector/implicit_database.py
Expand Up @@ -27,35 +27,31 @@


class SchemaField(NamedTuple):
"""
Schema of one table field
"""
"""Schema of one table field."""

target: str
type: str
description: Optional[str]


class Pagination:
"""
Schema of Pagination field
"""
"""Schema of Pagination field."""

type: str
count_key: str
limit_key: str
max_count: int
anchor_key: Optional[str]
cursor_id: Optional[str]
cursor_key: Optional[str]
offset_key: Optional[str]
seek_id: Optional[str]
seek_key: Optional[str]

def __init__(self, pdef: Dict[str, Any]) -> None:

self.type = pdef["type"]
self.max_count = pdef["max_count"]
self.count_key = pdef["count_key"]
self.anchor_key = pdef.get("anchor_key")
self.cursor_id = pdef.get("cursor_id")
self.cursor_key = pdef.get("cursor_key")
self.limit_key = pdef["limit_key"]
self.offset_key = pdef.get("offset_key")
self.seek_id = pdef.get("seek_id")
self.seek_key = pdef.get("seek_key")


class ImplicitTable: # pylint: disable=too-many-instance-attributes
Expand Down
14 changes: 7 additions & 7 deletions dataprep/connector/schema.json
Expand Up @@ -68,29 +68,29 @@
"type": {
"type": "string"
},
"max_count": {
"maxCount": {
"type": "integer"
},
"anchor_key": {
"offsetKey": {
"type": "string",
"optional": true
},
"count_key": {
"limitKey": {
"type": "string"
},
"cursor_id": {
"seekId": {
"type": "string",
"optional": true
},
"cursor_key": {
"seekKey": {
"type": "string",
"optional": true
}
},
"required": [
"count_key",
"limitKey",
"type",
"max_count"
"maxCount"
],
"additionalProperties": false
},
Expand Down

0 comments on commit 500ce13

Please sign in to comment.