Skip to content

Commit

Permalink
Add support for glob patterns in source stream names
Browse files Browse the repository at this point in the history
  • Loading branch information
DouweM committed Aug 1, 2023
1 parent f1f960e commit 09ea77d
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 39 deletions.
82 changes: 44 additions & 38 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import hashlib
import logging
import typing as t
import fnmatch

from singer_sdk.exceptions import MapExpressionError, StreamMapConfigError
from singer_sdk.helpers import _simpleeval as simpleeval
Expand Down Expand Up @@ -691,59 +692,64 @@ def register_raw_stream_schema( # noqa: PLR0912, C901
if isinstance(stream_map_val, dict)
else stream_map_val
)
stream_alias: str = stream_map_key
source_stream: str = stream_map_key
if isinstance(stream_def, str) and stream_def != NULL_STRING:
if stream_name == stream_map_key:
# TODO: Add any expected cases for str expressions (currently none)
pass

msg = f"Option '{stream_map_key}:{stream_def}' is not expected."
raise StreamMapConfigError(msg)
stream_alias: str = stream_map_key

if stream_def is None or stream_def == NULL_STRING:
if stream_name != stream_map_key:
continue
is_source_stream_primary = True
if isinstance(stream_def, dict):
if MAPPER_SOURCE_OPTION in stream_def:
# <alias>: __source__: <source>
source_stream = stream_def.pop(MAPPER_SOURCE_OPTION)
is_source_stream_primary = False
elif MAPPER_ALIAS_OPTION in stream_def:
# <source>: __alias__: <alias>
stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION)

if stream_name == source_stream:
# Exact match
pass
elif fnmatch.fnmatch(stream_name, source_stream):
# Wildcard match
if stream_alias == source_stream:
stream_alias = stream_name
source_stream = stream_name
else:
continue

self.stream_maps[stream_map_key][0] = RemoveRecordTransform(
stream_alias=stream_map_key,
if isinstance(stream_def, dict):
mapper = CustomStreamMap(
stream_alias=stream_alias,
map_transform=stream_def,
map_config=self.map_config,
raw_schema=schema,
key_properties=key_properties,
flattening_options=self.flattening_options,
)
elif stream_def is None or (isinstance(stream_def, str) and stream_def == NULL_STRING):
mapper = RemoveRecordTransform(
stream_alias=stream_alias,
raw_schema=schema,
key_properties=None,
flattening_options=self.flattening_options,
)
logging.info("Set null tansform as default for '%s'", stream_name)
continue
logging.info(f"Set null transform as default for '{stream_name}'")

elif isinstance(stream_def, str):
# Non-NULL string values are not currently supported
msg = f"Option '{stream_map_key}:{stream_def}' is not expected."
raise StreamMapConfigError(msg)

if not isinstance(stream_def, dict):
else:
msg = (
f"Unexpected stream definition type. Expected str, dict, or None. "
f"Got '{type(stream_def).__name__}'."
)
raise StreamMapConfigError(msg)

if MAPPER_SOURCE_OPTION in stream_def:
source_stream = stream_def.pop(MAPPER_SOURCE_OPTION)

if source_stream != stream_name:
# Not a match
continue

if MAPPER_ALIAS_OPTION in stream_def:
stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION)

mapper = CustomStreamMap(
stream_alias=stream_alias,
map_transform=stream_def,
map_config=self.map_config,
raw_schema=schema,
key_properties=key_properties,
flattening_options=self.flattening_options,
)

if source_stream == stream_map_key:
if is_source_stream_primary:
# Zero-th mapper should be the same-keyed mapper.
# Override the default mapper with this custom map.
self.stream_maps[stream_name][0] = mapper
self.stream_maps[source_stream][0] = mapper
else:
# Additional mappers for aliasing and multi-projection:
self.stream_maps[stream_name].append(mapper)
self.stream_maps[source_stream].append(mapper)
80 changes: 79 additions & 1 deletion tests/core/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ def sample_catalog_dict() -> dict:
Property("name", StringType),
Property("owner_email", StringType),
Property("description", StringType),
Property("description", StringType),
Property("create_date", StringType),
).to_dict()
foobars_schema = PropertiesList(
Property("the", StringType),
Property("brown", StringType),
).to_dict()
singular_schema = PropertiesList(Property("foo", StringType)).to_dict()
return {
"streams": [
{
Expand All @@ -64,6 +65,11 @@ def sample_catalog_dict() -> dict:
"tap_stream_id": "foobars",
"schema": foobars_schema,
},
{
"stream": "singular",
"tap_stream_id": "singular",
"schema": singular_schema,
},
],
}

Expand Down Expand Up @@ -106,6 +112,9 @@ def sample_stream():
{"the": "quick"},
{"brown": "fox"},
],
"singular": [
{"foo": "bar"},
],
}


Expand Down Expand Up @@ -181,6 +190,7 @@ def transformed_result(stream_map_config):
{"the": "quick"},
{"brown": "fox"},
],
"singular": [{"foo": "bar"}], # should be unchanged
}


Expand All @@ -200,6 +210,9 @@ def transformed_schemas():
Property("the", StringType),
Property("brown", StringType),
).to_dict(),
"singular": PropertiesList(
Property("foo", StringType),
).to_dict(),
}


Expand Down Expand Up @@ -231,6 +244,7 @@ def cloned_and_aliased_schemas():
Property("name", StringType),
Property("owner_email", StringType),
Property("description", StringType),
Property("create_date", StringType),
).to_dict()
return {
"repositories_aliased": properties,
Expand Down Expand Up @@ -277,6 +291,51 @@ def filtered_schemas():
return {"repositories": PropertiesList(Property("name", StringType)).to_dict()}


# Wildcard


@pytest.fixture
def wildcard_stream_maps():
return {
"*s": {
"db_name": "'database'",
},
}


@pytest.fixture
def wildcard_result(sample_stream):
return {
"repositories": [
{**record, "db_name": "database"}
for record in sample_stream["repositories"]
],
"foobars": [
{**record, "db_name": "database"} for record in sample_stream["foobars"]
],
"singular": sample_stream["singular"],
}


@pytest.fixture
def wildcard_schemas(sample_catalog_dict):
return {
"repositories": PropertiesList(
Property("name", StringType),
Property("owner_email", StringType),
Property("description", StringType),
Property("create_date", StringType),
Property("db_name", StringType),
).to_dict(),
"foobars": PropertiesList(
Property("the", StringType),
Property("brown", StringType),
Property("db_name", StringType), # added
).to_dict(),
"singular": PropertiesList(Property("foo", StringType)).to_dict(), # unchanged
}


def test_map_transforms(
sample_stream,
sample_catalog_obj,
Expand Down Expand Up @@ -354,6 +413,25 @@ def test_filter_transforms_w_error(
)


def test_wildcard_transforms(
sample_stream,
sample_catalog_obj,
wildcard_stream_maps,
stream_map_config,
wildcard_result,
wildcard_schemas,
):
_test_transform(
"wildcard",
stream_maps=wildcard_stream_maps,
stream_map_config=stream_map_config,
expected_result=wildcard_result,
expected_schemas=wildcard_schemas,
sample_stream=sample_stream,
sample_catalog_obj=sample_catalog_obj,
)


def _test_transform(
test_name: str,
*,
Expand Down

0 comments on commit 09ea77d

Please sign in to comment.