Skip to content

Commit

Permalink
feat(connector): implement page pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
dovahcrow committed Sep 24, 2020
1 parent 5d4f5c5 commit 02c93b4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 47 deletions.
50 changes: 25 additions & 25 deletions dataprep/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@
import sys
from asyncio import as_completed
from pathlib import Path
from typing import Any, Awaitable, Dict, List, Optional, Union
from typing import Any, Awaitable, Dict, List, Optional, Union, cast

import pandas as pd
from aiohttp import ClientSession
from jinja2 import Environment, StrictUndefined, Template, UndefinedError

from ..errors import UnreachableError
from .config_manager import config_directory, ensure_config
from .errors import InvalidParameterError, RequestError, UniversalParameterOverridden
from .implicit_database import ImplicitDatabase, ImplicitTable
from .int_ref import IntRef
from .ref import Ref
from .schema import ConfigDef, FieldDefUnion
from .throttler import OrderedThrottler, ThrottleSession

Expand Down Expand Up @@ -254,7 +253,7 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
_throttler=throttler,
_auth=_auth,
_limit=count,
_offset=last_id - 1,
_anchor=last_id - 1,
)

if df is None:
Expand All @@ -267,11 +266,18 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
last_id = int(df[pagdef.seek_id][len(df) - 1]) - 1
dfs.append(df)

elif pagdef.type == "offset":
elif pagdef.type in {"offset", "page"}:
resps_coros = []
allowed_page = IntRef(n_page)
allowed_page = Ref(n_page)
for i in range(n_page):
count = min(total - i * max_per_page, max_per_page)
if pagdef.type == "offset":
anchor = i * max_per_page
elif pagdef.type == "page":
anchor = i + 1
else:
raise ValueError(f"Unknown pagination type {pagdef.type}")

resps_coros.append(
self._fetch(
itable,
Expand All @@ -282,7 +288,7 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
_allowed_page=allowed_page,
_auth=_auth,
_limit=count,
_offset=i * max_per_page,
_anchor=anchor,
)
)

Expand All @@ -291,7 +297,6 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches
df = await resp_coro
if df is not None:
dfs.append(df)

else:
raise NotImplementedError

Expand All @@ -307,12 +312,12 @@ async def _fetch( # pylint: disable=too-many-locals,too-many-branches,too-many-
_client: ClientSession,
_throttler: ThrottleSession,
_page: int = 0,
_allowed_page: Optional[IntRef] = None,
_allowed_page: Optional[Ref[int]] = None,
_limit: Optional[int] = None,
_offset: Optional[int] = None,
_anchor: Optional[int] = None,
_auth: Optional[Dict[str, Any]] = None,
) -> Optional[pd.DataFrame]:
if (_limit is None) != (_offset is None):
if (_limit is None) != (_anchor is None):
raise ValueError("_limit and _offset should both be None or not None")

reqdef = table.config.request
Expand Down Expand Up @@ -350,28 +355,23 @@ async def _fetch( # pylint: disable=too-many-locals,too-many-branches,too-many-
pagdef = reqdef.pagination
pag_type = pagdef.type
limit_key = pagdef.limit_key

if pag_type == "seek":
if pagdef.seek_key is None:
raise ValueError(
"pagination type is seek but no seek_key set in the configuration file."
)
offset_key = pagdef.seek_key
anchor = cast(str, pagdef.seek_key)
elif pag_type == "offset":
if pagdef.offset_key is None:
raise ValueError(
"pagination type is offset but no offset_key set in the configuration file."
)
offset_key = pagdef.offset_key
anchor = cast(str, pagdef.offset_key)
elif pag_type == "page":
anchor = cast(str, pagdef.page_key)
else:
raise UnreachableError()
raise ValueError(f"Unknown pagination type {pag_type}.")

if limit_key in req_data["params"]:
raise UniversalParameterOverridden(limit_key, "_limit")
req_data["params"][limit_key] = _limit

if offset_key in req_data["params"]:
raise UniversalParameterOverridden(offset_key, "_offset")
req_data["params"][offset_key] = _offset
if anchor in req_data["params"]:
raise UniversalParameterOverridden(anchor, "_offset")
req_data["params"][anchor] = _anchor

await _throttler.acquire(_page)

Expand Down
20 changes: 0 additions & 20 deletions dataprep/connector/int_ref.py

This file was deleted.

32 changes: 32 additions & 0 deletions dataprep/connector/ref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""ref: defines a reference type of value."""

from typing import TypeVar, Generic

T = TypeVar("T")


class Ref(Generic[T]):
"""A reference to a value."""

__slots__ = ("val",)

val: T

def __init__(self, val: T) -> None:
self.val = val

def __int__(self) -> int:
return int(self.val)

def __bool__(self) -> bool:
return bool(self.val)

def set(self, val: T) -> None:
"""set the value."""
self.val = val

def __str__(self) -> str:
return str(self.val)

def __repr__(self) -> str:
return str(self.val)
16 changes: 14 additions & 2 deletions dataprep/connector/schema/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,31 @@
from typing import Any, Dict, Optional, Union

import requests
from pydantic import Field
from pydantic import Field, root_validator

from .base import BaseDef, BaseDefT


# pylint: disable=missing-class-docstring,missing-function-docstring
class PaginationDef(BaseDef):
type: str = Field(regex=r"^(offset|seek)$")
type: str = Field(regex=r"^(offset|seek|page)$")
max_count: int
offset_key: Optional[str]
limit_key: str
seek_id: Optional[str]
seek_key: Optional[str]
page_key: Optional[str]

@root_validator(pre=True)
def check_key_provided(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if values["type"] == "offset" and "offsetKey" not in values:
raise ValueError("Pagination type is 'offset' but no offsetKey set.")
elif values["type"] == "seek" and "seekKey" not in values:
raise ValueError("Pagination type is seek but no seekKey set.")
elif values["type"] == "page" and "pageKey" not in values:
raise ValueError("Pagination type is page but no pageKey set.")
else:
return values


class FieldDef(BaseDef):
Expand Down

0 comments on commit 02c93b4

Please sign in to comment.