From 3077bff8e758575e8f60958e788ed08f88df661e Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Fri, 26 May 2023 13:25:55 +0300 Subject: [PATCH] fix: add a look back window for klaviyo events stream (#119) * fix: add a look back window for klaviyo events stream * feat: pass look back window from config of the source * Update airbyte-integrations/connectors/source-klaviyo/source_klaviyo/spec.json Co-authored-by: a-rampalli <110388330+a-rampalli@users.noreply.github.com> * feat: avoid error if no events_look_back_window is given * chore: set lookback window to a constant of 30 mins --------- Co-authored-by: a-rampalli <110388330+a-rampalli@users.noreply.github.com> --- .../source-klaviyo/source_klaviyo/streams.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 7a97e535515a0..b1493157fd7c9 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -8,6 +8,7 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum +from datetime import timedelta import requests from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.http import HttpStream @@ -97,6 +98,15 @@ def cursor_field(self) -> Union[str, List[str]]: :return str: The name of the cursor field. """ + @property + def look_back_window_in_seconds(self) -> Optional[int]: + """ + How long in the past we can re fetch data to ensure we don't miss records + + :returns int: The window in seconds + """ + return None + def request_params(self, stream_state=None, **kwargs): """Add incremental filters""" stream_state = stream_state or {} @@ -104,6 +114,8 @@ def request_params(self, stream_state=None, **kwargs): if not params.get("since"): # skip state filter if already have one from pagination state_ts = int(stream_state.get(self.cursor_field, 0)) + if state_ts > 0 and self.look_back_window_in_seconds: + state_ts -= self.look_back_window_in_seconds params["since"] = max(state_ts, self._start_ts) params["sort"] = "asc" @@ -273,6 +285,10 @@ class Events(IncrementalKlaviyoStream): cursor_field = "timestamp" + @property + def look_back_window_in_seconds(self) -> Optional[int]: + return timedelta(minutes=30).seconds + def path(self, **kwargs) -> str: return "metrics/timeline"