Skip to content

Commit

Permalink
feat(connector): implement generator
Browse files Browse the repository at this point in the history
  • Loading branch information
juandavidospina authored and dovahcrow committed Sep 24, 2020
1 parent 673f898 commit 7a93ea0
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 12 deletions.
4 changes: 3 additions & 1 deletion dataprep/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ async def _query_imp( # pylint: disable=too-many-locals,too-many-branches,too-m
# The API returns empty for this page, maybe we've reached the end
break

last_id = int(df.iloc[-1, df.columns.get_loc(pagdef.seek_id)]) - 1 # type: ignore
cid = df.columns.get_loc(pagdef.seek_id)
last_id = int(df.iloc[-1, cid]) - 1 # type: ignore

dfs.append(df)
elif isinstance(pagdef, TokenPaginationDef):
next_token = None
Expand Down
5 changes: 5 additions & 0 deletions dataprep/connector/generator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""ConfigGenerator"""
from .generator import ConfigGenerator
from .ui import ConfigGeneratorUI

__all__ = ["ConfigGenerator", "ConfigGeneratorUI"]
168 changes: 168 additions & 0 deletions dataprep/connector/generator/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""This module implements the generation of connector configuration files."""

from dataprep.connector.schema.base import BaseDef
from pathlib import Path
from typing import Any, Dict, Optional, Union
from urllib.parse import parse_qs, urlparse, urlunparse

import requests

from ..schema import (
AuthorizationDef,
ConfigDef,
PaginationDef,
)
from .state import ConfigState
from .table import gen_schema_from_path, search_table_path

# class Example(TypedDict):
# url: str
# method: str
# params: Dict[str, str]
# authorization: Tuple[Dict[str, Any], Dict[str, Any]]
# pagination: Dict[str, Any]


class ConfigGenerator:
"""Config Generator.
Parameters
----------
config
Initialize the config generator with existing config file.
"""

config: ConfigState
storage: Dict[str, Any] # for auth usage

def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
if config is None:
self.config = ConfigState(None)
else:
self.config = ConfigState(ConfigDef(**config))
self.storage = {}

def add_example(
self, example: Dict[str, Any]
) -> None: # pylint: disable=too-many-locals
"""Add an example to the generator. The example
should be in the dictionary format.
class Example(TypedDict):
url: str
method: str
params: Dict[str, str]
# 0 for def and 1 for params
authorization: Optional[Tuple[Dict[str, Any], Dict[str, Any]]]
pagination: Optional[Dict[str, Any]]
Parameters
----------
req_example
The request example.
"""
url = example["url"]
method = example["method"]
if method not in {"POST", "GET", "PUT"}:
raise ValueError(f"{method} not allowed.")
if method != "GET":
raise NotImplementedError(f"{method} not implemented.")

params = example.get("params", {})

# Move url params to params
parsed = urlparse(url)

query_string = parse_qs(parsed.query)
for key, (val, *_) in query_string.items():
if key in params and params[key] != val:
raise ValueError(
f"{key} appears in both url and params, but have different values."
)
params[key] = val

url = urlunparse((*parsed[:4], "", *parsed[5:]))
req = {
"method": method,
"url": url,
"headers": {},
"params": params,
}

# Parse authorization and build authorization into request
authdef: Optional[AuthorizationDef] = None
authparams: Optional[Dict[str, Any]] = None
if example.get("authorization") is not None:
authorization, authparams = example["authorization"]
authdef = AuthUnion(val=authorization).val

if authdef is not None and authparams is not None:
authdef.build(req, authparams, self.storage)

# Send out request and construct config
config = _create_config(req)

# Add pagination information into the config
pagination = example.get("pagination")
if pagination is not None:
pagdef = PageUnion(val=pagination).val
config.request.pagination = pagdef

self.config += config

def to_string(self) -> str:
"""Output the string format of the current config."""
return str(self.config)

def save(self, path: Union[str, Path]) -> None:
"""Save the current config to a file.
Parameters
----------
path
The path to the saved file, with the file extension.
"""
path = Path(path)

with open(path, "w") as f:
f.write(self.to_string())


def _create_config(req: Dict[str, Any]) -> ConfigDef:
resp = requests.request(
req["method"].lower(), req["url"], params=req["params"], headers=req["headers"],
)

if resp.status_code != 200:
raise RuntimeError(
f"Request to HTTP endpoint not successful: {resp.status_code}: {resp.text}"
)
payload = resp.json()

table_path = search_table_path(payload)

ret: Dict[str, Any] = {
"version": 1,
"request": {
"url": req["url"],
"method": req["method"],
"params": {key: False for key in req["params"]},
},
"response": {
"ctype": "application/json",
"orient": "records",
"tablePath": table_path,
"schema": gen_schema_from_path(table_path, payload),
},
}

return ConfigDef(**ret)


class AuthUnion(BaseDef):
val: AuthorizationDef


class PageUnion(BaseDef):
val: PaginationDef
26 changes: 26 additions & 0 deletions dataprep/connector/generator/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Defines ConfigState."""

from typing import Optional

from dataprep.connector.schema.defs import ConfigDef


class ConfigState:
"""ConfigState"""

config: Optional[ConfigDef] = None

def __init__(self, config: Optional[ConfigDef]) -> None:
self.config = config

def __add__(self, rhs: ConfigDef) -> "ConfigState":
if self.config is None:
return ConfigState(rhs)

return ConfigState(self.config.merge(rhs))

def __str__(self) -> str:
return str(self.config)

def __repr__(self) -> str:
return str(self)
76 changes: 76 additions & 0 deletions dataprep/connector/generator/table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Table parsing utilities."""

from collections import defaultdict
from operator import itemgetter
from typing import Any, Dict, Set, Tuple

from jsonpath_ng import parse as jparse

from ..schema import SchemaFieldDef


def search_table_path(val: Dict[str, Any]) -> str:
"""Search table path in a json dict."""

paths = _search_table_path("$", val)
if not paths:
raise ValueError("No tables found.")
return max(paths, key=itemgetter(1))[0]


def _search_table_path(base: str, val: Dict[str, Any]) -> Set[Tuple[str, int]]:
table_paths = set()
for key, value in val.items():
cur = f"{base}.{key}"
if is_table_node(value):
table_paths.add((f"{cur}[*]", len(value)))
else:
if isinstance(value, dict):
table_paths.update(_search_table_path(cur, value))

return table_paths


def is_table_node(node: Any) -> bool:
"""Detect if a node is a table node."""

if isinstance(node, list):
for row in node:
if not isinstance(row, dict):
return False
for key in row.keys():
if not isinstance(key, str):
return False

# Better solutions? For different rows we might get different key sets
# keys = node[0].keys()
# for row in node[1:]:
# if row.keys() != keys:
# return False
return True
else:
return False


def gen_schema_from_path(path: str, val: Dict[str, Any]) -> Dict[str, SchemaFieldDef]:
"""Generate the table schema from a path to the table."""

finder = jparse(path)
rows = finder.find(val)
ret = {}

for row in rows:
for key, value in row.value.items():
if key in ret:
continue
target = f"$.{key}"
typ = _TYPE_MAPPING[type(value)]
description = "auto generated"
ret[key] = SchemaFieldDef(target=target, type=typ, description=description)

return ret


_TYPE_MAPPING = defaultdict(
lambda: "object", {int: "int", str: "string", float: "float", bool: "boolean",}
)
14 changes: 4 additions & 10 deletions dataprep/connector/implicit_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ def from_json(self, data: str) -> Dict[str, List[Any]]:
if respdef.orient == "records": # pylint: disable=no-member
data_rows = [match.value for match in table_expr.find(data)]

for (
column_name,
column_def,
) in respdef.schema_.items(): # pylint: disable=no-member
for (column_name, column_def,) in respdef.schema_.items():
column_target = column_def.target
column_type = column_def.type

Expand Down Expand Up @@ -107,13 +104,10 @@ def from_xml(self, data: str) -> Dict[str, List[Any]]:
data = data.replace('<?xml version="1.0" encoding="UTF-8"?>', "")

root = etree.parse(StringIO(data))
data_rows = root.xpath(respdef.table_path) # pylint: disable=no-member
data_rows = root.xpath(respdef.table_path)

if respdef.orient == "records": # pylint: disable=no-member
for (
column_name,
column_def,
) in respdef.schema_.items(): # pylint: disable=no-member
if respdef.orient == "records":
for (column_name, column_def,) in respdef.schema_.items():
column_target = column_def.target
column_type = column_def.type

Expand Down
2 changes: 1 addition & 1 deletion dataprep/connector/schema/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def merge(self, rhs: BaseDefT) -> BaseDefT:
"string": None,
"float": "string",
"int": "float",
"bool": "string",
"boolean": "string",
}


Expand Down

0 comments on commit 7a93ea0

Please sign in to comment.