Skip to content

Commit

Permalink
Merge pull request #173 from rudderlabs/fix.ETL-239
Browse files Browse the repository at this point in the history
fix: set last state only for incremental syncs
  • Loading branch information
nidhilashkari17 committed Jan 8, 2024
2 parents 19cf201 + f10a7fd commit ff49b5b
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,6 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "application_fees_refunds",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"primary_key": [["id"]]
},
{
"stream": {
"name": "application_fees",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["created"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["created"],
"primary_key": [["id"]]
},
{
"stream": {
"name": "authorizations",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,6 @@
"cursor_field": ["created"],
"primary_key": [["id"]]
},
{
"stream": {
"name": "application_fees",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["created"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["created"],
"primary_key": [["id"]]
},
{
"stream": {
"name": "invoices",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from source_stripe.streams import (
Accounts,
ApplicationFees,
ApplicationFeesRefunds,
Authorizations,
BalanceTransactions,
BankAccounts,
Expand Down Expand Up @@ -79,8 +77,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
incremental_args = {**args, "lookback_window_days": config.get("lookback_window_days")}
return [
Accounts(**args),
ApplicationFees(**incremental_args),
ApplicationFeesRefunds(**args),
Authorizations(**incremental_args),
BalanceTransactions(**incremental_args),
BankAccounts(**args),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ class IncrementalStripeStreamWithUpdates(IncrementalStripeStream):
last_stream_slice = None
last_record = None
def isLastSlice(self,currentSlice):
return self.last_stream_slice == currentSlice
return self.last_stream_slice["created[lte]"] == currentSlice["created[lte]"]

def setLastSlice(self,stream_state):
if(self.last_stream_slice==None):
*_, last = self.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)
Expand Down

0 comments on commit ff49b5b

Please sign in to comment.