Skip to content

Commit

Permalink
fix: add a look back window for klaviyo events stream (#119)
Browse files Browse the repository at this point in the history
* fix: add a look back window for klaviyo events stream

* feat: pass look back window from config of the source

* Update airbyte-integrations/connectors/source-klaviyo/source_klaviyo/spec.json

Co-authored-by: a-rampalli <110388330+a-rampalli@users.noreply.github.com>

* feat: avoid error if no events_look_back_window is given

* chore: set lookback window to a constant of  30 mins

---------

Co-authored-by: a-rampalli <110388330+a-rampalli@users.noreply.github.com>
  • Loading branch information
am6010 and a-rampalli committed May 26, 2023
1 parent 7df69b0 commit 3077bff
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
from datetime import timedelta
import requests
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream
Expand Down Expand Up @@ -97,13 +98,24 @@ def cursor_field(self) -> Union[str, List[str]]:
:return str: The name of the cursor field.
"""

@property
def look_back_window_in_seconds(self) -> Optional[int]:
"""
How long in the past we can re fetch data to ensure we don't miss records
:returns int: The window in seconds
"""
return None

def request_params(self, stream_state=None, **kwargs):
"""Add incremental filters"""
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)

if not params.get("since"): # skip state filter if already have one from pagination
state_ts = int(stream_state.get(self.cursor_field, 0))
if state_ts > 0 and self.look_back_window_in_seconds:
state_ts -= self.look_back_window_in_seconds
params["since"] = max(state_ts, self._start_ts)
params["sort"] = "asc"

Expand Down Expand Up @@ -273,6 +285,10 @@ class Events(IncrementalKlaviyoStream):

cursor_field = "timestamp"

@property
def look_back_window_in_seconds(self) -> Optional[int]:
return timedelta(minutes=30).seconds

def path(self, **kwargs) -> str:
return "metrics/timeline"

Expand Down

0 comments on commit 3077bff

Please sign in to comment.