Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

spring.kafka.listener properties are not applied to Kafka.messageDrivenChannelAdapter #204

Closed
mrusinak opened this issue May 9, 2018 · 11 comments

Comments

@mrusinak
Copy link

mrusinak commented May 9, 2018

In version 3.0.1.RELEASE, listener properties (such as concurrency, poll-timeout/thresholds, etc) are not applied when using spring-integration/java-dsl to create Kafka consumers.

While it is possible to apply them manually as a workaround, (https://stackoverflow.com/questions/50235548/spring-kafka-concurrency-with-spring-integration), it gets complicated to do so for a couple reasons:

  1. The KafkaMessageDrivenChannelAdapterListenerContainerSpec does not have parity to all the options, so it winds up being required to use a Kafka.messageDrivenChannelAdapter variant that takes a ContainerProperties - none of which allow specifying the Topic separately
  2. There is no publicly usable method as far as I can tell to create a ContainerProperties out of KafkaProperties
  3. Copying ContainerProperties is buried in configuring a new instance of a container, requiring playing games by extending classes, or by the application copy/pasting the logic - neither option feels good

Ideally the listener properties should be applied correctly without the application needing to do anything, but otherwise having a way for an application to easily create ContainerProperties from KafkaProperties would be a huge help.

@artembilan
Copy link
Contributor

Unfortunately you are missing the fact that KafkaProperties properties is a part of Spring Boot project: https://projects.spring.io/spring-boot/. This one is a pure Spring Integration implementation for the Spring for Kafka project. That is a convention in Spring Boot to auto-configuration something for us based on the configuration properties. Here we deal with regular Spring Inversion of Control container: there is no any auto-configurations.

The idea about ContainerProperties extracting from the KafkaProperties does not make sense as well.
See it's ctors:

public ContainerProperties(String... topics) {
}

public ContainerProperties(Pattern topicPattern) {
}

public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
}

So, the topic <-> ContainerProperties is one-to-one relationship.

I know it might be a bit complicated to figure how to configure everything, but that's was the best we could come up for Spring-based code style.

@mrusinak
Copy link
Author

mrusinak commented May 9, 2018

I did completely miss that KakaProperties was from springboot, that's quite unfortunate.

In that case, would it be possible to add a variation of Kafka.messageDrivenChannelAdapter that can accept a ConcurrentKafkaListenerContainerFactory (in place of a generic ConsumerFactory) so that internally when it's building it's KafkaMessageListenerContainerSpec, it could copy the ContainerProperties and concurrency defined in that factory?

Then a springboot application could easily autowire in the ConcurrentKafkaListenerContainerFactory, and construct an integration flow via Kafka.messageDrivenChannelAdapter(factory, "some-topic") from the auto-configuration already done by springboot.

@garyrussell
Copy link
Contributor

garyrussell commented May 9, 2018

Unfortunately, ConcurrentKafkaListenerContainerFactory is specifically for building containers for @KafkaListener annotations together with KafkaListenerEndpoints. So it wouldn't be that simple; the factory would have to be changed to be more generic.

@mrusinak
Copy link
Author

mrusinak commented May 9, 2018

Ahh, I was thinking the code would just use the getConsumerFactory and getContainerProperties to essentially just continue on with the existing variants of Kafka.messageDrivenChannelAdapter (just after copying correctly). Is the consumerFactory supplied in that bean not appropriate to use for integration flows?

@garyrussell
Copy link
Contributor

garyrussell commented May 9, 2018

Interesting, yes, I guess that would work; it's a bit of an abuse, though, since it's not really a factory in that case, just a mechanism for passing properties around.

