Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impossible to create Listener Containers with different Kafka consumer properties #1808

Closed
sergeylemekhov opened this issue May 21, 2021 · 3 comments 路 Fixed by #1809
Closed

Comments

@sergeylemekhov
Copy link

Affects Version(s): 2.7.0

馃巵 Enhancement

When manually creating a listener container via listener container factory org.springframework.kafka.config.AbstractKafkaListenerContainerFactory#createContainer(java.lang.String...) method the listener container is initialized with properties from the factory.
The org.springframework.kafka.config.AbstractKafkaListenerContainerFactory#initializeContainer method sets listener container's kafkaConsumerProperties as a reference to the factory consumerProperties.
When calling code then changes kafkaConsumerProperties of the created listener container the changes affect all other listener containers which is the problem.

Because of this it is impossible to create two listener containers with different offset reset policies for example:

ConcurrentMessageListenerContainer<String, String> container1 = listenerContainerFactory.createContainer("topic");
container1.getContainerProperties().setGroupId("group1");
container1.getContainerProperties().getKafkaConsumerProperties().setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "lateset");

ConcurrentMessageListenerContainer<String, String> container2 = listenerContainerFactory.createContainer("topic");
container2.getContainerProperties().setGroupId("group2");
container2.getContainerProperties().getKafkaConsumerProperties().setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Both listeners in the snippet end up with "earliest" offset reset policy.

A way to fix this would be creating a copy of factory consumerProperties before setting it for the listener container.

@garyrussell
Copy link
Contributor

garyrussell commented May 21, 2021

Something is amiss here; each container gets a new ContainerProperties:

if (!topics.isEmpty()) { // NOSONAR
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);

We then copy properties from the factory's properties into the instance properties:

ContainerProperties properties = instance.getContainerProperties();
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
"messageListener", "ackCount", "ackTime", "subBatchPerPartition");
JavaUtils.INSTANCE
.acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
.acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
properties::setAckCount)
.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
properties::setAckTime)
.acceptIfNotNull(this.containerProperties.getSubBatchPerPartition(),
properties::setSubBatchPerPartition)
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
Boolean autoStart = endpoint.getAutoStartup();
if (autoStart != null) {
instance.setAutoStartup(autoStart);
}
else if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
instance.setRecordInterceptor(this.recordInterceptor);
instance.setBatchInterceptor(this.batchInterceptor);
JavaUtils.INSTANCE
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
.acceptIfHasText(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
.acceptIfHasText(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
.acceptIfNotNull(endpoint.getConsumerProperties(),
instance.getContainerProperties()::setKafkaConsumerProperties);
}

So it's not at all clear how we could get cross-talk between container properties.

@garyrussell
Copy link
Contributor

Oh; I see - it's the containerProperies.kafkaConsumerProperties that is the same - it should be omitted from the BeanUtils copy.

As a work around, give each container a new Properties, instead of getting it from the container properties.

	/**
	 * Set the consumer properties that will be merged with the consumer properties
	 * provided by the consumer factory; properties here will supersede any with the same
	 * name(s) in the consumer factory.
	 * {@code group.id} and {@code client.id} are ignored.
	 * Property keys must be {@link String}s.
	 * @param kafkaConsumerProperties the properties.
	 * @see org.apache.kafka.clients.consumer.ConsumerConfig
	 * @see #setGroupId(String)
	 * @see #setClientId(String)
	 */
	public void setKafkaConsumerProperties(Properties kafkaConsumerProperties) {
		Assert.notNull(kafkaConsumerProperties, "'kafkaConsumerProperties' cannot be null");
		this.kafkaConsumerProperties = kafkaConsumerProperties;
	}

@garyrussell garyrussell added this to the 2.7.2 milestone May 21, 2021
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 21, 2021
Resolves spring-projects#1808

Each container got a reference to the same `Properties` object in
`kafkaConsumerProperties`, unless it was created via a `@KafkaListener`
with property overrides.

Do not copy the property from the factory's properties.

**Cherry-pick to all supported branches**
@sergeylemekhov
Copy link
Author

@garyrussell I've also created a PR #1810
I've added a wrapper for Properties instance that delegates default values to factory instance and added a separate test.

artembilan pushed a commit that referenced this issue May 21, 2021
Resolves #1808

Each container got a reference to the same `Properties` object in
`kafkaConsumerProperties`, unless it was created via a `@KafkaListener`
with property overrides.

Do not copy the property from the factory's properties.

**Cherry-pick to all supported branches**
artembilan pushed a commit that referenced this issue May 21, 2021
Resolves #1808

Each container got a reference to the same `Properties` object in
`kafkaConsumerProperties`, unless it was created via a `@KafkaListener`
with property overrides.

Do not copy the property from the factory's properties.

**Cherry-pick to all supported branches**
artembilan pushed a commit that referenced this issue May 21, 2021
Resolves #1808

Each container got a reference to the same `Properties` object in
`kafkaConsumerProperties`, unless it was created via a `@KafkaListener`
with property overrides.

Do not copy the property from the factory's properties.

**Cherry-pick to all supported branches**
artembilan pushed a commit that referenced this issue May 21, 2021
Resolves #1808

Each container got a reference to the same `Properties` object in
`kafkaConsumerProperties`, unless it was created via a `@KafkaListener`
with property overrides.

Do not copy the property from the factory's properties.

**Cherry-pick to all supported branches**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
artembilan pushed a commit that referenced this issue May 21, 2021
Resolves #1808

Each container got a reference to the same `Properties` object in
`kafkaConsumerProperties`, unless it was created via a `@KafkaListener`
with property overrides.

Do not copy the property from the factory's properties.

**Cherry-pick to all supported branches**

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
artembilan added a commit that referenced this issue May 21, 2021
Related to #1808

Turns out we also need to ignore a deprecated `setConsumerProperties`
when `BeanUtils.copyProperties()` still sets the value by reflection

**Cherry-pick to `2.3.x`**
artembilan added a commit that referenced this issue May 21, 2021
Related to #1808

Turns out we also need to ignore a deprecated `setConsumerProperties`
when `BeanUtils.copyProperties()` still sets the value by reflection

**Cherry-pick to `2.3.x`**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants