Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update bookmarking for interrupted sync #166

Merged
merged 6 commits into from May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog

## 1.7.3
* Update interrupted sync bookmark strategy [#166](https://github.com/singer-io/tap-shopify/pull/166)

## 1.7.2
* Add URLError (connection reset by peer) to retry logic [#165](https://github.com/singer-io/tap-shopify/pull/165)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -3,7 +3,7 @@

setup(
name="tap-shopify",
version="1.7.2",
version="1.7.3",
description="Singer.io tap for extracting Shopify data",
author="Stitch",
url="http://github.com/singer-io/tap-shopify",
Expand Down
21 changes: 18 additions & 3 deletions tap_shopify/streams/base.py
Expand Up @@ -183,6 +183,10 @@ def get_since_id(self):
self.name,
'since_id')

def get_updated_at_max(self):
updated_at_max = Context.state.get('bookmarks', {}).get(self.name, {}).get('updated_at_max')
return utils.strptime_with_tz(updated_at_max) if updated_at_max else None

def update_bookmark(self, bookmark_value, bookmark_key=None):
# NOTE: Bookmarking can never be updated to not get the most
# recent thing it saw the next time you run, because the querying
Expand Down Expand Up @@ -216,6 +220,7 @@ def get_query_params(self, since_id, status_key, updated_at_min, updated_at_max)
}

def get_objects(self):
last_sync_interrupted_at = self.get_updated_at_max()
updated_at_min = self.get_bookmark()
max_bookmark = updated_at_min

Expand All @@ -235,7 +240,14 @@ def get_objects(self):
# think it has something to do with how the API treats
# microseconds on its date windows. Maybe it's possible to
# drop data due to rounding errors or something like that?
updated_at_max = updated_at_min + datetime.timedelta(days=date_window_size)
# If last sync was interrupted, set updated_at_max to
# updated_at_max bookmarked in the interrupted sync.
# This will make sure that records with lower id than since_id
# which got updated later won't be missed
updated_at_max = (last_sync_interrupted_at
or updated_at_min + datetime.timedelta(days=date_window_size))
last_sync_interrupted_at = None

if updated_at_max > stop_time:
updated_at_max = stop_time
while True:
Expand Down Expand Up @@ -266,7 +278,9 @@ def get_objects(self):
# Save the updated_at_max as our bookmark as we've synced all rows up in our
# window and can move forward. Also remove the since_id because we want to
# restart at 1.
Context.state.get('bookmarks', {}).get(self.name, {}).pop('since_id', None)
stream_bookmarks = Context.state.get('bookmarks', {}).get(self.name, {})
stream_bookmarks.pop('since_id', None)
stream_bookmarks.pop('updated_at_max', None)
self.update_bookmark(utils.strftime(updated_at_max))
break

Expand All @@ -278,8 +292,9 @@ def get_objects(self):
objects[-1].id, max([o.id for o in objects])))
since_id = objects[-1].id

# Put since_id into the state.
# Put since_id and updated_at_max into the state.
self.update_bookmark(since_id, bookmark_key='since_id')
self.update_bookmark(utils.strftime(updated_at_max), bookmark_key='updated_at_max')

updated_at_min = updated_at_max
bookmark = max(min(stop_time,
Expand Down
56 changes: 50 additions & 6 deletions tests/unittests/test_updated_bookmark.py
Expand Up @@ -19,21 +19,65 @@ def to_dict(self):
return {"id": self.id, "updated_at": self.updated_at}


CUSTOMER_1 = Customer(2, "2021-08-11T01:57:05-04:00")
CUSTOMER_2 = Customer(3, "2021-08-12T01:57:05-04:00")
class DummyShopifyError(Exception):
def __init__(self, error, msg=''):
super().__init__('{}\n{}'.format(error.__class__.__name__, msg))


CUSTOMER_1 = Customer(20, "2021-08-11T01:57:05-04:00")
CUSTOMER_2 = Customer(30, "2021-08-12T01:57:05-04:00")

NOW_TIME = '2021-08-16T01:56:05-04:00'
class TestUpdateBookmark(unittest.TestCase):

@mock.patch("tap_shopify.streams.base.Stream.call_api")
@mock.patch("tap_shopify.streams.base.Stream.call_api", return_value = [CUSTOMER_1, CUSTOMER_2])
@mock.patch("tap_shopify.streams.base.Stream.get_bookmark", return_value=strptime_to_utc('2021-08-11T01:55:05-04:00'))
@mock.patch("tap_shopify.streams.base.Stream.update_bookmark")
@mock.patch("tap_shopify.streams.base.Stream.get_query_params")
@mock.patch("singer.utils.now", return_value=strptime_to_utc(NOW_TIME))
def test_update_bookmark(self, mock_now, mock_get_query_params, mock_update_bookmark, mock_get_bookmark, mock_call_api):
"""Verify that the update_bookmark() is called with correct argument"""
mock_call_api.return_value = [CUSTOMER_1, CUSTOMER_2]

customers = list(CUSTOMER_OBJECT.get_objects())
list(CUSTOMER_OBJECT.get_objects())
# Verify that the min-max evaluation returns correct results and provided in the update_bookmark()
mock_update_bookmark.assert_called_with(strftime(strptime_to_utc(NOW_TIME) - datetime.timedelta(DATE_WINDOW_SIZE)))

@mock.patch("tap_shopify.streams.base.Stream.call_api", return_value=[CUSTOMER_1, CUSTOMER_2])
@mock.patch("singer.utils.now", return_value=strptime_to_utc(NOW_TIME))
def test_since_id_and_updated_at_max_deleted(self, mock_now, mock_call_api):
"""Verify after successful sync since_id and updated_at_max keys are deleted from the state"""
Context.state = {"bookmarks": {
"currently_sync_stream": 'customers',
"customers": {
"updated_at": "2021-03-27T00:00:00.000000Z",
"since_id": 15,
"updated_at_max": "2021-04-26T00:00:00.000000Z"}}}

list(CUSTOMER_OBJECT.get_objects())

# Verify keys
self.assertIn("updated_at", Context.state["bookmarks"]["customers"])
self.assertNotIn("since_id", Context.state["bookmarks"]["customers"])
self.assertNotIn("updated_at_max", Context.state["bookmarks"]["customers"])

@mock.patch("tap_shopify.streams.base.Stream.call_api", side_effect=DummyShopifyError("Dummy Shopify exception..."))
@mock.patch("singer.utils.now", return_value=strptime_to_utc(NOW_TIME))
def test_interrupted_sync(self, mock_now, mock_call_api):
"""Verify if sync is interrrupted twice in a row then since_id and updated_at_max keys are not deleted from the state"""
Context.state = {"bookmarks": {
"currently_sync_stream": 'customers',
"customers": {
"updated_at": "2021-03-27T00:00:00.000000Z",
"since_id": 15,
"updated_at_max": "2021-04-26T00:00:00.000000Z"}}}

with self.assertRaises(DummyShopifyError):
list(CUSTOMER_OBJECT.get_objects())

# Verify keys exist
self.assertIn("since_id", Context.state["bookmarks"]["customers"])
self.assertIn("updated_at_max", Context.state["bookmarks"]["customers"])

# Verify bookmark key values are as expected
self.assertEqual(Context.state["bookmarks"]["customers"]['since_id'], 15)
self.assertEqual(Context.state["bookmarks"]["customers"]['updated_at_max'], "2021-04-26T00:00:00.000000Z")