Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Sink schema comparison before adding metadata columns #1778

Merged
merged 6 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import abc
import copy
import datetime
import json
import time
Expand Down Expand Up @@ -67,6 +68,7 @@ def __init__(
"Initializing target sink for stream '%s'...",
stream_name,
)
self.original_schema = copy.deepcopy(schema)
self.schema = schema
if self.include_sdc_metadata_properties:
self._add_sdc_metadata_to_schema()
Expand Down
2 changes: 1 addition & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def get_sink(
return self.add_sink(stream_name, schema, key_properties)

if (
existing_sink.schema != schema
existing_sink.original_schema != schema
or existing_sink.key_properties != key_properties
):
self.logger.info(
Expand Down
52 changes: 52 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

import pytest

from singer_sdk import typing as th
from singer_sdk.sinks import BatchSink
from singer_sdk.target_base import Target

if t.TYPE_CHECKING:
from _pytest.config import Config

Expand Down Expand Up @@ -54,3 +58,51 @@ def outdir() -> t.Generator[str, None, None]:
def snapshot_dir() -> pathlib.Path:
"""Return the path to the snapshot directory."""
return pathlib.Path("tests/snapshots/")


class BatchSinkMock(BatchSink):
"""A mock Sink class."""

name = "batch-sink-mock"

def __init__(
self,
target: TargetMock,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
):
"""Create the Mock batch-based sink."""
super().__init__(target, stream_name, schema, key_properties)
self.target = target

def process_record(self, record: dict, context: dict) -> None:
"""Tracks the count of processed records."""
self.target.num_records_processed += 1
super().process_record(record, context)

def process_batch(self, context: dict) -> None:
"""Write to mock trackers."""
self.target.records_written.extend(context["records"])
self.target.num_batches_processed += 1


class TargetMock(Target):
"""A mock Target class."""

name = "target-mock"
config_jsonschema = th.PropertiesList().to_dict()
default_sink_class = BatchSinkMock

def __init__(self, *args, **kwargs):
"""Create the Mock target sync."""
super().__init__(*args, **kwargs)
self.state_messages_written: list[dict] = []
self.records_written: list[dict] = []
self.num_records_processed: int = 0
self.num_batches_processed: int = 0

def _write_state_message(self, state: dict):
"""Emit the stream's latest state."""
super()._write_state_message(state)
self.state_messages_written.append(state)
30 changes: 30 additions & 0 deletions tests/core/test_target_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

import copy

from tests.conftest import BatchSinkMock, TargetMock


def test_get_sink():
input_schema_1 = {
"properties": {
"id": {
"type": ["string", "null"],
},
"col_ts": {
"format": "date-time",
"type": ["string", "null"],
},
},
}
input_schema_2 = copy.deepcopy(input_schema_1)
key_properties = []
target = TargetMock(config={"add_record_metadata": True})
sink = BatchSinkMock(target, "foo", input_schema_1, key_properties)
target._sinks_active["foo"] = sink
sink_returned = target.get_sink(
"foo",
schema=input_schema_2,
key_properties=key_properties,
)
assert sink_returned == sink
54 changes: 2 additions & 52 deletions tests/samples/test_target_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
from samples.sample_mapper.mapper import StreamTransform
from samples.sample_tap_countries.countries_tap import SampleTapCountries
from samples.sample_target_csv.csv_target import SampleTargetCSV
from singer_sdk import typing as th
from singer_sdk.sinks import BatchSink
from singer_sdk.target_base import Target
from singer_sdk.testing import (
get_target_test_class,
sync_end_to_end,
tap_sync_test,
tap_to_target_sync_test,
target_sync_test,
)
from tests.conftest import TargetMock

TEST_OUTPUT_DIR = Path(f".output/test_{uuid.uuid4()}/")
SAMPLE_CONFIG = {"target_folder": f"{TEST_OUTPUT_DIR}/"}
Expand Down Expand Up @@ -55,54 +53,6 @@ def resource(self, test_output_dir):
}


class BatchSinkMock(BatchSink):
"""A mock Sink class."""

name = "batch-sink-mock"

def __init__(
self,
target: TargetMock,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
):
"""Create the Mock batch-based sink."""
super().__init__(target, stream_name, schema, key_properties)
self.target = target

def process_record(self, record: dict, context: dict) -> None:
"""Tracks the count of processed records."""
self.target.num_records_processed += 1
super().process_record(record, context)

def process_batch(self, context: dict) -> None:
"""Write to mock trackers."""
self.target.records_written.extend(context["records"])
self.target.num_batches_processed += 1


class TargetMock(Target):
"""A mock Target class."""

name = "target-mock"
config_jsonschema = th.PropertiesList().to_dict()
default_sink_class = BatchSinkMock

def __init__(self):
"""Create the Mock target sync."""
super().__init__(config={})
self.state_messages_written: list[dict] = []
self.records_written: list[dict] = []
self.num_records_processed: int = 0
self.num_batches_processed: int = 0

def _write_state_message(self, state: dict):
"""Emit the stream's latest state."""
super()._write_state_message(state)
self.state_messages_written.append(state)


def test_countries_to_csv(csv_config: dict):
tap = SampleTapCountries(config=SAMPLE_TAP_CONFIG, state=None)
target = SampleTargetCSV(config=csv_config)
Expand Down Expand Up @@ -133,7 +83,7 @@ def test_target_batching():
countries_record_count = 257

with freeze_time(mocked_starttime):
target = TargetMock()
target = TargetMock(config={})
target.max_parallelism = 1 # Limit unit test to 1 process
assert target.num_records_processed == 0
assert len(target.records_written) == 0
Expand Down