From b4e9ab19a6cd39ff99c891c72692adfb1658c2dd Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 28 Apr 2015 00:59:55 +0300 Subject: [PATCH] DSL-30: Add Reactive Streams support Fixes: https://github.com/spring-projects/spring-integration-java-dsl/issues/30 DSL-30: Add Reactive Streams support Fixes: https://github.com/spring-projects/spring-integration-java-dsl/issues/30 Fixes: https://github.com/spring-projects/spring-integration-java-dsl/issues/37 * Add Reactive Stream `Publisher` support * Upgrade to SI-4.2 * Upgrade to SIK-1.2.0 * Make compatible with IO-2.0 * Fix `vararg` compiler warning --- build.gradle | 26 +- .../dsl/IntegrationFlowDefinition.java | 78 ++++-- .../dsl/PublisherIntegrationFlow.java | 263 ++++++++++++++++++ .../dsl/StandardIntegrationFlow.java | 3 +- .../IntegrationFlowBeanPostProcessor.java | 8 + .../KafkaProducerMessageHandlerSpec.java | 132 +-------- .../integration/dsl/support/Transformers.java | 2 +- .../dsl/LambdaMessageProcessorTests.java | 1 + .../dsl/test/kafka/KafkaTests.java | 23 +- ...tPublisherIntegrationFlowVerification.java | 195 +++++++++++++ ...ePublisherIntegrationFlowVerification.java | 109 ++++++++ ...ePublisherIntegrationFlowVerification.java | 199 +++++++++++++ .../reactivestreams/ReactiveStreamsTests.java | 161 +++++++++++ .../integration/dsl/test/xml/XmlTests.java | 49 ++++ .../integration/dsl/test/xml/transformer.xslt | 19 ++ 15 files changed, 1106 insertions(+), 162 deletions(-) create mode 100644 src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java create mode 100644 src/test/java/org/springframework/integration/dsl/test/reactivestreams/AbstractPublisherIntegrationFlowVerification.java create mode 100644 src/test/java/org/springframework/integration/dsl/test/reactivestreams/PollablePublisherIntegrationFlowVerification.java create mode 100644 src/test/java/org/springframework/integration/dsl/test/reactivestreams/PublishSubscribePublisherIntegrationFlowVerification.java create mode 100644 src/test/java/org/springframework/integration/dsl/test/reactivestreams/ReactiveStreamsTests.java create mode 100644 src/test/resources/org/springframework/integration/dsl/test/xml/transformer.xslt diff --git a/build.gradle b/build.gradle index 75437852..118557d9 100644 --- a/build.gradle +++ b/build.gradle @@ -57,13 +57,16 @@ ext { jmsApiVersion = '1.1-rev-1' jrubyVersion = '1.7.19' jythonVersion = '2.5.3' - kafkaVersion = '0.8.1.1' + kafkaVersion = '0.8.2.1' mailVersion = '1.5.2' + reactiveStreamsVersion = '1.0.0' + reactorVersion = '2.0.4.RELEASE' scalaVersion = '2.10' - slf4jVersion = '1.7.11' - springIntegrationVersion = '4.1.3.RELEASE' - springIntegrationKafkaVersion = '1.1.1.RELEASE' - springBootVersion = '1.2.3.RELEASE' + slf4jVersion = '1.7.12' + springIntegrationVersion = '4.2.0.M2' + springIntegrationKafkaVersion = '1.2.0.RELEASE' + springBootVersion = '1.3.0.M1' + testNgVersion = '6.8.21' linkHomepage = 'https://github.com/spring-projects/spring-integration-java-dsl' linkCi = 'https://build.spring.io/browse/INTEXT-SIJD' @@ -82,7 +85,7 @@ configurations { } dependencies { - compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" + compile("org.springframework.integration:spring-integration-core:$springIntegrationVersion") ['spring-integration-amqp' , 'spring-integration-event' @@ -117,10 +120,13 @@ dependencies { exclude group: 'com.yammer.metrics', module: 'metrics-annotation' exclude group: 'org.apache.velocity', module: 'velocity' exclude group: 'log4j', module: 'log4j' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' } compile("javax.jms:jms-api:$jmsApiVersion", provided) compile("javax.mail:javax.mail-api:$mailVersion", provided) + compile("org.reactivestreams:reactive-streams:$reactiveStreamsVersion", optional) + testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion" testCompile("org.springframework.boot:spring-boot-starter:$springBootVersion") { exclude module: 'spring-boot-starter-logging' @@ -129,9 +135,15 @@ dependencies { testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:$embedMongoVersion" testCompile "org.apache.ftpserver:ftpserver-core:$ftpServerVersion" testCompile "org.apache.sshd:sshd-core:$apacheSshdVersion" - testCompile ("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test") { + testCompile("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test") { exclude group: 'log4j', module: 'log4j' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + testCompile("org.reactivestreams:reactive-streams-tck:$reactiveStreamsVersion") { + exclude group: 'org.reactivestreams', module: 'reactive-streams-examples' } + testCompile "io.projectreactor:reactor-stream:$reactorVersion" + testCompile "org.testng:testng:$testNgVersion" testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion" testRuntime "org.apache.activemq:activemq-broker:$activeMqVersion" diff --git a/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index 30df1568..d64f0681 100644 --- a/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -21,6 +21,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.reactivestreams.Publisher; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; @@ -33,6 +36,7 @@ import org.springframework.integration.channel.ChannelInterceptorAware; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.FixedSubscriberChannel; +import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean; import org.springframework.integration.core.GenericSelector; @@ -83,6 +87,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.PollableChannel; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -2442,6 +2447,37 @@ public B gateway(IntegrationFlow flow, Consumer endpointCon return gateway(requestChannel, endpointConfigurer); } + /** + * Represent an Integration Flow as a Reactive Streams {@link Publisher} bean. + * @param the {@code payload} type + * @return the Reactive Streams {@link Publisher} + */ + public Publisher> toReactivePublisher() { + return toReactivePublisher(Executors.newSingleThreadExecutor()); + } + + /** + * Represent an Integration Flow as a Reactive Streams {@link Publisher} bean. + * @param executor the managed {@link Executor} to be used for the background task to + * poll messages from the {@link PollableChannel}. + * Defaults to {@link Executors#newSingleThreadExecutor()}. + * @param the {@code payload} type + * @return the Reactive Streams {@link Publisher} + */ + @SuppressWarnings("unchecked") + public Publisher> toReactivePublisher(Executor executor) { + Assert.notNull(executor); + MessageChannel channelForPublisher = this.currentMessageChannel; + if (channelForPublisher == null) { + PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel(); + publishSubscribeChannel.setMinSubscribers(1); + channelForPublisher = publishSubscribeChannel; + channel(channelForPublisher); + } + get(); + return new PublisherIntegrationFlow(this.integrationComponents, channelForPublisher, executor); + } + private > B register(S endpointSpec, Consumer endpointConfigurer) { if (endpointConfigurer != null) { endpointConfigurer.accept(endpointSpec); @@ -2544,27 +2580,6 @@ protected final B _this() { return (B) this; } - private static boolean isLambda(Object o) { - Class aClass = o.getClass(); - return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass(); - } - - private static Object extractProxyTarget(Object target) { - if (!(target instanceof Advised)) { - return target; - } - Advised advised = (Advised) target; - if (advised.getTargetSource() == null) { - return null; - } - try { - return extractProxyTarget(advised.getTargetSource().getTarget()); - } - catch (Exception e) { - throw new BeanCreationException("Could not extract target", e); - } - } - protected StandardIntegrationFlow get() { if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) { throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel + @@ -2588,4 +2603,25 @@ else if (this.currentMessageChannel != null) { return new StandardIntegrationFlow(this.integrationComponents); } + private static boolean isLambda(Object o) { + Class aClass = o.getClass(); + return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass(); + } + + private static Object extractProxyTarget(Object target) { + if (!(target instanceof Advised)) { + return target; + } + Advised advised = (Advised) target; + if (advised.getTargetSource() == null) { + return null; + } + try { + return extractProxyTarget(advised.getTargetSource().getTarget()); + } + catch (Exception e) { + throw new BeanCreationException("Could not extract target", e); + } + } + } diff --git a/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java b/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java new file mode 100644 index 00000000..4938910e --- /dev/null +++ b/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java @@ -0,0 +1,263 @@ +/* + * Copyright 2015 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl; + +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import org.springframework.context.Lifecycle; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageDeliveryException; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.SubscribableChannel; + +/** + * @author Artem Bilan + * @since 1.1 + */ +class PublisherIntegrationFlow extends StandardIntegrationFlow implements Publisher>, Lifecycle { + + private static final Subscription NO_OP_SUBSCRIPTION = new Subscription() { + + @Override + public void request(long n) { + } + + @Override + public void cancel() { + } + + }; + + private final Queue>> subscribers = + new LinkedBlockingQueue>>(); + + private final MessageChannel messageChannel; + + private final Executor executor; + + private volatile boolean running; + + PublisherIntegrationFlow(Set integrationComponents, MessageChannel messageChannel, Executor executor) { + super(integrationComponents); + this.messageChannel = messageChannel; + this.executor = executor; + start(); + } + + @Override + @SuppressWarnings("unchecked") + public void subscribe(Subscriber> subscriber) { + if (!this.running) { + //Reactive Streams Specification: https://github.com/reactive-streams/reactive-streams-jvm#1.4 + subscriber.onSubscribe(NO_OP_SUBSCRIPTION); + subscriber.onError( + new IllegalStateException("The Publisher must be started ('Lifecycle.start()') " + + "before accepting subscription.")); + return; + } + + this.subscribers.add(subscriber); + if (this.messageChannel instanceof SubscribableChannel) { + subscriber.onSubscribe(new MessageHandlerSubscription((Subscriber>) subscriber)); + } + else if (this.messageChannel instanceof PollableChannel) { + subscriber.onSubscribe(new PollableSubscription((Subscriber>) subscriber)); + } + else { + //Reactive Streams Specification: https://github.com/reactive-streams/reactive-streams-jvm#1.4 + subscriber.onSubscribe(NO_OP_SUBSCRIPTION); + subscriber.onError( + new IllegalStateException("Unsupported MessageChannel type [" + + this.messageChannel + "]. Must be 'SubscribableChannel' or 'PollableChannel'.")); + } + } + + @Override + public void start() { + this.running = true; + } + + @Override + public void stop() { + shutdown(); + } + + @Override + public boolean isRunning() { + return this.running; + } + + public void shutdown() { + Subscriber> subscriber; + while ((subscriber = this.subscribers.poll()) != null) { + subscriber.onComplete(); + } + this.running = false; + } + + + private abstract class SubscriberSubscription implements Subscription { + + protected final Subscriber> subscriber; + + protected volatile boolean terminated; + + protected SubscriberSubscription(Subscriber> subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + //Reactive Streams Specification: https://github.com/reactive-streams/reactive-streams-jvm#3.9 + if (n <= 0l) { + subscriber.onError( + new IllegalArgumentException("Spec. Rule 3.9 - " + + "Cannot request a non strictly positive number: " + n)); + } + //Reactive Streams Specification: https://github.com/reactive-streams/reactive-streams-jvm#3.6 + else if (!this.terminated && isRunning()) { + onRequest(n); + } + } + + @Override + public void cancel() { + PublisherIntegrationFlow.this.subscribers.remove(this.subscriber); + this.terminated = true; + } + + protected abstract void onRequest(long n); + + } + + private class MessageHandlerSubscription extends SubscriberSubscription implements MessageHandler { + + private final Queue pendingRequests = new LinkedBlockingQueue(); + + private final AtomicReference currentRequest = new AtomicReference(); + + private final AtomicLong count = new AtomicLong(); + + private volatile boolean unbounded; + + private MessageHandlerSubscription(Subscriber> subscriber) { + super(subscriber); + } + + @Override + public void onRequest(long n) { + if (n == Long.MAX_VALUE) { + this.unbounded = true; + this.pendingRequests.clear(); + this.currentRequest.set(null); + this.count.set(0); + } + else if (!this.unbounded) { + if (this.currentRequest.get() != null) { + this.pendingRequests.offer(n); + } + else { + this.currentRequest.set(n); + this.count.set(0); + } + } + ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).subscribe(this); + } + + @Override + @SuppressWarnings("unchecked") + public void handleMessage(Message message) throws MessagingException { + if (this.terminated || !PublisherIntegrationFlow.this.isRunning()) { + ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).unsubscribe(this); + throw new MessageDeliveryException(message); + } + + if (this.unbounded) { + this.subscriber.onNext(message); + } + else { + if (this.currentRequest.get() == null || this.count.getAndIncrement() == this.currentRequest.get()) { + this.currentRequest.set(this.pendingRequests.poll()); + this.count.set(0); + if (this.currentRequest.get() == null) { + ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).unsubscribe(this); + throw new MessageDeliveryException(message); + } + } + this.subscriber.onNext(message); + } + } + + @Override + public void cancel() { + ((SubscribableChannel) PublisherIntegrationFlow.this.messageChannel).unsubscribe(this); + super.cancel(); + } + + } + + + private class PollableSubscription extends SubscriberSubscription { + + private PollableSubscription(Subscriber> subscriber) { + super(subscriber); + } + + @Override + public void onRequest(final long n) { + PublisherIntegrationFlow.this.executor.execute(new Runnable() { + + @Override + public void run() { + if (n == Long.MAX_VALUE) { + while (!terminated && isRunning()) { + Message receive = ((PollableChannel) messageChannel).receive(50); + if (receive != null) { + subscriber.onNext(receive); + } + } + } + else { + long i = 0; + while (!terminated && isRunning() && i < n) { + Message receive = ((PollableChannel) messageChannel).receive(50); + if (receive != null) { + subscriber.onNext(receive); + i++; + } + } + } + } + + }); + } + + } + +} diff --git a/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java b/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java index 04435a73..b12f1de8 100644 --- a/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java +++ b/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java @@ -16,6 +16,7 @@ package org.springframework.integration.dsl; +import java.util.LinkedHashSet; import java.util.Set; /** @@ -26,7 +27,7 @@ public class StandardIntegrationFlow implements IntegrationFlow { private final Set integrationComponents; StandardIntegrationFlow(Set integrationComponents) { - this.integrationComponents = integrationComponents; + this.integrationComponents = new LinkedHashSet(integrationComponents); } public Set getIntegrationComponents() { diff --git a/src/main/java/org/springframework/integration/dsl/config/IntegrationFlowBeanPostProcessor.java b/src/main/java/org/springframework/integration/dsl/config/IntegrationFlowBeanPostProcessor.java index fc7fb1f8..4ce7e531 100644 --- a/src/main/java/org/springframework/integration/dsl/config/IntegrationFlowBeanPostProcessor.java +++ b/src/main/java/org/springframework/integration/dsl/config/IntegrationFlowBeanPostProcessor.java @@ -28,12 +28,14 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.event.ApplicationEventMulticaster; import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.FixedSubscriberChannel; import org.springframework.integration.config.ConsumerEndpointFactoryBean; import org.springframework.integration.config.IntegrationConfigUtils; import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlowBuilder; @@ -205,8 +207,14 @@ private Object processIntegrationFlowImpl(IntegrationFlow flow, String beanName) return processStandardIntegrationFlow(flowBuilder.get(), beanName); } + @SuppressWarnings("deprecation") private void registerComponent(Object component, String beanName) { this.beanFactory.registerSingleton(beanName, component); + if (component instanceof org.springframework.integration.expression.IntegrationEvaluationContextAware) { + StandardEvaluationContext evaluationContext = IntegrationContextUtils.getEvaluationContext(this.beanFactory); + ((org.springframework.integration.expression.IntegrationEvaluationContextAware) component) + .setIntegrationEvaluationContext(evaluationContext); + } this.beanFactory.initializeBean(component, beanName); } diff --git a/src/main/java/org/springframework/integration/dsl/kafka/KafkaProducerMessageHandlerSpec.java b/src/main/java/org/springframework/integration/dsl/kafka/KafkaProducerMessageHandlerSpec.java index 80c79a86..3955b612 100644 --- a/src/main/java/org/springframework/integration/dsl/kafka/KafkaProducerMessageHandlerSpec.java +++ b/src/main/java/org/springframework/integration/dsl/kafka/KafkaProducerMessageHandlerSpec.java @@ -23,6 +23,7 @@ import java.util.Properties; import org.springframework.integration.dsl.core.ComponentsRegistration; +import org.springframework.integration.dsl.core.IntegrationComponentSpec; import org.springframework.integration.dsl.core.MessageHandlerSpec; import org.springframework.integration.dsl.support.Consumer; import org.springframework.integration.dsl.support.Function; @@ -35,7 +36,9 @@ import org.springframework.messaging.Message; import org.springframework.util.Assert; -import kafka.javaapi.producer.Producer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.Serializer; + import kafka.producer.Partitioner; import kafka.serializer.Encoder; @@ -54,8 +57,8 @@ public class KafkaProducerMessageHandlerSpec private final Properties producerProperties; - private final Map producerConfigurations = - new HashMap(); + private final Map> producerConfigurations = + new HashMap>(); KafkaProducerMessageHandlerSpec(Properties producerProperties) { this.producerProperties = producerProperties; @@ -124,27 +127,20 @@ public

KafkaProducerMessageHandlerSpec messageKey(Function, ?> me /** * Add Kafka Producer to this {@link KafkaProducerMessageHandler} * for the provided {@code topic} and {@code brokerList}. - * @param topic the Kafka topic to send messages. + * @param producerMetadata the {@link ProducerMetadata} - options for Kafka {@link Producer}. * @param brokerList the Kafka brokers ({@code metadata.broker.list}) * in the format {@code host1:port1,host2:port2}. - * @param producerMetadataSpecConsumer the Producer metadata Java 8 Lambda. * @return the spec. - * @see KafkaProducerMessageHandlerSpec.ProducerMetadataSpec */ - public KafkaProducerMessageHandlerSpec addProducer(String topic, String brokerList, - Consumer producerMetadataSpecConsumer) { - Assert.hasText(topic); + public KafkaProducerMessageHandlerSpec addProducer(ProducerMetadata producerMetadata, String brokerList) { + Assert.notNull(producerMetadata); Assert.hasText(brokerList); - Assert.notNull(producerMetadataSpecConsumer); - ProducerMetadataSpec spec = new ProducerMetadataSpec(new ProducerMetadata(topic)); - producerMetadataSpecConsumer.accept(spec); try { - ProducerMetadata producerMetadata = spec.producerMetadata; - producerMetadata.afterPropertiesSet(); ProducerFactoryBean producerFactoryBean = new ProducerFactoryBean(producerMetadata, brokerList, this.producerProperties); Producer producer = producerFactoryBean.getObject(); - this.producerConfigurations.put(topic, new ProducerConfiguration(producerMetadata, producer)); + this.producerConfigurations.put(producerMetadata.getTopic(), + new ProducerConfiguration(producerMetadata, producer)); } catch (Exception e) { throw new IllegalStateException(e); @@ -163,111 +159,5 @@ protected KafkaProducerMessageHandler doGet() { throw new UnsupportedOperationException(); } - /** - * A helper class in the Builder pattern style to delegate options to the - * {@link ProducerMetadata}. - */ - public static class ProducerMetadataSpec { - - private final ProducerMetadata producerMetadata; - - ProducerMetadataSpec(ProducerMetadata producerMetadata) { - this.producerMetadata = producerMetadata; - } - - /** - * Specify an {@link Encoder} for Kafka message body. - * Can be used as Java 8 Lambda. - * @param valueEncoder the value encoder. - * @param the expected value type. - * @return the spec. - */ - public ProducerMetadataSpec valueEncoder(Encoder valueEncoder) { - this.producerMetadata.setValueEncoder(valueEncoder); - return this; - } - - /** - * Specify an {@link Encoder} for Kafka message key. - * Can be used as Java 8 Lambda. - * @param keyEncoder the key encoder. - * @param the expected key type. - * @return the spec. - */ - public ProducerMetadataSpec keyEncoder(Encoder keyEncoder) { - this.producerMetadata.setKeyEncoder(keyEncoder); - return this; - } - - /** - * Specify a {@link Class} for the message key. - * @param keyClassType the type for key to encode. - * @return the spec. - */ - public ProducerMetadataSpec keyClassType(Class keyClassType) { - this.producerMetadata.setKeyClassType(keyClassType); - return this; - } - - /** - * Specify a {@link Class} for the message body. - * @param valueClassType the type for message body to encode. - * @return the spec. - */ - public ProducerMetadataSpec valueClassType(Class valueClassType) { - this.producerMetadata.setValueClassType(valueClassType); - return this; - } - - /** - * Specify a compression codec constant. - * Valid values are: - *

    - *
  • none - *
  • gzip - *
  • snappy - *
- * @param compressionCodec the compression codec constant. - * @return the spec. - */ - public ProducerMetadataSpec compressionCodec(String compressionCodec) { - this.producerMetadata.setCompressionCodec(compressionCodec); - return this; - } - - /** - * Specify a {@link Partitioner} reference. - * Can be used as Java 8 Lambda. - * @param partitioner the partitioner. - * @return spec. - * @see Partitioner - */ - public ProducerMetadataSpec partitioner(Partitioner partitioner) { - this.producerMetadata.setPartitioner(partitioner); - return this; - } - - /** - * Specify a {@code sync/async} ({@code producer.type}) Producer behaviour. - * @param async the {@code boolean} flag to indicate the Producer behaviour. Defaults to {@code false}. - * @return the spec. - */ - public ProducerMetadataSpec async(boolean async) { - this.producerMetadata.setAsync(async); - return this; - } - - /** - * Specify a number of message property ({@code batch.num.messages}) for {@code async} Producer behaviour. - * @param batchNumMessages the number of message to batch. - * @return the spec. - */ - public ProducerMetadataSpec batchNumMessages(int batchNumMessages) { - this.producerMetadata.setBatchNumMessages("" + batchNumMessages); - return this; - } - - } - } diff --git a/src/main/java/org/springframework/integration/dsl/support/Transformers.java b/src/main/java/org/springframework/integration/dsl/support/Transformers.java index 9ae50b0f..234b329c 100644 --- a/src/main/java/org/springframework/integration/dsl/support/Transformers.java +++ b/src/main/java/org/springframework/integration/dsl/support/Transformers.java @@ -325,7 +325,7 @@ public static XPathTransformer xpath(String xpathExpression, XPathEvaluationType return transformer; } - @SuppressWarnings("unchecked") + @SafeVarargs public static XsltPayloadTransformer xslt(Resource xsltTemplate, Tuple2... xslParameterMappings) { XsltPayloadTransformer transformer = new XsltPayloadTransformer(xsltTemplate); diff --git a/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java b/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java index e4a8e7c8..c70a5fc0 100644 --- a/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java +++ b/src/test/java/org/springframework/integration/dsl/LambdaMessageProcessorTests.java @@ -35,6 +35,7 @@ public class LambdaMessageProcessorTests { @Test + @SuppressWarnings("divzero") public void testException() { try { handle ((m, h) -> 1 / 0); diff --git a/src/test/java/org/springframework/integration/dsl/test/kafka/KafkaTests.java b/src/test/java/org/springframework/integration/dsl/test/kafka/KafkaTests.java index 0b2f7b63..31312a80 100644 --- a/src/test/java/org/springframework/integration/dsl/test/kafka/KafkaTests.java +++ b/src/test/java/org/springframework/integration/dsl/test/kafka/KafkaTests.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,9 +52,12 @@ import org.springframework.integration.kafka.core.DefaultConnectionFactory; import org.springframework.integration.kafka.core.ZookeeperConfiguration; import org.springframework.integration.kafka.listener.Acknowledgment; +import org.springframework.integration.kafka.listener.LongSerializerDecoder; import org.springframework.integration.kafka.listener.MetadataStoreOffsetManager; import org.springframework.integration.kafka.support.KafkaHeaders; +import org.springframework.integration.kafka.support.ProducerMetadata; import org.springframework.integration.kafka.support.ZookeeperConnect; +import org.springframework.integration.kafka.util.EncoderAdaptingSerializer; import org.springframework.integration.kafka.util.TopicUtils; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; @@ -69,6 +73,7 @@ import kafka.admin.AdminUtils; import kafka.api.OffsetRequest; +import kafka.serializer.Encoder; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.IntEncoder; @@ -154,8 +159,7 @@ public ZkClient zookeeperClient(EmbeddedZookeeper zookeeper) { @Bean @DependsOn("zookeeper") public KafkaServer kafkaServer() { - Properties brokerConfigProperties = TestUtils.createBrokerConfig(1, TestUtils.choosePort()); - brokerConfigProperties.put("controlled.shutdown.enable", "false"); + Properties brokerConfigProperties = TestUtils.createBrokerConfig(1, TestUtils.choosePort(), false); return TestUtils.createServer(new KafkaConfig(brokerConfigProperties), SystemTime$.MODULE$); } @@ -222,18 +226,15 @@ public IntegrationFlow sendToKafkaFlow(String serverAddress) { .handle(kafkaMessageHandler(serverAddress)); } + @SuppressWarnings("unchecked") private KafkaProducerMessageHandlerSpec kafkaMessageHandler(String serverAddress) { + Encoder intEncoder = new IntEncoder(null); return Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000")) .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)) - .addProducer(TEST_TOPIC, serverAddress, this::producer); - } - - private void producer(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) { - metadata.async(true) - .batchNumMessages(10) - .valueClassType(String.class) - .valueEncoder(String::getBytes) - .keyEncoder(new IntEncoder(null)); + .addProducer(new ProducerMetadata<>(TEST_TOPIC, Integer.class, String.class, + new EncoderAdaptingSerializer<>((Encoder) intEncoder), new + StringSerializer()), + serverAddress); } @Bean diff --git a/src/test/java/org/springframework/integration/dsl/test/reactivestreams/AbstractPublisherIntegrationFlowVerification.java b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/AbstractPublisherIntegrationFlowVerification.java new file mode 100644 index 00000000..01f18227 --- /dev/null +++ b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/AbstractPublisherIntegrationFlowVerification.java @@ -0,0 +1,195 @@ +/* + * Copyright 2015 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.test.reactivestreams; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.lang.reflect.Method; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.mockito.stubbing.Answer; +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.Lifecycle; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.mock.env.MockEnvironment; + +/** + * @author Artem Bilan + * @since 1.1 + */ +public abstract class AbstractPublisherIntegrationFlowVerification extends PublisherVerification> { + + private static final String ELEMENTS = "publisher.elements"; + + private static final String FAILED_PUBLISHER = "failed.publisher"; + + private static final String REQUEST_COMPLETE = "request.complete"; + + private static final String TEST_METHOD = "test.method"; + + protected final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + + protected volatile AnnotationConfigApplicationContext applicationContext; + + protected volatile TestEnvironment originalEnvironment; + + protected boolean completionSignalRequired; + + protected String testName; + + public AbstractPublisherIntegrationFlowVerification() { + super(new TestEnvironment(2000), 1000); + this.originalEnvironment = TestUtils.getPropertyValue(this, "env", TestEnvironment.class); + } + + @BeforeMethod + @SuppressWarnings("unchecked") + public void setUp(Method method) throws Exception { + this.setUp(); + this.testName = method.getName(); + DirectFieldAccessor dfa = new DirectFieldAccessor(this); + dfa.setPropertyValue("env", this.originalEnvironment); + this.completionSignalRequired = false; + + if (!testName.equals("required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber")) { + if (testName.equals("stochastic_spec103_mustSignalOnMethodsSequentially")) { + this.completionSignalRequired = true; + } + else { + TestEnvironment env = new TestEnvironment(2000) { + + @Override + public ManualSubscriber newManualSubscriber(Publisher pub, long timeoutMillis) + throws InterruptedException { + ManualSubscriber subscriber = spy(super.newManualSubscriber(pub, timeoutMillis)); + Answer onCompleteAnswer = invocation -> { + applicationContext.getBean("publisher", Lifecycle.class).stop(); + return invocation.callRealMethod(); + }; + doAnswer(onCompleteAnswer).when(subscriber).expectCompletion(anyLong(), anyString()); + doAnswer(onCompleteAnswer).when(subscriber).requestEndOfStream(anyLong(), anyString()); + + doAnswer(invocation -> { + if (applicationContext.containsBean("input")) { + MessageChannel input = applicationContext.getBean("input", MessageChannel.class); + final Long n = (Long) invocation.getArguments()[0]; + scheduledExecutor.schedule(() -> { + for (int i = 0; i < n; i++) { + input.send(new GenericMessage<>(Math.random())); + } + }, + 100, TimeUnit.MILLISECONDS); + + } + return invocation.callRealMethod(); + }).when(subscriber).request(anyLong()); + + return subscriber; + } + + }; + dfa.setPropertyValue("env", env); + } + } + } + + @AfterMethod + public void teardown() { + if (this.applicationContext != null) { + this.applicationContext.close(); + } + } + + @Override + public Publisher> createPublisher(long elements) { + MockEnvironment environment = new MockEnvironment() + .withProperty(ELEMENTS, "" + elements) + .withProperty(TEST_METHOD, this.testName) + .withProperty(REQUEST_COMPLETE, "" + this.completionSignalRequired); + return doCreatePublisher(environment); + } + + @Override + public Publisher> createFailedPublisher() { + MockEnvironment environment = new MockEnvironment() + .withProperty(FAILED_PUBLISHER, "true"); + return doCreatePublisher(environment); + } + + @SuppressWarnings("unchecked") + private Publisher> doCreatePublisher(ConfigurableEnvironment environment) { + AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(); + applicationContext.register(getConfigClass()); + applicationContext.setEnvironment(environment); + applicationContext.refresh(); + this.applicationContext = applicationContext; + return this.applicationContext.getBean(Publisher.class); + } + + + protected abstract Class getConfigClass(); + + + public static class PublisherConfiguration { + + protected final AtomicBoolean invoked = new AtomicBoolean(); + + @Value("${" + ELEMENTS + ":" + Long.MAX_VALUE + "}") + protected long elements; + + @Value("${" + FAILED_PUBLISHER + ":false}") + protected boolean failedPublisher; + + @Value("${" + REQUEST_COMPLETE + ":false}") + protected boolean completionSignalRequired; + + @Value("${" + TEST_METHOD + ":}") + protected String testMethod; + + @Autowired + protected ConfigurableBeanFactory beanFactory; + + @Bean + public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() { + return new PropertySourcesPlaceholderConfigurer(); + } + + } + +} diff --git a/src/test/java/org/springframework/integration/dsl/test/reactivestreams/PollablePublisherIntegrationFlowVerification.java b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/PollablePublisherIntegrationFlowVerification.java new file mode 100644 index 00000000..13fecaf9 --- /dev/null +++ b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/PollablePublisherIntegrationFlowVerification.java @@ -0,0 +1,109 @@ +/* + * Copyright 2015 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.test.reactivestreams; + +import static org.mockito.Mockito.mock; + +import java.util.Date; + +import org.reactivestreams.Publisher; +import org.testng.SkipException; + +import org.springframework.context.Lifecycle; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.ChannelInterceptorAdapter; +import org.springframework.messaging.support.GenericMessage; + +/** + * @author Artem Bilan + * @since 1.1 + */ +@org.testng.annotations.Test +public class PollablePublisherIntegrationFlowVerification extends AbstractPublisherIntegrationFlowVerification { + + @Override + public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() throws Throwable { + throw new SkipException("The Spring Integration Publisher supports " + + "'TheSameElementsInTheSameSequenceToAllOfItsSubscribers' only in case of 'PublishSubscribeChannel' " + + "and unbounded (Long.MAX_VALUE) request(n)."); + } + + @Override + public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() throws Throwable { + optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); + } + + @Override + public void optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() throws Throwable { + optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne(); + } + + @Override + protected Class getConfigClass() { + return PollablePublisherConfiguration.class; + } + + @Configuration + @EnableIntegration + public static class PollablePublisherConfiguration extends PublisherConfiguration { + + @Bean + public MessageChannel reactiveChannel() { + if (this.failedPublisher) { + return mock(MessageChannel.class); + } + else { + QueueChannel queueChannel = new QueueChannel(); + if (this.completionSignalRequired) { + queueChannel.addInterceptor(new ChannelInterceptorAdapter() { + + private long count; + + @Override + public void afterReceiveCompletion(Message message, MessageChannel channel, Exception ex) { + super.afterReceiveCompletion(message, channel, ex); + if (count++ == elements) { + beanFactory.getBean("publisher", Lifecycle.class).stop(); + } + } + + }); + } + return queueChannel; + } + } + + @Bean + public Publisher> publisher() { + return IntegrationFlows + .from(() -> new GenericMessage<>(Math.random()), + e -> e.poller(p -> + p.trigger(ctx -> this.invoked.getAndSet(true) ? null : new Date()) + .maxMessagesPerPoll(this.elements))) + .channel(reactiveChannel()) + .toReactivePublisher(); + } + + } + +} diff --git a/src/test/java/org/springframework/integration/dsl/test/reactivestreams/PublishSubscribePublisherIntegrationFlowVerification.java b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/PublishSubscribePublisherIntegrationFlowVerification.java new file mode 100644 index 00000000..83b8bf07 --- /dev/null +++ b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/PublishSubscribePublisherIntegrationFlowVerification.java @@ -0,0 +1,199 @@ +/* + * Copyright 2015 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.test.reactivestreams; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.testng.annotations.BeforeMethod; + +import org.springframework.context.Lifecycle; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.ChannelInterceptorAdapter; +import org.springframework.messaging.support.GenericMessage; + +/** + * @author Artem Bilan + * @since 1.1 + */ +@org.testng.annotations.Test +public class PublishSubscribePublisherIntegrationFlowVerification extends AbstractPublisherIntegrationFlowVerification { + + + @BeforeMethod + @Override + public void setUp(Method method) throws Exception { + super.setUp(method); + if (this.testName.equals("required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber") || + this.testName.equals("required_spec303_mustNotAllowUnboundedRecursion")) { + if (this.testName.equals("required_spec303_mustNotAllowUnboundedRecursion")) { + this.completionSignalRequired = true; + } + this.scheduledExecutor.schedule(() -> { + MessageChannel input = applicationContext.getBean("input", MessageChannel.class); + for (int i = 0; i < 10; i++) { + input.send(new GenericMessage<>("foo")); + } + }, + 500, TimeUnit.MILLISECONDS); + } + } + + @Override + protected Class getConfigClass() { + return PublishSubscribePublisherConfiguration.class; + } + + @Configuration + @EnableIntegration + public static class PublishSubscribePublisherConfiguration extends PublisherConfiguration { + + @Bean + public MessageChannel reactiveChannel() { + if (this.failedPublisher) { + return mock(MessageChannel.class); + } + else { + PublishSubscribeChannel channel = new PublishSubscribeChannel(); + + + channel.addInterceptor(new ChannelInterceptorAdapter() { + + private final AtomicLong count = new AtomicLong(); + + @Override + public Message preSend(Message message, MessageChannel channel) { + if (this.count.get() < elements) { + return super.preSend(message, channel); + } + else { + return null; + } + } + + @Override + public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, + Exception ex) { + super.afterSendCompletion(message, channel, sent, ex); + if (this.count.incrementAndGet() == elements) { + if (completionSignalRequired) { + beanFactory.getBean("publisher", Lifecycle.class).stop(); + } + } + } + + }); + + return channel; + } + } + + @Bean + @SuppressWarnings("unchecked") + public Publisher> publisher() { + Publisher> publisher = getPublisher(); + if (this.testMethod.equals("stochastic_spec103_mustSignalOnMethodsSequentially")) { + publisher = spy(publisher); + doAnswer(invocation -> { + Subscriber subscriber = (Subscriber) invocation.getArguments()[0]; + invocation.getArguments()[0] = new WrappedSubscriber(subscriber); + return invocation.callRealMethod(); + }).when(publisher).subscribe(any(Subscriber.class)); + } + return publisher; + } + + private Publisher> getPublisher() { + return IntegrationFlows + .from("input") + .channel(reactiveChannel()) + .toReactivePublisher(); + } + + private class WrappedSubscriber implements Subscriber { + + private final Subscriber delegate; + + private WrappedSubscriber(Subscriber delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(Subscription s) { + this.delegate.onSubscribe(new WrappedSubscription(s)); + } + + @Override + public void onNext(Object o) { + this.delegate.onNext(o); + } + + @Override + public void onError(Throwable t) { + this.delegate.onError(t); + } + + @Override + public void onComplete() { + this.delegate.onComplete(); + } + + } + + private class WrappedSubscription implements Subscription { + + private final Subscription delegate; + + private final MessageChannel input = beanFactory.getBean("input", MessageChannel.class); + + private WrappedSubscription(Subscription delegate) { + this.delegate = delegate; + } + + @Override + public void request(long n) { + delegate.request(n); + for (int i = 0; i < n; i++) { + this.input.send(new GenericMessage<>(Math.random())); + } + } + + @Override + public void cancel() { + delegate.cancel(); + } + + } + + } + +} diff --git a/src/test/java/org/springframework/integration/dsl/test/reactivestreams/ReactiveStreamsTests.java b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/ReactiveStreamsTests.java new file mode 100644 index 00000000..dad1a672 --- /dev/null +++ b/src/test/java/org/springframework/integration/dsl/test/reactivestreams/ReactiveStreamsTests.java @@ -0,0 +1,161 @@ +/* + * Copyright 2015 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.test.reactivestreams; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.reactivestreams.Publisher; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.Lifecycle; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.Channels; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import reactor.rx.Streams; + + +/** + * @author Artem Bilan + * @since 1.1 + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +@DirtiesContext +public class ReactiveStreamsTests { + + @Autowired + @Qualifier("reactiveFlow") + private Publisher> publisher; + + @Autowired + @Qualifier("pollableReactiveFlow") + private Publisher> pollablePublisher; + + @Autowired + @Qualifier("reactiveSteamsMessageSource") + private Lifecycle messageSource; + + @Autowired + @Qualifier("inputChannel") + private MessageChannel inputChannel; + + @Test + public void testReactiveFlow() throws InterruptedException { + List results = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(6); + Streams.wrap(this.publisher) + .map(m -> m.getPayload().toUpperCase()) + .consume(p -> { + results.add(p); + latch.countDown(); + }); + this.messageSource.start(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + String[] strings = results.toArray(new String[results.size()]); + assertArrayEquals(new String[]{"A", "B", "C", "D", "E", "F"}, strings); + } + + @Test + public void testPollableReactiveFlow() throws InterruptedException, TimeoutException, ExecutionException { + this.inputChannel.send(new GenericMessage<>("1,2,3,4,5")); + + CountDownLatch latch = new CountDownLatch(6); + + Streams.wrap(this.pollablePublisher) + .filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)) + .observe(p -> latch.countDown()) + .consume(6); + + Future> future = + Executors.newSingleThreadExecutor().submit(() -> + Streams.from(new String[]{"11,12,13"}) + .map(v -> v.split(",")) + .map(Arrays::asList) + .split() + .map(Integer::parseInt) + .>map(GenericMessage::new) + .concatWith(this.pollablePublisher) + .map(Message::getPayload) + .toList(7) + .await(5, TimeUnit.SECONDS)); + + this.inputChannel.send(new GenericMessage<>("6,7,8,9,10")); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + List integers = future.get(10, TimeUnit.SECONDS); + assertNotNull(integers); + assertEquals(7, integers.size()); + } + + @Configuration + @EnableIntegration + public static class ContextConfiguration { + + private final AtomicBoolean invoked = new AtomicBoolean(); + + @Bean + public Publisher> reactiveFlow() { + return IntegrationFlows + .from(() -> new GenericMessage<>("a,b,c,d,e,f"), + e -> e.poller(p -> p.trigger(ctx -> this.invoked.getAndSet(true) ? null : new Date())) + .autoStartup(false) + .id("reactiveSteamsMessageSource")) + .split(String.class, p -> p.split(",")) + .toReactivePublisher(); + } + + @Bean + public Publisher> pollableReactiveFlow() { + return IntegrationFlows + .from("inputChannel") + .split(e -> e.get().getT2().setDelimiters(",")) + .transform(Integer::parseInt) + .channel(Channels::queue) + .toReactivePublisher(); + } + + } + +} diff --git a/src/test/java/org/springframework/integration/dsl/test/xml/XmlTests.java b/src/test/java/org/springframework/integration/dsl/test/xml/XmlTests.java index 6dbda7b2..493dbb10 100644 --- a/src/test/java/org/springframework/integration/dsl/test/xml/XmlTests.java +++ b/src/test/java/org/springframework/integration/dsl/test/xml/XmlTests.java @@ -16,21 +16,35 @@ package org.springframework.integration.dsl.test.xml; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.support.FunctionExpression; +import org.springframework.integration.dsl.support.Transformers; +import org.springframework.integration.dsl.support.tuple.Tuple; +import org.springframework.integration.dsl.support.tuple.Tuples; import org.springframework.integration.router.AbstractMappingMessageRouter; +import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.xml.router.XPathRouter; import org.springframework.integration.xml.selector.StringValueTestXPathMessageSelector; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; @@ -49,6 +63,10 @@ public class XmlTests { @Autowired private MessageChannel inputChannel; + @Autowired + @Qualifier("xsltFlow.input") + private MessageChannel xsltFlowInput; + @Autowired private PollableChannel wrongMessagesChannel; @@ -73,10 +91,29 @@ public void testXpathFlow() { assertNotNull(this.receivedChannel.receive(10000)); } + @Test + public void testXsltFlow() { + String doc = "test"; + this.xsltFlowInput.send(MessageBuilder.withPayload(doc) + .setHeader("testParam", "testParamValue") + .setHeader("testParam2", "FOO") + .build()); + Message resultMessage = this.receivedChannel.receive(10000); + assertEquals("Wrong payload type", String.class, resultMessage.getPayload().getClass()); + String payload = (String) resultMessage.getPayload(); + assertThat(payload, containsString("testParamValue")); + assertThat(payload, containsString("FOO")); + assertThat(payload, containsString("hello")); + } + + @Configuration @EnableIntegration public static class ContextConfiguration { + @Value("org/springframework/integration/dsl/test/xml/transformer.xslt") + private Resource xslt; + @Bean public PollableChannel wrongMessagesChannel() { return new QueueChannel(); @@ -112,6 +149,18 @@ public AbstractMappingMessageRouter xpathRouter() { return router; } + @Bean + public IntegrationFlow xsltFlow() { + return f -> f + .transform(Transformers.xslt(this.xslt, + Tuples.of("testParam", new FunctionExpression>(m -> m.getHeaders().get("testParam"))), + Tuples.of("testParam2", new FunctionExpression>(m -> m.getHeaders().get("testParam2"))), + Tuples.of("unresolved", new FunctionExpression>(m -> m.getHeaders().get("foo"))), + Tuples.of("testParam3", new LiteralExpression("hello")) + )) + .channel(receivedChannel()); + } + } } diff --git a/src/test/resources/org/springframework/integration/dsl/test/xml/transformer.xslt b/src/test/resources/org/springframework/integration/dsl/test/xml/transformer.xslt new file mode 100644 index 00000000..419f8c1f --- /dev/null +++ b/src/test/resources/org/springframework/integration/dsl/test/xml/transformer.xslt @@ -0,0 +1,19 @@ + + + + + + + test + + + + + + + + + + +