Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ subprojects { subproject ->
assertkVersion = '0.12'
googleJsr305Version = '3.0.2'
hamcrestVersion = '1.3'
hibernateValidationVersion = '6.0.12.Final'
jacksonVersion = '2.9.6'
jaywayJsonPathVersion = '2.4.0'
junit4Version = '4.12'
Expand Down Expand Up @@ -205,6 +206,7 @@ project ('spring-kafka') {
compile ("com.jayway.jsonpath:json-path:$jaywayJsonPathVersion", optional)

testCompile project (":spring-kafka-test")
testCompile "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,8 +49,8 @@
* The {@code KafkaListenerContainerFactory} is responsible to create the listener
* container for a particular endpoint. Typical implementations, as the
* {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
* ConcurrentKafkaListenerContainerFactory} used in the sample above, provides the necessary
* configuration options that are supported by the underlying
* ConcurrentKafkaListenerContainerFactory} used in the sample above, provides the
* necessary configuration options that are supported by the underlying
* {@link org.springframework.kafka.listener.MessageListenerContainer
* MessageListenerContainer}.
*
Expand Down Expand Up @@ -113,8 +113,7 @@
* Annotated methods can use a flexible signature; in particular, it is possible to use
* the {@link org.springframework.messaging.Message Message} abstraction and related
* annotations, see {@link KafkaListener} Javadoc for more details. For instance, the
* following would inject the content of the message and the kafka partition
* header:
* following would inject the content of the message and the kafka partition header:
*
* <pre class="code">
* &#064;KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
Expand Down Expand Up @@ -165,9 +164,10 @@
* {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry
* KafkaListenerEndpointRegistry} in case you need more control on the way the containers
* are created and managed. The example below also demonstrates how to customize the
* {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory} to use with a custom
* {@link org.springframework.validation.Validator Validator} so that payloads annotated
* with {@link org.springframework.validation.annotation.Validated Validated} are first
* {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory}
* as well as how to supply a custom {@link org.springframework.validation.Validator
* Validator} so that payloads annotated with
* {@link org.springframework.validation.annotation.Validated Validated} are first
* validated against a custom {@code Validator}.
*
* <pre class="code">
Expand All @@ -180,6 +180,7 @@
* public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
* registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
* registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
* registrar.setValidator(new MyValidator());
* }
*
* &#064;Bean
Expand All @@ -190,7 +191,7 @@
* &#064;Bean
* public MessageHandlerMethodFactory myMessageHandlerMethodFactory() {
* DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
* factory.setValidator(new MyValidator());
* // factory configuration
* return factory;
* }
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.validation.Validator;

/**
* Bean post-processor that registers methods annotated with {@link KafkaListener}
Expand Down Expand Up @@ -750,6 +751,10 @@ private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {

private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
if (validator != null) {
defaultFactory.setValidator(validator);
}
defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);

ConfigurableBeanFactory cbf =
Expand All @@ -768,7 +773,7 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
// Type-based argument resolution
final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {

@Override
protected boolean isEmptyPayload(Object payload) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,8 +22,10 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.util.Assert;
import org.springframework.validation.Validator;

/**
* Helper bean for registering {@link KafkaListenerEndpoint} with
Expand Down Expand Up @@ -52,6 +54,8 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial

private boolean startImmediately;

private Validator validator;

/**
* Set the {@link KafkaListenerEndpointRegistry} instance to use.
* @param endpointRegistry the {@link KafkaListenerEndpointRegistry} instance to use.
Expand Down Expand Up @@ -82,6 +86,8 @@ public KafkaListenerEndpointRegistry getEndpointRegistry() {
* @param kafkaHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
*/
public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory) {
Assert.isNull(this.validator,
"A validator cannot be provided with a custom message handler factory");
this.messageHandlerMethodFactory = kafkaHandlerMethodFactory;
}

Expand Down Expand Up @@ -126,6 +132,26 @@ public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}

/**
* Get the validator, if supplied.
* @return the validator.
* @since 2.2
*/
@Nullable
public Validator getValidator() {
return this.validator;
}

/**
* Set the validator to use if the default message handler factory is used.
* @param validator the validator.
* @since 2.2
*/
public void setValidator(Validator validator) {
Assert.isNull(this.messageHandlerMethodFactory,
"A validator cannot be provided with a custom message handler factory");
this.validator = validator;
}

@Override
public void afterPropertiesSet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.validation.Valid;
import javax.validation.ValidationException;
import javax.validation.constraints.Max;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand All @@ -57,6 +61,7 @@
import org.springframework.core.convert.converter.Converter;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
Expand Down Expand Up @@ -110,6 +115,8 @@
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;

/**
* @author Gary Russell
Expand All @@ -135,7 +142,7 @@ public class EnableKafkaIntegrationTests {
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
"annotated34");
"annotated34", "annotated35");

private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();

Expand Down Expand Up @@ -507,6 +514,13 @@ public void testListenerErrorHandler() throws Exception {
assertThat(this.listener.latch16.await(60, TimeUnit.SECONDS)).isTrue();
}

@Test
public void testValidation() throws Exception {
template.send("annotated35", 0, "{\"bar\":42}");
assertThat(this.listener.validationLatch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.validationException).isInstanceOf(ValidationException.class);
}

@Test
public void testReplyingListener() throws Exception {
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
Expand Down Expand Up @@ -686,7 +700,7 @@ public void testAutoConfigTm() {
@Configuration
@EnableKafka
@EnableTransactionManagement(proxyTargetClass = true)
public static class Config {
public static class Config implements KafkaListenerConfigurer {

private final CountDownLatch spyLatch = new CountDownLatch(2);

Expand Down Expand Up @@ -1042,6 +1056,15 @@ public KafkaListenerErrorHandler consumeException(Listener listener) {
};
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler(Listener listener) {
return (m, e) -> {
listener.validationException = (Exception) e.getCause();
listener.validationLatch.countDown();
return null;
};
}

@Bean
public KafkaListenerErrorHandler replyErrorHandler() {
return (m, e) -> ((String) m.getPayload()).toLowerCase();
Expand Down Expand Up @@ -1156,6 +1179,23 @@ public FooConverter fooConverter() {
return new FooConverter();
}

@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new Validator() {

@Override
public void validate(Object target, Errors errors) {
throw new ValidationException();
}

@Override
public boolean supports(Class<?> clazz) {
return ValidatedClass.class.isAssignableFrom(clazz);
}

});
}

}

@Component
Expand Down Expand Up @@ -1209,6 +1249,10 @@ static class Listener implements ConsumerSeekAware {

private final CountDownLatch latch21 = new CountDownLatch(1);

private final CountDownLatch validationLatch = new CountDownLatch(1);

private Exception validationException;

private final CountDownLatch eventLatch = new CountDownLatch(1);

private volatile Integer partition;
Expand Down Expand Up @@ -1491,6 +1535,12 @@ public void pollResults(ConsumerRecords<?, ?> records) {
this.latch21.countDown();
}

@KafkaListener(id = "validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
// NOSONAR
}

@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.seekCallBack.set(callback);
Expand Down Expand Up @@ -1725,4 +1775,20 @@ public Foo convert(String source) {
return delegate.convert(source);
}
}

public static class ValidatedClass {

@Max(10)
private int bar;

public int getBar() {
return this.bar;
}

public void setBar(int bar) {
this.bar = bar;
}

}

}
Loading