Skip to content

Commit

Permalink
update bookmarking for interrupted sync
Browse files Browse the repository at this point in the history
  • Loading branch information
RushiT0122 committed May 5, 2023
1 parent 13dd806 commit 5df807f
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions tap_shopify/streams/base.py
Expand Up @@ -183,6 +183,11 @@ def get_since_id(self):
self.name,
'since_id')

def get_updated_at_max(self):
return utils.strptime_with_tz(singer.get_bookmark(Context.state,
self.name,
'updated_at_max'))

def update_bookmark(self, bookmark_value, bookmark_key=None):
# NOTE: Bookmarking can never be updated to not get the most
# recent thing it saw the next time you run, because the querying
Expand Down Expand Up @@ -216,6 +221,7 @@ def get_query_params(self, since_id, status_key, updated_at_min, updated_at_max)
}

def get_objects(self):
last_sync_interrupted_at = self.get_updated_at_max()
updated_at_min = self.get_bookmark()
max_bookmark = updated_at_min

Expand All @@ -235,7 +241,14 @@ def get_objects(self):
# think it has something to do with how the API treats
# microseconds on its date windows. Maybe it's possible to
# drop data due to rounding errors or something like that?
updated_at_max = updated_at_min + datetime.timedelta(days=date_window_size)
# If last sync was interrupted, set updated_at_max to
# updated_at_max bookmarked in the interrupted sync.
# This will make sure that records with lower id than since_id
# which got updated later won't be missed
updated_at_max = last_sync_interrupted_at or updated_at_min + \
datetime.timedelta(days=date_window_size)
last_sync_interrupted_at = None

if updated_at_max > stop_time:
updated_at_max = stop_time
while True:
Expand Down Expand Up @@ -267,6 +280,8 @@ def get_objects(self):
# window and can move forward. Also remove the since_id because we want to
# restart at 1.
Context.state.get('bookmarks', {}).get(self.name, {}).pop('since_id', None)
Context.state.get('bookmarks', {}).get(
self.name, {}).pop('updated_at_max', None)
self.update_bookmark(utils.strftime(updated_at_max))
break

Expand All @@ -278,8 +293,9 @@ def get_objects(self):
objects[-1].id, max([o.id for o in objects])))
since_id = objects[-1].id

# Put since_id into the state.
# Put since_id and updated_at_max into the state.
self.update_bookmark(since_id, bookmark_key='since_id')
self.update_bookmark(utils.strftime(updated_at_max), bookmark_key='updated_at_max')

updated_at_min = updated_at_max
bookmark = max(min(stop_time,
Expand Down

0 comments on commit 5df807f

Please sign in to comment.