Skip to content

Commit

Permalink
SATs: allow new records in a sequential read for full refresh test (a…
Browse files Browse the repository at this point in the history
…irbytehq#17660)

* SATs: allow new records in a sequential read for full refresh test

* SATs: upd changelog

* SATs: change the output when failing full refresh test

* SATs: upd according to code review
  • Loading branch information
davydov-d authored and jhammarstedt committed Oct 31, 2022
1 parent b0b56e2 commit eb44846
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.8
Make full refresh tests tolerant to new records in a sequential read.[#17660](https://github.com/airbytehq/airbyte/pull/17660/)

## 0.2.7
Fix a bug when a state is evaluated once before used in a loop of `test_read_sequential_slices` [#17757](https://github.com/airbytehq/airbyte/pull/17757/)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,14 @@ def test_sequential_reads(
serializer = partial(make_hashable, exclude_fields=ignored_fields.get(stream))
stream_records_1 = records_by_stream_1.get(stream)
stream_records_2 = records_by_stream_2.get(stream)
# Using
output_diff = set(map(serializer, stream_records_1)).symmetric_difference(set(map(serializer, stream_records_2)))
if output_diff:
if not set(map(serializer, stream_records_1)).issubset(set(map(serializer, stream_records_2))):
missing_records = set(map(serializer, stream_records_1)) - (set(map(serializer, stream_records_2)))
msg = f"{stream}: the two sequential reads should produce either equal set of records or one of them is a strict subset of the other"
detailed_logger.info(msg)
detailed_logger.info("First read")
detailed_logger.log_json_list(stream_records_1)
detailed_logger.info("Second read")
detailed_logger.log_json_list(stream_records_2)
detailed_logger.info("Difference")
detailed_logger.log_json_list(output_diff)
detailed_logger.info("Missing records")
detailed_logger.log_json_list(missing_records)
pytest.fail(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from contextlib import nullcontext as does_not_raise
from typing import Dict, List
from unittest.mock import MagicMock

import pytest
from _pytest.outcomes import Failed
Expand All @@ -16,15 +16,51 @@ class ReadTestConfigWithIgnoreFields(ConnectionTestConfig):
ignored_fields: Dict[str, List[str]] = {"test_stream": ["ignore_me", "ignore_me_too"]}


test_cases = [
(
def record_message_from_record(records: List[Dict]) -> List[AirbyteMessage]:
return [
AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111),
)
for record in records
]


def get_default_catalog(schema, **kwargs):
configured_catalog_kwargs = {"sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}
primary_key = kwargs.get("primary_key")
if primary_key:
configured_catalog_kwargs["primary_key"] = primary_key
return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test_stream",
json_schema=schema,
supported_sync_modes=["full_refresh"],
),
**configured_catalog_kwargs,
)
]
)


fail_context = pytest.raises(
Failed,
match="the two sequential reads should produce either equal set of records or one of them is a strict subset of the other",
)
no_fail_context = does_not_raise()


ignored_fields_test_cases = [
pytest.param(
{"type": "object", "properties": {"created": {"type": "string"}}},
{"created": "23"},
{"created": "23"},
False,
"no_ignored_fields_present",
no_fail_context,
id="no_ignored_fields_present",
),
(
pytest.param(
{
"type": "object",
"properties": {
Expand All @@ -34,10 +70,10 @@ class ReadTestConfigWithIgnoreFields(ConnectionTestConfig):
},
{"created": "23"},
{"created": "23", "ignore_me": "23"},
False,
"with_ignored_field",
no_fail_context,
id="with_ignored_field",
),
(
pytest.param(
{
"type": "object",
"required": ["created", "DONT_ignore_me"],
Expand All @@ -49,76 +85,131 @@ class ReadTestConfigWithIgnoreFields(ConnectionTestConfig):
},
{"created": "23"},
{"created": "23", "DONT_ignore_me": "23", "ignore_me": "hello"},
True,
"ignore_field_present_but_a_required_is_not",
fail_context,
id="ignore_field_present_but_a_required_is_not",
),
]


@pytest.mark.parametrize(
"schema, record, expected_record, should_fail, test_case_name",
test_cases,
ids=[test_case[-1] for test_case in test_cases],
"schema, record, expected_record, fail_context",
ignored_fields_test_cases,
)
def test_read_with_ignore_fields(schema, record, expected_record, should_fail, test_case_name):
def test_read_with_ignore_fields(mocker, schema, record, expected_record, fail_context):
catalog = get_default_catalog(schema)
input_config = ReadTestConfigWithIgnoreFields()
docker_runner_mock = MagicMock()

def record_message_from_record(record_: Dict) -> List[AirbyteMessage]:
return [
AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(stream="test_stream", data=record_, emitted_at=111),
)
]
docker_runner_mock = mocker.MagicMock()

sequence_of_docker_callread_results = [record, expected_record]

# Ignored fields should work both ways
for pair in (
for first, second in (
sequence_of_docker_callread_results,
list(reversed(sequence_of_docker_callread_results)),
):
docker_runner_mock.call_read.side_effect = [
record_message_from_record(pair[0]),
record_message_from_record(pair[1]),
]
docker_runner_mock.call_read.side_effect = [record_message_from_record([first]), record_message_from_record([second])]

t = _TestFullRefresh()
if should_fail:
with pytest.raises(
Failed,
match="the two sequential reads should produce either equal set of records or one of them is a strict subset of the other",
):
t.test_sequential_reads(
inputs=input_config,
connector_config=MagicMock(),
configured_catalog=catalog,
docker_runner=docker_runner_mock,
detailed_logger=MagicMock(),
)
else:
with fail_context:
t.test_sequential_reads(
inputs=input_config,
connector_config=MagicMock(),
connector_config=mocker.MagicMock(),
configured_catalog=catalog,
docker_runner=docker_runner_mock,
detailed_logger=MagicMock(),
detailed_logger=mocker.MagicMock(),
)


def get_default_catalog(schema):
return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test_stream",
json_schema=schema,
supported_sync_modes=["full_refresh"],
),
sync_mode="full_refresh",
destination_sync_mode="overwrite",
)
]
)
recordset_comparison_test_cases = [
pytest.param(
[["id"]],
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
[{"id": 1, "first_name": "Albert", "last_name": "Einstein"}, {"id": 2, "first_name": "Joseph", "last_name": "Lagrange"}],
no_fail_context,
id="pk_sets_equal_success",
),
pytest.param(
[["id"]],
[
{"id": 1, "first_name": "Thomas", "last_name": "Edison"},
],
[{"id": 1, "first_name": "Albert", "last_name": "Einstein"}, {"id": 2, "first_name": "Joseph", "last_name": "Lagrange"}],
no_fail_context,
id="pk_first_is_subset_success",
),
pytest.param(
[["id"]],
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
[{"id": 1, "first_name": "Albert", "last_name": "Einstein"}],
fail_context,
id="pk_second_is_subset_fail",
),
pytest.param(
[["id"]],
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
[{"id": 2, "first_name": "Thomas", "last_name": "Edison"}, {"id": 3, "first_name": "Nicola", "last_name": "Tesla"}],
fail_context,
id="pk_no_subsets_fail",
),
pytest.param(
None,
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
no_fail_context,
id="no_pk_sets_equal_success",
),
pytest.param(
None,
[
{"id": 1, "first_name": "Thomas", "last_name": "Edison"},
],
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
no_fail_context,
id="no_pk_first_is_subset_success",
),
pytest.param(
None,
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
[
{"id": 1, "first_name": "Thomas", "last_name": "Edison"},
],
fail_context,
id="no_pk_second_is_subset_fail",
),
pytest.param(
None,
[{"id": 1, "first_name": "Thomas", "last_name": "Edison"}, {"id": 2, "first_name": "Nicola", "last_name": "Tesla"}],
[{"id": 2, "first_name": "Nicola", "last_name": "Tesla"}, {"id": 3, "first_name": "Albert", "last_name": "Einstein"}],
fail_context,
id="no_pk_no_subsets_fail",
),
]


@pytest.mark.parametrize(
"primary_key, first_read_records, second_read_records, fail_context",
recordset_comparison_test_cases,
)
def test_recordset_comparison(mocker, primary_key, first_read_records, second_read_records, fail_context):
schema = {
"type": "object",
"properties": {"id": {"type": "integer"}, "first_name": {"type": "string"}, "last_name": {"type": "string"}},
}
catalog = get_default_catalog(schema, primary_key=primary_key)
input_config = ReadTestConfigWithIgnoreFields()
docker_runner_mock = mocker.MagicMock()

docker_runner_mock.call_read.side_effect = [
record_message_from_record(first_read_records),
record_message_from_record(second_read_records),
]

t = _TestFullRefresh()
with fail_context:
t.test_sequential_reads(
inputs=input_config,
connector_config=mocker.MagicMock(),
configured_catalog=catalog,
docker_runner=docker_runner_mock,
detailed_logger=mocker.MagicMock(),
)

0 comments on commit eb44846

Please sign in to comment.