Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Add seek management support for the Kafka Inbound Channel Adapters #212

Closed
alpert opened this issue Sep 5, 2018 · 4 comments
Closed

Add seek management support for the Kafka Inbound Channel Adapters #212

alpert opened this issue Sep 5, 2018 · 4 comments
Assignees
Milestone

Comments

@alpert
Copy link

alpert commented Sep 5, 2018

Hi.

This may be a generic question about Kafka and not directly related to spring integration but without manual offset management what will happen if i close and open my application(that using spring integration kafka) again? Will it continue where it left? Will it continue from beginning or end? What should I do if i want it to read from beginning, end of where it left?

@artembilan
Copy link
Contributor

The Spring Integration Kafka is fully based now on the Spring for Apache Kafka: https://spring.io/projects/spring-kafka

The KafkaMessageDrivenChannelAdapter and KafkaInboundGateway fully rely on the MessageListenerContainer therefore all the offset management comes from there: https://docs.spring.io/spring-kafka/docs/2.1.9.RELEASE/reference/html/_reference.html#committing-offsets.
If you use a enable.auto.commit = true , then you should understand that Kafka Client commits offsets in async manner, therefore it is indeed that after closing app and restarting you may start to consume from some earlier offset.

To re-read from the beginning you need to use a ConsumerSeekCallback: https://docs.spring.io/spring-kafka/docs/2.1.9.RELEASE/reference/html/_reference.html#seek.

Well, it is used, but looks like in case of KafkaMessageDrivenChannelAdapter and KafkaInboundGateway we don't provide any hooks for that functionality.
And from here it sounds like we need to provide for those Channel Adapter a functionality like setConsumerSeekAware(). So, you will be able to do the same what can be done now with the @KafkaListener and ConsumerSeekAware impl on that class.

Saying that I'd recommend you as a workaround to use a @KafkaListener for time being and send messages to the channel from the method impl or delegate to some @MessagingGateway.

Does it make sense?

@garyrussell , WDYT?

@garyrussell
Copy link
Contributor

Yes; we should add seek capability to the adapter.

@artembilan artembilan self-assigned this Sep 5, 2018
@artembilan artembilan changed the title Question about offsets Add seek management support for the Kafka Inbound Channel Adapters Sep 5, 2018
@artembilan artembilan added this to the 3.1.M2 milestone Sep 5, 2018
artembilan added a commit to artembilan/spring-integration-kafka that referenced this issue Sep 5, 2018
artembilan added a commit to artembilan/spring-integration-kafka that referenced this issue Sep 6, 2018
Fixes spring-attic#212

* Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK`
header to be populated to messages for sending to the channel
* Populate that header from the `KafkaInboundGateway` and
`KafkaMessageDrivenChannelAdapter` into the message from the
`seekCallBack` property if `ListenerContainer` is single-threaded or
from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise;
and only if newly introduced `setAdditionalHeaders` is `true`
* Populate `seekCallBack` property or `ThreadLocal<?>` from the
`registerSeekCallback()` implementation from the internal listeners
* Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and
`setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate
event from the underlying container and perform appropriate seek management
* Add new options to the DSL classes and cover them with tests, including
check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header

**Cherry-pick to 3.0.x**
garyrussell pushed a commit that referenced this issue Sep 7, 2018
* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes #212

**Cherry-pick to 3.0.x**

* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes #212

* Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK`
header to be populated to messages for sending to the channel
* Populate that header from the `KafkaInboundGateway` and
`KafkaMessageDrivenChannelAdapter` into the message from the
`seekCallBack` property if `ListenerContainer` is single-threaded or
from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise;
and only if newly introduced `setAdditionalHeaders` is `true`
* Populate `seekCallBack` property or `ThreadLocal<?>` from the
`registerSeekCallback()` implementation from the internal listeners
* Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and
`setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate
event from the underlying container and perform appropriate seek management
* Add new options to the DSL classes and cover them with tests, including
check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header

**Cherry-pick to 3.0.x**

* Address PR comments: remove unnecessary API

* *Polishing `setOnPartitionsAssignedSeekCallback()` JavaDocs
*Close producers in the `KafkaProducerMessageHandlerTests`
garyrussell pushed a commit that referenced this issue Sep 7, 2018
* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes #212

**Cherry-pick to 3.0.x**

* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes #212

* Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK`
header to be populated to messages for sending to the channel
* Populate that header from the `KafkaInboundGateway` and
`KafkaMessageDrivenChannelAdapter` into the message from the
`seekCallBack` property if `ListenerContainer` is single-threaded or
from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise;
and only if newly introduced `setAdditionalHeaders` is `true`
* Populate `seekCallBack` property or `ThreadLocal<?>` from the
`registerSeekCallback()` implementation from the internal listeners
* Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and
`setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate
event from the underlying container and perform appropriate seek management
* Add new options to the DSL classes and cover them with tests, including
check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header

**Cherry-pick to 3.0.x**

* Address PR comments: remove unnecessary API

* *Polishing `setOnPartitionsAssignedSeekCallback()` JavaDocs
*Close producers in the `KafkaProducerMessageHandlerTests`
garyrussell pushed a commit to garyrussell/spring-integration that referenced this issue Jun 24, 2020
…-projects#213)

* spring-projectsGH-212: Add ConsumerSeekAware impl to Inbounds

Fixes spring-attic/spring-integration-kafka#212

**Cherry-pick to 3.0.x**

* spring-projectsGH-212: Add ConsumerSeekAware impl to Inbounds

Fixes spring-attic/spring-integration-kafka#212

* Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK`
header to be populated to messages for sending to the channel
* Populate that header from the `KafkaInboundGateway` and
`KafkaMessageDrivenChannelAdapter` into the message from the
`seekCallBack` property if `ListenerContainer` is single-threaded or
from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise;
and only if newly introduced `setAdditionalHeaders` is `true`
* Populate `seekCallBack` property or `ThreadLocal<?>` from the
`registerSeekCallback()` implementation from the internal listeners
* Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and
`setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate
event from the underlying container and perform appropriate seek management
* Add new options to the DSL classes and cover them with tests, including
check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header

**Cherry-pick to 3.0.x**

* Address PR comments: remove unnecessary API

* *Polishing `setOnPartitionsAssignedSeekCallback()` JavaDocs
*Close producers in the `KafkaProducerMessageHandlerTests`
artembilan added a commit to spring-projects/spring-integration that referenced this issue Jun 25, 2020
* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes spring-attic/spring-integration-kafka#212

**Cherry-pick to 3.0.x**

* GH-212: Add ConsumerSeekAware impl to Inbounds

Fixes spring-attic/spring-integration-kafka#212

* Introduce a new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK`
header to be populated to messages for sending to the channel
* Populate that header from the `KafkaInboundGateway` and
`KafkaMessageDrivenChannelAdapter` into the message from the
`seekCallBack` property if `ListenerContainer` is single-threaded or
from the `ThreadLocal<ConsumerSeekAware.ConsumerSeekCallback>` otherwise;
and only if newly introduced `setAdditionalHeaders` is `true`
* Populate `seekCallBack` property or `ThreadLocal<?>` from the
`registerSeekCallback()` implementation from the internal listeners
* Add `setOnPartitionsAssignedSeekCallback(BiConsumer)` and
`setOnIdleSeekCallback(BiConsumer)` options to react for the appropriate
event from the underlying container and perform appropriate seek management
* Add new options to the DSL classes and cover them with tests, including
check for new `IntegrationKafkaHeaders.CONSUMER_SEEK_CALLBACK` header

**Cherry-pick to 3.0.x**

* Address PR comments: remove unnecessary API

* *Polishing `setOnPartitionsAssignedSeekCallback()` JavaDocs
*Close producers in the `KafkaProducerMessageHandlerTests`
@harshita-mangly-mohan
Copy link

Hi, @garyrussell @artembilan,

I am using ConcurrentMessageListenerContainer with KafkaMessageDrivenChannelAdapter for an application Kafka implementation, in a use case to replay older messages from a kafka topic I plan to follow ->
Find offset by timestamp, seek and then continue polling from that offset

I was using the ConsumerSeekCallback.seekToTimestamp​ given by setOnPartitionsAssignedSeekCallback() hook, for which I get a multi thread access on consumer ERROR [ConcurrentModificationException], I believe the consumerCallback returned by setOnPartitionsAssignedSeekCallback() method should be used only in that method scope for seek to be triggered on the respective consumer thread , won't we need the 'ListenerConsumer' reference returned from registerSeekCallback() which is not available with KafkaMessageDrivenChannelAdapter to perform seek from the corresponding consumer thread at a later point in time?

Or can you please suggest if I am missing something in following this example form docs -> https://docs.spring.io/spring-kafka/reference/html/#seek

@garyrussell
Copy link
Contributor

This enhancement was only to support seeks during partition assignment; not at arbitrary times later.

More work is required to support that; I suggest you open a new issue for a new feature.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Development

No branches or pull requests

4 participants