diff --git a/async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java b/async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java index b3fcf755..11d9974c 100644 --- a/async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java +++ b/async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/RabbitMqConfig.java @@ -55,14 +55,16 @@ public ReactiveMessageSender messageSender(ConnectionFactoryProvider provider, M final Sender sender = RabbitFlux.createSender(new SenderOptions().channelPool(channelPool)); - return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(senderConnection)); + return new ReactiveMessageSender(sender, brokerConfigProps.getAppName(), converter, new TopologyCreator(sender)); } @Bean public ReactiveMessageListener messageListener(ConnectionFactoryProvider provider) { final Mono connection = createSenderConnectionMono(provider.getConnectionFactory(), "listener"); Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection)); - return new ReactiveMessageListener(receiver, new TopologyCreator(connection), maxConcurrency); + final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection)); + + return new ReactiveMessageListener(receiver, new TopologyCreator(sender), maxConcurrency); } @Bean @@ -75,9 +77,6 @@ public ConnectionFactoryProvider connectionFactory(RabbitProperties properties) map.from(properties::determineUsername).whenNonNull().to(factory::setUsername); map.from(properties::determinePassword).whenNonNull().to(factory::setPassword); map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost); - map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat); - factory.setAutomaticRecoveryEnabled(true); - factory.setTopologyRecoveryEnabled(true); factory.useNio(); return () -> factory; } diff --git a/async/async-commons/async-commons.gradle b/async/async-commons/async-commons.gradle index 9613ddf0..d0c7b30e 100644 --- a/async/async-commons/async-commons.gradle +++ b/async/async-commons/async-commons.gradle @@ -76,7 +76,7 @@ dependencies { compile project(":domain-events-api") api 'io.projectreactor:reactor-core' - api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.2.0.RELEASE" + api "io.projectreactor.rabbitmq:reactor-rabbitmq:1.4.0.RC1" api 'com.fasterxml.jackson.core:jackson-databind:2.9.7' testImplementation 'io.projectreactor:reactor-test' } \ No newline at end of file diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/ReactiveMessageSender.java b/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/ReactiveMessageSender.java index 37214678..7a645dc6 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/ReactiveMessageSender.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/ReactiveMessageSender.java @@ -19,8 +19,6 @@ public class ReactiveMessageSender { - private static final Scheduler senderScheduler = Schedulers.newElastic("reactive-message-sender"); - private final Sender sender; private final String sourceApplication; private final MessageConverter messageConverter; @@ -35,7 +33,6 @@ public ReactiveMessageSender(Sender sender, String sourceApplication, MessageCon public Mono sendWithConfirm(T message, String exchange, String routingKey, Map headers) { return just(toOutboundMessage(message, exchange, routingKey, headers)) - .publishOn(senderScheduler) .map(Mono::just) .flatMapMany(sender::sendWithPublishConfirms) .flatMap(result -> result.isAck() ? diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/TopologyCreator.java b/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/TopologyCreator.java index b5fb0372..4f7afc4f 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/TopologyCreator.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/impl/communications/TopologyCreator.java @@ -8,6 +8,7 @@ import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Sender; import java.io.IOException; import java.time.Duration; @@ -19,63 +20,35 @@ */ public class TopologyCreator { - private final Mono channel; + private final Sender sender; - public TopologyCreator(Mono connectionMono) { - this.channel = connectionMono.map(connection -> { - try { - return connection.createChannel(); - } catch (IOException e) { - throw new TopologyDefException("Fail to create channel", e); - } - }).doOnError(e -> log.log(Level.SEVERE, e.getMessage(), e)) - .retryBackoff(5, Duration.ofMillis(500)) - .cache(); + public TopologyCreator(Sender sender) { + this.sender = sender; } - public Mono declare(ExchangeSpecification exchange){ - return channel.map(ch -> { - try { - return ch.exchangeDeclare(exchange.getName(), exchange.getType(), exchange.isDurable(), exchange.isAutoDelete(), exchange.getArguments()); - } catch (IOException e) { - throw new TopologyDefException("Fail to declare exchange: " + exchange.getName(), e); - } - }); + public Mono declare(ExchangeSpecification exchange) { + return sender.declare(exchange) + .onErrorMap(TopologyDefException::new); } - public Mono declare(QueueSpecification queue){ - return channel.map(ch -> { - try { - return ch.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), queue.getArguments()); - } catch (IOException e) { - throw new TopologyDefException("Fail to declare queue: " + queue.getName(), e); - } - }); + public Mono declare(QueueSpecification queue) { + return sender.declare(queue) + .onErrorMap(TopologyDefException::new); } - public Mono bind(BindingSpecification binding){ - return channel.map(ch -> { - try { - return ch.queueBind(binding.getQueue(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); - } catch (IOException e) { - throw new TopologyDefException("Fail to bind queue: " + binding.getQueue(), e); - } - }); + public Mono bind(BindingSpecification binding) { + return sender.bind(binding) + .onErrorMap(TopologyDefException::new); } public Mono unbind(BindingSpecification binding) { - return channel.map(ch -> { - try { - return ch.queueUnbind(binding.getQueue(), binding.getExchange(), binding.getRoutingKey(), binding.getArguments()); - } catch (IOException e) { - throw new TopologyDefException("Fail to unbind queue: " + binding.getQueue(), e); - } - }) ; + return sender.unbind(binding) + .onErrorMap(TopologyDefException::new); } public static class TopologyDefException extends RuntimeException { - public TopologyDefException(String message, Throwable cause) { - super(message, cause); + public TopologyDefException(Throwable cause) { + super(cause); } } } diff --git a/build.gradle b/build.gradle index 44f7a7e6..f2c3f231 100644 --- a/build.gradle +++ b/build.gradle @@ -6,6 +6,7 @@ buildscript { } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") + classpath("com.github.ben-manes:gradle-versions-plugin:$gradleVersionsVersion") } } @@ -14,5 +15,4 @@ plugins { } apply from: './main.gradle' - - +apply plugin: 'com.github.ben-manes.versions' diff --git a/gradle.properties b/gradle.properties index d6767c95..f15ddf05 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,4 @@ version=0.2.1-beta1 -springBootVersion=2.1.1.RELEASE +springBootVersion=2.2.1.RELEASE +gradleVersionsVersion=0.27.0 +