-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-129: Implement Consumer Seek #173
Conversation
if (theListener instanceof MessageListener) { | ||
this.listener = (MessageListener<K, V>) theListener; | ||
if (this.theListener instanceof MessageListener) { | ||
this.listener = (MessageListener<K, V>) this.theListener; | ||
this.batchListener = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I see, this code is a part of constructor. In this case initializing object's field with null
is unnecessary - java does this. So now it's just extra code, which makes the method longer and less readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is incorrect - since the field is marked final
, the compiler requires it to be explicitly initialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, missed final
.
Must be rebased to the latest |
@@ -896,6 +921,11 @@ private void commitIfNecessary() { | |||
} | |||
} | |||
|
|||
@Override | |||
public void seek(String topic, int partition, long offset) { | |||
this.seeks.add(new TopicPartitionInitialOffset(topic, partition, offset)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why we can't reach the same via just top-level container operation.
Moreover expose it to JMX.
What am I missing?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it somehow needs to be directed to the right consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaMessageListenerContainer
has 1-1
relationship with the ListenerConsumer
. So, I believe, the question is closed here.
The ConcurrentMessageListenerContainer
builds a set of containers
based on the partitionSubset(containerProperties, i)
. Together with the Collection<TopicPartition> getAssignedPartitions()
, it seems for me we can reach a uniqueness and correctly assign requested seek
to the appropriate KafkaMessageListenerContainer
.
No ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that the topics/partitions can move when the broker does a rebalance of the assignment - I want to be sure that the seek is done in the context of the current state at the time the listener is called. That way, if the consumer no longer owns that partition the seek operation will fail.
The solution looks cool, but I'm curious what drove you away of the simple With good valuable argument I can merge it as is or let you go ahead with Docs 😄 |
} | ||
Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener"); | ||
if (this.autoCommit && this.theListener instanceof ConsumerSeekAware) { | ||
((ConsumerSeekAware) this.theListener).registerSeekCallback(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this one if we registerSeekCallback()
any way in the ListenerInvoker.run()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because with autoCommit = true
, the listener is invoked on the consumer thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I wasn't able to step in here in debug mode during testing. Maybe we just don't have any test on the matter?..
- Having your condition, "the listener is invoked on the consumer thread." and "listeners should store the callback in a
{@code ThreadLocal}
.", plus the test-case in theEnableKafkaIntegrationTests
with exactly aThreadLocal
, I dug further and found this code in theKafkaMessageListenerContainer.doStart()
:
this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
setRunning(true);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
So, we create ListenerConsumer
instance in the KafkaMessageListenerContainer.doStart()
Thread, but run()
it in a different Thread.
My concern that this .registerSeekCallback(this)
for the autoCommit = true
is done in a wrong Thread. Therefore I won't be able to get a proper ThreadLocal
value, when I consume messages.
Am I missing anything?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we don't have a test case; will add one - good catch - I had a similar problem with the other case - I called from the ctor instead of the run method in the ListenerInvoker
. See the second commit where I moved that code.
1413897
to
e6be94d
Compare
Hmmm - no travis check - weird. |
Can't that be a reason? |
* @param <T> the delegate type. | ||
* | ||
* @author Gary Russell | ||
* @since 5.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1.1
😄
Fine with me. Do you still have something else to do here, e.g. Docs? |
Will add a docs commit tomorrow. |
/** | ||
* Identifies the current position of a TopicPartition. | ||
* | ||
* @author Artem Bilan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is your class and @since 1.1
😄
Although I still try to understand if we need it...
Maybe simple Map<TopicPartition, Long>
could be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe we can just rename TopicPartitionInitialOffset
to TopicPartitionOffset
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to a Map.
I think we need |
Rebase is needed as well. |
That's all for this round. |
Needs rebase |
- support seek from a `@KafkaListener` - pass the callback through the adapters - set the callback on the correct thread when not auto-commit
Also rebased.
I think I'm fine with the fix. Polling locally for the last review round and probably merge... |
|
||
/** | ||
* Register the callback to use when seeking at some arbitrary time. When used with a | ||
* {@code ConcurrentMessageListenerContainer} or the same listener instance in multupe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: multiple
Merged as b18ef3e |
Resolves #129
Review only @mbogoevici @artembilan
TODO: docs, pass callback through adapters.