From 6366e4a55fd03511217de73a97f0cb1f2da4a94a Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 27 Jan 2023 18:48:21 +0200 Subject: [PATCH] Source Amplitude: handle null values and ampty strings in datetimefields (#21957) * #1347 source Amplitude: handle null values and ampty strings in datetime fields * #1347 source Amplitude: upd changelog * #1347 source amplitude: fix tests * auto-bump connector version --------- Co-authored-by: Octavia Squidington III --- .../src/main/resources/seed/source_definitions.yaml | 2 +- .../init/src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-amplitude/Dockerfile | 2 +- .../integration_tests/acceptance.py | 13 +++++++++++-- .../source-amplitude/source_amplitude/api.py | 11 ++++++++--- .../source-amplitude/source_amplitude/source.py | 2 +- .../source-amplitude/unit_tests/test_api.py | 4 +++- docs/integrations/sources/amplitude.md | 9 +++++---- 8 files changed, 31 insertions(+), 14 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 867c293e56fefc..bc3f6c523c1934 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -81,7 +81,7 @@ - name: Amplitude sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396 dockerRepository: airbyte/source-amplitude - dockerImageTag: 0.1.19 + dockerImageTag: 0.1.20 documentationUrl: https://docs.airbyte.com/integrations/sources/amplitude icon: amplitude.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 4a7a7cb364afc4..7f8901940c8164 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1157,7 +1157,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amplitude:0.1.19" +- dockerImage: "airbyte/source-amplitude:0.1.20" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/amplitude" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-amplitude/Dockerfile b/airbyte-integrations/connectors/source-amplitude/Dockerfile index d1a4466b530537..97c29296b0ecfa 100644 --- a/airbyte-integrations/connectors/source-amplitude/Dockerfile +++ b/airbyte-integrations/connectors/source-amplitude/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.19 +LABEL io.airbyte.version=0.1.20 LABEL io.airbyte.name=airbyte/source-amplitude diff --git a/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py index 480378d7bf2037..1302b2f57e10ec 100644 --- a/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py +++ b/airbyte-integrations/connectors/source-amplitude/integration_tests/acceptance.py @@ -3,5 +3,14 @@ # -def test_dummy_test(): - assert True +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py index e7439b0ea0c232..edf1c72c925634 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/api.py @@ -88,14 +88,19 @@ def _get_date_time_items_from_schema(self): result.append(key) return result - def _date_time_to_rfc3339(self, record: Mapping[str, Any]) -> Mapping[str, Any]: + def _date_time_to_rfc3339(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: """ Transform 'date-time' items to RFC3339 format """ date_time_fields = self._get_date_time_items_from_schema() for item in record: if item in date_time_fields: - record[item] = pendulum.parse(record[item]).to_rfc3339_string() + dt_value = record[item] + if not dt_value: + # either null or empty string, leave it as it + record[item] = dt_value + else: + record[item] = pendulum.parse(dt_value).to_rfc3339_string() return record def _get_end_date(self, current_date: pendulum, end_date: pendulum = pendulum.now()): @@ -183,7 +188,7 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str, record[user_id_key] = record.pop("user_id") yield self._date_time_to_rfc3339(record) # transform all `date-time` to RFC3339 - def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]: + def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[MutableMapping]: with gzip.open(zip_file) as file: for record in file: yield json.loads(record) diff --git a/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py b/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py index e77f6a32107256..721d2f2ebd36d0 100644 --- a/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py +++ b/airbyte-integrations/connectors/source-amplitude/source_amplitude/source.py @@ -26,7 +26,7 @@ def _convert_auth_to_token(self, username: str, password: str) -> str: def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]: try: auth = TokenAuthenticator(token=self._convert_auth_to_token(config["api_key"], config["secret_key"]), auth_method="Basic") - list(Cohorts(authenticator=auth, data_region=config["data_region"]).read_records(SyncMode.full_refresh)) + list(Cohorts(authenticator=auth, data_region=config.get("data_region", "Standard Server")).read_records(SyncMode.full_refresh)) return True, None except Exception as error: return False, f"Unable to connect to Amplitude API with the provided credentials - {repr(error)}" diff --git a/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py index 1f781aa95a9439..0d9a2e35a79e99 100644 --- a/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py +++ b/airbyte-integrations/connectors/source-amplitude/unit_tests/test_api.py @@ -237,8 +237,10 @@ def test_get_date_time_items_from_schema(self): [ ({}, {}), ({"event_time": "2021-05-27 11:59:53.710000"}, {"event_time": "2021-05-27T11:59:53.710000+00:00"}), + ({"event_time": None}, {"event_time": None}), + ({"event_time": ""}, {"event_time": ""}), ], - ids=["empty_record", "transformed_record"], + ids=["empty_record", "transformed_record", "null_value", "empty_value"], ) def test_date_time_to_rfc3339(self, record, expected): stream = Events(pendulum.now().isoformat(), data_region="Standard Server") diff --git a/docs/integrations/sources/amplitude.md b/docs/integrations/sources/amplitude.md index 265226a6ed7434..a4e89b0821950a 100644 --- a/docs/integrations/sources/amplitude.md +++ b/docs/integrations/sources/amplitude.md @@ -43,10 +43,11 @@ The Amplitude connector ideally should gracefully handle Amplitude API limitatio | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------| -| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required | -| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region | -| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream | -| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam | +| 0.1.20 | 2023-01-27 | [21957](https://github.com/airbytehq/airbyte/pull/21957) | Handle null values and empty strings in date-time fields | +| 0.1.19 | 2022-12-09 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Remove `data_region` as required | +| 0.1.18 | 2022-12-08 | [19727](https://github.com/airbytehq/airbyte/pull/19727) | Add parameter to select region | +| 0.1.17 | 2022-10-31 | [18684](https://github.com/airbytehq/airbyte/pull/18684) | Add empty `series` validation for `AverageSessionLength` stream | +| 0.1.16 | 2022-10-11 | [17854](https://github.com/airbytehq/airbyte/pull/17854) | Add empty `series` validation for `ActtiveUsers` steam | | 0.1.15 | 2022-10-03 | [17320](https://github.com/airbytehq/airbyte/pull/17320) | Add validation `start_date` filed if it's in the future | | 0.1.14 | 2022-09-28 | [17326](https://github.com/airbytehq/airbyte/pull/17326) | Migrate to per-stream states. | | 0.1.13 | 2022-08-31 | [16185](https://github.com/airbytehq/airbyte/pull/16185) | Re-release on new `airbyte_cdk==0.1.81` |