Skip to content

Starting paused ConcurrentMessageListenerContainer results in messages being consumed #1111

@bgK

Description

@bgK

Affects Version(s): 2.2.6 (probably 2.3.x as well)

Use case:
I need to manually control the start offset for topic partitions while still using topic subscriptions; with multiple instances of the application sharing the same groupId.

The solution I plan to use is as follows:

  • Start all the instances of the application but without starting the CMLCs.
  • Have an external system transmit the desired start offsets to all the instances. Pause all the CMLCs. Start them all. In a ConsumerSeekAware message listener, seek the topic partitions in the onPartitionsAssigned callback.
  • Once the consumer group is stabilized, resume all the CMLCs to start message consumption.

This works mostly fine but I have encountered two issues:

The paused state is not propagated to KafkaMessageListenerContainer on start:

ConcurrentMessageListenerContainer cmlc = ...;
cmlc.pause();
cmlc.start();

After executing this code, ConcurrentMessageListenerContainer is paused, but the created KafkaMessageListenerContainer are not.

It's seemingly possible to fix this issue by applying the following modification in ConcurrentMessageListenerContainer:

protected void doStart() {
...
	KafkaMessageListenerContainer<K, V> container = new KafkaMessageListenerContainer<>();
...
	if (isPaused()) {
		container.pause();
	}
...
	container.start();
	this.containers.add(container);
}

The first call to KafkaConsumer#poll still returns messages:
KafkaMessageListenerContainer.ListenerConsumer.ListenerConsumerRebalanceListener#onPartitionsAssigned is indirectly called by KafkaMessageListenerContainer.ListenerConsumer#pollAndInvoke, via consumer.poll.

In my case, the KMLC is paused when the rebalance occurs and thus this code is executed:

	ListenerConsumer.this.consumerPaused = false;
	ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
		+ "the container will pause again before polling, unless the container's "
		+ "'paused' property is reset by a custom rebalance listener");

However, the call to consumer.poll that caused the rebalance listener to be called still returns messages.

As a quick test, I changed the code above to:

	ListenerConsumer.this.consumer.pause(partitions);

This yields the desired behavior. The call to consumer.poll does not return messages.
There may be other implications to this change I don't understand ...

(I wonder if there is a similar issue if a seek is performed from ConsumerSeekAware#onPartitionsAssigned while the KMLC is not paused -- messages from before the seek being returned by the call to consumer.poll. I did not test that case though.)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions