Skip to content

Commit

Permalink
do not use stream state for generating request params!
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Jul 17, 2023
1 parent 2835add commit 443781e
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ acceptance_tests:
export: ["time"]
funnels: ["41833532", "date"]
revenue: ["date"]
cohort_members": ["last_seen"]
engage: ["last_seen"]
cohort_members: ["last_seen"]
timeout_seconds: 9000
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
"type": "STREAM",
"stream": {
"stream_state": { "36152117": { "date": "2030-01-01" } },
"stream_state": { "41833532": { "date": "2030-01-01" } },
"stream_descriptor": { "name": "funnels" }
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,25 +153,25 @@ def stream_slices(
date_slices: list = []

# use the latest date between self.start_date and stream_state
start_date = self.start_date
start_date = pendulum.datetime(self.start_date.year, self.start_date.month, self.start_date.day, tz=self.project_timezone)
# end_date cannot be later than today
end_date = pendulum.datetime(self.end_date.year, self.end_date.month, self.end_date.day, tz=self.project_timezone)
end_date = min(end_date, pendulum.today(tz=self.project_timezone))
if stream_state and self.cursor_field and self.cursor_field in stream_state:
# Remove time part from state because API accept 'from_date' param in date format only ('YYYY-MM-DD')
# It also means that sync returns duplicated entries for the date from the state (date range is inclusive)
stream_state_date = pendulum.parse(stream_state[self.cursor_field]).date()
start_date = max(start_date, stream_state_date)
stream_state_date = pendulum.parse(stream_state[self.cursor_field])
start_date = max(start_date, stream_state_date).in_tz(self.project_timezone)

# move start_date back <attribution_window> days to sync data since that time as well
start_date = start_date - timedelta(days=self.attribution_window)

# end_date cannot be later than today
end_date = min(self.end_date, pendulum.today(tz=self.project_timezone).date())

while start_date <= end_date:
current_end_date = start_date + timedelta(days=self.date_window_size - 1) # -1 is needed because dates are inclusive
date_slices.append(
{
"start_date": str(start_date),
"end_date": str(min(current_end_date, end_date)),
"start_date": start_date,
"end_date": min(current_end_date, end_date),
}
)
# add 1 additional day because date range is inclusive
Expand All @@ -185,8 +185,8 @@ def request_params(
params = super().request_params(stream_state, stream_slice, next_page_token)
return {
**params,
"from_date": stream_slice["start_date"],
"to_date": stream_slice["end_date"],
"from_date": str(stream_slice["start_date"].date()),
"to_date": str(stream_slice["end_date"].date()),
}


Expand All @@ -196,6 +196,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
if updated_state:
state_value = current_stream_state.get(self.cursor_field)
if state_value:
updated_state = max(updated_state, state_value)
current_stream_state[self.cursor_field] = updated_state
updated_state = max(pendulum.parse(updated_state), pendulum.parse(state_value))
current_stream_state[self.cursor_field] = str(updated_state.in_tz(self.project_timezone))
return current_stream_state
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
mapping = super().request_params(stream_state, stream_slice, next_page_token)
if stream_state and "date" in stream_state:
timestamp = int(pendulum.parse(stream_state["date"]).timestamp())
mapping["where"] = f'properties["$time"]>=datetime({timestamp})'
after_timestamp = int((stream_slice["start_date"]).timestamp())
before_timestamp = int((stream_slice["end_date"]).timestamp())
mapping["where"] = f'properties["$time"]>=datetime({after_timestamp}) and properties["$time"]<datetime({before_timestamp})'
return mapping

def request_kwargs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def wrapper(self, config):
if bool(os.environ.get("PATCH_FUNNEL_SLICES", "")):
Funnels.funnel_slices = funnel_slices_patched
return func(self, config)

return wrapper


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""
from unittest.mock import MagicMock

import pendulum
import pytest
from airbyte_cdk.models import SyncMode
from source_mixpanel.streams import Export
Expand Down Expand Up @@ -39,7 +40,7 @@ def test_export_stream_conflict_names(requests_mock, export_response, config):
stream.reqs_per_hour_limit = 0
requests_mock.register_uri("GET", get_url_to_mock(stream), export_response)

stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
stream_slice = {"start_date": pendulum.parse("2017-01-25T00:00:00Z"), "end_date": pendulum.parse("2017-02-25T00:00:00Z")}
# read records for single slice
records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice)
records = [record for record in records]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_updated_state(patch_incremental_base_class, config):
current_stream_state={"date": "2021-01-25T00:00:00Z"}, latest_record={"date": "2021-02-25T00:00:00Z"}
)

assert updated_state == {"date": "2021-02-25T00:00:00Z"}
assert updated_state == {"date": "2021-02-25T00:00:00+00:00"}


@pytest.fixture
Expand Down Expand Up @@ -165,7 +165,7 @@ def test_engage_stream_incremental(requests_mock, engage_response, config):
records = list(read_incremental(stream, stream_state, cursor_field=["created"]))

assert len(records) == 1
assert stream.get_updated_state(current_stream_state=stream_state, latest_record=records[-1]) == {"created": "2008-12-12T11:20:47"}
assert stream.get_updated_state(current_stream_state=stream_state, latest_record=records[-1]) == {"created": "2008-12-12T11:20:47+00:00"}


def test_cohort_members_stream_incremental(requests_mock, engage_response, cohorts_response, config):
Expand All @@ -181,7 +181,7 @@ def test_cohort_members_stream_incremental(requests_mock, engage_response, cohor

records = [item for item in records]
assert len(records) == 1
assert stream.get_updated_state(current_stream_state=stream_state, latest_record=records[-1]) == {"created": "2008-12-12T11:20:47"}
assert stream.get_updated_state(current_stream_state=stream_state, latest_record=records[-1]) == {"created": "2008-12-12T11:20:47+00:00"}


@pytest.fixture
Expand Down Expand Up @@ -334,7 +334,7 @@ def test_annotations_stream(requests_mock, annotations_response, config):
stream = Annotations(authenticator=MagicMock(), **config)
requests_mock.register_uri("GET", get_url_to_mock(stream), annotations_response)

stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
stream_slice = {"start_date": pendulum.parse("2017-01-25T00:00:00Z"), "end_date": pendulum.parse("2017-02-25T00:00:00Z")}
# read records for single slice
records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)

Expand Down Expand Up @@ -364,7 +364,7 @@ def test_revenue_stream(requests_mock, revenue_response, config):
stream = Revenue(authenticator=MagicMock(), **config)
requests_mock.register_uri("GET", get_url_to_mock(stream), revenue_response)

stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
stream_slice = {"start_date": pendulum.parse("2017-01-25T00:00:00Z"), "end_date": pendulum.parse("2017-02-25T00:00:00Z")}
# read records for single slice
records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice)

Expand Down Expand Up @@ -428,7 +428,7 @@ def test_export_stream(requests_mock, export_response, config):

stream = Export(authenticator=MagicMock(), **config)
requests_mock.register_uri("GET", get_url_to_mock(stream), export_response)
stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
stream_slice = {"start_date": pendulum.parse("2017-01-25T00:00:00Z"), "end_date": pendulum.parse("2017-02-25T00:00:00Z")}
# read records for single slice
records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice)

Expand All @@ -438,19 +438,16 @@ def test_export_stream(requests_mock, export_response, config):

def test_export_stream_request_params(config):
stream = Export(authenticator=MagicMock(), **config)
stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
stream_slice = {"start_date": pendulum.parse("2017-01-25T00:00:00Z"), "end_date": pendulum.parse("2023-02-25T00:00:00Z")}
stream_state = {"date": "2021-06-16T17:00:00"}

request_params = stream.request_params(stream_state=None, stream_slice=stream_slice)
assert "where" not in request_params

request_params = stream.request_params(stream_state={}, stream_slice=stream_slice)
assert "where" not in request_params

request_params = stream.request_params(stream_state=stream_state, stream_slice=stream_slice)
assert "where" in request_params
timestamp = int(pendulum.parse("2021-06-16T17:00:00Z").timestamp())
assert request_params.get("where") == f'properties["$time"]>=datetime({timestamp})'
# stream state should be ignored in request params to not trigger race conditions!
# stream state only affects stream slices.
start_timestamp = int(pendulum.parse("2017-01-25T00:00:00Z").timestamp())
end_timestamp = int(pendulum.parse("2023-02-25T00:00:00Z").timestamp())
assert request_params.get("where") == f'properties["$time"]>=datetime({start_timestamp}) and properties["$time"]<datetime({end_timestamp})'


def test_export_terminated_early(requests_mock, config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from datetime import date, timedelta
from datetime import timedelta

import pendulum
from airbyte_cdk.sources.streams.http.auth import NoAuth
Expand Down Expand Up @@ -63,55 +63,66 @@ def test_date_slices():
# Test with start_date end_date range
stream_slices = Annotations(
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-01"),
start_date=pendulum.datetime(2021, 7, 1, tz="US/Pacific"),
end_date=pendulum.datetime(2021, 7, 1, tz="US/Pacific"),
date_window_size=1,
region="US",
project_timezone="US/Pacific",
).stream_slices(sync_mode="any")
assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}] == stream_slices
assert [
{"start_date": pendulum.datetime(2021, 7, 1, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 1, tz="US/Pacific")}
] == stream_slices

stream_slices = Annotations(
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-02"),
start_date=pendulum.datetime(2021, 7, 1, tz="US/Pacific"),
end_date=pendulum.datetime(2021, 7, 2, tz="US/Pacific"),
date_window_size=1,
region="EU",
project_timezone="US/Pacific",
).stream_slices(sync_mode="any")
assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}, {"start_date": "2021-07-02", "end_date": "2021-07-02"}] == stream_slices
assert [
{"start_date": pendulum.datetime(2021, 7, 1, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 1, tz="US/Pacific")},
{"start_date": pendulum.datetime(2021, 7, 2, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 2, tz="US/Pacific")}
] == stream_slices

stream_slices = Annotations(
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-03"),
start_date=pendulum.datetime(2021, 7, 1, tz="US/Pacific"),
end_date=pendulum.datetime(2021, 7, 3, tz="US/Pacific"),
date_window_size=1,
region="US",
project_timezone="US/Pacific",
).stream_slices(sync_mode="any")
assert [
{"start_date": "2021-07-01", "end_date": "2021-07-01"},
{"start_date": "2021-07-02", "end_date": "2021-07-02"},
{"start_date": "2021-07-03", "end_date": "2021-07-03"},
{"start_date": pendulum.datetime(2021, 7, 1, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 1, tz="US/Pacific")},
{"start_date": pendulum.datetime(2021, 7, 2, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 2, tz="US/Pacific")},
{"start_date": pendulum.datetime(2021, 7, 3, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 3, tz="US/Pacific")},
] == stream_slices

stream_slices = Annotations(
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-03"),
start_date=pendulum.datetime(2021, 7, 1, tz="US/Pacific"),
end_date=pendulum.datetime(2021, 7, 3, tz="US/Pacific"),
date_window_size=2,
region="US",
project_timezone="US/Pacific",
).stream_slices(sync_mode="any")
assert [{"start_date": "2021-07-01", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices
assert [
{"start_date": pendulum.datetime(2021, 7, 1, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 2, tz="US/Pacific")},
{"start_date": pendulum.datetime(2021, 7, 3, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 3, tz="US/Pacific")}
] == stream_slices

# test with stream_state
stream_slices = Export(
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-03"),
start_date=pendulum.datetime(2021, 7, 1, tz="US/Pacific"),
end_date=pendulum.datetime(2021, 7, 3, tz="US/Pacific"),
date_window_size=1,
region="US",
project_timezone="US/Pacific",
).stream_slices(sync_mode="any", stream_state={"time": "2021-07-02T00:00:00Z"})
assert [{"start_date": "2021-07-02", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices
).stream_slices(sync_mode="any", stream_state={"time": "2021-07-02T00:00:00-07:00"})
assert [
{"start_date": pendulum.datetime(2021, 7, 2, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 2, tz="US/Pacific")},
{"start_date": pendulum.datetime(2021, 7, 3, tz="US/Pacific"), "end_date": pendulum.datetime(2021, 7, 3, tz="US/Pacific")}
] == stream_slices

0 comments on commit 443781e

Please sign in to comment.