Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option so that KafkaMessageListenerContainer.stop() takes effect after the current record instead of the current batch #1672

Closed
yamunachukka opened this issue Jan 11, 2021 · 2 comments · Fixed by #1676

Comments

@yamunachukka
Copy link

UseCase: Given topic with 100 messages in kafka topic, I want to read messaged from offset 10 to offset 20. I could able to fetch from beginning offset. when i reach end offset, I have written code to stop the container.Even after execution of code, Consumer can consume further messages(from offset 21).It only stops after reading all messages in the topic


@Service
public class Consumer1  implements MessageListener<String, GenericRecord> {

 @Override
  public void onMessage(ConsumerRecord<String, GenericRecord> data) {
    log.info("feed record {}", data);
    if (data.offset() == 20) {
      feedService.stopConsumer();
    }
  }
}

@Service
public class FeedService{
   public void startConsumer(){
     kafkaMessageListenerContainer.start();
   }
   public void stopConsumer() {
    kafkaMessageListenerContainer.stop();
  }

}

Note: I am using spring-kafka latest version(2.6.4). One observation is container stop method is being executed but consumer is not getting closed.And no errors on output

@artembilan
Copy link
Member

You asked the same question on StackOverflow already: https://stackoverflow.com/questions/65664927/kafkamessagelistenercontainer-stop-is-not-stopping-consumption-of-messages-in. Just 4 hours ago.
Please, bear with us when we are out for weekend.

We will come back to you over there.

@garyrussell garyrussell reopened this Jan 11, 2021
@garyrussell garyrussell changed the title KafkaMessageListenerContainer.stop() is not stopping consumption of messages in message listener Add an option so that KafkaMessageListenerContainer.stop() takes effect after the current record instead of the current batch Jan 11, 2021
@garyrussell garyrussell self-assigned this Jan 11, 2021
@garyrussell garyrussell added this to the 2.6.5 milestone Jan 11, 2021
@garyrussell
Copy link
Contributor

Regardless of this behavior, you should not call stop() on the listener thread - it will cause a delay (because stop() waits for the listener thread to exit until shutdownTimeout). Call stop(() -> { }) instead.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jan 11, 2021
Resolves spring-projects#1672

Previously (and still, by default), stopping the listener container does
not take effect until the records from the previous poll are all processed.

Add an option to stop after the current record, instead.

**cherry-pick to 2.6.x, 1.5.x**
artembilan pushed a commit that referenced this issue Jan 11, 2021
Resolves #1672

Previously (and still, by default), stopping the listener container does
not take effect until the records from the previous poll are all processed.

Add an option to stop after the current record, instead.

**cherry-pick to 2.6.x, 1.5.x**
artembilan pushed a commit that referenced this issue Jan 11, 2021
Resolves #1672

Previously (and still, by default), stopping the listener container does
not take effect until the records from the previous poll are all processed.

Add an option to stop after the current record, instead.

**cherry-pick to 2.6.x, 1.5.x**

# Conflicts:
#	src/reference/asciidoc/whats-new.adoc
artembilan pushed a commit that referenced this issue Jan 11, 2021
Resolves #1672

Previously (and still, by default), stopping the listener container does
not take effect until the records from the previous poll are all processed.

Add an option to stop after the current record, instead.

**cherry-pick to 2.6.x, 1.5.x**

# Conflicts:
#	src/reference/asciidoc/whats-new.adoc

# Conflicts:
#	src/reference/asciidoc/whats-new.adoc
garyrussell added a commit that referenced this issue Jan 12, 2021
garyrussell added a commit that referenced this issue Jan 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants