Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,69 +16,29 @@

package org.springframework.integration.channel;

import java.time.Duration;

import org.reactivestreams.Publisher;

import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Utilities for adaptation {@link MessageChannel}s to the {@link Publisher}s.
*
* @author Artem Bilan
*
* @since 5.0
*
* @deprecated since 5.3 in favor of {@link IntegrationReactiveUtils}.
*/
@Deprecated
public final class MessageChannelReactiveUtils {

private MessageChannelReactiveUtils() {
}

@SuppressWarnings("unchecked")
public static <T> Publisher<Message<T>> toPublisher(MessageChannel messageChannel) {
if (messageChannel instanceof Publisher) {
return (Publisher<Message<T>>) messageChannel;
}
else if (messageChannel instanceof SubscribableChannel) {
return adaptSubscribableChannelToPublisher((SubscribableChannel) messageChannel);
}
else if (messageChannel instanceof PollableChannel) {
return adaptPollableChannelToPublisher((PollableChannel) messageChannel);
}
else {
throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, " +
"SubscribableChannel or PollableChannel, not: " + messageChannel);
}
}

private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
return Flux.defer(() -> {
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
@SuppressWarnings("unchecked")
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
inputChannel.subscribe(messageHandler);
return publisher
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
});
}

@SuppressWarnings("unchecked")
private static <T> Publisher<Message<T>> adaptPollableChannelToPublisher(PollableChannel inputChannel) {
return Mono.<Message<T>>create(monoSink ->
monoSink.onRequest(value ->
monoSink.success((Message<T>) inputChannel.receive(0))))
.subscribeOn(Schedulers.boundedElastic())
.repeatWhenEmpty(it -> it.delayElements(Duration.ofMillis(100))) // NOSONAR - magic
.repeat();
return IntegrationReactiveUtils.messageChannelToFlux(messageChannel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
Expand Down Expand Up @@ -86,6 +85,7 @@
import org.springframework.integration.transformer.MethodInvokingTransformer;
import org.springframework.integration.transformer.Transformer;
import org.springframework.integration.util.ClassUtils;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -2920,7 +2920,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {
if (channelForPublisher != null && components.size() > 1
&& !(channelForPublisher instanceof MessageChannelReference) &&
!(channelForPublisher instanceof FixedSubscriberChannelPrototype)) {
publisher = MessageChannelReactiveUtils.toPublisher(channelForPublisher);
publisher = IntegrationReactiveUtils.messageChannelToFlux(channelForPublisher);
}
else {
MessageChannel reactiveChannel = new FluxMessageChannel();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.endpoint;

import java.time.Duration;

import org.springframework.integration.core.MessageSource;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import reactor.core.publisher.Flux;

/**
* The {@link MessageProducerSupport} to adapt a provided {@link MessageSource}
* into a {@link Flux} and let it be subscribed in the {@link #subscribeToPublisher}
*
* @author Artem Bilan
*
* @since 5.3
*/
public class ReactiveMessageSourceProducer extends MessageProducerSupport {

private final Flux<? extends Message<?>> messageFlux;

private Duration delayWhenEmpty = IntegrationReactiveUtils.DEFAULT_DELAY_WHEN_EMPTY;

/**
* Create an instance based on the provided {@link MessageSource}.
* @param messageSource the {@link MessageSource} to pull for messages.
*/
public ReactiveMessageSourceProducer(MessageSource<?> messageSource) {
Assert.notNull(messageSource, "'messageSource' must not be null");
this.messageFlux =
IntegrationReactiveUtils.messageSourceToFlux(messageSource)
.subscriberContext((ctx) ->
ctx.put(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, this.delayWhenEmpty));
}

/**
* Configure a {@link Duration} to delay next pull request when the previous one
* was empty. Defaults to {@link IntegrationReactiveUtils#DEFAULT_DELAY_WHEN_EMPTY}.
* @param delayWhenEmpty the {@link Duration} to use.
*/
public void setDelayWhenEmpty(Duration delayWhenEmpty) {
Assert.notNull(delayWhenEmpty, "'delayWhenEmpty' must not be null");
this.delayWhenEmpty = delayWhenEmpty;
}

@Override
protected void doStart() {
subscribeToPublisher(this.messageFlux);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.ChannelUtils;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
import org.springframework.integration.router.MessageRouter;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -89,7 +89,7 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, Subscriber<Message<?
"it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
}

this.publisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
this.subscriber = subscriber;
this.lifecycleDelegate = subscriber instanceof Lifecycle ? (Lifecycle) subscriber : null;
if (subscriber instanceof MessageHandlerSubscriber) {
Expand All @@ -115,7 +115,7 @@ public ReactiveStreamsConsumer(MessageChannel inputChannel, ReactiveMessageHandl
this.inputChannel = inputChannel;
this.handler = new ReactiveMessageHandlerAdapter(reactiveMessageHandler);
this.reactiveMessageHandler = reactiveMessageHandler;
this.publisher = MessageChannelReactiveUtils.toPublisher(inputChannel);
this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
this.subscriber = null;
this.lifecycleDelegate =
reactiveMessageHandler instanceof Lifecycle ? (Lifecycle) reactiveMessageHandler : null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.util;

import java.time.Duration;
import java.util.function.Function;

import org.reactivestreams.Publisher;

import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Utilities for adapting integration components to/from reactive types.
*
* @author Artem Bilan
*
* @since 5.3
*/
public final class IntegrationReactiveUtils {

/**
* The subscriber context entry for {@link Flux#delayElements}
* from the {@link Mono#repeatWhenEmpty(Function)}.
*/
public static final String DELAY_WHEN_EMPTY_KEY = "DELAY_WHEN_EMPTY_KEY";

/**
* A default delay before repeating an empty source {@link Mono} as 1 second {@link Duration}.
*/
public static final Duration DEFAULT_DELAY_WHEN_EMPTY = Duration.ofSeconds(1);

private IntegrationReactiveUtils() {
}

/**
* Wrap a provided {@link MessageSource} into a {@link Flux} for pulling the on demand.
* When {@link MessageSource#receive()} returns {@code null}, the source {@link Mono}
* goes to the {@link Mono#repeatWhenEmpty} state and performs a {@code delay}
* based on the {@link #DELAY_WHEN_EMPTY_KEY} {@link Duration} entry in the subscriber context
* or falls back to 1 second duration.
* If a produced message has an
* {@link org.springframework.integration.IntegrationMessageHeaderAccessor#ACKNOWLEDGMENT_CALLBACK} header
* it is ack'ed in the {@link Mono#doOnSuccess} and nack'ed in the {@link Mono#doOnError}.
* @param messageSource the {@link MessageSource} to adapt.
* @param <T> the expected payload type.
* @return a {@link Flux} which pulls messages from the {@link MessageSource} on demand.
*/
public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) {
return Mono.
<Message<T>>create(monoSink ->
monoSink.onRequest(value ->
monoSink.success(messageSource.receive())))
.doOnSuccess((message) ->
AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message)))
.doOnError(MessagingException.class,
(ex) -> {
Message<?> failedMessage = ex.getFailedMessage();
if (failedMessage != null) {
AckUtils.autoNack(StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMessage));
}
})
.subscribeOn(Schedulers.boundedElastic())
.repeatWhenEmpty((repeat) ->
repeat.flatMap((increment) ->
Mono.subscriberContext()
.flatMap(ctx ->
Mono.delay(ctx.getOrDefault(DELAY_WHEN_EMPTY_KEY,
DEFAULT_DELAY_WHEN_EMPTY)))))
.repeat()
.retry();
}

/**
* Adapt a provided {@link MessageChannel} into a {@link Flux} source:
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
* is returned as is because it is already a {@link Publisher};
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
* for the {@link EmitterProcessor#onNext(Object)} which is returned from this method;
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
* {@link #messageSourceToFlux(MessageSource)}.
* @param messageChannel the {@link MessageChannel} to adapt.
* @param <T> the expected payload type.
* @return a {@link Flux} which uses a provided {@link MessageChannel} as a source for events to publish.
*/
@SuppressWarnings("unchecked")
public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
if (messageChannel instanceof Publisher) {
return Flux.from((Publisher<Message<T>>) messageChannel);
}
else if (messageChannel instanceof SubscribableChannel) {
return adaptSubscribableChannelToPublisher((SubscribableChannel) messageChannel);
}
else if (messageChannel instanceof PollableChannel) {
return messageSourceToFlux(() -> (Message<T>) ((PollableChannel) messageChannel).receive(0));
}
else {
throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, " +
"SubscribableChannel or PollableChannel, not: " + messageChannel);
}
}

private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
return Flux.defer(() -> {
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
@SuppressWarnings("unchecked")
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
inputChannel.subscribe(messageHandler);
return publisher
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.junit.jupiter.api.Test;

import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.messaging.support.GenericMessage;

import reactor.core.Disposable;
Expand All @@ -45,7 +46,7 @@ void testBackpressureWithSubscribableChannel() {
try {
DirectChannel channel = new DirectChannel();
int initialRequest = 10;
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
StepVerifier.create(IntegrationReactiveUtils.messageChannelToFlux(channel), initialRequest)
.expectSubscription()
.then(() -> {
compositeDisposable.add(
Expand Down Expand Up @@ -77,7 +78,7 @@ void testOverproducingWithSubscribableChannel() {
AtomicInteger sendCount = new AtomicInteger();
try {
int initialRequest = 10;
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
StepVerifier.create(IntegrationReactiveUtils.messageChannelToFlux(channel), initialRequest)
.expectSubscription()
.then(() ->
compositeDisposable.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.MessageChannelReactiveUtils;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
Expand Down Expand Up @@ -104,7 +104,7 @@ void testMessageChannelReactiveAdaptation() throws InterruptedException {
List<String> results = new ArrayList<>();

Disposable disposable =
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
IntegrationReactiveUtils.<String>messageChannelToFlux(this.queueChannel)
.map(Message::getPayload)
.map(String::toUpperCase)
.doOnNext(results::add)
Expand Down
Loading