Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update bookmarking for interrupted sync #166

Merged
merged 6 commits into from May 10, 2023

Conversation

RushiT0122
Copy link
Contributor

@RushiT0122 RushiT0122 commented May 5, 2023

Description of change

Consider we have following records to sync from the source,

(order_id | replication_key):

  • (1, 101),
  • (2, 102),
  • (3, 103),
  • (4, 104),
  • (5, 105),
  • (6, 106),
  • (7, 107),
  • (8, 108),
  • (7, 109), (9, 109), --> order_id#7 gets updated here
  • (10, 110)

Before sync starts,

  • Sync starts at: 110
  • Window size: 5
  • Bookmark=100
  • First window: 100-105

Scenario

  • Sync interrupted at: since_id=8 (replication_key=108)
  • Next sync starts at: 115
  • Record updated before next sync: order_id=7 (replication_key=109)

On completion of next sync,

  • since_id = 7
  • Windows: (100-105), (105-110), (110-115)
  • Bookmark = min(max_bookmark_value, stop_time) = 110

In this scenario, on resuming sync updated record id#7 with (replication_key=109) will not sync with current implementation.

Solution:

Add updated_at_max value along with since_id and on resuming interrupted sync set window between (bookmark, updated_at_max). 

  • Sync interrupted at: since_id=8 (replication_key=108)
  • Next sync starts at: 115
  • Record updated before next sync: order_id=7 (replication_key=109)

On completion of next sync,

  • since_id = 1  --> pop() the since_id=8 from the bookmark
  • Starting window: (105-108), (108-113), (113-115)
  • Bookmark = min(max_bookmark_value, stop_time) = 110

This solution will make sure on resuming the interrupted sync, record id#7 (replication_key=109) will be synced.

QA steps

  • automated tests passing
  • manual qa steps passing (list below)

Risks

Rollback steps

  • revert this branch

@RushiT0122 RushiT0122 force-pushed the TDL-22898-fix-intermediate-bookmark branch 2 times, most recently from 5df807f to 3d91ae9 Compare May 5, 2023 05:12
@RushiT0122 RushiT0122 requested a review from luandy64 May 5, 2023 07:05
@RushiT0122 RushiT0122 force-pushed the TDL-22898-fix-intermediate-bookmark branch from 3d91ae9 to 0156a80 Compare May 5, 2023 09:27
Copy link
Contributor

@luandy64 luandy64 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code changes look okay, but the tests could be improved I think

Comment on lines 247 to 248
updated_at_max = last_sync_interrupted_at or updated_at_min + \
datetime.timedelta(days=date_window_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style suggestion:
https://pep8.org/#break-before-or-after-binary-operator

Suggested change
updated_at_max = last_sync_interrupted_at or updated_at_min + \
datetime.timedelta(days=date_window_size)
updated_at_max = (last_sync_interrupted_at
or updated_at_min + datetime.timedelta(days=date_window_size))

The preview doesn't look great, but the line is 91 characters long, so it should be fine

@mock.patch("singer.utils.now", return_value=strptime_to_utc(NOW_TIME))
def test_since_id_and_updated_at_max_deleted(self, mock_now, mock_get_query_params, mock_update_bookmark, mock_get_bookmark, mock_call_api):
"""Verify after successful sync since_id and updated_at_max keys are deleted from the state"""
mock_call_api.return_value = [CUSTOMER_1, CUSTOMER_2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this return value set here, but some of the other patches are set above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern was duplicated from the existing test for conistency, will refactor the code at both the places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion on consistency is only copy it when it's good. Otherwise, we should improve the code when we touch it

"since_id": 15,
"updated_at_max": "2021-04-26T00:00:00.000000Z"}}}

customers = list(CUSTOMER_OBJECT.get_objects())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we don't use customers after this. Can this just be

CUSTOMER_OBJECT.get_objects()

@mock.patch("singer.utils.now", return_value=strptime_to_utc(NOW_TIME))
def test_interrupted_sync(self, mock_now, mock_get_query_params, mock_update_bookmark, mock_get_bookmark, mock_call_api):
"""Verify if sync is interrrupted twice in a row then since_id and updated_at_max keys are not deleted from the state"""
mock_call_api.side_effect = Exception("Dummy exception...")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

Comment on lines 77 to 80
try:
customers = list(CUSTOMER_OBJECT.get_objects())
except:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we should catch our dummy exception because in the current implementation, if an exception is thrown or not, we handle it the same.

How do we distinguish a syntax error in the test from the dummy exception, for example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added dummy exception to distinguish dummy exception from code exception and used it as a side effect.

Comment on lines 67 to 68
def test_interrupted_sync(self, mock_now, mock_get_query_params, mock_update_bookmark, mock_get_bookmark, mock_call_api):
"""Verify if sync is interrrupted twice in a row then since_id and updated_at_max keys are not deleted from the state"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems poorly implemented.

  1. We set the state
  2. We mock all of the code that would alter the state
  3. We assert the state is what we expect it to be

Shouldn't we start with an empty state, mock the API call so that state looks like

{"bookmarks": {
            "currently_sync_stream": 'customers',
            "customers": {
                "updated_at": "2021-03-27T00:00:00.000000Z",
                "since_id": 15,
                "updated_at_max": "2021-04-26T00:00:00.000000Z"}}}

then force the tap to throw an exception, and then assert the state has a since_id and an updated_at_max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the code implementation, we are poping since_id and updated_at_max. This test is verifying that we are not poping these intermediate bookmarks too early before we successfully sync till updated_at_max in the bookmark. Early deletion of intermitent bookmarks will cause the undesired results in case sync is interrupted.

@RushiT0122 RushiT0122 force-pushed the TDL-22898-fix-intermediate-bookmark branch from cda0f9a to 7fa474d Compare May 8, 2023 07:52
Copy link
Contributor

@luandy64 luandy64 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one style change. It's up to you if you want to include it

tap_shopify/streams/base.py Outdated Show resolved Hide resolved
RushiT0122 and others added 2 commits May 10, 2023 11:50
Co-authored-by: Andy Lu <andy@stitchdata.com>
@RushiT0122 RushiT0122 merged commit 6939afb into master May 10, 2023
1 check passed
RushiT0122 added a commit that referenced this pull request May 22, 2023
* update bookmarking for interrupted sync
* add unit tests
* minor fix
* fix reviews

* fix style suggestion
Co-authored-by: Andy Lu <andy@stitchdata.com>

* bump version 1.7.3

---------

Co-authored-by: RushiT0122 <rtodkar@stitchdata-talend.com>
Co-authored-by: Andy Lu <andy@stitchdata.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants