-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
I am using 2.1.3.RELEASE, I have created
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@KafkaListener
public @interface MyListener{
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "groupId")
String groupId() default "";
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] value() default {};
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
and using like:
@MyListener(id="My-Id", groupId = "group_id",value = "Kafka_Example")
public void consume(ConsumerRecord<?, Map<String, String>> message) {
try {
Thread.sleep(3000);
}catch(Exception e){
message.offset();
}
System.out.println("Consumed message: " + message.value());
}
But getting:
java.lang.IllegalArgumentException: An array of topicPartitions must be provided
at org.springframework.util.Assert.notEmpty(Assert.java:361) ~[spring-core-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.kafka.listener.config.ContainerProperties.(ContainerProperties.java:184) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.(AbstractMessageListenerContainer.java:133) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.(ConcurrentMessageListenerContainer.java:71) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.createContainerInstance(ConcurrentKafkaListenerContainerFactory.java:70) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.createContainerInstance(ConcurrentKafkaListenerContainerFactory.java:40) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:206) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:49) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry$$FastClassBySpringCGLIB$$fd6b2fcf.invoke() ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at com.techprimers.kafka.springbootkafkaconsumerexample.EnableKafkaOffsetResetAspect.wrapConsumerFactory(EnableKafkaOffsetResetAspect.java:36) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry$$EnhancerBySpringCGLIB$$d58089c2.registerListenerContainer() ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:138) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:132) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:243) ~[spring-kafka-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:776) ~[spring-beans-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:548) ~[spring-context-5.0.10.RELEASE.jar:5.0.10.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) [spring-boot-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:386) [spring-boot-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1242) [spring-boot-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1230) [spring-boot-2.0.6.RELEASE.jar:2.0.6.RELEASE]
at com.techprimers.kafka.springbootkafkaconsumerexample.SpringBootKafkaConsumerExampleApplication.main(SpringBootKafkaConsumerExampleApplication.java:11) [classes/:na]