Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package org.springframework.cloud.stream.binder.kafka;

import javax.validation.constraints.Min;

import org.springframework.cloud.stream.binder.ConsumerProperties;

/**
* @author Marius Bogoevici
*/
Expand All @@ -35,6 +39,7 @@ public void setMinPartitionCount(int minPartitionCount) {
this.minPartitionCount = minPartitionCount;
}

@Min(value = 1, message = "Min Partition Count should be greater than zero.")
public int getMinPartitionCount() {
return minPartitionCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.cloud.stream.binder.kafka;

import javax.validation.constraints.NotNull;

import org.springframework.integration.kafka.support.ProducerMetadata;

/**
Expand All @@ -41,6 +43,7 @@ public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

@NotNull
public ProducerMetadata.CompressionType getCompressionType() {
return compressionType;
}
Expand All @@ -57,6 +60,7 @@ public void setSync(boolean sync) {
this.sync = sync;
}

@NotNull
public KafkaMessageChannelBinder.Mode getMode() {
return mode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.cloud.stream.binder.rabbit;

import javax.validation.constraints.Min;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -73,6 +75,7 @@ public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode;
}

@Min(value = 1, message = "Max Concurrency should be greater than zero.")
public int getMaxConcurrency() {
return maxConcurrency;
}
Expand All @@ -81,6 +84,7 @@ public void setMaxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
}

@Min(value = 1, message = "Prefetch should be greater than zero.")
public int getPrefetch() {
return prefetch;
}
Expand All @@ -97,6 +101,7 @@ public void setRequestHeaderPatterns(String[] requestHeaderPatterns) {
this.requestHeaderPatterns = requestHeaderPatterns;
}

@Min(value = 1, message = "Tx Size should be greater than zero.")
public int getTxSize() {
return txSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.cloud.stream.binder.rabbit;

import javax.validation.constraints.Min;

import org.springframework.amqp.core.MessageDeliveryMode;

/**
Expand Down Expand Up @@ -102,6 +104,7 @@ public void setBatchingEnabled(boolean batchingEnabled) {
this.batchingEnabled = batchingEnabled;
}

@Min(value = 1, message = "Batch Size should be greater than zero.")
public int getBatchSize() {
return batchSize;
}
Expand All @@ -110,6 +113,7 @@ public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

@Min(value = 1, message = "Batch Buffer Limit should be greater than zero.")
public int getBatchBufferLimit() {
return batchBufferLimit;
}
Expand All @@ -118,6 +122,7 @@ public void setBatchBufferLimit(int batchBufferLimit) {
this.batchBufferLimit = batchBufferLimit;
}

@Min(value = 1, message = "Batch Timeout should be greater than zero.")
public int getBatchTimeout() {
return batchTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package org.springframework.cloud.stream.binder;

import javax.validation.constraints.Min;

/**
* Common consumer properties.
*
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
*/
public class ConsumerProperties {

Expand All @@ -39,6 +42,7 @@ public class ConsumerProperties {

private double backOffMultiplier = 2.0;

@Min(value = 1, message = "Concurrency should be greater than zero.")
public int getConcurrency() {
return concurrency;
}
Expand All @@ -55,6 +59,7 @@ public void setPartitioned(boolean partitioned) {
this.partitioned = partitioned;
}

@Min(value = 1, message = "Instance Count should be greater than zero.")
public int getInstanceCount() {
return instanceCount;
}
Expand All @@ -63,6 +68,7 @@ public void setInstanceCount(int instanceCount) {
this.instanceCount = instanceCount;
}

@Min(value = 0, message = "Instance Index should be greater than or equal to 0")
public int getInstanceIndex() {
return instanceIndex;
}
Expand All @@ -75,6 +81,7 @@ public void setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
}

@Min(value = 1, message = "Max attempts should be greater than zero.")
public int getMaxAttempts() {
return maxAttempts;
}
Expand All @@ -83,6 +90,7 @@ public void setBackOffInitialInterval(int backOffInitialInterval) {
this.backOffInitialInterval = backOffInitialInterval;
}

@Min(value = 1, message = "BackOff initial interval should be greater than zero.")
public int getBackOffInitialInterval() {
return backOffInitialInterval;
}
Expand All @@ -91,6 +99,7 @@ public void setBackOffMaxInterval(int backOffMaxInterval) {
this.backOffMaxInterval = backOffMaxInterval;
}

@Min(value = 1, message = "Backoff max interval should be greater than zero.")
public int getBackOffMaxInterval() {
return backOffMaxInterval;
}
Expand All @@ -99,6 +108,7 @@ public void setBackOffMultiplier(double backOffMultiplier) {
this.backOffMultiplier = backOffMultiplier;
}

@Min(value = 1, message = "Backoff multiplier should be greater than zero.")
public double getBackOffMultiplier() {
return backOffMultiplier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package org.springframework.cloud.stream.binder;

import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;

import org.springframework.expression.Expression;

/**
* Common producer properties.
*
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
*/
public class ProducerProperties {

Expand Down Expand Up @@ -73,6 +77,7 @@ public void setPartitionSelectorExpression(Expression partitionSelectorExpressio
this.partitionSelectorExpression = partitionSelectorExpression;
}

@Min(value = 1, message = "Partition Count should be greater than zero.")
public int getPartitionCount() {
return partitionCount;
}
Expand All @@ -89,4 +94,14 @@ public void setRequiredGroups(String... requiredGroups) {
this.requiredGroups = requiredGroups;
}

@AssertTrue(message = "PartitionKeyExpression and PartitionKeyExtractorClass properties are mutually exclusive.")
public boolean isValidPartitionKeyProperty() {
return (this.partitionKeyExpression == null) || (this.partitionKeyExtractorClass == null);
}

@AssertTrue(message = "PartitionSelectorClass and PartitionSelectorExpression properties are mutually exclusive.")
public boolean isValidPartitionSelectorProperty() {
return (this.partitionSelectorClass == null) || (this.partitionSelectorExpression == null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeanUtils;
import org.springframework.boot.bind.RelaxedDataBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.Binding;
Expand All @@ -38,6 +39,7 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.validation.beanvalidation.CustomValidatorBean;

/**
* Handles the operations related to channel binding including binding of input/output channels by delegating
Expand Down Expand Up @@ -83,6 +85,7 @@ public Collection<Binding<MessageChannel>> bindConsumer(MessageChannel inputChan
BeanUtils.copyProperties(consumerProperties, extendedConsumerProperties);
consumerProperties = extendedConsumerProperties;
}
ProducerConsumerPropertiesValidator.validate(consumerProperties);
for (String target : channelBindingTargets) {
Binding<MessageChannel> binding = binder.bindConsumer(target, channelBindingServiceProperties.getGroup(inputChannelName), inputChannel, consumerProperties);
bindings.add(binding);
Expand All @@ -104,6 +107,7 @@ public Binding<MessageChannel> bindProducer(MessageChannel outputChannel, String
BeanUtils.copyProperties(producerProperties, extendedProducerProperties);
producerProperties = extendedProducerProperties;
}
ProducerConsumerPropertiesValidator.validate(producerProperties);
Binding<MessageChannel> binding = binder.bindProducer(channelBindingTarget, outputChannel, producerProperties);
this.producerBindings.put(outputChannelName, binding);
return binding;
Expand Down Expand Up @@ -135,4 +139,23 @@ else if (log.isWarnEnabled()) {
String transport = this.channelBindingServiceProperties.getBinder(channelName);
return binderFactory.getBinder(transport);
}

private static class ProducerConsumerPropertiesValidator {

private static CustomValidatorBean validator;

static {
validator = new CustomValidatorBean();
validator.afterPropertiesSet();
}

static void validate(Object properties) {
RelaxedDataBinder dataBinder = new RelaxedDataBinder(properties);
dataBinder.setValidator(validator);
dataBinder.validate();
if (dataBinder.getBindingResult().hasErrors()) {
throw new IllegalStateException(dataBinder.getBindingResult().toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.util.Properties;
import java.util.TreeMap;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -38,6 +35,9 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;

/**
* @author Dave Syer
* @author Marius Bogoevici
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
Expand Down Expand Up @@ -105,7 +106,6 @@ public void testDefaultGroup() throws Exception {
public void testMultipleConsumerBindings() throws Exception {
ChannelBindingServiceProperties properties = new ChannelBindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();

BindingProperties props = new BindingProperties();
props.setDestination("foo,bar");
final String inputChannelName = "input";
Expand Down Expand Up @@ -241,4 +241,57 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
}

@Test
public void testProducerPropertiesValidation() {
ChannelBindingServiceProperties serviceProperties = new ChannelBindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
ProducerProperties producerProperties = new ProducerProperties();
producerProperties.setPartitionCount(0);
props.setDestination("foo");
props.setProducer(producerProperties);
final String outputChannelName = "output";
bindingProperties.put(outputChannelName, props);
serviceProperties.setBindings(bindingProperties);
DefaultBinderFactory<MessageChannel> binderFactory =
new DefaultBinderFactory<>(Collections.singletonMap("mock",
new BinderConfiguration(new BinderType("mock", new Class[]{MockBinderConfiguration.class}),
new Properties(), true)));
ChannelBindingService service = new ChannelBindingService(serviceProperties, binderFactory);
MessageChannel outputChannel = new DirectChannel();
try {
service.bindProducer(outputChannel, outputChannelName);
fail("Producer properties should be validated.");
}
catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("Partition Count should be greater than zero."));
}
}

@Test
public void testConsumerPropertiesValidation() {
ChannelBindingServiceProperties serviceProperties = new ChannelBindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setConcurrency(0);
props.setDestination("foo");
props.setConsumer(consumerProperties);
final String inputChannelName = "input";
bindingProperties.put(inputChannelName, props);
serviceProperties.setBindings(bindingProperties);
DefaultBinderFactory<MessageChannel> binderFactory =
new DefaultBinderFactory<>(Collections.singletonMap("mock",
new BinderConfiguration(new BinderType("mock", new Class[]{MockBinderConfiguration.class}),
new Properties(), true)));
ChannelBindingService service = new ChannelBindingService(serviceProperties, binderFactory);
MessageChannel inputChannel = new DirectChannel();
try {
service.bindConsumer(inputChannel, inputChannelName);
fail("Consumer properties should be validated.");
}
catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("Concurrency should be greater than zero."));
}
}
}