Skip to content

Commit

Permalink
GH-1675: Validation on @KafkaHandler Methods
Browse files Browse the repository at this point in the history
Resolves #1675

Add support for payload validation with `@KafkaHandler` methods.

* Fix javadoc.
# Conflicts:
#	src/reference/asciidoc/whats-new.adoc
  • Loading branch information
garyrussell authored and artembilan committed Jan 12, 2021
1 parent f2ecd40 commit bf5d38e
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 21 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -187,6 +187,10 @@ public void afterPropertiesSet() {
protected void registerAllEndpoints() {
synchronized (this.endpointDescriptors) {
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint
&& this.validator != null) {
((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
}
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.validation.Validator;

/**
* The {@link MethodKafkaListenerEndpoint} extension for several POJO methods
Expand All @@ -44,11 +45,16 @@ public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerE

private final Method defaultMethod;

private Validator validator;

/**
* Construct an instance for the provided methods and bean with no default method.
* @param methods the methods.
* @param bean the bean.
* @deprecated in favor of
* {@link #MultiMethodKafkaListenerEndpoint(List, Method, Object)}.
*/
@Deprecated
public MultiMethodKafkaListenerEndpoint(List<Method> methods, Object bean) {
this(methods, null, bean);
}
Expand All @@ -66,6 +72,15 @@ public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method d
setBean(bean);
}

/**
* Set a payload validator.
* @param validator the validator.
* @since 2.5.11
*/
public void setValidator(Validator validator) {
this.validator = validator;
}

@Override
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
Expand All @@ -79,7 +94,7 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte
}
}
DelegatingInvocableHandler delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods,
defaultHandler, getBean(), getResolver(), getBeanExpressionContext(), getBeanFactory());
defaultHandler, getBean(), getResolver(), getBeanExpressionContext(), getBeanFactory(), this.validator);
return new HandlerAdapter(delegatingHandler);
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,11 +39,15 @@
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.validation.Validator;


/**
Expand All @@ -63,6 +67,9 @@ public class DelegatingInvocableHandler {

private final ConcurrentMap<Class<?>, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>();

private final ConcurrentMap<InvocableHandlerMethod, MethodParameter> payloadMethodParameters =
new ConcurrentHashMap<>();

private final InvocableHandlerMethod defaultHandler;

private final Map<InvocableHandlerMethod, Expression> handlerSendTo = new HashMap<>();
Expand All @@ -77,17 +84,22 @@ public class DelegatingInvocableHandler {

private final ConfigurableListableBeanFactory beanFactory;

private final PayloadValidator validator;

/**
* Construct an instance with the supplied handlers for the bean.
* @param handlers the handlers.
* @param bean the bean.
* @param beanExpressionResolver the expression resolver.
* @param beanExpressionContext the expression context.
* @deprecated in favor of
* {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)}
*/
@Deprecated
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object bean,
BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {

this(handlers, null, bean, beanExpressionResolver, beanExpressionContext);
this(handlers, null, bean, beanExpressionResolver, beanExpressionContext, null, null);
}

/**
Expand All @@ -97,13 +109,16 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object
* @param bean the bean.
* @param beanExpressionResolver the resolver.
* @param beanExpressionContext the context.
* @deprecated in favor of
* {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)}
* @since 2.1.3
*/
@Deprecated
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
@Nullable InvocableHandlerMethod defaultHandler,
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {

this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, null);
this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, null, null);
}

/**
Expand All @@ -114,13 +129,35 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
* @param beanExpressionResolver the resolver.
* @param beanExpressionContext the context.
* @param beanFactory the bean factory.
* @since 2.1.11
* @deprecated in favor of
* {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)}
* @since 2.5.11
*/
@Deprecated
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
@Nullable InvocableHandlerMethod defaultHandler,
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext,
@Nullable BeanFactory beanFactory) {

this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, beanFactory, null);
}

/**
* Construct an instance with the supplied handlers for the bean.
* @param handlers the handlers.
* @param defaultHandler the default handler.
* @param bean the bean.
* @param beanExpressionResolver the resolver.
* @param beanExpressionContext the context.
* @param beanFactory the bean factory.
* @param validator the validator.
* @since 2.5.11
*/
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
@Nullable InvocableHandlerMethod defaultHandler,
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext,
@Nullable BeanFactory beanFactory, @Nullable Validator validator) {

this.handlers = new ArrayList<>();
for (InvocableHandlerMethod handler : handlers) {
this.handlers.add(wrapIfNecessary(handler));
Expand All @@ -132,6 +169,7 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory
? (ConfigurableListableBeanFactory) beanFactory
: null;
this.validator = validator == null ? null : new PayloadValidator(validator);
}

private InvocableHandlerMethod wrapIfNecessary(InvocableHandlerMethod handler) {
Expand Down Expand Up @@ -166,6 +204,12 @@ public Object getBean() {
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
Class<? extends Object> payloadClass = message.getPayload().getClass();
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
if (this.validator != null) {
MethodParameter parameter = this.payloadMethodParameters.get(handler);
if (parameter != null) {
this.validator.validate(message, parameter, message.getPayload());
}
}
Object result;
if (handler instanceof MetadataAwareInvocableHandlerMethod) {
Object[] args = new Object[providedArgs.length + 1];
Expand Down Expand Up @@ -279,23 +323,29 @@ protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, Invoc
if ((methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
if (this.validator != null) {
this.payloadMethodParameters.put(handler, methodParameter);
}
return true;
}
}

boolean foundCandidate = false;
MethodParameter foundCandidate = null;
for (int i = 0; i < parameterAnnotations.length; i++) {
MethodParameter methodParameter = new MethodParameter(method, i);
if ((methodParameter.getParameterAnnotations().length == 0
|| !methodParameter.hasParameterAnnotation(Header.class))
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
if (foundCandidate) {
if (foundCandidate != null) {
throw new KafkaException("Ambiguous payload parameter for " + method.toGenericString());
}
foundCandidate = true;
foundCandidate = methodParameter;
}
}
return foundCandidate;
if (foundCandidate != null && this.validator != null) {
this.payloadMethodParameters.put(handler, foundCandidate);
}
return foundCandidate != null;
}

/**
Expand Down Expand Up @@ -325,4 +375,32 @@ private static final class MetadataAwareInvocableHandlerMethod extends Invocable

}

private static final class PayloadValidator extends PayloadMethodArgumentResolver {

PayloadValidator(Validator validator) {
super(new MessageConverter() { // Required but never used

@Override
@Nullable
public Message<?> toMessage(Object payload, @Nullable
MessageHeaders headers) {
return null;
}

@Override
@Nullable
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return null;
}

}, validator);
}

@Override
public void validate(Message<?> message, MethodParameter parameter, Object target) { // NOSONAR - public
super.validate(message, parameter, target);
}

}

}

0 comments on commit bf5d38e

Please sign in to comment.