Skip to content

Commit

Permalink
Fix MessagingGateway for reactive reply type
Browse files Browse the repository at this point in the history
* Upgrade dependencies to be ready for release
  • Loading branch information
artembilan committed Jun 23, 2020
1 parent 3bb445e commit 9367d7f
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 28 deletions.
28 changes: 14 additions & 14 deletions build.gradle
Expand Up @@ -45,24 +45,24 @@ ext {
modifiedFiles =
files(grgit.status().unstaged.modified).filter{ f -> f.name.endsWith('.java') || f.name.endsWith('.kt') }

activeMqVersion = '5.15.12'
apacheSshdVersion = '2.4.0'
activeMqVersion = '5.15.13'
apacheSshdVersion = '2.5.0'
aspectjVersion = '1.9.5'
assertjVersion = '3.16.1'
assertkVersion = '0.22'
avroVersion = '1.9.2'
awaitilityVersion = '4.0.3'
commonsDbcp2Version = '2.7.0'
commonsIoVersion = '2.6'
commonsIoVersion = '2.7'
commonsNetVersion = '3.6'
curatorVersion = '4.3.0'
derbyVersion = '10.14.2.0'
ftpServerVersion = '1.1.1'
googleJsr305Version = '3.0.2'
groovyVersion = '3.0.3'
groovyVersion = '3.0.4'
hamcrestVersion = '2.2'
hazelcastVersion = '4.0.1'
hibernateVersion = '5.4.14.Final'
hibernateVersion = '5.4.18.Final'
hsqldbVersion = '2.5.0'
h2Version = '1.4.200'
jacksonVersion = '2.11.0'
Expand All @@ -77,19 +77,19 @@ ext {
jschVersion = '0.1.55'
jsonpathVersion = '2.4.0'
junit4Version = '4.13'
junitJupiterVersion = '5.6.2'
junitJupiterVersion = '5.7.0-M1'
jythonVersion = '2.7.2'
kryoShadedVersion = '4.0.2'
lettuceVersion = '5.3.0.RELEASE'
log4jVersion = '2.13.2'
micrometerVersion = '1.5.1'
lettuceVersion = '6.0.0.M1'
log4jVersion = '2.13.3'
micrometerVersion = '1.5.2'
mockitoVersion = '3.3.3'
mongoDriverVersion = '4.0.3'
mongoDriverVersion = '4.0.4'
mysqlVersion = '8.0.20'
pahoMqttClientVersion = '1.2.2'
postgresVersion = '42.2.12'
postgresVersion = '42.2.14'
reactorVersion = '2020.0.0-M1'
resilience4jVersion = '1.4.0'
resilience4jVersion = '1.5.0'
romeToolsVersion = '1.12.2'
rsocketVersion = '1.0.1'
saajVersion = '1.5.2'
Expand All @@ -102,7 +102,7 @@ ext {
springRetryVersion = '1.3.0'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.0-SNAPSHOT'
springWsVersion = '3.0.9.RELEASE'
tomcatVersion = "9.0.35"
tomcatVersion = "9.0.36"
xstreamVersion = '1.4.12'

javaProjects = subprojects - project(':spring-integration-bom')
Expand Down Expand Up @@ -325,7 +325,7 @@ configure(javaProjects) { subproject ->

checkstyle {
configDirectory.set(rootProject.file("src/checkstyle"))
toolVersion = project.hasProperty('checkstyleVersion') ? project.checkstyleVersion : '8.32'
toolVersion = project.hasProperty('checkstyleVersion') ? project.checkstyleVersion : '8.33'
}

jar {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -875,8 +875,7 @@ public boolean send(Message<?> message, long timeout) {

@Override
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
this.replyMono.switchIfEmpty(Mono.from(publisher));
this.replyMono.onComplete();
publisher.subscribe(this.replyMono);
}

}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -299,21 +299,26 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
}

private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHeaders, Object reply,
@Nullable Object replyChannel) {
@Nullable Object replyChannelArg) {

Object replyChannel = replyChannelArg;
if (replyChannel == null) {
replyChannel = getOutputChannel();
}

if (this.async && (reply instanceof ListenableFuture<?> || reply instanceof Publisher<?>)) {
MessageChannel messageChannel = getOutputChannel();
if (reply instanceof ListenableFuture<?> ||
!(messageChannel instanceof ReactiveStreamsSubscribableChannel)) {
asyncNonReactiveReply(requestMessage, reply, replyChannel);
}
else {
((ReactiveStreamsSubscribableChannel) messageChannel)
if (reply instanceof Publisher<?> &&
replyChannel instanceof ReactiveStreamsSubscribableChannel) {

((ReactiveStreamsSubscribableChannel) replyChannel)
.subscribeTo(
Flux.from((Publisher<?>) reply)
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
.map(result -> createOutputMessage(result, requestHeaders)));
}
else {
asyncNonReactiveReply(requestMessage, reply, replyChannel);
}
}
else {
sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);
Expand Down
Expand Up @@ -112,7 +112,7 @@ public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>>handle((payload, headers) -> payload.map(String::toUpperCase), e -> e.async(true))
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}

Expand Down

0 comments on commit 9367d7f

Please sign in to comment.