Skip to content

Commit

Permalink
Merge 94f4da0 into c2335f5
Browse files Browse the repository at this point in the history
  • Loading branch information
chezou committed Dec 4, 2019
2 parents c2335f5 + 94f4da0 commit 7e5c3da
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 15 deletions.
10 changes: 0 additions & 10 deletions docs/api/misc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,3 @@ tdclient.util
:members:
:undoc-members:
:show-inheritance:



tdclient.pseudo\_certifi
-------------------------------

.. automodule:: tdclient.pseudo_certifi
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
msgpack==0.6.1
msgpack==0.6.2
python-dateutil==2.7.3
urllib3>=1.24.1
63 changes: 60 additions & 3 deletions tdclient/connector_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json

from .util import create_url
from .util import create_url, normalize_connector_config


class ConnectorAPI:
Expand All @@ -16,12 +16,69 @@ def connector_guess(self, job):
Args:
job (dict): :class:`dict` representation of `seed.yml`
See Also: https://www.embulk.org/docs/built-in.html#guess-executor
Returns:
:class:`dict`
:class:`dict`: The configuration of the Data Connector.
Examples:
>>> config = {
... "in": {
... "type": "s3",
... "bucket": "your-bucket",
... "path_prefix": "logs/csv-",
... "access_key_id": "YOUR-AWS-ACCESS-KEY",
... "secret_access_key": "YOUR-AWS-SECRET-KEY"
... },
... "out": {"mode": "append"},
... "exec": {"guess_plugins": ["json", "query_string"]},
... }
>>> td.api.connector_guess(config)
{'config': {'in': {'type': 's3',
'bucket': 'your-bucket',
'path_prefix': 'logs/csv-',
'access_key_id': 'YOUR-AWS-ACCESS-KEY',
'secret_access_key': 'YOU-AWS-SECRET-KEY',
'parser': {'charset': 'UTF-8',
'newline': 'LF',
'type': 'csv',
'delimiter': ',',
'quote': '"',
'escape': '"',
'trim_if_not_quoted': False,
'skip_header_lines': 1,
'allow_extra_columns': False,
'allow_optional_columns': False,
'columns': [{'name': 'sepal.length', 'type': 'double'},
{'name': 'sepal.width', 'type': 'double'},
{'name': 'petal.length', 'type': 'double'},
{'name': 'petal.width', 'type': 'string'},
{'name': 'variety', 'type': 'string'}]}},
'out': {'mode': 'append'},
'exec': {'guess_plugin': ['json', 'query_string']},
'filters': [{'rules': [{'rule': 'upper_to_lower'},
{'pass_types': ['a-z', '0-9'],
'pass_characters': '_',
'replace': '_',
'rule': 'character_types'},
{'pass_types': ['a-z'],
'pass_characters': '_',
'prefix': '_',
'rule': 'first_character_types'},
{'rule': 'unique_number_suffix', 'max_length': 128}],
'type': 'rename'},
{'from_value': {'mode': 'upload_time'},
'to_column': {'name': 'time'},
'type': 'add_time'}]}}
"""
headers = {"content-type": "application/json; charset=utf-8"}
payload = json.dumps(job).encode("utf-8") if isinstance(job, dict) else job
if isinstance(job, dict):
job = {"config": normalize_connector_config(job)}
payload = json.dumps(job).encode("utf-8")
else:
# Not checking the format. Assuming the right format
payload = job

with self.post("/v3/bulk_loads/guess", payload, headers=headers) as res:
code, body = res.status, res.read()
if code != 200:
Expand Down
16 changes: 16 additions & 0 deletions tdclient/test/connector_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ def test_connector_guess_success():
td = api.API("APIKEY")
td.post = mock.MagicMock(return_value=make_response(200, dumps(config)))
res = td.connector_guess(seed)
seed["config"]["exec"] = {}
seed["config"]["filters"] = []
assert res == config
td.post.assert_called_with(
"/v3/bulk_loads/guess",
dumps(seed),
headers={"content-type": "application/json; charset=utf-8"},
)


def test_connector_guess_without_config_success():
td = api.API("APIKEY")
td.post = mock.MagicMock(return_value=make_response(200, dumps(config)))
res = td.connector_guess(seed["config"])
seed["config"]["exec"] = {}
seed["config"]["filters"] = []
assert res == config
td.post.assert_called_with(
"/v3/bulk_loads/guess",
Expand Down
66 changes: 66 additions & 0 deletions tdclient/test/util_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pytest

from tdclient.util import normalize_connector_config


def test_normalize_connector_config():
config = {"in": {"type": "s3"}}
assert normalize_connector_config(config) == {
"in": {"type": "s3"},
"out": {},
"exec": {},
"filters": [],
}


def test_normalize_connector_config_with_expected_keys():
config = {
"in": {"type": "s3"},
"out": {"mode": "append"},
"exec": {"guess_plugins": ["json"]},
"filters": [{"type": "speedometer"}],
}
assert normalize_connector_config(config) == config


def test_nomalize_connector_with_config_key():
config = {
"config": {
"in": {"type": "s3"},
"out": {"mode": "append"},
"exec": {"guess_plugins": ["json"]},
"filters": [{"type": "speedometer"}],
}
}
assert normalize_connector_config(config) == config["config"]


def test_normalize_connector_without_in_key():
config = {"config": {"type": "s3"}}

assert normalize_connector_config(config) == {
"in": config["config"],
"out": {},
"exec": {},
"filters": [],
}


def test_normalize_connector_witout_in_and_config_keys():
config = {"type": "s3"}
assert normalize_connector_config(config) == {
"in": config,
"out": {},
"exec": {},
"filters": [],
}


def test_normalize_conector_has_sibling_keys():
config = {
"config": {"in": {"type": "s3"}},
"sibling_key": {"out": {"mode": "append"}},
}

with pytest.raises(ValueError):
normalize_connector_config(config)
52 changes: 51 additions & 1 deletion tdclient/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import dateutil.parser
import msgpack


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -134,3 +133,54 @@ def parse_date(s):
except ValueError:
log.warning("Failed to parse date string: %s", s)
return None


def normalize_connector_config(config):
"""Normalize connector config
This is porting of TD CLI's ConnectorConfigNormalizer#normalized_config.
see also: https://github.com/treasure-data/td/blob/15495f12d8645a7b3f6804098f8f8aca72de90b9/lib/td/connector_config_normalizer.rb#L7-L30
Args:
config (dict): A config to be normalized
Returns:
dict: Normalized configuration
Examples:
Only with ``in`` key in a config.
>>> config = {"in": {"type": "s3"}}
>>> normalize_connector_config(config)
{'in': {'type': 's3'}, 'out': {}, 'exec': {}, 'filters': []}
With ``in``, ``out``, ``exec``, and ``filters`` in a config.
>>> config = {
... "in": {"type": "s3"},
... "out": {"mode": "append"},
... "exec": {"guess_plugins": ["json"]},
... "filters": [{"type": "speedometer"}],
... }
>>> normalize_connector_config(config)
{'in': {'type': 's3'},
'out': {'mode': 'append'},
'exec': {'guess_plugins': ['json']},
'filters': [{'type': 'speedometer'}]}
"""
if "in" in config:
return {
"in": config["in"],
"out": config.get("out", {}),
"exec": config.get("exec", {}),
"filters": config.get("filters", []),
}
elif "config" in config:
if len(config) != 1:
raise ValueError(
"Setting sibling keys with 'config' key isn't support. "
"Set within the 'config' key, or put all the settings without 'config'"
"key."
)

return normalize_connector_config(config["config"])
else:
return {"in": config, "out": {}, "exec": {}, "filters": []}

0 comments on commit 7e5c3da

Please sign in to comment.