Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how can i set consumer's topic by code #132

Closed
dragontree101 opened this issue Jul 1, 2016 · 21 comments
Closed

how can i set consumer's topic by code #132

dragontree101 opened this issue Jul 1, 2016 · 21 comments

Comments

@dragontree101
Copy link

@dragontree101 dragontree101 commented Jul 1, 2016

i want to set consumer's topic by my runtime code, first i will get topic by others interface, and then my consume the topic's message

now i must by @KafkaListener to set topic

how can i solve this problem?

@stepio
Copy link
Contributor

@stepio stepio commented Jul 1, 2016

Hi @dragontree101,

Maybe next extract from the reference will be helpful for your case:
http://docs.spring.io/spring-kafka/docs/1.0.0.RC1/reference/htmlsingle/

    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
    containerProps.setMessageListener(new MessageListener<Integer, String>() {

        @Override
        public void onMessage(ConsumerRecord<Integer, String> message) {
            logger.info("received: " + message);
        }

    });
    container.setBeanName("testAuto");
    container.start();

Loading

@artembilan
Copy link
Member

@artembilan artembilan commented Jul 1, 2016

@dragontree101 , as you know annotation configuration are static and compile time linking.
So, you aren't able to create a @KafkaListener construction dynamically in your code according an appropriate logic.
Independently of that the KafkaMessageListenerContainer doesn't support dynamic topics/pattern/partitions assignment. And I'm afraid that it won't support. Any such an environment changes causes hard re-balance process on Kafka for existing consumers.

So, only solution is for you, like @stepio pointed already, is create and start a fresh KafkaMessageListenerContainer manually.

On the other hand, to be honest, your logic doesn't fit Kafka purpose. The real goal is to have a distributed, clustered application with many consumers on the same topic with grouping to reach "only one per message" approach. So, your dynamically created listener and therefore consumer somehow won't fit to the normal Kafka client behavior.

Loading

@stepio
Copy link
Contributor

@stepio stepio commented Jul 1, 2016

By the way, @dragontree101, if you just need to make the topic's name configurable, you're still able to use SpEL, specifying the exact name of topic(s) in a property file, e.g.:

@Component
public class FirstListener {
    @KafkaListener(containerFactory = "firstContainerFactory", topics = "${kafka.topic.first}")
    public void onMessage(FirstTO message) throws SomeSpecificException {
        ...
    }
    ...
}

@Component
public class SecondListener {
    @KafkaListener(containerFactory = "secondContainerFactory", topics = "${kafka.topic.second}")
    public void onMessage(SecondTO message) throws SomeSpecificException {
        ...
    }
    ...
}

And your application.properties should contain:

kafka.topic.first=first-type-messages
kafka.topic.second=second-type-messages

@artembilan, talking about best practises... Am I correct using different topics for different message types (with different serialisers/deserialisers)?

Loading

@artembilan
Copy link
Member

@artembilan artembilan commented Jul 1, 2016

@stepio ,

Am I correct using different topics for different message types (with different serialisers/deserialisers)?

Yes, looks like. With such a distribution you are fine to customize your downstream logic independently for each topic. Although you can do similar distinguishes even with the single topic but different partitions. And configure your listener containers respectively.

And yes, your SpEL advice is good, but somehow I feel that @dragontree101 would like to create annotation configuration at runtime. Although that might be possible anyway. It is called "microservice". So, you have a separate simple Spring Boot application and feed to it desired properties and start it from another application. I haven't tried but maybe you even can do that with the new SpringApplicationBuilder().child(MyListenerConfiguration.class). In other words: start a separate child application context with desired evnironment (Kafka topics).

Loading

@dragontree101
Copy link
Author

@dragontree101 dragontree101 commented Jul 1, 2016

@stepio @artembilan thanks very much,
@artembilan i have "microservice" A, and at the begin of "microservice" A i will to fetch kafka topic from another "microservice" B, and then use this topic to consumer message.

so i will try to new my KafkaMessageListenerContainer and set the topic.

Loading

@artembilan
Copy link
Member

@artembilan artembilan commented Jul 1, 2016

You are welcome, @dragontree101 !

And thank you for the company, @stepio 👍 !

Loading

@artembilan artembilan closed this Jul 1, 2016
@deitch
Copy link

@deitch deitch commented Feb 27, 2017

if you just need to make the topic's name configurable, you're still able to use SpEL, specifying the exact name of topic(s) in a property file

@stepio : spring boot usually will enable override of any property by env var. Is there any way to get it to respect that here as well? I find using ${property.name} works only if it is in the config file.

Loading

@artembilan
Copy link
Member

@artembilan artembilan commented Feb 27, 2017

@deitch ,

Spring Boot can do that only for @ConfigurationProperties. If it is just an arbitrary property like that kafka.topic.first, it can't be treated from the ENV var.

And that is already a question to Spring Boot and fully not related to Spring Kafka.

More over this topic has been closed already.

Loading

@deitch
Copy link

@deitch deitch commented Feb 27, 2017

Spring Boot can do that only for @ConfigurationProperties

Ah, is that how it works? I already have @ ConfigurationProperties set up to, so Is there any way to consume those in the @KafkaListener annotation?

Loading

@artembilan
Copy link
Member

@artembilan artembilan commented Feb 27, 2017

M-m-m.
Something like this:

@KafkaListener(topics = "#{myCustomKafkaProperties.topics}")

Loading

@deitch
Copy link

@deitch deitch commented Feb 27, 2017

Thanks @artembilan will try,. :-)

Loading

@deitch
Copy link

@deitch deitch commented Feb 27, 2017

That really helped. I had to do #{myCustomKafkaProperties.getTopics()} but it worked

Thank you.

Loading

@elarbi
Copy link

@elarbi elarbi commented Mar 31, 2017

On the other hand, to be honest, your logic doesn't fit Kafka purpose. The real goal is to have a distributed, clustered application with many consumers on the same topic with grouping to reach "only one per message" approach. So, your dynamically created listener and therefore consumer somehow won't fit to the normal Kafka client behavior.

@artembilan hi, I'm pretty new to Kafka, and I was wondering why we cannot use the subscibe dynamically ? Is the rebalance algorithm very "costly" ?
We have a use case where a running microservice need to listen to a newly created topic so we need to subscribe to this topic programmatically. It could be perfect (for us) that the container listener manages that for us :)

Loading

@artembilan
Copy link
Member

@artembilan artembilan commented Mar 31, 2017

@elarbi ,

Thank you for your curiosity!
Well, what I see there is just no such a functionality.

The KafkaMessageListenerContainer is based on the ContainerProperties. And that last one has topics or topicPattern as a final ctor property.

So, what the best you can is like create a new KafkaMessageListenerContainer.

The current implementation is just without the add/remove topics or partitions.

We might reconsider such a feature, but already in a new GH issue.

Thank you again!

Loading

@elarbi
Copy link

@elarbi elarbi commented Mar 31, 2017

Thanks for your reply. Actually this is what we have done, so I don't need to worry any more :)

Loading

@maxceban
Copy link

@maxceban maxceban commented Apr 4, 2017

@deitch how did you manage to get the myCustomKafkaProperties working? I constantly get SPEL expression errors. Could you post a code example?

Loading

@deitch
Copy link

@deitch deitch commented Apr 4, 2017

@maxceban yeah.

	@Autowired
	KafkaConsumerProperties kafkaConsumerProperties;

    @KafkaListener(topics = "#{kafkaConsumerProperties.getTopic()}")
    public void receiveMessage(BaseMessage message) throws JsonProcessingException {

and then in KafkaConsumerProperties.java:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "kafka.consumer")
public class KafkaConsumerProperties {

    private String bootstrap;
    private String group;
    private String topic;

    public String getBootstrap() {
        return bootstrap;
    }

    public void setBootstrap(String bootstrap) {
        this.bootstrap = bootstrap;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

And the rest is set by Spring...

Loading

@maxceban
Copy link

@maxceban maxceban commented Apr 5, 2017

thanks @deitch ! It was a silly mistake on my part, I forgot to add @Component to my Properties class

Loading

@deitch
Copy link

@deitch deitch commented Apr 5, 2017

It has been years since I did intensive Java work. You don't want to know how long it took me to figure that out... so I am glad it helps you!

Loading

@DanielJyc
Copy link

@DanielJyc DanielJyc commented May 31, 2018

Hi @stepio

Can this way provide multi-threaded consumption ?

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
 KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
 containerProps.setMessageListener(new MessageListener<Integer, String>() {

     @Override
     public void onMessage(ConsumerRecord<Integer, String> message) {
         logger.info("received: " + message);
     }

 });
 container.setBeanName("testAuto");
 container.start();

Just like this factory.setConcurrency(concurrency):

 ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setConcurrency(concurrency);  //Specify the container concurrency
 factory.getContainerProperties().setPollTimeout(1500);

Loading

@artembilan
Copy link
Member

@artembilan artembilan commented May 31, 2018

@DanielJyc ,

please, don't ask questions in the closed issues.
More over the questions should go to the community forums, like StackOverflow.

Anyway you need to use a ConcurrentMessageListenerContainer instead of single-threaded KafkaMessageListenerContainer. More info is here: https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/_reference.html#message-listener-container

Loading

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
7 participants