diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandler.java index a7a1a82b13..a6942e8125 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,4 +48,12 @@ @Documented public @interface KafkaHandler { + /** + * When true, designate that this is the default fallback method if the payload type + * matches no other {@link KafkaHandler} method. Only one method can be so designated. + * @return true if this is the default method. + * @since 2.1.3 + */ + boolean isDefault() default false; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 9b57e464c3..4f11adcb8d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -333,12 +333,20 @@ private Set findListenerAnnotations(Method method) { private void processMultiMethodListeners(Collection classLevelListeners, List multiMethods, Object bean, String beanName) { List checkedMethods = new ArrayList(); + Method defaultMethod = null; for (Method method : multiMethods) { - checkedMethods.add(checkProxy(method, bean)); + Method checked = checkProxy(method, bean); + if (AnnotationUtils.findAnnotation(method, KafkaHandler.class).isDefault()) { + final Method toAssert = defaultMethod; + Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: " + + toAssert.toString() + " and " + method.toString()); + defaultMethod = checked; + } + checkedMethods.add(checked); } for (KafkaListener classLevelListener : classLevelListeners) { - MultiMethodKafkaListenerEndpoint endpoint = new MultiMethodKafkaListenerEndpoint(checkedMethods, - bean); + MultiMethodKafkaListenerEndpoint endpoint = + new MultiMethodKafkaListenerEndpoint(checkedMethods, defaultMethod, bean); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName); } @@ -680,7 +688,7 @@ private Collection getBeansOfType(Class type) { */ private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory { - private DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService(); + private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService(); private MessageHandlerMethodFactory messageHandlerMethodFactory; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java index bfd595cccb..8f42f87349 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2017 the original author or authors. + * Copyright 2016-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import org.springframework.kafka.listener.adapter.DelegatingInvocableHandler; import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; +import org.springframework.lang.Nullable; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; /** @@ -41,20 +42,44 @@ public class MultiMethodKafkaListenerEndpoint extends MethodKafkaListenerE private final List methods; + private final Method defaultMethod; + + /** + * Construct an instance for the provided methods and bean with no default method. + * @param methods the methods. + * @param bean the bean. + */ public MultiMethodKafkaListenerEndpoint(List methods, Object bean) { + this(methods, null, bean); + } + + /** + * Construct an instance for the provided methods, default method and bean. + * @param methods the methods. + * @param defaultMethod the default method. + * @param bean the bean. + * @since 2.1.3 + */ + public MultiMethodKafkaListenerEndpoint(List methods, @Nullable Method defaultMethod, Object bean) { this.methods = methods; + this.defaultMethod = defaultMethod; setBean(bean); } @Override protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) { List invocableHandlerMethods = new ArrayList(); + InvocableHandlerMethod defaultHandler = null; for (Method method : this.methods) { - invocableHandlerMethods.add(getMessageHandlerMethodFactory() - .createInvocableHandlerMethod(getBean(), method)); + InvocableHandlerMethod handler = getMessageHandlerMethodFactory() + .createInvocableHandlerMethod(getBean(), method); + invocableHandlerMethods.add(handler); + if (method.equals(this.defaultMethod)) { + defaultHandler = handler; + } } DelegatingInvocableHandler delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods, - getBean(), getResolver(), getBeanExpressionContext()); + defaultHandler, getBean(), getResolver(), getBeanExpressionContext()); return new HandlerAdapter(delegatingHandler); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index fa53bbcede..18ea4becff 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2017 the original author or authors. + * Copyright 2016-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.springframework.expression.common.TemplateParserContext; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.kafka.KafkaException; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -61,6 +62,8 @@ public class DelegatingInvocableHandler { private final ConcurrentMap, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>(); + private final InvocableHandlerMethod defaultHandler; + private final Map handlerSendTo = new HashMap<>(); private final Object bean; @@ -78,7 +81,23 @@ public class DelegatingInvocableHandler { */ public DelegatingInvocableHandler(List handlers, Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) { + this(handlers, null, bean, beanExpressionResolver, beanExpressionContext); + } + + /** + * Construct an instance with the supplied handlers for the bean. + * @param handlers the handlers. + * @param defaultHandler the default handler. + * @param bean the bean. + * @param beanExpressionResolver the resolver. + * @param beanExpressionContext the context. + * @since 2.1.3 + */ + public DelegatingInvocableHandler(List handlers, + @Nullable InvocableHandlerMethod defaultHandler, + Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) { this.handlers = new ArrayList<>(handlers); + this.defaultHandler = defaultHandler; this.bean = bean; this.resolver = beanExpressionResolver; this.beanExpressionContext = beanExpressionContext; @@ -174,13 +193,19 @@ protected InvocableHandlerMethod findHandlerForPayload(Class p for (InvocableHandlerMethod handler : this.handlers) { if (matchHandlerMethod(payloadClass, handler)) { if (result != null) { - throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " + - result.getMethod().getName() + " and " + handler.getMethod().getName()); + boolean resultIsDefault = result.equals(this.defaultHandler); + if (!handler.equals(this.defaultHandler) && !resultIsDefault) { + throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " + + result.getMethod().getName() + " and " + handler.getMethod().getName()); + } + if (!resultIsDefault) { + continue; // otherwise replace the result with the actual match + } } result = handler; } } - return result; + return result != null ? result : this.defaultHandler; } protected boolean matchHandlerMethod(Class payloadClass, InvocableHandlerMethod handler) { @@ -221,4 +246,8 @@ public String getMethodNameFor(Object payload) { return handlerForPayload == null ? "no match" : handlerForPayload.getMethod().toGenericString(); //NOSONAR } + public boolean hasDefaultHandler() { + return this.defaultHandler != null; + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index e21c725e00..c6565530b5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2016 the original author or authors. + * Copyright 2015-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,13 @@ public Object invoke(Message message, Object... providedArgs) throws Exceptio if (this.invokerHandlerMethod != null) { return this.invokerHandlerMethod.invoke(message, providedArgs); } + else if (this.delegatingHandler.hasDefaultHandler()) { + // Needed to avoid returning raw Message which matches Object + Object[] args = new Object[providedArgs.length + 1]; + args[0] = message.getPayload(); + System.arraycopy(providedArgs, 0, args, 1, providedArgs.length); + return this.delegatingHandler.invoke(message, args); + } else { return this.delegatingHandler.invoke(message, providedArgs); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 5eb4c37f32..2f3c744b89 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -83,6 +83,7 @@ import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper; +import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence; import org.springframework.kafka.support.converter.StringJsonMessageConverter; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -92,6 +93,7 @@ import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.retry.support.RetryTemplate; import org.springframework.test.annotation.DirtiesContext; @@ -124,7 +126,7 @@ public class EnableKafkaIntegrationTests { "annotated18", "annotated19", "annotated20", "annotated21", "annotated21reply", "annotated22", "annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply", "annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28", - "annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32"); + "annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33"); // @Rule // public Log4jLevelAdjuster adjuster = new Log4jLevelAdjuster(Level.TRACE, @@ -142,6 +144,9 @@ public class EnableKafkaIntegrationTests { @Autowired public MultiListenerBean multiListener; + @Autowired + public MultiJsonListenerBean multiJsonListener; + @Autowired public KafkaTemplate template; @@ -303,6 +308,21 @@ public void testMulti() throws Exception { assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue(); } + @Test + public void testMultiJson() throws Exception { + this.kafkaJsonTemplate.setDefaultTopic("annotated33"); + this.kafkaJsonTemplate.send(new GenericMessage<>(new Foo("one"))); + this.kafkaJsonTemplate.send(new GenericMessage<>(new Baz("two"))); + this.kafkaJsonTemplate.send(new GenericMessage<>(new Qux("three"))); + assertThat(this.multiJsonListener.latch1.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiJsonListener.latch2.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiJsonListener.latch3.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiJsonListener.foo.getBar()).isEqualTo("one"); + assertThat(this.multiJsonListener.baz.getBar()).isEqualTo("two"); + assertThat(this.multiJsonListener.bar.getBar()).isEqualTo("three"); + assertThat(this.multiJsonListener.bar).isInstanceOf(Qux.class); + } + @Test public void testTx() throws Exception { template.send("annotated9", 0, "foo"); @@ -312,8 +332,7 @@ public void testTx() throws Exception { @Test public void testJson() throws Exception { - Foo foo = new Foo(); - foo.setBar("bar"); + Foo foo = new Foo("bar"); kafkaJsonTemplate.send(MessageBuilder.withPayload(foo) .setHeader(KafkaHeaders.TOPIC, "annotated10") .setHeader(KafkaHeaders.PARTITION_ID, 0) @@ -339,8 +358,7 @@ public void testJsonHeaders() throws Exception { "messageConverter.typeMapper", DefaultJackson2JavaTypeMapper.class); typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); assertThat(container).isNotNull(); - Foo foo = new Foo(); - foo.setBar("bar"); + Foo foo = new Foo("bar"); this.kafkaJsonTemplate.send(MessageBuilder.withPayload(foo) .setHeader(KafkaHeaders.TOPIC, "annotated31") .setHeader(KafkaHeaders.PARTITION_ID, 0) @@ -611,7 +629,7 @@ public void testConverterBean() throws Exception { Converter converterDelegate = mock(Converter.class); fooConverter.setDelegate(converterDelegate); - Foo foo = new Foo(); + Foo foo = new Foo("foo"); willReturn(foo).given(converterDelegate).convert("{'bar':'foo'}"); template.send("annotated32", 0, 1, "{'bar':'foo'}"); assertThat(this.listener.latch20.await(10, TimeUnit.SECONDS)).isTrue(); @@ -690,6 +708,23 @@ public KafkaListenerContainerFactory kafkaJsonListenerContainerFactory() { return factory; } + /* + * Uses Type_Id header + */ + @Bean + public KafkaListenerContainerFactory kafkaJsonListenerContainerFactory2() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + StringJsonMessageConverter converter = new StringJsonMessageConverter(); + DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); + typeMapper.addTrustedPackages("*"); + typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID); + converter.setTypeMapper(typeMapper); + factory.setMessageConverter(converter); + return factory; + } + @Bean public KafkaListenerContainerFactory batchFactory() { ConcurrentKafkaListenerContainerFactory factory = @@ -843,6 +878,11 @@ public MultiListenerBean multiListener() { return new MultiListenerBean(); } + @Bean + public MultiJsonListenerBean multiJsonListener() { + return new MultiJsonListenerBean(); + } + @Bean public MultiListenerSendTo multiListenerSendTo() { return new MultiListenerSendTo(); @@ -1414,12 +1454,12 @@ static class MultiListenerBean { @KafkaHandler public void bar(@NonNull String bar) { - latch1.countDown(); + this.latch1.countDown(); } @KafkaHandler public void bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { - latch2.countDown(); + this.latch2.countDown(); } public void foo(String bar) { @@ -1427,6 +1467,41 @@ public void foo(String bar) { } + @KafkaListener(id = "multiJson", topics = "annotated33", containerFactory = "kafkaJsonListenerContainerFactory2") + static class MultiJsonListenerBean { + + private final CountDownLatch latch1 = new CountDownLatch(1); + + private final CountDownLatch latch2 = new CountDownLatch(1); + + private final CountDownLatch latch3 = new CountDownLatch(1); + + private Foo foo; + + private Baz baz; + + private Bar bar; + + @KafkaHandler + public void bar(Foo foo) { + this.foo = foo; + this.latch1.countDown(); + } + + @KafkaHandler + public void bar(Baz baz) { + this.baz = baz; + this.latch2.countDown(); + } + + @KafkaHandler(isDefault = true) + public void defaultHandler(Bar bar) { + this.bar = bar; + this.latch3.countDown(); + } + + } + @KafkaListener(id = "multiSendTo", topics = "annotated25") @SendTo("annotated25reply1") static class MultiListenerSendTo { @@ -1447,12 +1522,70 @@ public String bar(@Payload(required = false) KafkaNull nul, public interface Bar { + String getBar(); + } public static class Foo implements Bar { private String bar; + + public Foo() { + super(); + } + + public Foo(String bar) { + this.bar = bar; + } + + @Override + public String getBar() { + return this.bar; + } + + public void setBar(String bar) { + this.bar = bar; + } + + } + + public static class Baz implements Bar { + + private String bar; + + public Baz() { + super(); + } + + public Baz(String bar) { + this.bar = bar; + } + + @Override + public String getBar() { + return this.bar; + } + + public void setBar(String bar) { + this.bar = bar; + } + + } + + public static class Qux implements Bar { + + private String bar; + + public Qux() { + super(); + } + + public Qux(String bar) { + this.bar = bar; + } + + @Override public String getBar() { return this.bar; } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 4cd88499d1..8681956d6f 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -945,7 +945,7 @@ For the `ConcurrentMessageListenerContainer`, the `` part of the threa So, with a bean name of `container`, threads in this container will be named `container-0-C-1`, `container-1-C-1` etc., after the container is started the first time; `container-0-C-2`, `container-1-C-2` etc., after a stop/start. [[class-level-kafkalistener]] -===== @KafkaListener on a class +===== @KafkaListener on a Class When using `@KafkaListener` at the class-level, you specify `@KafkaHandler` at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. @@ -965,9 +965,19 @@ static class MultiListenerBean { ... } + @KafkaHandler(isDefault = true`) + public void listenDefault(Object object) { + ... + } + } ---- +Starting with _version 2.1.3_, a `@KafkaHandler` method can be designated as the default method which is invoked if there is no match on other methods. +At most one method can be so designated. +When using `@KafkaHandler` methods, the payload must have already been converted to the domain object (so the match can be performed). +Use a custom deserializer, the `JsonDeserializer` or the `StringJsonMessageConverter` with its `TypePrecedence` set to `TYPE_ID` - see <> for more information. + [[kafkalistener-lifecycle]] ===== @KafkaListener Lifecycle Management @@ -1437,6 +1447,7 @@ By default, the type for the conversion is inferred from the listener argument. If you configure the `StringJsonMessageConverter` with a `DefaultJackson2TypeMapper` that has its `TypePrecedence` set to `TYPE_ID` (instead of the default `INFERRED`), then the converter will use type information in headers (if present) instead. This allows, for example, listener methods to be declared with interfaces instead of concrete classes. Also, the type converter supports mapping so the deserialization can be to a different type than the source (as long as the data is compatible). +This is also useful when using <> where the payload must have already been converted, to determine which method to invoke. [source, java] ---- diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index cf5aa2d9fa..86542c5e03 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -32,6 +32,10 @@ By default, logging of topic offset commits is performed with the DEBUG logging Starting with _version 2.1.2_, there is a new property in `ContainerProperties` called `commitLogLevel` which allows you to specify the log level for these messages. See <> for more information. +==== Default @KafkaHandler + +Starting with _version 2.1.3_, one of the `@KafkaHandler` s on a class-level `@KafkaListener` can be designated as the default. +See <> for more information. ==== ReplyingKafkaTemplate