Skip to content

feat(watermarks): track last seen message committable #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 19, 2025

Conversation

bmckerry
Copy link
Member

@bmckerry bmckerry commented Jun 16, 2025

Based off of #133

This PR:

  • updates the Watermark step to track the committable of the last seen message through the step
  • updates the payload of WatermarkMessage to contain the last seen committable of the Watermark step at the time of its creation

Watermark PRs:

  • pr1 (feat(rust_arroyo): add watermark step #133): introduce Watermark step and WatermarkMessage type
  • pr2 (this PR): watermark message body - tracking offsets for messages sent before watermark messages, etc
  • pr3: fix reduce/broadcast/router - each needs to have special behaviour when the see a watermark msg
  • pr4: commit policy - building a custom commit policy that only sends a CommitRequest when a watermark msg is seen by the policy

@bmckerry bmckerry requested review from fpacifici and ayirr7 June 16, 2025 20:41
Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see comments inline

#[pyclass]
pub struct WatermarkMessage {
pub timestamp: f64,
pub expected_copies: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the expected_copies is the right approach.
Think about this scenario:

  • one source
  • some processing
  • then a broadcast
  • then one of the branches has a broadcast.

All the watermarks going through one branch will have a expected_copies = 2, those going through the other branch will have expected_copies=3.

I think a better option is to let message s flow through as they are, and make the final step aware of all the possible routes thus able to identifies it received a watermark through all routes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that the commit step would store the max of expected_copies for each unique committable, but I agree if we can make the commit step aware of the # of branches when building the chain that would be better. I'll remove expected_copies.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can make the commit step aware of the # of branches

This is relatively easy.

@bmckerry bmckerry changed the base branch from 74/watermark-step to main June 17, 2025 18:02
Comment on lines -90 to +98
#[derive(Debug, Copy, Clone)]
///
/// TODO:
/// - reduce/broadcast/router steps need to handle WatermarkMessages instead of just forwarding them downstream immediately
/// - comit policy needs to be aware of the total # of broadcast branches so it knows how many copies of a given WatermarkMessage
/// to anticipate before it sends a CommitRequest
#[derive(Debug, Clone, PartialEq)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't derive Copy for a struct using BTreeMap

Comment on lines 78 to 80
for (partition, offset) in message.committable() {
self.watermark_committable.insert(partition, offset);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should never happen but warning if the new offset is <= of the existing one would be a good defensive programming idea.

@bmckerry bmckerry merged commit 7e733c3 into main Jun 19, 2025
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants