From 9367d7f4de829f3c96ea85e8fa1bad80c1cc4618 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 23 Jun 2020 14:26:11 -0400 Subject: [PATCH] Fix MessagingGateway for reactive reply type * Upgrade dependencies to be ready for release --- build.gradle | 28 +++++++++---------- gradle/wrapper/gradle-wrapper.properties | 2 +- .../gateway/MessagingGatewaySupport.java | 5 ++-- .../AbstractMessageProducingHandler.java | 23 +++++++++------ .../rsocket/dsl/RSocketDslTests.java | 2 +- 5 files changed, 32 insertions(+), 28 deletions(-) diff --git a/build.gradle b/build.gradle index f29be35d5aa..3f234324e76 100644 --- a/build.gradle +++ b/build.gradle @@ -45,24 +45,24 @@ ext { modifiedFiles = files(grgit.status().unstaged.modified).filter{ f -> f.name.endsWith('.java') || f.name.endsWith('.kt') } - activeMqVersion = '5.15.12' - apacheSshdVersion = '2.4.0' + activeMqVersion = '5.15.13' + apacheSshdVersion = '2.5.0' aspectjVersion = '1.9.5' assertjVersion = '3.16.1' assertkVersion = '0.22' avroVersion = '1.9.2' awaitilityVersion = '4.0.3' commonsDbcp2Version = '2.7.0' - commonsIoVersion = '2.6' + commonsIoVersion = '2.7' commonsNetVersion = '3.6' curatorVersion = '4.3.0' derbyVersion = '10.14.2.0' ftpServerVersion = '1.1.1' googleJsr305Version = '3.0.2' - groovyVersion = '3.0.3' + groovyVersion = '3.0.4' hamcrestVersion = '2.2' hazelcastVersion = '4.0.1' - hibernateVersion = '5.4.14.Final' + hibernateVersion = '5.4.18.Final' hsqldbVersion = '2.5.0' h2Version = '1.4.200' jacksonVersion = '2.11.0' @@ -77,19 +77,19 @@ ext { jschVersion = '0.1.55' jsonpathVersion = '2.4.0' junit4Version = '4.13' - junitJupiterVersion = '5.6.2' + junitJupiterVersion = '5.7.0-M1' jythonVersion = '2.7.2' kryoShadedVersion = '4.0.2' - lettuceVersion = '5.3.0.RELEASE' - log4jVersion = '2.13.2' - micrometerVersion = '1.5.1' + lettuceVersion = '6.0.0.M1' + log4jVersion = '2.13.3' + micrometerVersion = '1.5.2' mockitoVersion = '3.3.3' - mongoDriverVersion = '4.0.3' + mongoDriverVersion = '4.0.4' mysqlVersion = '8.0.20' pahoMqttClientVersion = '1.2.2' - postgresVersion = '42.2.12' + postgresVersion = '42.2.14' reactorVersion = '2020.0.0-M1' - resilience4jVersion = '1.4.0' + resilience4jVersion = '1.5.0' romeToolsVersion = '1.12.2' rsocketVersion = '1.0.1' saajVersion = '1.5.2' @@ -102,7 +102,7 @@ ext { springRetryVersion = '1.3.0' springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.0-SNAPSHOT' springWsVersion = '3.0.9.RELEASE' - tomcatVersion = "9.0.35" + tomcatVersion = "9.0.36" xstreamVersion = '1.4.12' javaProjects = subprojects - project(':spring-integration-bom') @@ -325,7 +325,7 @@ configure(javaProjects) { subproject -> checkstyle { configDirectory.set(rootProject.file("src/checkstyle")) - toolVersion = project.hasProperty('checkstyleVersion') ? project.checkstyleVersion : '8.32' + toolVersion = project.hasProperty('checkstyleVersion') ? project.checkstyleVersion : '8.33' } jar { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a4f0001d203..622ab64a3cb 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java index 0e41ea96b5d..5dc9dfe18fa 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -875,8 +875,7 @@ public boolean send(Message message, long timeout) { @Override public void subscribeTo(Publisher> publisher) { - this.replyMono.switchIfEmpty(Mono.from(publisher)); - this.replyMono.onComplete(); + publisher.subscribe(this.replyMono); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index df685485c44..4b44ebd5992 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -299,21 +299,26 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) { } private void doProduceOutput(Message requestMessage, MessageHeaders requestHeaders, Object reply, - @Nullable Object replyChannel) { + @Nullable Object replyChannelArg) { + + Object replyChannel = replyChannelArg; + if (replyChannel == null) { + replyChannel = getOutputChannel(); + } if (this.async && (reply instanceof ListenableFuture || reply instanceof Publisher)) { - MessageChannel messageChannel = getOutputChannel(); - if (reply instanceof ListenableFuture || - !(messageChannel instanceof ReactiveStreamsSubscribableChannel)) { - asyncNonReactiveReply(requestMessage, reply, replyChannel); - } - else { - ((ReactiveStreamsSubscribableChannel) messageChannel) + if (reply instanceof Publisher && + replyChannel instanceof ReactiveStreamsSubscribableChannel) { + + ((ReactiveStreamsSubscribableChannel) replyChannel) .subscribeTo( Flux.from((Publisher) reply) .doOnError((ex) -> sendErrorMessage(requestMessage, ex)) .map(result -> createOutputMessage(result, requestHeaders))); } + else { + asyncNonReactiveReply(requestMessage, reply, replyChannel); + } } else { sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false); diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java index 190035f9e19..f198d21f333 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java @@ -112,7 +112,7 @@ public IntegrationFlow rsocketUpperCaseFlow() { return IntegrationFlows .from(RSockets.inboundGateway("/uppercase") .interactionModels(RSocketInteractionModel.requestChannel)) - .>handle((payload, headers) -> payload.map(String::toUpperCase), e -> e.async(true)) + ., Flux>transform((flux) -> flux.map(String::toUpperCase)) .get(); }