From bf5d38ec30aad02dd84cc54eb869d1aed0341156 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 12 Jan 2021 13:41:51 -0500 Subject: [PATCH] GH-1675: Validation on @KafkaHandler Methods Resolves https://github.com/spring-projects/spring-kafka/issues/1675 Add support for payload validation with `@KafkaHandler` methods. * Fix javadoc. # Conflicts: # src/reference/asciidoc/whats-new.adoc --- .../KafkaListenerEndpointRegistrar.java | 6 +- .../MultiMethodKafkaListenerEndpoint.java | 19 +++- .../adapter/DelegatingInvocableHandler.java | 94 +++++++++++++++++-- .../EnableKafkaIntegrationTests.java | 56 +++++++++-- src/reference/asciidoc/kafka.adoc | 4 + 5 files changed, 158 insertions(+), 21 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java index e4ace48960..d88af5212f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -187,6 +187,10 @@ public void afterPropertiesSet() { protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { + if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint + && this.validator != null) { + ((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator); + } this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java index e090657553..e027a20685 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.lang.Nullable; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; +import org.springframework.validation.Validator; /** * The {@link MethodKafkaListenerEndpoint} extension for several POJO methods @@ -44,11 +45,16 @@ public class MultiMethodKafkaListenerEndpoint extends MethodKafkaListenerE private final Method defaultMethod; + private Validator validator; + /** * Construct an instance for the provided methods and bean with no default method. * @param methods the methods. * @param bean the bean. + * @deprecated in favor of + * {@link #MultiMethodKafkaListenerEndpoint(List, Method, Object)}. */ + @Deprecated public MultiMethodKafkaListenerEndpoint(List methods, Object bean) { this(methods, null, bean); } @@ -66,6 +72,15 @@ public MultiMethodKafkaListenerEndpoint(List methods, @Nullable Method d setBean(bean); } + /** + * Set a payload validator. + * @param validator the validator. + * @since 2.5.11 + */ + public void setValidator(Validator validator) { + this.validator = validator; + } + @Override protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) { List invocableHandlerMethods = new ArrayList(); @@ -79,7 +94,7 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte } } DelegatingInvocableHandler delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods, - defaultHandler, getBean(), getResolver(), getBeanExpressionContext(), getBeanFactory()); + defaultHandler, getBean(), getResolver(), getBeanExpressionContext(), getBeanFactory(), this.validator); return new HandlerAdapter(delegatingHandler); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index 955839ad9c..bf87332a37 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,11 +39,15 @@ import org.springframework.kafka.support.KafkaUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.HandlerMethod; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.util.Assert; +import org.springframework.validation.Validator; /** @@ -63,6 +67,9 @@ public class DelegatingInvocableHandler { private final ConcurrentMap, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>(); + private final ConcurrentMap payloadMethodParameters = + new ConcurrentHashMap<>(); + private final InvocableHandlerMethod defaultHandler; private final Map handlerSendTo = new HashMap<>(); @@ -77,17 +84,22 @@ public class DelegatingInvocableHandler { private final ConfigurableListableBeanFactory beanFactory; + private final PayloadValidator validator; + /** * Construct an instance with the supplied handlers for the bean. * @param handlers the handlers. * @param bean the bean. * @param beanExpressionResolver the expression resolver. * @param beanExpressionContext the expression context. + * @deprecated in favor of + * {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)} */ + @Deprecated public DelegatingInvocableHandler(List handlers, Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) { - this(handlers, null, bean, beanExpressionResolver, beanExpressionContext); + this(handlers, null, bean, beanExpressionResolver, beanExpressionContext, null, null); } /** @@ -97,13 +109,16 @@ public DelegatingInvocableHandler(List handlers, Object * @param bean the bean. * @param beanExpressionResolver the resolver. * @param beanExpressionContext the context. + * @deprecated in favor of + * {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)} * @since 2.1.3 */ + @Deprecated public DelegatingInvocableHandler(List handlers, @Nullable InvocableHandlerMethod defaultHandler, Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) { - this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, null); + this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, null, null); } /** @@ -114,13 +129,35 @@ public DelegatingInvocableHandler(List handlers, * @param beanExpressionResolver the resolver. * @param beanExpressionContext the context. * @param beanFactory the bean factory. - * @since 2.1.11 + * @deprecated in favor of + * {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)} + * @since 2.5.11 */ + @Deprecated public DelegatingInvocableHandler(List handlers, @Nullable InvocableHandlerMethod defaultHandler, Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext, @Nullable BeanFactory beanFactory) { + this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, beanFactory, null); + } + + /** + * Construct an instance with the supplied handlers for the bean. + * @param handlers the handlers. + * @param defaultHandler the default handler. + * @param bean the bean. + * @param beanExpressionResolver the resolver. + * @param beanExpressionContext the context. + * @param beanFactory the bean factory. + * @param validator the validator. + * @since 2.5.11 + */ + public DelegatingInvocableHandler(List handlers, + @Nullable InvocableHandlerMethod defaultHandler, + Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext, + @Nullable BeanFactory beanFactory, @Nullable Validator validator) { + this.handlers = new ArrayList<>(); for (InvocableHandlerMethod handler : handlers) { this.handlers.add(wrapIfNecessary(handler)); @@ -132,6 +169,7 @@ public DelegatingInvocableHandler(List handlers, this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory ? (ConfigurableListableBeanFactory) beanFactory : null; + this.validator = validator == null ? null : new PayloadValidator(validator); } private InvocableHandlerMethod wrapIfNecessary(InvocableHandlerMethod handler) { @@ -166,6 +204,12 @@ public Object getBean() { public Object invoke(Message message, Object... providedArgs) throws Exception { //NOSONAR Class payloadClass = message.getPayload().getClass(); InvocableHandlerMethod handler = getHandlerForPayload(payloadClass); + if (this.validator != null) { + MethodParameter parameter = this.payloadMethodParameters.get(handler); + if (parameter != null) { + this.validator.validate(message, parameter, message.getPayload()); + } + } Object result; if (handler instanceof MetadataAwareInvocableHandlerMethod) { Object[] args = new Object[providedArgs.length + 1]; @@ -279,23 +323,29 @@ protected boolean matchHandlerMethod(Class payloadClass, Invoc if ((methodParameter.getParameterAnnotations().length == 0 || !methodParameter.hasParameterAnnotation(Header.class)) && methodParameter.getParameterType().isAssignableFrom(payloadClass)) { + if (this.validator != null) { + this.payloadMethodParameters.put(handler, methodParameter); + } return true; } } - boolean foundCandidate = false; + MethodParameter foundCandidate = null; for (int i = 0; i < parameterAnnotations.length; i++) { MethodParameter methodParameter = new MethodParameter(method, i); if ((methodParameter.getParameterAnnotations().length == 0 || !methodParameter.hasParameterAnnotation(Header.class)) && methodParameter.getParameterType().isAssignableFrom(payloadClass)) { - if (foundCandidate) { + if (foundCandidate != null) { throw new KafkaException("Ambiguous payload parameter for " + method.toGenericString()); } - foundCandidate = true; + foundCandidate = methodParameter; } } - return foundCandidate; + if (foundCandidate != null && this.validator != null) { + this.payloadMethodParameters.put(handler, foundCandidate); + } + return foundCandidate != null; } /** @@ -325,4 +375,32 @@ private static final class MetadataAwareInvocableHandlerMethod extends Invocable } + private static final class PayloadValidator extends PayloadMethodArgumentResolver { + + PayloadValidator(Validator validator) { + super(new MessageConverter() { // Required but never used + + @Override + @Nullable + public Message toMessage(Object payload, @Nullable + MessageHeaders headers) { + return null; + } + + @Override + @Nullable + public Object fromMessage(Message message, Class targetClass) { + return null; + } + + }, validator); + } + + @Override + public void validate(Message message, MethodParameter parameter, Object target) { // NOSONAR - public + super.validate(message, parameter, target); + } + + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index dfad8aae51..18100d850c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,6 @@ import java.util.stream.Collectors; import javax.validation.Valid; -import javax.validation.ValidationException; import javax.validation.constraints.Max; import org.aopalliance.intercept.MethodInterceptor; @@ -140,6 +139,7 @@ import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException; import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; @@ -437,13 +437,16 @@ public void testMultiJson() throws Exception { this.kafkaJsonTemplate.send(new GenericMessage<>(new Foo("one"))); this.kafkaJsonTemplate.send(new GenericMessage<>(new Baz("two"))); this.kafkaJsonTemplate.send(new GenericMessage<>(new Qux("three"))); + this.kafkaJsonTemplate.send(new GenericMessage<>(new ValidatedClass(5))); assertThat(this.multiJsonListener.latch1.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.multiJsonListener.latch2.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.multiJsonListener.latch3.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(this.multiJsonListener.latch4.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.multiJsonListener.foo.getBar()).isEqualTo("one"); assertThat(this.multiJsonListener.baz.getBar()).isEqualTo("two"); assertThat(this.multiJsonListener.bar.getBar()).isEqualTo("three"); assertThat(this.multiJsonListener.bar).isInstanceOf(Qux.class); + assertThat(this.multiJsonListener.validated.isValidated()).isTrue(); } @Test @@ -617,7 +620,7 @@ public void testListenerErrorHandler() throws Exception { public void testValidation() throws Exception { template.send("annotated35", 0, "{\"bar\":42}"); assertThat(this.listener.validationLatch.await(60, TimeUnit.SECONDS)).isTrue(); - assertThat(this.listener.validationException).isInstanceOf(ValidationException.class); + assertThat(this.listener.validationException).isInstanceOf(MethodArgumentNotValidException.class); } @Test @@ -1558,7 +1561,12 @@ public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { @Override public void validate(Object target, Errors errors) { - throw new ValidationException(); + if (target instanceof ValidatedClass && ((ValidatedClass) target).getBar() > 10) { + errors.reject("bar too large"); + } + else { + ((ValidatedClass) target).setValidated(true); + } } @Override @@ -2168,17 +2176,21 @@ public void foo(String bar) { @KafkaListener(id = "multiJson", topics = "annotated33", containerFactory = "kafkaJsonListenerContainerFactory2") static class MultiJsonListenerBean { - private final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch1 = new CountDownLatch(1); - private final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); - private final CountDownLatch latch3 = new CountDownLatch(1); + final CountDownLatch latch3 = new CountDownLatch(1); - private Foo foo; + final CountDownLatch latch4 = new CountDownLatch(1); - private Baz baz; + volatile Foo foo; + + volatile Baz baz; + + volatile Bar bar; - private Bar bar; + volatile ValidatedClass validated; @KafkaHandler public void bar(Foo foo) { @@ -2192,6 +2204,12 @@ public void bar(Baz baz) { this.latch2.countDown(); } + @KafkaHandler + public void bar(@Valid ValidatedClass val) { + this.validated = val; + this.latch4.countDown(); + } + @KafkaHandler(isDefault = true) public void defaultHandler(Bar bar) { this.bar = bar; @@ -2328,6 +2346,16 @@ public static class ValidatedClass { @Max(10) private int bar; + private volatile boolean validated; + + + public ValidatedClass() { + } + + public ValidatedClass(int bar) { + this.bar = bar; + } + public int getBar() { return this.bar; } @@ -2336,6 +2364,14 @@ public void setBar(int bar) { this.bar = bar; } + public boolean isValidated() { + return this.validated; + } + + public void setValidated(boolean validated) { + this.validated = validated; + } + } interface ProjectionSample { diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 924d5deb4e..75482a4e24 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1761,6 +1761,7 @@ public class Config implements KafkaListenerConfigurer { public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(new MyValidator()); } + } ---- ==== @@ -1824,6 +1825,9 @@ public KafkaListenerErrorHandler validationErrorHandler() { ---- ==== +Starting with version 2.5.11, validation now works on payloads for `@KafkaHandler` methods in a class-level listener. +See <>. + [[rebalance-listeners]] ===== Rebalancing Listeners