-
Notifications
You must be signed in to change notification settings - Fork 180
GH-212: Add ConsumerSeekAware impl to Inbounds #213
Conversation
Fixes spring-attic#212 **Cherry-pick to 3.0.x**
Do not merge yet: the proposed solution is slightly useless. Stay tuned! |
Fixes spring-attic#212 * Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header to be populated to messages for sending to the channel * Populate that header from the `KafkaInboundGateway` and `KafkaMessageDrivenChannelAdapter` into the message from the `seekCallBack` property if `ListenerContainer` is single-threaded or from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise; and only if newly introduced `setAdditionalHeaders` is `true` * Populate `seekCallBack` property or `ThreadLocal<?>` from the `registerSeekCallback()` implementation from the internal listeners * Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and `setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate event from the underlying container and perform appropriate seek management * Add new options to the DSL classes and cover them with tests, including check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header **Cherry-pick to 3.0.x**
Ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we only need the onPartitionsAssigned
code to allow an initial seek at that time; the Consumer
is provided in a header so they can seek on it directly. Also, as long as they use the normal event multicaster, they can seek from an event listener for the idle event.
this.seekCallback = callback; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need this extra complexity - why not always use the ThreadLocal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's not end-user API, so I don't see reason to worry about complexity in our code since with the single-threaded container we will avoid ThreadLocal
overhead altogether.
* @since 3.0.4 | ||
* @see IntegrationKafkaHeaders | ||
*/ | ||
public void setAdditionalRequestHeaders(boolean setAdditionalHeaders) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name seems too general; it is specifically related to adding the seek callback to the headers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for now. I'm open for better name.
Or do you think it would be better to add a boolean
setter for any new headers which may come in the future?
Did you see my general review comment? |
Good point about What else should I pay attention in your review? Thanks |
I mean I don't think we need to add the callback as a header, since the Consumer is already there as one. |
Huh? So, we don't need I'll fix all your concern. And having the argument about |
Indeed 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's all.
KafkaInboundGateway.this.onPartitionsAssignedSeekCallback.accept(assignments, callback); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only from the record listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh? There is no IntegrationBatchMessageListener
in the gateway.
Sorry, I'm not sure how to proceed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On other thought the batch processing does not make sense in the request-reply scenarios.
So, that's fully fine that we don't support batch
there.
Please, elaborate with your concern.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh; sorry; you are correct; I guess I was confused by the Javadocs iin the MDCA (which is wrong).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Will fix that JavaDoc in a few.
* Specify a {@link BiConsumer} for seeks management during | ||
* {@link ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)} | ||
* call from the {@link org.springframework.kafka.listener.KafkaMessageListenerContainer}. | ||
* This is called from the internal {@link RecordMessagingMessageListenerAdapter} implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, it's called from both.
*Close producers in the `KafkaProducerMessageHandlerTests`
* GH-212: Add ConsumerSeekAware impl to Inbounds Fixes #212 **Cherry-pick to 3.0.x** * GH-212: Add ConsumerSeekAware impl to Inbounds Fixes #212 * Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header to be populated to messages for sending to the channel * Populate that header from the `KafkaInboundGateway` and `KafkaMessageDrivenChannelAdapter` into the message from the `seekCallBack` property if `ListenerContainer` is single-threaded or from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise; and only if newly introduced `setAdditionalHeaders` is `true` * Populate `seekCallBack` property or `ThreadLocal<?>` from the `registerSeekCallback()` implementation from the internal listeners * Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and `setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate event from the underlying container and perform appropriate seek management * Add new options to the DSL classes and cover them with tests, including check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header **Cherry-pick to 3.0.x** * Address PR comments: remove unnecessary API * *Polishing `setOnPartitionsAssignedSeekCallback()` JavaDocs *Close producers in the `KafkaProducerMessageHandlerTests`
Fixes #212
Cherry-pick to 3.0.x