Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumerSeekAware broken with upgrade from 1.1.5 to 1.1.6 #403

Closed
tbinias opened this issue Aug 27, 2017 · 6 comments
Closed

ConsumerSeekAware broken with upgrade from 1.1.5 to 1.1.6 #403

tbinias opened this issue Aug 27, 2017 · 6 comments

Comments

@tbinias
Copy link

tbinias commented Aug 27, 2017

Having a listener which implements ConsumerSeekAware annotated with @KafkaListener works fine in 1.1.5 but not in 1.1.6:

@Component
public class EntityChangedEventConsumer implements ConsumerSeekAware {
@KafkaListener(topics = "${kafka.consumer.topic}")
    public void onMessage(ConsumerRecord<String, EntityChangedEvent> data) { ... }
}
   @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
      ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
    }

None of the ConsumerSeekAware-EntryPoints are called in 1.1.6.

Since #324 ConsumerSeekAware implementations are detected on this.genericListener

But this.genericListener is always assigned to listener:
line 354 KafkaMessageListenerContainer -> this.genericListener = listener;

The case of having a GenericAcknowledgingMessageListener is lost:
line 331 -> this.theListener = listener == null ? ackListener : listener;

artembilan pushed a commit that referenced this issue Sep 1, 2017
Resolves #403

remove test duplicates and use dedicated topic for testSeekAck

* Remove in the `ListenerConsumer` redundant `kafkaDataListener`
property in favor of `theListener`
* Some simple polishing
artembilan pushed a commit that referenced this issue Sep 1, 2017
Resolves #403

remove test duplicates and use dedicated topic for testSeekAck

* Remove in the `ListenerConsumer` redundant `kafkaDataListener`
property in favor of `theListener`
* Some simple polishing

Fix Checkstyle violation for annotation position

(cherry picked from commit b02ecd6)
artembilan added a commit to artembilan/counter that referenced this issue Sep 1, 2017
Resolves spring-projects/spring-kafka#403

With Java 8 Spring Boot auto-configure `BufferCounterService` which
has an inconsistency with the `DefaultCounterService`, where the last
one checks for the `counter.` prefix, but `BufferCounterService` just
for the `counter`.
This is a problem because the `spring.application.name` is populated as
`counter-sink`, therefore we don't have `counter.` in case of `BufferCounterService`.
Having that the SCDF can't get access to our counter because it reads them as
`spring.metrics.counter.*`

* While Spring Boot is fixing consistency with the `counter.` prefix, we provide
a workaround with an explicit setting `counter.` prefix in the `CounterSinkConfiguration`
@artembilan
Copy link
Member

Fixed via b02ecd6

@guillaumedrolet21
Copy link

Hi, using version 1.2.2.RELEASE here and the methods are still not invoked.

Using a ConcurrentKafkaListenerContainerFactory and a listener with KafkaListener annotation and implementing ConsumerSeekAware like described in this issue.

Like @tbinias has described, when arriving at below condition in KafkaMessageListenerContainer in createRebalanceListener method, genericListener is null.

if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) { seekPartitions(partitions, false); }

Also, I added the idle interval config like @garyrussell said in this stack overflow thread: https://stackoverflow.com/questions/46321483/spring-kafka-consumer-seek-aware-methods-not-invoked/46325685?newreg=5487961b397b4c7387b56694c6798884

I am using auto commit and I have auto offset reset set to "latest" if that changes anything.

something I am missing?

Thank you very much for your help!

@artembilan
Copy link
Member

Well, the fix is there already after release 1.2.2: https://github.com/spring-projects/spring-kafka/commits/1.2.x .
We may release 1.2.3 if that is a strong requirement.

@guillaumedrolet21
Copy link

Ok, thank you for quick response

@garyrussell
Copy link
Contributor

garyrussell commented Sep 20, 2017

My mistake, sorry; my local 1.2.x branch was out of date; I somehow reasoned it wasn't an issue on that branch since I tested with a snapshot.

@artembilan Please go ahead and release 1.2.3 tomorrow, while I am in transit; thanks.

@artembilan
Copy link
Member

Spring Kafka 1.2.3 has been released.
You can obtain it from the https://repo.spring.io/release/org/springframework/kafka/spring-kafka/1.2.3.RELEASE/

Will be in the Maven Central in a couple hours.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants