Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Listeners throws javax.management.InstanceAlreadyExistsException #398

Closed
collabintel opened this issue Aug 21, 2017 · 11 comments
Closed

Comments

@collabintel
Copy link

I get javax.management.InstanceAlreadyExistsException when I have multiple listeners. It works fine in 1.2.2.RELEASE, but throws this exception in 1.3.0.M1 version or 1.3.0.BUILD-SNAPSHOT. I think the problem is consumers don't get a generated id like 'consumer-1', 'consumer-2':

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=null-0
 	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_111]
 	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_111]
 	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_111]
 	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_111]
 	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_111]
 	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_111]
 	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.11.0.0.jar!/:na]
 	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:757) [kafka-clients-0.11.0.0.jar!/:na]
 	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:602) [kafka-clients-0.11.0.0.jar!/:na]
 	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:118) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:113) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:93) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:320) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:192) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:125) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) [spring-kafka-1.3.0.BUILD-SNAPSHOT.jar!/:na]
 	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) [spring-context-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
 	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) [spring-context-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
 	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) [spring-context-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
 	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) [spring-context-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
 	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) [spring-context-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
 	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) [spring-context-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
 	at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:144) [spring-boot-1.5.3.RELEASE.jar!/:1.5.3.RELEASE]
 	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:545) [spring-context-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
 	at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) [spring-boot-1.5.3.RELEASE.jar!/:1.5.3.RELEASE]
 	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) [spring-boot-1.5.3.RELEASE.jar!/:1.5.3.RELEASE]
 	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) [spring-boot-1.5.3.RELEASE.jar!/:1.5.3.RELEASE]
 	at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.3.RELEASE.jar!/:1.5.3.RELEASE]
 	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.3.RELEASE.jar!/:1.5.3.RELEASE]
 	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.3.RELEASE.jar!/:1.5.3.RELEASE]
 	at org.micro.commerce.product.ProductCommandApplication.main(ProductCommandApplication.java:10) [classes!/:na]
 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111]
 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111]
 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111]
 	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111]
 	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [app.jar:na]
 	at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [app.jar:na]
 	at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [app.jar:na]
 	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [app.jar:na]
@Component
public class ProductEventsConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProductEventsConsumer.class);

    private final CountDownLatch latch1 = new CountDownLatch(1);

    private final CountDownLatch latch2 = new CountDownLatch(1);

    private final CountDownLatch latch3 = new CountDownLatch(1);

    @KafkaListener(topics = "${kafka.topic.product-creation-requested}")
    public void receiveProductCreationRequestedEvent(ProductCreationRequestedEvent event) {
        LOGGER.info("isProductCreationRequestedEvent: " + (event instanceof ProductCreationRequestedEvent) + " received payload='{}'", event.getModel());
        latch1.countDown();
    }

    @KafkaListener(topics = "${kafka.topic.product-creation-validated}")
    public void receiveProductCreationValidatedEvent(ProductCreationValidatedEvent event) {
        LOGGER.info("isProductCreationValidatedEvent: " + (event instanceof ProductCreationValidatedEvent) + " received payload='{}'", event.getModel());
        latch2.countDown();
    }

    @KafkaListener(topics = "${kafka.topic.product-created}")
    public void receiveProductCreatedEvent(ProductCreatedEvent event) {
        LOGGER.info("isProductCreatedEvent: " + (event instanceof ProductCreatedEvent) + " received payload='{}'", event.getModel());
        latch3.countDown();
    }

}
@Configuration
@EnableKafka
public class EventsConsumerConfiguration {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "product-command-events");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setMessageConverter(new StringJsonMessageConverter());

        return factory;
    }


}
@garyrussell
Copy link
Contributor

Looks like a regression - I will create M2 today.

@garyrussell
Copy link
Contributor

We failed to backport this commit.

@garyrussell
Copy link
Contributor

This is fixed in 1.3.0.BUILD-SNAPSHOT; I will release M2 soon.

@garyrussell
Copy link
Contributor

1.3.0.M2 is available in the milestone repo.

@collabintel
Copy link
Author

I tested with 1.3.0.M2 version and it is fixed. Thanks

@thomasmey
Copy link

Hi,

with org.springframework.kafka:spring-kafka:jar:2.1.8.RELEASE:compile I see the same error:

stack trace is:

Repository.addMBean(DynamicMBean, ObjectName, Repository$RegistrationContext) line: 437
DefaultMBeanServerInterceptor.registerWithRepository(Object, DynamicMBean, ObjectName) line: 1898
DefaultMBeanServerInterceptor.registerDynamicMBean(String, DynamicMBean, ObjectName) line: 966
DefaultMBeanServerInterceptor.registerObject(String, Object, ObjectName) line: 900
DefaultMBeanServerInterceptor.registerMBean(Object, ObjectName) line: 324
JmxMBeanServer.registerMBean(Object, ObjectName) line: 522
AppInfoParser.registerAppInfo(String, String, Metrics) line: 62
KafkaConsumer<K,V>.(ConsumerConfig, Deserializer, Deserializer) line: 781
KafkaConsumer<K,V>.(Map<String,Object>, Deserializer, Deserializer) line: 608
DefaultKafkaConsumerFactory<K,V>.createKafkaConsumer(Map<String,Object>) line: 139
DefaultKafkaConsumerFactory<K,V>.createKafkaConsumer(String, String, String) line: 134
DefaultKafkaConsumerFactory<K,V>.createConsumer(String, String, String) line: 102
KafkaMessageListenerContainer$ListenerConsumer.(GenericMessageListener<?>, ListenerType) line: 425
KafkaMessageListenerContainer<K,V>.doStart() line: 259
KafkaMessageListenerContainer<K,V>(AbstractMessageListenerContainer<K,V>).start() line: 269
ConcurrentMessageListenerContainer<K,V>.doStart() line: 164
ConcurrentMessageListenerContainer<K,V>(AbstractMessageListenerContainer<K,V>).start() line: 269
KafkaListenerEndpointRegistry.startIfNecessary(MessageListenerContainer) line: 289
KafkaListenerEndpointRegistry.start() line: 238
DefaultLifecycleProcessor.doStart(Map<String,Lifecycle>, String, boolean) line: 181

I think part of the problem is that org.springframework.kafka.config.KafkaListenerEndpointRegistry.start() does create a separate container for each registered endpoint, i.e. @KafkaListener annotation, but the DefaultKafkaConsumerFactory uses the same clientId (resp. clientId prefx/suffix) for all containers created, resulting in AppInfoParser to register the same mbean id.

I'm not sure what a good solution for this problem would be, I ended up with this hack:
``
private static AtomicInteger containerCount = new AtomicInteger();

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(....) {
        @Override
        public Consumer<String, String> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
            String cid = clientIdPrefix + containerCount.getAndIncrement();
            return super.createConsumer(groupId, cid, clientIdSuffix);
        }
    };
}

``


@thomasmey
Copy link

@garyrussell do you see above comment for an already close issue?

@garyrussell
Copy link
Contributor

garyrussell commented Apr 15, 2021

This will only happen if you explicitly set the client id in the application properties/yml (otherwise the kafka clients generate the client id).

If you have multiple listeners created from the same factory, use the clientIdPrefix property on the annotation:

@SpringBootApplication
public class Kgh398Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh398Application.class, args);
	}

	@KafkaListener(id = "kgh3981", topics = "kgh398", clientIdPrefix = "${spring.kafka.consumer.client-id}-listen1")
	public void listen1(String in) {
		System.out.println(in);
	}

	@KafkaListener(id = "kgh3982", topics = "kgh398", clientIdPrefix = "${spring.kafka.consumer.client-id}-listen2")
	public void listen2(String in) {
		System.out.println(in);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("kgh398").partitions(1).replicas(1).build();
	}

}

2.1.x is no longer supported; 2.3.x is the oldest supported version.

@thomasmey
Copy link

Mhhh... we do not set clientId anywhere, the default seems to be consumer-0, etc.
But both "containers" use consumer-x and try to add an mbean with that name, which doesnt hurt because the exception is caught and the flow goes on, but at least ugly.

We need to update our components anyway, so I will try to update one component next week, and retest.

One question:
Is it mandatory to set id on @KafkaListener when having multiple listeners, or do I misunderstand here?

@garyrussell
Copy link
Contributor

garyrussell commented Apr 15, 2021

There are 4 places it can be specified.

Consumer factory
Container factory Container properties
Endpoint (populated from @KafkaListener).
Container properties (after container is created).

If it's not specified anywhere, it's generated by the kafka-clients with recent clients, it is constructed from the group id and either the group instance id, or an incrementing atomic integer. I think that with older clients, yes, it was just consumer-n; it looks like the change was made in 2.6.0.

String groupInstanceIdPart = groupInstanceId != null ? groupInstanceId : CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() + "";
String generatedClientId = String.format("consumer-%s-%s", groupId, groupInstanceIdPart);

If it is specified in the above 4 places, the last one is supposed to win, but with the #1770 bug, it is set to an empty string by the endpoint (if not set), overriding the container factory property.
If the result is an empty string, then we fall back to whatever is configured in the consumer factory.

So, if you explicitly set a client id (anywhere), and you have multiple listeners then, yes, it needs to be made unique for each listener, either by setting it on the @KafkaListener or with a container customizer added to the factory, as described on #1770.

factory.setContainerCustomizer(container -> {
	container.getContainerProperties().setClientId("client-for-"
			+ container.getContainerProperties().getTopics()[0]);
});

@thomasmey
Copy link

Many thanks for the explanation, very helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants