Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Finalize and write last state message with dedupe #1708

Merged
merged 17 commits into from
Jul 14, 2023

Conversation

kgpayne
Copy link
Contributor

@kgpayne kgpayne commented May 16, 2023

Closes #1704

@codecov
Copy link

codecov bot commented May 16, 2023

Codecov Report

Merging #1708 (32b3d96) into main (7f33209) will increase coverage by 0.02%.
The diff coverage is 94.11%.

@@            Coverage Diff             @@
##             main    #1708      +/-   ##
==========================================
+ Coverage   86.53%   86.56%   +0.02%     
==========================================
  Files          59       59              
  Lines        4990     5000      +10     
  Branches      811      811              
==========================================
+ Hits         4318     4328      +10     
  Misses        476      476              
  Partials      196      196              
Impacted Files Coverage Δ
singer_sdk/tap_base.py 80.71% <ø> (-0.09%) ⬇️
singer_sdk/testing/suites.py 100.00% <ø> (ø)
singer_sdk/streams/core.py 83.96% <90.90%> (+0.19%) ⬆️
singer_sdk/testing/tap_tests.py 84.49% <100.00%> (+0.75%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@aaronsteers
Copy link
Contributor

aaronsteers commented May 16, 2023

@kgpayne - I did some digging and found that Stream.finalize_state_progress_markers actually calls a black box finalize_state_progress_markers(), which does not return a value. It actually does not track as of now whether changes are made or not:

def finalize_state_progress_markers(stream_or_partition_state: dict) -> dict | None:
"""Promote or wipe progress markers once sync is complete.
This marks any non-resumable progress markers as finalized. If there are
valid bookmarks present, they will be promoted to be resumable.
"""
signpost_value = stream_or_partition_state.pop(SIGNPOST_MARKER, None)
stream_or_partition_state.pop(STARTING_MARKER, None)
if (
is_state_non_resumable(stream_or_partition_state)
and "replication_key" in stream_or_partition_state[PROGRESS_MARKERS]
):
# Replication keys valid (only) after sync is complete
progress_markers = stream_or_partition_state[PROGRESS_MARKERS]
stream_or_partition_state["replication_key"] = progress_markers.pop(
"replication_key",
)
new_rk_value = progress_markers.pop("replication_key_value")
if signpost_value and _greater_than_signpost(signpost_value, new_rk_value):
new_rk_value = signpost_value
stream_or_partition_state["replication_key_value"] = new_rk_value
# Wipe and return any markers that have not been promoted
return reset_state_progress_markers(stream_or_partition_state)

Probably more scope than is needed for a fix of the underlying bug, given that sending an extra STATE message (as I think this PR may sometimes do) is relatively harmless, and was the prior behavior as well. One option for a longer-term fix might be to migrate the helper function logic directly into the class - allowing that method then to update the internal Stream._is_state_flushed property when the state object is known to be changed. Alternatively, the helper function could return a boolean value indicating "True" if changes are applied, and the stream class could use that return value. Complicating matters though (and expanding scope) is the fact that many write operations in the above helper function are not behind conditional checks.

Regarding performance of this operation, at least this operation is once-per-state-finalization-option and not once-per-record-write. Otherwise this would be more high-stakes code to update, performance wise. 🤷

@kgpayne kgpayne changed the title fix: force write last state message fix: finalise and write last message with dedupe May 16, 2023
@kgpayne kgpayne changed the title fix: finalise and write last message with dedupe fix: finalise and write last state message with dedupe May 16, 2023
@kgpayne
Copy link
Contributor Author

kgpayne commented May 16, 2023

@aaronsteers the root cause was indeed missing self._is_state_flushed = False snippets. We are not marking state as flushable after calling finalize_state_progress_markers, which meant the final calls in stream and tap to _write_state_message() were skipped by the if statement in that method.

For now I was able to 'fix' the issue @laurentS is seeing in tap-github by i) setting self._is_state_flushed = False after every call to the finalize_state_progress_markers helper function (as state may have changed) and ii) deduping in Stream._write_state_message() to eliminate duplicates in cases where state wasn't changed. Its not elegant, but I wanted to verify I understood the root cause 🙂

Re: a refactor: we use the finalize_state_progress_markers helper function within the stream and the Stream.finalize_state_progress_markers from the Tap. Agree this could and probably should be refactored so that the Stream.finalize_state_progress_markers is the only one used, allowing us to toggle _is_state_flushed inline. Will keep hacking at this PR tomorrow.

FYI @edgarrmondragon - open to all feedback and suggestions 🙂

@aaronsteers
Copy link
Contributor

@kgpayne - Yeah, nice. Agreed with all the above. 👍

@kgpayne kgpayne marked this pull request as ready for review May 17, 2023 10:30
singer_sdk/streams/core.py Outdated Show resolved Hide resolved
singer_sdk/streams/core.py Show resolved Hide resolved
@edgarrmondragon edgarrmondragon changed the title fix: finalise and write last state message with dedupe fix: Finalize and write last state message with dedupe May 17, 2023
@laurentS
Copy link
Contributor

@kgpayne @edgarrmondragon @aaronsteers is it worth bringing some form of the test that fails in tap-github upstream in the sdk to avoid regressions like this in the future? I'm thinking some simplified variant of this one.

@edgarrmondragon
Copy link
Collaborator

@kgpayne @edgarrmondragon @aaronsteers is it worth bringing some form of the test that fails in tap-github upstream in the sdk to avoid regressions like this in the future? I'm thinking some simplified variant of this one.

@laurentS I agree, we should take the opportunity to guard against future regressions 👍

@edgarrmondragon edgarrmondragon self-requested a review July 14, 2023 18:09
Copy link
Collaborator

@edgarrmondragon edgarrmondragon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ken's work is 👌 here. I only an automatic test so I'm confident merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bug: possible regression in state messages and progress_markers handling
4 participants