Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pausing and resuming consumption #261

Closed
nitin02 opened this issue Nov 16, 2017 · 7 comments
Closed

pausing and resuming consumption #261

nitin02 opened this issue Nov 16, 2017 · 7 comments
Assignees
Milestone

Comments

@nitin02
Copy link

nitin02 commented Nov 16, 2017

Is there a way to pause consumption of messages from kafka on API call (or some other way externally) and to resume whenever necessary. Also, To reset consumer offset to fetch latest messages, I can see the property being set but it seems like is there a way to do it in real-time?

I'm currently using Kafka (0.10+) with autoCommit currently and all defaults.

What would be the best way to acheive this? To pause/resume consumer from a external API call and to flush offset from a external API call.

@garyrussell
Copy link
Contributor

Pause/Resume are not currently supported in Spring Cloud Stream.

You can pause() and seek() using Elmhurst (2.0) because the Consumer is available as a message header; however, we'd need to expose idleEventInterval as a property to enable an event listener to resume() the consumer; otherwise the application will never be called for a paused consumer.

These features are available today if you use spring-kafka directly.

@garyrussell garyrussell self-assigned this Nov 16, 2017
@garyrussell garyrussell added this to the 2.0.0.M4 milestone Nov 16, 2017
garyrussell added a commit to garyrussell/spring-cloud-stream-binder-kafka that referenced this issue Nov 16, 2017
@nitin02
Copy link
Author

nitin02 commented Nov 17, 2017

Hello @garyrussell, Thank you for your quick reply. I have seen the idleEventInterval property which helps with the resume() functionality. But how does it internally work?

An event is called after idleEventInterval is elapsed, wherein resume() can be called. Does this mean the inteval is fixed and cannot be set at runtime?

Is there any way to resume(), only when some other service returns true, signalling resume().

@garyrussell
Copy link
Contributor

See the pull request and the documentation update included therein.

The interval is fixed but you don't always have to act on the idle event so, of course, you can call some other service to see if you actually want to resume - the idle event is emitted each time the interval elapses so you don't have to resume on the first call, you'll get another event later. I chose 30 seconds as the default interval.

This is the application I used to test it...

@SpringBootApplication
@EnableBinding(Sink.class)
public class KbGh261Application {

	public static void main(String[] args) {
		SpringApplication.run(KbGh261Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("gh261", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event + " paused:" + event.getConsumer().paused());
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

And this is the result...

xxx
ListenerContainerIdleEvent [idleTime=30.082s, listenerId=gh261.container-0, container=KafkaMessageListenerContainer [id=gh261.container-0, clientIndex=-0, topicPartitions=[gh261-0]], topicPartitions=[gh261-0]] paused:[gh261-0]
yyy
ListenerContainerIdleEvent [idleTime=30.111s, listenerId=gh261.container-0, container=KafkaMessageListenerContainer [id=gh261.container-0, clientIndex=-0, topicPartitions=[gh261-0]], topicPartitions=[gh261-0]] paused:[gh261-0]
ListenerContainerIdleEvent [idleTime=60.217s, listenerId=gh261.container-0, container=KafkaMessageListenerContainer [id=gh261.container-0, clientIndex=-0, topicPartitions=[gh261-0]], topicPartitions=[gh261-0]] paused:[]
ListenerContainerIdleEvent [idleTime=90.301s, listenerId=gh261.container-0, container=KafkaMessageListenerContainer [id=gh261.container-0, clientIndex=-0, topicPartitions=[gh261-0]], topicPartitions=[gh261-0]] paused:[]

(See the idleTime in the event).

@nitin02
Copy link
Author

nitin02 commented Nov 17, 2017

@garyrussell, Thank you for the explanation. It seems perfect for my usecase.

But, how can I use it? I'm using the 1.4.1.RELEASE of spring boot and I can see from the release info,
https://github.com/spring-cloud/spring-cloud-stream-starters/releases that I need atleast 2.0.0.M5 or later for testing it, and other version updates as well.

Are the milestone releases stable for production use? If yes, How should I go about about updating the versions of all dependencies.

@garyrussell
Copy link
Contributor

It requires spring-kafka 2.0 or higher - earlier versions do not provide access the to the Consumer object so you can't pause with earlier releases.

No, milestones are not suitable for production use.

If you can't wait for the 2.0 release of spring cloud stream, you have no choice but to use spring-kafka 2.0.1.RELEASE directly.

@nitin02
Copy link
Author

nitin02 commented Nov 18, 2017

I extend the AbstractMessageConverter currently for my conversion logic. I can see from the docs that, I should use, MessagingMessageListenerAdapter.

Is there any standard way to make this switch?

@garyrussell
Copy link
Contributor

@nitin02 Sorry for the delay in replying; I somehow missed the notification.

It's not clear what you are asking; sorry.

@olegz olegz closed this as completed in ae269e7 Jan 4, 2018
@sabbyanandan sabbyanandan removed the in pr label Jan 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants