Skip to content

Commit

Permalink
Fix deprecations from Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Jun 22, 2020
1 parent 498f42d commit 3bb445e
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 44 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Expand Up @@ -88,10 +88,10 @@ ext {
mysqlVersion = '8.0.20'
pahoMqttClientVersion = '1.2.2'
postgresVersion = '42.2.12'
reactorVersion = 'Dysprosium-SR7'
reactorVersion = '2020.0.0-M1'
resilience4jVersion = '1.4.0'
romeToolsVersion = '1.12.2'
rsocketVersion = '1.0.0'
rsocketVersion = '1.0.1'
saajVersion = '1.5.2'
servletApiVersion = '4.0.1'
smackVersion = '4.3.4'
Expand Down
Expand Up @@ -24,10 +24,11 @@

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxIdentityProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Processors;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/**
Expand All @@ -43,16 +44,17 @@
public class FluxMessageChannel extends AbstractMessageChannel
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {

private final EmitterProcessor<Message<?>> processor;
private final FluxIdentityProcessor<Message<?>> processor;

private final FluxSink<Message<?>> sink;

private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1);
private final Sinks.StandaloneFluxSink<Boolean> subscribedSignal = Sinks.replay(1);

private final Disposable.Composite upstreamSubscriptions = Disposables.composite();

@SuppressWarnings("deprecation")
public FluxMessageChannel() {
this.processor = EmitterProcessor.create(1, false);
this.processor = Processors.more().multicast(1, false);
this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
}

Expand All @@ -67,16 +69,16 @@ protected boolean doSend(Message<?> message, long timeout) {
@Override
public void subscribe(Subscriber<? super Message<?>> subscriber) {
this.processor
.doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams()))
.doFinally((s) -> this.subscribedSignal.next(this.processor.hasDownstreams()))
.subscribe(subscriber);
this.subscribedSignal.onNext(this.processor.hasDownstreams());
this.subscribedSignal.next(this.processor.hasDownstreams());
}

@Override
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
this.upstreamSubscriptions.add(
Flux.from(publisher)
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
.publishOn(Schedulers.boundedElastic())
.doOnNext((message) -> {
try {
Expand All @@ -91,7 +93,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {

@Override
public void destroy() {
this.subscribedSignal.onNext(false);
this.subscribedSignal.next(false);
this.upstreamSubscriptions.dispose();
this.processor.onComplete();
super.destroy();
Expand Down
Expand Up @@ -30,9 +30,10 @@
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxIdentityProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Processors;
import reactor.core.scheduler.Schedulers;

/**
Expand Down Expand Up @@ -101,7 +102,7 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
* is returned as is because it is already a {@link Publisher};
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
* for the {@link EmitterProcessor#onNext(Object)} which is returned from this method;
* for the {@link FluxIdentityProcessor#onNext(Object)} which is returned from this method;
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
* {@link #messageSourceToFlux(MessageSource)}.
* @param messageChannel the {@link MessageChannel} to adapt.
Expand All @@ -127,7 +128,7 @@ else if (messageChannel instanceof PollableChannel) {

private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
return Flux.defer(() -> {
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
FluxIdentityProcessor<Message<T>> publisher = Processors.more().multicast(1);
@SuppressWarnings("unchecked")
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
inputChannel.subscribe(messageHandler);
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,8 +50,8 @@
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxIdentityProcessor;

/**
* @author Artem Bilan
Expand Down Expand Up @@ -141,7 +141,7 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {

flowRegistration.destroy();

assertThat(TestUtils.getPropertyValue(flux, "processor", EmitterProcessor.class).isTerminated()).isTrue();
assertThat(TestUtils.getPropertyValue(flux, "processor", FluxIdentityProcessor.class).isTerminated()).isTrue();
}

@Configuration
Expand Down
Expand Up @@ -57,9 +57,10 @@
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.messaging.support.GenericMessage;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxIdentityProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Processors;
import reactor.test.StepVerifier;
import reactor.util.Loggers;

Expand Down Expand Up @@ -298,7 +299,7 @@ public void testReactiveStreamsConsumerViaConsumerEndpointFactoryBean() throws E
public void testReactiveStreamsConsumerFluxMessageChannelReactiveMessageHandler() {
FluxMessageChannel testChannel = new FluxMessageChannel();

EmitterProcessor<Message<?>> processor = EmitterProcessor.create(2, false);
FluxIdentityProcessor<Message<?>> processor = Processors.more().multicast(2, false);

ReactiveMessageHandler messageHandler =
m -> {
Expand Down
Expand Up @@ -84,7 +84,7 @@
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

/**
Expand Down Expand Up @@ -241,7 +241,8 @@ public void testReactiveMessageHandler() {
this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test"));

StepVerifier.create(
this.contextConfiguration.messageMonoProcessor
this.contextConfiguration.messageMono
.asMono()
.map(Message::getPayload)
.cast(String.class))
.expectNext("test")
Expand Down Expand Up @@ -291,7 +292,7 @@ public MessageChannel routerChannel() {
}

@Bean
@Router(inputChannel = "routerChannel", channelMappings = { "true=odd", "false=filter" }, suffix = "Channel")
@Router(inputChannel = "routerChannel", channelMappings = {"true=odd", "false=filter"}, suffix = "Channel")
public MessageSelector router() {
return new ExpressionEvaluatingSelector("payload % 2 == 0");
}
Expand Down Expand Up @@ -373,7 +374,8 @@ public MessageHandler service() {
@Filter(inputChannel = "skippedChannel5")
@Profile("foo")
public MessageHandler skippedMessageHandler() {
return m -> { };
return m -> {
};
}

@Bean
Expand Down Expand Up @@ -427,7 +429,7 @@ public Consumer<Message<?>> messageConsumerAsService() {
return collector()::add;
}

MonoProcessor<Message<?>> messageMonoProcessor = MonoProcessor.create();
Sinks.StandaloneMonoSink<Message<?>> messageMono = Sinks.promise();

@Bean
MessageChannel reactiveMessageHandlerChannel() {
Expand All @@ -438,8 +440,7 @@ MessageChannel reactiveMessageHandlerChannel() {
@ServiceActivator(inputChannel = "reactiveMessageHandlerChannel")
public ReactiveMessageHandler reactiveMessageHandlerService() {
return (message) -> {
messageMonoProcessor.onNext(message);
messageMonoProcessor.onComplete();
messageMono.success(message);
return Mono.empty();
};
}
Expand Down
Expand Up @@ -62,7 +62,7 @@
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

/**
Expand All @@ -88,14 +88,14 @@ public void testOneWay() {
Message<?> result = channel.receive(10000);
assertThat(result.getPayload()).isEqualTo("foo");

MonoProcessor<Object> defaultMethodHandler = MonoProcessor.create();
Sinks.StandaloneMonoSink<Object> defaultMethodHandler = Sinks.promise();

this.errorChannel.subscribe(message -> defaultMethodHandler.onNext(message.getPayload()));
this.errorChannel.subscribe(message -> defaultMethodHandler.success(message.getPayload()));

String defaultMethodPayload = "defaultMethodPayload";
service.defaultMethodGateway(defaultMethodPayload);

StepVerifier.create(defaultMethodHandler)
StepVerifier.create(defaultMethodHandler.asMono())
.expectNext(defaultMethodPayload)
.verifyComplete();
}
Expand Down Expand Up @@ -423,7 +423,7 @@ public void setBeanName(String beanName) {
}

@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({"rawtypes", "unchecked"})
public <T> Future<T> submit(Callable<T> task) {
try {
Future<?> result = super.submit(task);
Expand Down
Expand Up @@ -112,7 +112,7 @@ public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.<Flux<String>>handle((payload, headers) -> payload.map(String::toUpperCase), e -> e.async(true))
.get();
}

Expand Down
Expand Up @@ -50,7 +50,7 @@
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

/**
Expand Down Expand Up @@ -93,7 +93,7 @@ static void tearDown() {
@BeforeEach
void setupTest(TestInfo testInfo) {
if (testInfo.getDisplayName().startsWith("server")) {
this.serverRsocketRequester = serverConfig.clientRequester.block(Duration.ofSeconds(10));
this.serverRsocketRequester = serverConfig.clientRequester.asMono().block(Duration.ofSeconds(10));
}
else {
this.clientRsocketRequester =
Expand Down Expand Up @@ -181,7 +181,7 @@ public Mono<String> echoTransformation(Flux<String> payload) {
@EnableIntegration
static class ServerConfig extends CommonConfig {

final MonoProcessor<RSocketRequester> clientRequester = MonoProcessor.create();
final Sinks.StandaloneMonoSink<RSocketRequester> clientRequester = Sinks.promise();

@Bean
public CloseableChannel rsocketServer() {
Expand All @@ -204,7 +204,7 @@ public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
this.clientRequester.onNext(event.getRequester());
this.clientRequester.success(event.getRequester());
}

}
Expand Down
Expand Up @@ -66,8 +66,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

/**
Expand Down Expand Up @@ -133,7 +132,7 @@ static void tearDown() {
@BeforeEach
void setupTest(TestInfo testInfo) {
if (testInfo.getDisplayName().startsWith("server")) {
this.serverRsocketRequester = serverController.clientRequester.block(Duration.ofSeconds(10));
this.serverRsocketRequester = serverController.clientRequester.asMono().block(Duration.ofSeconds(10));
}
}

Expand All @@ -158,7 +157,7 @@ private void fireAndForget(MessageChannel inputChannel, FluxMessageChannel resul
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());

StepVerifier.create(controller.fireForgetPayloads)
StepVerifier.create(controller.fireForgetPayloads.asFlux())
.expectNext("Hello")
.thenCancel()
.verify();
Expand Down Expand Up @@ -537,13 +536,13 @@ public RSocketMessageHandler messageHandler() {
@Controller
static class TestController {

final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
final Sinks.StandaloneFluxSink<String> fireForgetPayloads = Sinks.replayAll();

final MonoProcessor<RSocketRequester> clientRequester = MonoProcessor.create();
final Sinks.StandaloneMonoSink<RSocketRequester> clientRequester = Sinks.promise();

@MessageMapping("receive")
void receive(String payload) {
this.fireForgetPayloads.onNext(payload);
this.fireForgetPayloads.next(payload);
}

@MessageMapping("echo")
Expand Down Expand Up @@ -595,7 +594,7 @@ Mono<Void> handleExceptionWithVoidReturnValue(IllegalStateException ex) {

@ConnectMapping("clientConnect")
void clientConnect(RSocketRequester requester) {
this.clientRequester.onNext(requester);
this.clientRequester.success(requester);
}

}
Expand Down

0 comments on commit 3bb445e

Please sign in to comment.