If we were to do this, I would prefer to pull that functionality (properties/consumer factory holder) to a super class (e.g. ContainerConfigurationHolder and have the current factory be a subclass of that and have the DSL accept a ContainerConfigurationHolder.

@mrusinak
Copy link
Author

mrusinak commented May 9, 2018

True. I was just looking at what springboot currently autoconfigures, to see if there anything existing that could be passed to spring-integration-kafka that wouldn't have a dependency on sprintboot itself. Moving the ConsumerFactory / ContainerProperty holder into a superclass/interface makes perfect sense to me.

Edit It also seems like being it's own interface/superclass would open up springboot being able to autoconfigure a bean on just @EnableIntegration being applied, and not require @EnableKafka

Also, if there is a better project to file a feature request I am also more than happy to move the request - it just seemed like spring-kafka & springboot work together correctly just fine, so I thought this would be the best choice to to start in.

@garyrussell
Copy link
Contributor

I am not comfortable with adding a DSL method that takes a ConcurrentKafkaListenerContainerFactory since it's not really used as a factory.

The refactoring would be in spring-kafka, and the DSL improvement would need to be here. I don't think we need another issue over there.

Give us some time to discuss internally and we'll get back to you in a day or so.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 11, 2018
Resolves spring-attic/spring-integration-kafka#204

With this in place, we could pass a container factory into the KMDCA and/or a DSL spec,
along with topic configuration.

This would enable boot properties to be used to configure the container (and any container
that is not used for a `@KafkaListener`.

With the integration DSL, something like

   IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(containerFactory, String... topics)
                   .handle(...)
                   ...

where the `ConsumerFactory` and `ContainerProperties` are provided by the factory.
@garyrussell
Copy link
Contributor

I think I've come up with a reasonable compromise - extend the container factory so it can create any container based on the boot properties, even for containers that aren't used for @KafkaListeners.

PoC here: garyrussell/spring-kafka@59b96ca

@artembilan WDYT?

@artembilan
Copy link
Contributor

Let's give it a shot, Gary!

Looks very promising. I guess we would need to fix JavaDocs on that factory and also say something in the Docs.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 11, 2018
See spring-attic/spring-integration-kafka#204

With this in place, we could pass a container factory into the KMDCA and/or a DSL spec,
along with topic configuration.

This would enable boot properties to be used to configure the container (and any container
that is not used for a `@KafkaListener`.

With the integration DSL, something like

   IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(containerFactory, String... topics)
                   .handle(...)
                   ...

where the `ConsumerFactory` and `ContainerProperties` are provided by the factory.
artembilan pushed a commit to spring-projects/spring-kafka that referenced this issue May 14, 2018
See spring-attic/spring-integration-kafka#204

With this in place, we could pass a container factory into the KMDCA and/or a DSL spec,
along with topic configuration.

This would enable boot properties to be used to configure the container (and any container
that is not used for a `@KafkaListener`.

With the integration DSL, something like

```
   IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(containerFactory, String... topics)
                   .handle(...)
```
where the `ConsumerFactory` and `ContainerProperties` are provided by the factory.

* Javadoc polishing.

* Polishing - pull methods up to abstract factory

* Polishing - add setAutoStartup to container interface
garyrussell added a commit to garyrussell/spring-integration-kafka that referenced this issue May 14, 2018
Fixes spring-attic#204

When an external container is provided to the DSL, register it as a bean
if it is not already a bean.
artembilan pushed a commit that referenced this issue May 15, 2018
Fixes #204

When an external container is provided to the DSL, register it as a bean
if it is not already a bean.

Polishing id from PR comments; add gateway support too.

* Polishing JavaDocs and omissions in the test-case

# Conflicts:
#	src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
See spring-attic/spring-integration-kafka#204

With this in place, we could pass a container factory into the KMDCA and/or a DSL spec,
along with topic configuration.

This would enable boot properties to be used to configure the container (and any container
that is not used for a `@KafkaListener`.

With the integration DSL, something like

```
   IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(containerFactory, String... topics)
                   .handle(...)
```
where the `ConsumerFactory` and `ContainerProperties` are provided by the factory.

* Javadoc polishing.

* Polishing - pull methods up to abstract factory

* Polishing - add setAutoStartup to container interface
garyrussell added a commit to garyrussell/spring-integration that referenced this issue Jun 24, 2020
Fixes spring-attic/spring-integration-kafka#204

When an external container is provided to the DSL, register it as a bean
if it is not already a bean.

Polishing id from PR comments; add gateway support too.

* Polishing JavaDocs and omissions in the test-case
artembilan pushed a commit to spring-projects/spring-integration that referenced this issue Jun 25, 2020
Fixes spring-attic/spring-integration-kafka#204

When an external container is provided to the DSL, register it as a bean
if it is not already a bean.

Polishing id from PR comments; add gateway support too.

* Polishing JavaDocs and omissions in the test-case
@rajeshsharmaacs
Copy link

rajeshsharmaacs commented Jul 7, 2020

// sorry for the trouble. Removed.

@garyrussell
Copy link
Contributor

Please don't piggy-back on 2 year old issues; this is clearly unrelated to the subject of this issue.

It's better to ask such questions on stack overflow, tagged with spring-integration and spring-kafka.

You need to show ALL the dependencies and the full stack trace.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Development

No branches or pull requests

4 participants