-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(watermarks): track last seen message committable #137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see comments inline
sentry_streams/src/messages.rs
Outdated
#[pyclass] | ||
pub struct WatermarkMessage { | ||
pub timestamp: f64, | ||
pub expected_copies: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the expected_copies
is the right approach.
Think about this scenario:
- one source
- some processing
- then a broadcast
- then one of the branches has a broadcast.
All the watermarks going through one branch will have a expected_copies = 2, those going through the other branch will have expected_copies=3
.
I think a better option is to let message s flow through as they are, and make the final step aware of all the possible routes thus able to identifies it received a watermark through all routes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that the commit step would store the max of expected_copies
for each unique committable, but I agree if we can make the commit step aware of the # of branches when building the chain that would be better. I'll remove expected_copies
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we can make the commit step aware of the # of branches
This is relatively easy.
#[derive(Debug, Copy, Clone)] | ||
/// | ||
/// TODO: | ||
/// - reduce/broadcast/router steps need to handle WatermarkMessages instead of just forwarding them downstream immediately | ||
/// - comit policy needs to be aware of the total # of broadcast branches so it knows how many copies of a given WatermarkMessage | ||
/// to anticipate before it sends a CommitRequest | ||
#[derive(Debug, Clone, PartialEq)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't derive Copy
for a struct using BTreeMap
sentry_streams/src/watermark.rs
Outdated
for (partition, offset) in message.committable() { | ||
self.watermark_committable.insert(partition, offset); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should never happen but warning if the new offset is <= of the existing one would be a good defensive programming idea.
Based off of #133
This PR:
Watermark
step to track the committable of the last seen message through the stepWatermarkMessage
to contain the last seen committable of the Watermark step at the time of its creationWatermark PRs:
Watermark
step andWatermarkMessage
typeCommitRequest
when a watermark msg is seen by the policy