diff --git a/build.gradle b/build.gradle index 6caee4a200e..f29be35d5aa 100644 --- a/build.gradle +++ b/build.gradle @@ -88,10 +88,10 @@ ext { mysqlVersion = '8.0.20' pahoMqttClientVersion = '1.2.2' postgresVersion = '42.2.12' - reactorVersion = 'Dysprosium-SR7' + reactorVersion = '2020.0.0-M1' resilience4jVersion = '1.4.0' romeToolsVersion = '1.12.2' - rsocketVersion = '1.0.0' + rsocketVersion = '1.0.1' saajVersion = '1.5.2' servletApiVersion = '4.0.1' smackVersion = '4.3.4' diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java index 7d5593beab4..2f2a6c621fc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java @@ -24,10 +24,11 @@ import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxIdentityProcessor; import reactor.core.publisher.FluxSink; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Processors; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; /** @@ -43,16 +44,17 @@ public class FluxMessageChannel extends AbstractMessageChannel implements Publisher>, ReactiveStreamsSubscribableChannel { - private final EmitterProcessor> processor; + private final FluxIdentityProcessor> processor; private final FluxSink> sink; - private final ReplayProcessor subscribedSignal = ReplayProcessor.create(1); + private final Sinks.StandaloneFluxSink subscribedSignal = Sinks.replay(1); private final Disposable.Composite upstreamSubscriptions = Disposables.composite(); + @SuppressWarnings("deprecation") public FluxMessageChannel() { - this.processor = EmitterProcessor.create(1, false); + this.processor = Processors.more().multicast(1, false); this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER); } @@ -67,16 +69,16 @@ protected boolean doSend(Message message, long timeout) { @Override public void subscribe(Subscriber> subscriber) { this.processor - .doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams())) + .doFinally((s) -> this.subscribedSignal.next(this.processor.hasDownstreams())) .subscribe(subscriber); - this.subscribedSignal.onNext(this.processor.hasDownstreams()); + this.subscribedSignal.next(this.processor.hasDownstreams()); } @Override public void subscribeTo(Publisher> publisher) { this.upstreamSubscriptions.add( Flux.from(publisher) - .delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next()) + .delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next()) .publishOn(Schedulers.boundedElastic()) .doOnNext((message) -> { try { @@ -91,7 +93,7 @@ public void subscribeTo(Publisher> publisher) { @Override public void destroy() { - this.subscribedSignal.onNext(false); + this.subscribedSignal.next(false); this.upstreamSubscriptions.dispose(); this.processor.onComplete(); super.destroy(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java index 1b9bb6ff72f..2f9161c2c87 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java @@ -30,9 +30,10 @@ import org.springframework.messaging.PollableChannel; import org.springframework.messaging.SubscribableChannel; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxIdentityProcessor; import reactor.core.publisher.Mono; +import reactor.core.publisher.Processors; import reactor.core.scheduler.Schedulers; /** @@ -101,7 +102,7 @@ public static Flux> messageSourceToFlux(MessageSource messageS * - a {@link org.springframework.integration.channel.FluxMessageChannel} * is returned as is because it is already a {@link Publisher}; * - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler} - * for the {@link EmitterProcessor#onNext(Object)} which is returned from this method; + * for the {@link FluxIdentityProcessor#onNext(Object)} which is returned from this method; * - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses * {@link #messageSourceToFlux(MessageSource)}. * @param messageChannel the {@link MessageChannel} to adapt. @@ -127,7 +128,7 @@ else if (messageChannel instanceof PollableChannel) { private static Flux> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) { return Flux.defer(() -> { - EmitterProcessor> publisher = EmitterProcessor.create(1); + FluxIdentityProcessor> publisher = Processors.more().multicast(1); @SuppressWarnings("unchecked") MessageHandler messageHandler = (message) -> publisher.onNext((Message) message); inputChannel.subscribe(messageHandler); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java index 4f04f28ee9c..1110596bdfe 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,8 +50,8 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import reactor.core.Disposable; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxIdentityProcessor; /** * @author Artem Bilan @@ -141,7 +141,7 @@ void testFluxMessageChannelCleanUp() throws InterruptedException { flowRegistration.destroy(); - assertThat(TestUtils.getPropertyValue(flux, "processor", EmitterProcessor.class).isTerminated()).isTrue(); + assertThat(TestUtils.getPropertyValue(flux, "processor", FluxIdentityProcessor.class).isTerminated()).isTrue(); } @Configuration diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java index 3d8d6baad48..5a031c4050b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java @@ -57,9 +57,10 @@ import org.springframework.messaging.ReactiveMessageHandler; import org.springframework.messaging.support.GenericMessage; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxIdentityProcessor; import reactor.core.publisher.Mono; +import reactor.core.publisher.Processors; import reactor.test.StepVerifier; import reactor.util.Loggers; @@ -298,7 +299,7 @@ public void testReactiveStreamsConsumerViaConsumerEndpointFactoryBean() throws E public void testReactiveStreamsConsumerFluxMessageChannelReactiveMessageHandler() { FluxMessageChannel testChannel = new FluxMessageChannel(); - EmitterProcessor> processor = EmitterProcessor.create(2, false); + FluxIdentityProcessor> processor = Processors.more().multicast(2, false); ReactiveMessageHandler messageHandler = m -> { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java index 8a6a4e26dd6..6ece3aee1a9 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java @@ -84,7 +84,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; /** @@ -241,7 +241,8 @@ public void testReactiveMessageHandler() { this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test")); StepVerifier.create( - this.contextConfiguration.messageMonoProcessor + this.contextConfiguration.messageMono + .asMono() .map(Message::getPayload) .cast(String.class)) .expectNext("test") @@ -291,7 +292,7 @@ public MessageChannel routerChannel() { } @Bean - @Router(inputChannel = "routerChannel", channelMappings = { "true=odd", "false=filter" }, suffix = "Channel") + @Router(inputChannel = "routerChannel", channelMappings = {"true=odd", "false=filter"}, suffix = "Channel") public MessageSelector router() { return new ExpressionEvaluatingSelector("payload % 2 == 0"); } @@ -373,7 +374,8 @@ public MessageHandler service() { @Filter(inputChannel = "skippedChannel5") @Profile("foo") public MessageHandler skippedMessageHandler() { - return m -> { }; + return m -> { + }; } @Bean @@ -427,7 +429,7 @@ public Consumer> messageConsumerAsService() { return collector()::add; } - MonoProcessor> messageMonoProcessor = MonoProcessor.create(); + Sinks.StandaloneMonoSink> messageMono = Sinks.promise(); @Bean MessageChannel reactiveMessageHandlerChannel() { @@ -438,8 +440,7 @@ MessageChannel reactiveMessageHandlerChannel() { @ServiceActivator(inputChannel = "reactiveMessageHandlerChannel") public ReactiveMessageHandler reactiveMessageHandlerService() { return (message) -> { - messageMonoProcessor.onNext(message); - messageMonoProcessor.onComplete(); + messageMono.success(message); return Mono.empty(); }; } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java index e4722277098..97d071b306e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java @@ -62,7 +62,7 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; /** @@ -88,14 +88,14 @@ public void testOneWay() { Message result = channel.receive(10000); assertThat(result.getPayload()).isEqualTo("foo"); - MonoProcessor defaultMethodHandler = MonoProcessor.create(); + Sinks.StandaloneMonoSink defaultMethodHandler = Sinks.promise(); - this.errorChannel.subscribe(message -> defaultMethodHandler.onNext(message.getPayload())); + this.errorChannel.subscribe(message -> defaultMethodHandler.success(message.getPayload())); String defaultMethodPayload = "defaultMethodPayload"; service.defaultMethodGateway(defaultMethodPayload); - StepVerifier.create(defaultMethodHandler) + StepVerifier.create(defaultMethodHandler.asMono()) .expectNext(defaultMethodPayload) .verifyComplete(); } @@ -423,7 +423,7 @@ public void setBeanName(String beanName) { } @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({"rawtypes", "unchecked"}) public Future submit(Callable task) { try { Future result = super.submit(task); diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java index f198d21f333..190035f9e19 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java @@ -112,7 +112,7 @@ public IntegrationFlow rsocketUpperCaseFlow() { return IntegrationFlows .from(RSockets.inboundGateway("/uppercase") .interactionModels(RSocketInteractionModel.requestChannel)) - ., Flux>transform((flux) -> flux.map(String::toUpperCase)) + .>handle((payload, headers) -> payload.map(String::toUpperCase), e -> e.async(true)) .get(); } diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java index 04b1a029f48..fb32b10c4ff 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java @@ -50,7 +50,7 @@ import io.rsocket.transport.netty.server.TcpServerTransport; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; /** @@ -93,7 +93,7 @@ static void tearDown() { @BeforeEach void setupTest(TestInfo testInfo) { if (testInfo.getDisplayName().startsWith("server")) { - this.serverRsocketRequester = serverConfig.clientRequester.block(Duration.ofSeconds(10)); + this.serverRsocketRequester = serverConfig.clientRequester.asMono().block(Duration.ofSeconds(10)); } else { this.clientRsocketRequester = @@ -181,7 +181,7 @@ public Mono echoTransformation(Flux payload) { @EnableIntegration static class ServerConfig extends CommonConfig { - final MonoProcessor clientRequester = MonoProcessor.create(); + final Sinks.StandaloneMonoSink clientRequester = Sinks.promise(); @Bean public CloseableChannel rsocketServer() { @@ -204,7 +204,7 @@ public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler @EventListener public void onApplicationEvent(RSocketConnectedEvent event) { - this.clientRequester.onNext(event.getRequester()); + this.clientRequester.success(event.getRequester()); } } diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java index a90df41a8fa..3ebbbd76171 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java @@ -66,8 +66,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; /** @@ -133,7 +132,7 @@ static void tearDown() { @BeforeEach void setupTest(TestInfo testInfo) { if (testInfo.getDisplayName().startsWith("server")) { - this.serverRsocketRequester = serverController.clientRequester.block(Duration.ofSeconds(10)); + this.serverRsocketRequester = serverController.clientRequester.asMono().block(Duration.ofSeconds(10)); } } @@ -158,7 +157,7 @@ private void fireAndForget(MessageChannel inputChannel, FluxMessageChannel resul .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - StepVerifier.create(controller.fireForgetPayloads) + StepVerifier.create(controller.fireForgetPayloads.asFlux()) .expectNext("Hello") .thenCancel() .verify(); @@ -537,13 +536,13 @@ public RSocketMessageHandler messageHandler() { @Controller static class TestController { - final ReplayProcessor fireForgetPayloads = ReplayProcessor.create(); + final Sinks.StandaloneFluxSink fireForgetPayloads = Sinks.replayAll(); - final MonoProcessor clientRequester = MonoProcessor.create(); + final Sinks.StandaloneMonoSink clientRequester = Sinks.promise(); @MessageMapping("receive") void receive(String payload) { - this.fireForgetPayloads.onNext(payload); + this.fireForgetPayloads.next(payload); } @MessageMapping("echo") @@ -595,7 +594,7 @@ Mono handleExceptionWithVoidReturnValue(IllegalStateException ex) { @ConnectMapping("clientConnect") void clientConnect(RSocketRequester requester) { - this.clientRequester.onNext(requester); + this.clientRequester.success(requester); } }