Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer.pause(Collection<TopicPartition> var1) is delayed #479

Closed
messaoudi-mounir opened this issue Oct 23, 2018 · 2 comments
Closed

consumer.pause(Collection<TopicPartition> var1) is delayed #479

messaoudi-mounir opened this issue Oct 23, 2018 · 2 comments

Comments

@messaoudi-mounir
Copy link

messaoudi-mounir commented Oct 23, 2018

With default settings, the following happens: (kafka, topic with only 1 partition).
The partition is initialized with messages that not yet consumed.

  • msg1
  • msg2
  • msg3
  • ...
  • msg100

Running the sample example (from docs https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples)

Using spring-cloud-stream-binder-kafka 2.0.1.RELEASE

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}
}

Expected result:
The stream listener consume the first message (msg1) then pause.
Result after execution example
The stream listener consume the first message (msg1) and keep consuming the rest of messages..
After "X" time, if i write messages on topic, the messages ar enot consumed.

I think the problem is when calling pause method on the thread that invokes the @StreamListener though, the pause will be delayed (because the container will wait for that thread to exit for "X" secs).

Any quick response will be much appreciated!

@garyrussell
Copy link
Contributor

Pausing the consumer doesn't discard any messages that have already been fetched by the previous poll(). It simple prevents new messages from being fetched on subsequent polls. You would need to discard those messages and perform a seek on the consumer so the discarded records will be refetched when you resume the consumer.

You can also set max.poll.records to 1 so only one record is fetched by each poll.

@messaoudi-mounir
Copy link
Author

Thank you @garyrussell for answer for quick reply.
worked with max.poll.records=1

closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants