Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DefaultKafkaConsumerFactory Should have a way to set the configs after instantiation #1678

Closed
jbotuck opened this issue Jan 13, 2021 · 3 comments · Fixed by #1696
Closed
Assignees
Milestone

Comments

@jbotuck
Copy link

jbotuck commented Jan 13, 2021

As of spring boot 2.3 it is possible to use customizers to configure some of boot's autoconfigured kafka client beans

https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/autoconfigure/kafka/DefaultKafkaConsumerFactoryCustomizer.html

https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/autoconfigure/kafka/DefaultKafkaProducerFactoryCustomizer.html

https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.html

The problem is that the consumer customizer has limited utility since its currently impossible to modify the config in the consumer factory. Is it possible to add a way to modify the config? Similar to the updateConfigs method in the producer factory? https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/core/ProducerFactory.html#updateConfigs(java.util.Map)

@garyrussell
Copy link
Contributor

garyrussell commented Jan 13, 2021

I agree it should be made symmetric.

However, the consumer factory has an overloaded createConsumer method:

	/**
	 * Create a consumer with an explicit group id; in addition, the
	 * client id suffix is appended to the clientIdPrefix which overrides the
	 * {@code client.id} property, if present. In addition, consumer properties can
	 * be overridden if the factory implementation supports it.
	 * @param groupId the group id.
	 * @param clientIdPrefix the prefix.
	 * @param clientIdSuffix the suffix.
	 * @param properties the properties to override.
	 * @return the consumer.
	 * @since 2.2.4
	 */
	Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
			@Nullable String clientIdSuffix, @Nullable Properties properties);

It is used by the listener container.

With the listener container, you can set overrides there:

container.getContainerProperties().setKafkaConsumerProperties(...);
	/**
	 * Set the consumer properties that will be merged with the consumer properties
	 * provided by the consumer factory; properties here will supersede any with the same
	 * name(s) in the consumer factory.
	 * {@code group.id} and {@code client.id} are ignored.
	 * Property keys must be {@link String}s.
	 * @param kafkaConsumerProperties the properties.
	 * @see org.apache.kafka.clients.consumer.ConsumerConfig
	 * @see #setGroupId(String)
	 * @see #setClientId(String)
	 */
	public void setKafkaConsumerProperties(Properties kafkaConsumerProperties) {
           ...
	}

@garyrussell garyrussell added this to the Backlog milestone Jan 13, 2021
@jbotuck
Copy link
Author

jbotuck commented Jan 14, 2021

Cool, I didn't know about that other way of setting the properties. But Boot doesn't really provide a great way to customize the autoconfigured containerfactory bean.

My current workaround is to have my code create the ConsumerFactory Bean manually, instead of my preferred flow of letting boot create the factory and my code supplying the factory customizer

@garyrussell
Copy link
Contributor

garyrussell commented Jan 14, 2021

You can add a customizer bean to customize Boot's container factory:

@Component
class Customizer {

	public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
		factory.setContainerCustomizer(container -> {
			if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
				container.getContainerProperties().setIdleBetweenPolls(60_000);
			}
		});
	}

}

This is safe because the only code in the factory's afterPropertiesSet() is some validation of the error handler type.

@garyrussell garyrussell self-assigned this Jan 27, 2021
@garyrussell garyrussell modified the milestones: Backlog, 2.7.0-M2 Jan 27, 2021
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jan 27, 2021
Resolves spring-projects#1678

Although the consumer factory can override properties using an overloaded
`createConsumer` method, add update/remove methods for properties to be
consistent with the `ProducerFactory`.

Change maps to concurrent to avoid possible `ConcurrentModificationException`.
artembilan pushed a commit that referenced this issue Jan 27, 2021
Resolves #1678

Although the consumer factory can override properties using an overloaded
`createConsumer` method, add update/remove methods for properties to be
consistent with the `ProducerFactory`.

Change maps to concurrent to avoid possible `ConcurrentModificationException`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants