Skip to content

Commit

Permalink
feat(mappers): Added support for glob patterns in source stream names (
Browse files Browse the repository at this point in the history
…#1888)

* Don't rebuild stream_maps[stream_name] if stream is already known

* Don't emit SCHEMA messages for skipped streams

* Add support for glob patterns in source stream names

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix wildcard test

* Fix types

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Edgar Ramírez-Mondragón <edgarrm358@gmail.com>
Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com>
  • Loading branch information
4 people committed Apr 11, 2024
1 parent ff1100a commit 7eebef9
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 43 deletions.
97 changes: 55 additions & 42 deletions singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ast
import copy
import datetime
import fnmatch
import hashlib
import importlib.util
import logging
Expand Down Expand Up @@ -714,11 +715,14 @@ def register_raw_stream_schema( # noqa: PLR0912, C901
if stream_name in self.stream_maps:
primary_mapper = self.stream_maps[stream_name][0]
if (
primary_mapper.raw_schema != schema
or primary_mapper.raw_key_properties != key_properties
isinstance(primary_mapper, self.default_mapper_type)
and primary_mapper.raw_schema == schema
and primary_mapper.raw_key_properties == key_properties
):
# Unload/reset stream maps if schema or key properties have changed.
self.stream_maps.pop(stream_name)
return

# Unload/reset stream maps if schema or key properties have changed.
self.stream_maps.pop(stream_name)

if stream_name not in self.stream_maps:
# The 0th mapper should be the same-named treatment.
Expand All @@ -738,60 +742,69 @@ def register_raw_stream_schema( # noqa: PLR0912, C901
if isinstance(stream_map_val, dict)
else stream_map_val
)
stream_alias: str = stream_map_key
source_stream: str = stream_map_key
if isinstance(stream_def, str) and stream_def != NULL_STRING:
if stream_name == stream_map_key:
# TODO: Add any expected cases for str expressions (currently none)
pass
stream_alias: str = stream_map_key

msg = f"Option '{stream_map_key}:{stream_def}' is not expected."
raise StreamMapConfigError(msg)
is_source_stream_primary = True
if isinstance(stream_def, dict):
if MAPPER_SOURCE_OPTION in stream_def:
# <alias>: __source__: <source>
source_stream = stream_def.pop(MAPPER_SOURCE_OPTION)
is_source_stream_primary = False
elif MAPPER_ALIAS_OPTION in stream_def:
# <source>: __alias__: <alias>
stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION)

if stream_name == source_stream:
# Exact match
pass
elif fnmatch.fnmatch(stream_name, source_stream):
# Wildcard match
if stream_alias == source_stream:
stream_alias = stream_name
source_stream = stream_name
else:
continue

if stream_def is None or stream_def == NULL_STRING:
if stream_name != stream_map_key:
continue
mapper: CustomStreamMap | RemoveRecordTransform

self.stream_maps[stream_map_key][0] = RemoveRecordTransform(
stream_alias=stream_map_key,
if isinstance(stream_def, dict):
mapper = CustomStreamMap(
stream_alias=stream_alias,
map_transform=stream_def,
map_config=self.map_config,
faker_config=self.faker_config,
raw_schema=schema,
key_properties=key_properties,
flattening_options=self.flattening_options,
)
elif stream_def is None or (
isinstance(stream_def, str) and stream_def == NULL_STRING
):
mapper = RemoveRecordTransform(
stream_alias=stream_alias,
raw_schema=schema,
key_properties=None,
flattening_options=self.flattening_options,
)
logging.info("Set null tansform as default for '%s'", stream_name)
continue
logging.info("Set null transform as default for '%s'", stream_name)

elif isinstance(stream_def, str):
# Non-NULL string values are not currently supported
msg = f"Option '{stream_map_key}:{stream_def}' is not expected."
raise StreamMapConfigError(msg)

if not isinstance(stream_def, dict):
else:
msg = (
f"Unexpected stream definition type. Expected str, dict, or None. "
f"Got '{type(stream_def).__name__}'."
)
raise StreamMapConfigError(msg)

if MAPPER_SOURCE_OPTION in stream_def:
source_stream = stream_def.pop(MAPPER_SOURCE_OPTION)

if source_stream != stream_name:
# Not a match
continue

if MAPPER_ALIAS_OPTION in stream_def:
stream_alias = stream_def.pop(MAPPER_ALIAS_OPTION)

mapper = CustomStreamMap(
stream_alias=stream_alias,
map_transform=stream_def,
map_config=self.map_config,
faker_config=self.faker_config,
raw_schema=schema,
key_properties=key_properties,
flattening_options=self.flattening_options,
)

if source_stream == stream_map_key:
if is_source_stream_primary:
# Zero-th mapper should be the same-keyed mapper.
# Override the default mapper with this custom map.
self.stream_maps[stream_name][0] = mapper
self.stream_maps[source_stream][0] = mapper
else:
# Additional mappers for aliasing and multi-projection:
self.stream_maps[stream_name].append(mapper)
self.stream_maps[source_stream].append(mapper)
94 changes: 93 additions & 1 deletion tests/core/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from singer_sdk.typing import (
ArrayType,
BooleanType,
CustomType,
IntegerType,
NumberType,
ObjectType,
Expand Down Expand Up @@ -51,12 +52,13 @@ def sample_catalog_dict() -> dict:
Property("name", StringType),
Property("owner_email", StringType),
Property("description", StringType),
Property("description", StringType),
Property("create_date", StringType),
).to_dict()
foobars_schema = PropertiesList(
Property("the", StringType),
Property("brown", StringType),
).to_dict()
singular_schema = PropertiesList(Property("foo", StringType)).to_dict()
nested_jellybean_schema = PropertiesList(
Property("id", IntegerType),
Property(
Expand All @@ -81,6 +83,11 @@ def sample_catalog_dict() -> dict:
"tap_stream_id": "foobars",
"schema": foobars_schema,
},
{
"stream": "singular",
"tap_stream_id": "singular",
"schema": singular_schema,
},
{
"stream": "nested_jellybean",
"tap_stream_id": "nested_jellybean",
Expand Down Expand Up @@ -128,6 +135,9 @@ def sample_stream():
{"the": "quick"},
{"brown": "fox"},
],
"singular": [
{"foo": "bar"},
],
"nested_jellybean": [
{
"id": 123,
Expand Down Expand Up @@ -240,6 +250,7 @@ def transformed_result(stream_map_config):
{"the": "quick"},
{"brown": "fox"},
],
"singular": [{"foo": "bar"}], # should be unchanged
"nested_jellybean": [
{
"id": 123,
Expand Down Expand Up @@ -273,6 +284,9 @@ def transformed_schemas():
Property("the", StringType),
Property("brown", StringType),
).to_dict(),
"singular": PropertiesList(
Property("foo", StringType),
).to_dict(),
"nested_jellybean": PropertiesList(
Property("id", IntegerType),
Property("custom_field_1", StringType),
Expand Down Expand Up @@ -310,6 +324,7 @@ def cloned_and_aliased_schemas():
Property("name", StringType),
Property("owner_email", StringType),
Property("description", StringType),
Property("create_date", StringType),
).to_dict()
return {
"repositories_aliased": properties,
Expand Down Expand Up @@ -356,6 +371,64 @@ def filtered_schemas():
return {"repositories": PropertiesList(Property("name", StringType)).to_dict()}


# Wildcard


@pytest.fixture
def wildcard_stream_maps():
return {
"*s": {
"db_name": "'database'",
},
}


@pytest.fixture
def wildcard_result(sample_stream):
return {
"repositories": [
{**record, "db_name": "database"}
for record in sample_stream["repositories"]
],
"foobars": [
{**record, "db_name": "database"} for record in sample_stream["foobars"]
],
"singular": sample_stream["singular"],
"nested_jellybean": sample_stream["nested_jellybean"],
}


@pytest.fixture
def wildcard_schemas():
return {
"repositories": PropertiesList(
Property("name", StringType),
Property("owner_email", StringType),
Property("description", StringType),
Property("create_date", StringType),
Property("db_name", StringType),
).to_dict(),
"foobars": PropertiesList(
Property("the", StringType),
Property("brown", StringType),
Property("db_name", StringType), # added
).to_dict(),
"singular": PropertiesList(Property("foo", StringType)).to_dict(), # unchanged
"nested_jellybean": PropertiesList( # unchanged
Property("id", IntegerType),
Property(
"custom_fields",
ArrayType(
ObjectType(
Property("id", IntegerType),
Property("value", CustomType({})),
),
),
),
).to_dict(),
}


def test_map_transforms(
sample_stream,
sample_catalog_obj,
Expand Down Expand Up @@ -433,6 +506,25 @@ def test_filter_transforms_w_error(
)


def test_wildcard_transforms(
sample_stream,
sample_catalog_obj,
wildcard_stream_maps,
stream_map_config,
wildcard_result,
wildcard_schemas,
):
_test_transform(
"wildcard",
stream_maps=wildcard_stream_maps,
stream_map_config=stream_map_config,
expected_result=wildcard_result,
expected_schemas=wildcard_schemas,
sample_stream=sample_stream,
sample_catalog_obj=sample_catalog_obj,
)


def _run_transform(
*,
stream_maps,
Expand Down

0 comments on commit 7eebef9

Please sign in to comment.