-
Notifications
You must be signed in to change notification settings - Fork 521
fix: consumer offset end #4500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: consumer offset end #4500
Conversation
a1f17e9 to
453bda7
Compare
| debug!(consumer_id, consumer_offset, "consumer offset"); | ||
| partition_response.start_offset = consumer_offset + 1; | ||
| partition_response.consumer_offset = Some(consumer_offset + 1); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have integration test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some tests at crates/fluvio-test/src/tests/consumer_offsets/ that should cover it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or do you mean another kind of integration test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's E2E tests, good job on there.
The integration test should go here: https://github.com/infinyon/fluvio/tree/master/crates/fluvio-spu/src/services/public/tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added integration test
453bda7 to
3f426be
Compare
3f426be to
6b5366b
Compare
|
|
||
| /// Last committed consumer offset | ||
| #[fluvio(min_version = 24)] | ||
| pub consumer_offset: Option<i64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. There is already FetchConsumerOffsetsRequest ? Why is needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only requested when the stream is created, we can also change to request FetchConsumerOffsetsRequest there, but it will need 2 requests.
As we were handling consumer offset directly at start_offset of FetchOffsetPartitionResponse, I kept with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use FetchConsumerOffsetsRequest. otherwise we have to maintain at two different places. Making 2 requests are fine as this is only at beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that FetchConsumerOffsetsRequest return all consumers and doesn't have parameters for filtering.
Do you still think that is better use it? If yes, should we add a parameter to filter the replica/consumer_id that we want in the request or in memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We solve it in the client, so I think it makes sense to return the consumer offset state separately from the start/end of the topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem is that semantics is unclear. Maybe then logic should be into the SPU consumer processing not at client.
I would expect that when I get offset, it is valid. SPU should make sure it's always valid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, I can change back to just changing the start_offset, but sending in the fetch request the offset (begin, end, ...), and resolve it on SPU.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It already has following info:
/// First readable offset.
pub start_offset: i64,
/// Last readable offset
pub last_stable_offset: i64,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to send two requests on stream creation.
One for topic offset and another one for the consumer_offset.
We're not handling it via forwarding now.
6b5366b to
0c5cc26
Compare
d5f240b to
eae8f7a
Compare
eae8f7a to
2f40a6d
Compare
8772bf3 to
d9efb09
Compare
f772120 to
e9e5b74
Compare
sehz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good job
e9e5b74 to
0466704
Compare
369b130

Should solve #4325