diff --git a/async/async-commons-api/async-commons-api.gradle b/async/async-commons-api/async-commons-api.gradle index 3e79429d..4a48317e 100644 --- a/async/async-commons-api/async-commons-api.gradle +++ b/async/async-commons-api/async-commons-api.gradle @@ -7,4 +7,5 @@ dependencies { api project(':domain-events-api') compileOnly 'io.projectreactor:reactor-core' testImplementation 'io.projectreactor:reactor-test' + implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0' } diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java index 39ab22f5..13b6d0b3 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java @@ -1,10 +1,25 @@ package org.reactivecommons.async.api; +import io.cloudevents.CloudEvent; import org.reactivecommons.api.domain.Command; import reactor.core.publisher.Mono; public interface DirectAsyncGateway { Mono sendCommand(Command command, String targetName); + + Mono sendCommand(Command command, String targetName, String domain); + + Mono sendCommand(CloudEvent command, String targetName); + + Mono sendCommand(CloudEvent command, String targetName, String domain); + Mono requestReply(AsyncQuery query, String targetName, Class type); + + Mono requestReply(AsyncQuery query, String targetName, Class type, String domain); + + Mono requestReply(CloudEvent query, String targetName, Class type); + + Mono requestReply(CloudEvent query, String targetName, Class type, String domain); + Mono reply(T response, From from); } diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java index aca8879d..d5e35fc3 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java @@ -1,5 +1,8 @@ package org.reactivecommons.async.api; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -13,24 +16,36 @@ import java.lang.reflect.ParameterizedType; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @Getter @NoArgsConstructor(access = AccessLevel.PACKAGE) public class HandlerRegistry { - - private final List> eventListeners = new CopyOnWriteArrayList<>(); + public static final String DEFAULT_DOMAIN = "app"; + private final Map>> domainEventListeners = new ConcurrentHashMap<>(); private final List> dynamicEventHandlers = new CopyOnWriteArrayList<>(); private final List> eventNotificationListener = new CopyOnWriteArrayList<>(); private final List> handlers = new CopyOnWriteArrayList<>(); private final List> commandHandlers = new CopyOnWriteArrayList<>(); + public static HandlerRegistry register() { - return new HandlerRegistry(); + HandlerRegistry instance = new HandlerRegistry(); + instance.domainEventListeners.put(DEFAULT_DOMAIN, new CopyOnWriteArrayList<>()); + return instance; + } + + public HandlerRegistry listenDomainEvent(String domain, String eventName, EventHandler handler, Class eventClass) { + domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>()) + .add(new RegisteredEventListener<>(eventName, handler, eventClass)); + return this; } public HandlerRegistry listenEvent(String eventName, EventHandler handler, Class eventClass) { - eventListeners.add(new RegisteredEventListener<>(eventName, handler, eventClass)); + domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>()) + .add(new RegisteredEventListener<>(eventName, handler, eventClass)); return this; } @@ -67,7 +82,21 @@ public HandlerRegistry serveQuery(String resource, QueryHandler han } public HandlerRegistry serveQuery(String resource, QueryHandler handler, Class queryClass) { - handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass)); + if(queryClass == CloudEvent.class){ + handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> + { + CloudEvent query = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(message); + + return handler.handle((R) query); + + } , byte[].class)); + } + else{ + handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass)); + } return this; } diff --git a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java index 31416ac0..121feeb8 100644 --- a/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java +++ b/async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java @@ -16,6 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; class HandlerRegistryTest { private final HandlerRegistry registry = HandlerRegistry.register(); @@ -27,7 +28,7 @@ void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() { registry.listenEvent(name, eventHandler); - assertThat(registry.getEventListeners()) + assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler) .containsExactly(name, SomeDataClass.class, eventHandler)).hasSize(1); @@ -43,7 +44,7 @@ void shouldRegisterPatternEventHandlerWithTypeInference() { RegisteredEventListener expectedRegisteredEventListener = new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class); - assertThat(registry.getEventListeners()) + assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN)) .anySatisfy(registeredEventListener -> assertThat(registeredEventListener) .usingRecursiveComparison() .isEqualTo(expectedRegisteredEventListener)); @@ -62,7 +63,7 @@ void shouldRegisterPatternEventHandler() { RegisteredEventListener expectedRegisteredEventListener = new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class); - assertThat(registry.getEventListeners()) + assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN)) .anySatisfy(registeredEventListener -> assertThat(registeredEventListener) .usingRecursiveComparison() .isEqualTo(expectedRegisteredEventListener)); @@ -84,7 +85,7 @@ public void listenEvent() { EventHandler handler = mock(EventHandler.class); registry.listenEvent(name, handler, SomeDataClass.class); - assertThat(registry.getEventListeners()) + assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN)) .anySatisfy(registered -> assertThat(registered) .extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler) .containsExactly(name, SomeDataClass.class, handler)).hasSize(1); diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java index 85ea576c..2202ab5b 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java @@ -1,5 +1,6 @@ package org.reactivecommons.async.commons.converters; +import io.cloudevents.CloudEvent; import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.async.api.AsyncQuery; diff --git a/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/matcher/KeyMatcherPerformanceWildcardTest.java b/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/matcher/KeyMatcherPerformanceWildcardTest.java index 48bca1f6..b607233d 100755 --- a/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/matcher/KeyMatcherPerformanceWildcardTest.java +++ b/async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/matcher/KeyMatcherPerformanceWildcardTest.java @@ -8,7 +8,12 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; class KeyMatcherPerformanceWildcardTest { @@ -27,13 +32,13 @@ public void setUp() { File file = new File(classLoader.getResource("wildcard_names_for_matching.txt").getFile()); File file2 = new File(classLoader.getResource("concrete_names_for_matching.txt").getFile()); try { - Set names = new HashSet<>(Files + Set names = new HashSet<>(Files .readAllLines(Paths.get(file.getAbsolutePath()))); candidates = names.stream() .collect(Collectors.toMap(name -> name, name -> name)); testList = new ArrayList<>(new HashSet<>(Files - .readAllLines(Paths.get(file2.getAbsolutePath())))); - testResultList = new ArrayList<>(testList.size()*10); + .readAllLines(Paths.get(file2.getAbsolutePath())))); + testResultList = new ArrayList<>(testList.size() * 10); } catch (IOException e) { e.printStackTrace(); } @@ -43,14 +48,14 @@ public void setUp() { void keyMatcherLookupShouldPerformInLessThan30Micros() { final int size = testList.size(); final long init = System.currentTimeMillis(); - for (int i = 0; i< size*10; ++i){ - testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i%size))); + for (int i = 0; i < size * 10; ++i) { + testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i % size))); } final long end = System.currentTimeMillis(); final long total = end - init; - final double microsPerLookup = ((total+0.0)/testResultList.size())*1000; + final double microsPerLookup = ((total + 0.0) / testResultList.size()) * 1000; System.out.println("Performed Lookups: " + testResultList.size()); System.out.println("Total Execution Time: " + total + "ms"); System.out.println("Microseconds per lookup: " + microsPerLookup + "us"); diff --git a/async/async-rabbit-starter-eda/async-commons-rabbit-starter-eda.gradle b/async/async-rabbit-starter-eda/async-commons-rabbit-starter-eda.gradle new file mode 100644 index 00000000..d80988a7 --- /dev/null +++ b/async/async-rabbit-starter-eda/async-commons-rabbit-starter-eda.gradle @@ -0,0 +1,15 @@ +ext { + artifactId = 'async-commons-rabbit-starter-eda' + artifactDescription = 'Async Commons Starter EDA' +} + +dependencies { + api project(':async-rabbit') + compileOnly 'org.springframework.boot:spring-boot-starter' + compileOnly 'org.springframework.boot:spring-boot-starter-actuator' + + annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' + + testImplementation 'io.projectreactor:reactor-test' + testImplementation 'org.springframework.boot:spring-boot-starter-actuator' +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/RabbitEDADirectAsyncGateway.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/RabbitEDADirectAsyncGateway.java new file mode 100644 index 00000000..f4418ec8 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/RabbitEDADirectAsyncGateway.java @@ -0,0 +1,25 @@ +package org.reactivecommons.async; + +import io.micrometer.core.instrument.MeterRegistry; +import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.reply.ReactiveReplyRouter; +import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; +import org.reactivecommons.async.rabbit.config.ConnectionManager; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +public class RabbitEDADirectAsyncGateway extends RabbitDirectAsyncGateway { + private final ConnectionManager manager; + + public RabbitEDADirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, String exchange, MessageConverter converter, MeterRegistry meterRegistry) { + super(config, router, manager.getSender(DEFAULT_DOMAIN), exchange, converter, meterRegistry); + this.manager = manager; + } + + @Override + protected ReactiveMessageSender resolveSender(String domain) { + return manager.getSender(domain); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableCommandListeners.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableCommandListeners.java new file mode 100644 index 00000000..fcbe9833 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableCommandListeners.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.rabbit.config.CommandListenersConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(CommandListenersConfig.class) +@Configuration +public @interface EnableCommandListeners { +} + + + diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDirectAsyncGateway.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDirectAsyncGateway.java new file mode 100644 index 00000000..359913fd --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDirectAsyncGateway.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.rabbit.config.DirectAsyncGatewayConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(DirectAsyncGatewayConfig.class) +@Configuration +public @interface EnableDirectAsyncGateway { +} + + + diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDomainEventBus.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDomainEventBus.java new file mode 100644 index 00000000..6820ab50 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableDomainEventBus.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.rabbit.config.EventBusConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(EventBusConfig.class) +@Configuration +public @interface EnableDomainEventBus { +} + + + diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableEventListeners.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableEventListeners.java new file mode 100644 index 00000000..87791c20 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableEventListeners.java @@ -0,0 +1,18 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.rabbit.config.EventListenersConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(EventListenersConfig.class) +@Configuration +public @interface EnableEventListeners { +} + + + diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java new file mode 100644 index 00000000..51aebd4b --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableMessageListeners.java @@ -0,0 +1,26 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.rabbit.config.CommandListenersConfig; +import org.reactivecommons.async.rabbit.config.EventListenersConfig; +import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig; +import org.reactivecommons.async.rabbit.config.QueryListenerConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + +/** + * This annotation enables all messages listeners (Query, Commands, Events). If you want to enable separately, please use + * EnableCommandListeners, EnableQueryListeners or EnableEventListeners. + * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificacionListenersConfig.class}) +@Configuration +public @interface EnableMessageListeners { +} + + + diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableNotificationListener.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableNotificationListener.java new file mode 100644 index 00000000..66f97f02 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableNotificationListener.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(NotificacionListenersConfig.class) +@Configuration +public @interface EnableNotificationListener { +} + + + diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableQueryListeners.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableQueryListeners.java new file mode 100644 index 00000000..6eb878b0 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/impl/config/annotations/EnableQueryListeners.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.impl.config.annotations; + +import org.reactivecommons.async.rabbit.config.QueryListenerConfig; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +@Documented +@Import(QueryListenerConfig.class) +@Configuration +public @interface EnableQueryListeners { +} + + + diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java new file mode 100644 index 00000000..d3f54eee --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/CommandListenersConfig.java @@ -0,0 +1,37 @@ +package org.reactivecommons.async.rabbit.config; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationCommandListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@Configuration +@RequiredArgsConstructor +@Import(RabbitMqConfig.class) +public class CommandListenersConfig { + + @Value("${spring.application.name}") + private String appName; + + private final AsyncProps asyncProps; + + @Bean + public ApplicationCommandListener applicationCommandListener(ConnectionManager manager, + MessageConverter converter, + CustomReporter errorReporter) { + ApplicationCommandListener commandListener = new ApplicationCommandListener(manager.getListener(DEFAULT_DOMAIN), appName, manager.getHandlerResolver(DEFAULT_DOMAIN), + asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), + asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter); + + commandListener.startListener(); + + return commandListener; + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java new file mode 100644 index 00000000..ca1b6be2 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/ConnectionManager.java @@ -0,0 +1,69 @@ +package org.reactivecommons.async.rabbit.config; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.rabbit.HandlerResolver; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; + +import java.util.Map; +import java.util.TreeMap; +import java.util.function.BiConsumer; + +public class ConnectionManager { + private final Map connections = new TreeMap<>(); + + @Builder + @Getter + public static class DomainConnections { + private final ReactiveMessageListener listener; + private final ReactiveMessageSender sender; + private final HandlerResolver handlerResolver; + private final ConnectionFactoryProvider provider; + @Setter + private DiscardNotifier discardNotifier; + } + + public void forSender(BiConsumer consumer) { + connections.forEach((key, conn) -> consumer.accept(key, conn.getSender())); + } + + public void forListener(BiConsumer consumer) { + connections.forEach((key, conn) -> consumer.accept(key, conn.getListener())); + } + + public void setDiscardNotifier(String domain, DiscardNotifier discardNotifier) { + connections.get(domain).setDiscardNotifier(discardNotifier); + } + + public ConnectionManager addDomain(String domain, ReactiveMessageListener listener, ReactiveMessageSender sender, + HandlerResolver resolver) { + connections.put(domain, DomainConnections.builder() + .listener(listener) + .sender(sender) + .handlerResolver(resolver) + .build()); + return this; + } + + public ReactiveMessageSender getSender(String domain) { + return connections.get(domain).getSender(); + } + + public ReactiveMessageListener getListener(String domain) { + return connections.get(domain).getListener(); + } + + public DiscardNotifier getDiscardNotifier(String domain) { + return connections.get(domain).getDiscardNotifier(); + } + + public HandlerResolver getHandlerResolver(String domain) { + return connections.get(domain).getHandlerResolver(); + } + public ConnectionFactoryProvider getProvider(String domain) { + return connections.get(domain).getProvider(); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java new file mode 100644 index 00000000..a2ef6f65 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/DirectAsyncGatewayConfig.java @@ -0,0 +1,63 @@ +package org.reactivecommons.async.rabbit.config; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.RabbitEDADirectAsyncGateway; +import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.reply.ReactiveReplyRouter; +import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationReplyListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@Configuration +@Import(RabbitMqConfig.class) +@RequiredArgsConstructor +public class DirectAsyncGatewayConfig { + + private final BrokerConfigProps props; + + @Bean + public RabbitDirectAsyncGateway rabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, MessageConverter converter, MeterRegistry meterRegistry) throws Exception { + return new RabbitEDADirectAsyncGateway(config, router, manager, props.getDirectMessagesExchangeName(), converter, meterRegistry); + } + + @Bean + public ApplicationReplyListener msgListener(ReactiveReplyRouter router, AsyncProps asyncProps, BrokerConfig config, ConnectionManager manager) { + asyncProps.getListenRepliesFrom().add(DEFAULT_DOMAIN); + AtomicReference localListener = new AtomicReference<>(); + + asyncProps.getListenRepliesFrom().forEach(domain -> { + + final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, manager.getListener(domain), props.getReplyQueue()); + replyListener.startListening(config.getRoutingKey()); + + if (DEFAULT_DOMAIN.equals(domain)) { + localListener.set(replyListener); + } + }); + + return localListener.get(); + } + + @Bean + public ReactiveReplyRouter router() { + return new ReactiveReplyRouter(); + } + + @Bean + @ConditionalOnMissingBean(MeterRegistry.class) + public MeterRegistry defaultRabbitMeterRegistry() { + return new SimpleMeterRegistry(); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventBusConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventBusConfig.java new file mode 100644 index 00000000..bfa7ec90 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventBusConfig.java @@ -0,0 +1,36 @@ +package org.reactivecommons.async.rabbit.config; + +import org.reactivecommons.api.domain.DomainEventBus; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; +import org.reactivecommons.async.rabbit.RabbitDiscardNotifier; +import org.reactivecommons.async.rabbit.RabbitDomainEventBus; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; +import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; +import static reactor.rabbitmq.ExchangeSpecification.exchange; + +@Configuration +@Import(RabbitMqConfig.class) +public class EventBusConfig { + + @Bean // app connection + public DomainEventBus domainEventBus(ConnectionManager manager, BrokerConfigProps props, BrokerConfig config, + ObjectMapperSupplier objectMapperSupplier) { + ReactiveMessageSender sender = manager.getSender(DEFAULT_DOMAIN); + final String exchangeName = props.getDomainEventsExchangeName(); + sender.getTopologyCreator().declare(exchange(exchangeName).durable(true).type("topic")).subscribe(); + DomainEventBus domainEventBus = new RabbitDomainEventBus(sender, exchangeName, config); + manager.setDiscardNotifier(DEFAULT_DOMAIN, createDiscardNotifier(domainEventBus, objectMapperSupplier)); + return domainEventBus; + } + + private DiscardNotifier createDiscardNotifier(DomainEventBus domainEventBus, ObjectMapperSupplier objectMapperSupplier) { + return new RabbitDiscardNotifier(domainEventBus, objectMapperSupplier.get()); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventListenersConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventListenersConfig.java new file mode 100644 index 00000000..0b29ffda --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/EventListenersConfig.java @@ -0,0 +1,51 @@ +package org.reactivecommons.async.rabbit.config; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationEventListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@Configuration +@RequiredArgsConstructor +@Import(RabbitMqConfig.class) +public class EventListenersConfig { + + @Value("${spring.application.name}") + private String appName; + + private final AsyncProps asyncProps; + + @Bean + public ApplicationEventListener eventListener(MessageConverter messageConverter, + ConnectionManager manager, CustomReporter errorReporter) { + AtomicReference external = new AtomicReference<>(); + manager.forListener((domain, receiver) -> { + final ApplicationEventListener listener = new ApplicationEventListener(receiver, + appName + ".subsEvents", + manager.getHandlerResolver(domain), + asyncProps.getDomain().getEvents().getExchange(), + messageConverter, asyncProps.getWithDLQRetry(), + asyncProps.getMaxRetries(), + asyncProps.getRetryDelay(), + asyncProps.getDomain().getEvents().getMaxLengthBytes(), + manager.getDiscardNotifier(domain), + errorReporter, + appName); + if (DEFAULT_DOMAIN.equals(domain)) { + external.set(listener); + } + listener.startListener(); + }); + + return external.get(); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/HandlerResolverBuilder.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/HandlerResolverBuilder.java new file mode 100644 index 00000000..ed4f7e3f --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/HandlerResolverBuilder.java @@ -0,0 +1,97 @@ +package org.reactivecommons.async.rabbit.config; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.reactivecommons.async.api.DefaultCommandHandler; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler; +import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener; +import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; +import org.reactivecommons.async.rabbit.HandlerResolver; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Stream; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class HandlerResolverBuilder { + + public static HandlerResolver buildResolver(String domain, + Map registries, + final DefaultCommandHandler defaultCommandHandler) { + + if (DEFAULT_DOMAIN.equals(domain)) { + final ConcurrentMap> queryHandlers = registries + .values().stream() + .flatMap(r -> r.getHandlers().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + final ConcurrentMap> commandHandlers = registries + .values().stream() + .flatMap(r -> r.getCommandHandlers().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + final ConcurrentMap> eventNotificationListener = registries + .values() + .stream() + .flatMap(r -> r.getEventNotificationListener().stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + + final ConcurrentMap> eventsToBind = getEventsToBind(domain, registries); + + final ConcurrentMap> eventHandlers = getEventHandlersWithDynamics(domain, registries); + + return new HandlerResolver(queryHandlers, eventHandlers, eventsToBind, eventNotificationListener, commandHandlers) { + @Override + @SuppressWarnings("unchecked") + public RegisteredCommandHandler getCommandHandler(String path) { + final RegisteredCommandHandler handler = super.getCommandHandler(path); + return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class); + } + }; + } + + + final ConcurrentMap> eventsToBind = getEventsToBind(domain, registries); + final ConcurrentMap> eventHandlers = getEventHandlersWithDynamics(domain, registries); + + return new HandlerResolver(new ConcurrentHashMap<>(), eventHandlers, eventsToBind, new ConcurrentHashMap<>(), new ConcurrentHashMap<>()) { + @Override + @SuppressWarnings("unchecked") + public RegisteredCommandHandler getCommandHandler(String path) { + final RegisteredCommandHandler handler = super.getCommandHandler(path); + return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class); + } + }; + } + + private static ConcurrentMap> getEventHandlersWithDynamics(String domain, Map registries) { + // event handlers and dynamic handlers + return registries + .values().stream() + .flatMap(r -> Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r))) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + } + + private static Stream> getDynamics(String domain, HandlerRegistry r) { + if (DEFAULT_DOMAIN.equals(domain)) { + return r.getDynamicEventHandlers().stream(); + } + return Stream.of(); + } + + private static ConcurrentMap> getEventsToBind(String domain, Map registries) { + return registries + .values().stream() + .flatMap(r -> r.getDomainEventListeners().get(domain).stream()) + .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), + ConcurrentHashMap::putAll); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfig.java new file mode 100644 index 00000000..7dd88c07 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfig.java @@ -0,0 +1,40 @@ +package org.reactivecommons.async.rabbit.config; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationNotificationListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@Configuration +@RequiredArgsConstructor +@Import(RabbitMqConfig.class) +public class NotificacionListenersConfig { + + @Value("${spring.application.name}") + private String appName; + + private final AsyncProps asyncProps; + + @Bean + public ApplicationNotificationListener eventNotificationListener(ConnectionManager manager, + MessageConverter messageConverter, + CustomReporter errorReporter) { + final ApplicationNotificationListener listener = new ApplicationNotificationListener( + manager.getListener(DEFAULT_DOMAIN), + asyncProps.getDomain().getEvents().getExchange(), + asyncProps.getNotificationProps().getQueueName(appName), + manager.getHandlerResolver(DEFAULT_DOMAIN), + messageConverter, + manager.getDiscardNotifier(DEFAULT_DOMAIN), + errorReporter); + listener.startListener(); + return listener; + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/QueryListenerConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/QueryListenerConfig.java new file mode 100644 index 00000000..695e6407 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/QueryListenerConfig.java @@ -0,0 +1,39 @@ +package org.reactivecommons.async.rabbit.config; + +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationQueryListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@Configuration +@RequiredArgsConstructor +@Import(RabbitMqConfig.class) +public class QueryListenerConfig { + + @Value("${spring.application.name}") + private String appName; + + private final AsyncProps asyncProps; + + @Bean + public ApplicationQueryListener queryListener(MessageConverter converter, + ConnectionManager manager, + CustomReporter errorReporter) { + final ApplicationQueryListener listener = new ApplicationQueryListener(manager.getListener(DEFAULT_DOMAIN), + appName + ".query", manager.getHandlerResolver(DEFAULT_DOMAIN), manager.getSender(DEFAULT_DOMAIN), asyncProps.getDirect().getExchange(), converter, + asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), + asyncProps.getRetryDelay(), asyncProps.getGlobal().getMaxLengthBytes(), + asyncProps.getDirect().isDiscardTimeoutQueries(), manager.getDiscardNotifier(DEFAULT_DOMAIN), errorReporter); + + listener.startListener(); + + return listener; + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitHealthConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitHealthConfig.java new file mode 100644 index 00000000..b1c6f5aa --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitHealthConfig.java @@ -0,0 +1,19 @@ +package org.reactivecommons.async.rabbit.config; + +import org.reactivecommons.async.rabbit.health.RabbitReactiveHealthIndicator; +import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@Configuration +@ConditionalOnClass(AbstractReactiveHealthIndicator.class) +public class RabbitHealthConfig { + + @Bean + public RabbitReactiveHealthIndicator rabbitHealthIndicator(ConnectionManager manager) { + return new RabbitReactiveHealthIndicator(manager.getProvider(DEFAULT_DOMAIN)); // TODO: Check every domain connection + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java new file mode 100644 index 00000000..32cbf3b5 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java @@ -0,0 +1,216 @@ +package org.reactivecommons.async.rabbit.config; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.java.Log; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.api.DefaultCommandHandler; +import org.reactivecommons.async.api.DefaultQueryHandler; +import org.reactivecommons.async.api.DynamicRegistry; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivecommons.async.commons.config.IBrokerConfigProps; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.converters.json.DefaultObjectMapperSupplier; +import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.DynamicRegistryImp; +import org.reactivecommons.async.rabbit.HandlerResolver; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; +import org.reactivecommons.async.rabbit.communications.TopologyCreator; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps; +import org.reactivecommons.async.rabbit.converters.json.JacksonCloudEventMessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.ChannelPool; +import reactor.rabbitmq.ChannelPoolFactory; +import reactor.rabbitmq.ChannelPoolOptions; +import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Receiver; +import reactor.rabbitmq.ReceiverOptions; +import reactor.rabbitmq.Sender; +import reactor.rabbitmq.SenderOptions; +import reactor.rabbitmq.Utils; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.util.Map; +import java.util.logging.Level; + +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@Log +@Configuration +@RequiredArgsConstructor +@EnableConfigurationProperties({ + RabbitProperties.class, + AsyncProps.class +}) +@Import({BrokerConfigProps.class, RabbitHealthConfig.class}) +public class RabbitMqConfig { + + private static final String LISTENER_TYPE = "listener"; + + private static final String SENDER_TYPE = "sender"; + + + @Bean + public ConnectionManager buildConnectionManager(@Value("${spring.application.name}") String appName, + AsyncProps props, + RabbitProperties defaultAppProps, + MessageConverter converter, + ApplicationContext context, + DefaultCommandHandler commandHandler) { + ConnectionManager connectionManager = new ConnectionManager(); + final Map registries = context.getBeansOfType(HandlerRegistry.class); + props.getConnections().computeIfAbsent(DEFAULT_DOMAIN, k -> defaultAppProps); + props.getConnections() + .forEach((domain, properties) -> { + ConnectionFactoryProvider provider = createConnectionFactoryProvider(properties); + ReactiveMessageSender sender = createMessageSender(appName, provider, properties, converter); + ReactiveMessageListener listener = createMessageListener(appName, provider, props); + HandlerResolver resolver = HandlerResolverBuilder.buildResolver(domain, registries, commandHandler); + connectionManager.addDomain(domain, listener, sender, resolver); + }); + return connectionManager; + } + + + private ReactiveMessageSender createMessageSender(String appName, + ConnectionFactoryProvider provider, + RabbitProperties properties, + MessageConverter converter) { + final Sender sender = RabbitFlux.createSender(reactiveCommonsSenderOptions(appName, provider, properties)); + return new ReactiveMessageSender(sender, appName, converter, new TopologyCreator(sender)); + } + + private SenderOptions reactiveCommonsSenderOptions(String appName, ConnectionFactoryProvider provider, RabbitProperties rabbitProperties) { + final Mono senderConnection = createConnectionMono(provider.getConnectionFactory(), appName, SENDER_TYPE); + final ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions(); + final PropertyMapper map = PropertyMapper.get(); + + map.from(rabbitProperties.getCache().getChannel()::getSize).whenNonNull() + .to(channelPoolOptions::maxCacheSize); + + final ChannelPool channelPool = ChannelPoolFactory.createChannelPool( + senderConnection, + channelPoolOptions + ); + + return new SenderOptions() + .channelPool(channelPool) + .resourceManagementChannelMono(channelPool.getChannelMono() + .transform(Utils::cache)); + } + + public ReactiveMessageListener createMessageListener(String appName, ConnectionFactoryProvider provider, AsyncProps props) { + final Mono connection = + createConnectionMono(provider.getConnectionFactory(), appName, LISTENER_TYPE); + final Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection)); + final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection)); + + return new ReactiveMessageListener(receiver, + new TopologyCreator(sender), + props.getFlux().getMaxConcurrency(), + props.getPrefetchCount()); + } + + @SneakyThrows + public ConnectionFactoryProvider createConnectionFactoryProvider(RabbitProperties properties) { + final ConnectionFactory factory = new ConnectionFactory(); + PropertyMapper map = PropertyMapper.get(); + map.from(properties::determineHost).whenNonNull().to(factory::setHost); + map.from(properties::determinePort).to(factory::setPort); + map.from(properties::determineUsername).whenNonNull().to(factory::setUsername); + map.from(properties::determinePassword).whenNonNull().to(factory::setPassword); + map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost); + factory.useNio(); + if (properties.getSsl() != null && properties.getSsl().isEnabled()) { + factory.useSslProtocol(); + } + return () -> factory; + } + + @Bean + @ConditionalOnMissingBean + public BrokerConfig brokerConfig() { + return new BrokerConfig(); + } + + @Bean + @ConditionalOnMissingBean + public ObjectMapperSupplier objectMapperSupplier() { + return new DefaultObjectMapperSupplier(); + } + + @Bean + @ConditionalOnMissingBean + public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) { + return new JacksonCloudEventMessageConverter(objectMapperSupplier.get()); + } + + @Bean + @ConditionalOnMissingBean + public CustomReporter reactiveCommonsCustomErrorReporter() { + return new CustomReporter() { + @Override + public Mono reportError(Throwable ex, Message rawMessage, Command message, boolean redelivered) { + return Mono.empty(); + } + + @Override + public Mono reportError(Throwable ex, Message rawMessage, DomainEvent message, boolean redelivered) { + return Mono.empty(); + } + + @Override + public Mono reportError(Throwable ex, Message rawMessage, AsyncQuery message, boolean redelivered) { + return Mono.empty(); + } + }; + } + + Mono createConnectionMono(ConnectionFactory factory, String connectionPrefix, String connectionType) { + return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType)) + .doOnError(err -> + log.log(Level.SEVERE, "Error creating connection to RabbitMq Broker. Starting retry process...", err) + ) + .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(300)) + .maxBackoff(Duration.ofMillis(3000))) + .cache(); + } + + @Bean + public DynamicRegistry dynamicRegistry(ConnectionManager connectionManager, IBrokerConfigProps props) { + return new DynamicRegistryImp(connectionManager.getHandlerResolver(DEFAULT_DOMAIN), + connectionManager.getListener(DEFAULT_DOMAIN).getTopologyCreator(), props); + } + + @Bean + @ConditionalOnMissingBean + public DefaultQueryHandler defaultHandler() { + return (DefaultQueryHandler) command -> + Mono.error(new RuntimeException("No Handler Registered")); + } + + @Bean + @ConditionalOnMissingBean + public DefaultCommandHandler defaultCommandHandler() { + return message -> Mono.error(new RuntimeException("No Handler Registered")); + } + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitProperties.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitProperties.java new file mode 100644 index 00000000..1ed9934e --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitProperties.java @@ -0,0 +1,967 @@ +package org.reactivecommons.async.rabbit.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.convert.DurationUnit; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +@ConfigurationProperties(prefix = "spring.rabbitmq") +public class RabbitProperties { + + /** + * RabbitMQ host. + */ + private String host = "localhost"; + + /** + * RabbitMQ port. + */ + private int port = 5672; + + /** + * Login user to authenticate to the communications. + */ + private String username = "guest"; + + /** + * Login to authenticate against the communications. + */ + private String password = "guest"; + + /** + * SSL configuration. + */ + private final Ssl ssl = new Ssl(); + + /** + * Virtual host to use when connecting to the communications. + */ + private String virtualHost; + + /** + * Comma-separated list of addresses to which the client should connect. + */ + private String addresses; + + /** + * Requested heartbeat timeout; zero for none. If a duration suffix is not specified, + * seconds will be used. + */ + @DurationUnit(ChronoUnit.SECONDS) + private Duration requestedHeartbeat; + + /** + * Whether to enable publisher confirms. + */ + private boolean publisherConfirms; + + /** + * Whether to enable publisher returns. + */ + private boolean publisherReturns; + + /** + * Connection timeout. Set it to zero to wait forever. + */ + private Duration connectionTimeout; + + /** + * Cache configuration. + */ + private final Cache cache = new Cache(); + + /** + * Listener container configuration. + */ + private final Listener listener = new Listener(); + + private final Template template = new Template(); + + private List
parsedAddresses; + + public String getHost() { + return this.host; + } + + /** + * Returns the host from the first address, or the configured host if no addresses + * have been set. + * @return the host + * @see #setAddresses(String) + * @see #getHost() + */ + public String determineHost() { + if (CollectionUtils.isEmpty(this.parsedAddresses)) { + return getHost(); + } + return this.parsedAddresses.get(0).host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return this.port; + } + + /** + * Returns the port from the first address, or the configured port if no addresses + * have been set. + * @return the port + * @see #setAddresses(String) + * @see #getPort() + */ + public int determinePort() { + if (CollectionUtils.isEmpty(this.parsedAddresses)) { + return getPort(); + } + Address address = this.parsedAddresses.get(0); + return address.port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getAddresses() { + return this.addresses; + } + + /** + * Returns the comma-separated addresses or a single address ({@code host:port}) + * created from the configured host and port if no addresses have been set. + * @return the addresses + */ + public String determineAddresses() { + if (CollectionUtils.isEmpty(this.parsedAddresses)) { + return this.host + ":" + this.port; + } + List addressStrings = new ArrayList<>(); + for (Address parsedAddress : this.parsedAddresses) { + addressStrings.add(parsedAddress.host + ":" + parsedAddress.port); + } + return StringUtils.collectionToCommaDelimitedString(addressStrings); + } + + public void setAddresses(String addresses) { + this.addresses = addresses; + this.parsedAddresses = parseAddresses(addresses); + } + + private List
parseAddresses(String addresses) { + List
parsedAddresses = new ArrayList<>(); + for (String address : StringUtils.commaDelimitedListToStringArray(addresses)) { + parsedAddresses.add(new Address(address)); + } + return parsedAddresses; + } + + public String getUsername() { + return this.username; + } + + /** + * If addresses have been set and the first address has a username it is returned. + * Otherwise returns the result of calling {@code getUsername()}. + * @return the username + * @see #setAddresses(String) + * @see #getUsername() + */ + public String determineUsername() { + if (CollectionUtils.isEmpty(this.parsedAddresses)) { + return this.username; + } + Address address = this.parsedAddresses.get(0); + return (address.username != null) ? address.username : this.username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return this.password; + } + + /** + * If addresses have been set and the first address has a password it is returned. + * Otherwise returns the result of calling {@code getPassword()}. + * @return the password or {@code null} + * @see #setAddresses(String) + * @see #getPassword() + */ + public String determinePassword() { + if (CollectionUtils.isEmpty(this.parsedAddresses)) { + return getPassword(); + } + Address address = this.parsedAddresses.get(0); + return (address.password != null) ? address.password : getPassword(); + } + + public void setPassword(String password) { + this.password = password; + } + + public Ssl getSsl() { + return this.ssl; + } + + public String getVirtualHost() { + return this.virtualHost; + } + + /** + * If addresses have been set and the first address has a virtual host it is returned. + * Otherwise returns the result of calling {@code getVirtualHost()}. + * @return the virtual host or {@code null} + * @see #setAddresses(String) + * @see #getVirtualHost() + */ + public String determineVirtualHost() { + if (CollectionUtils.isEmpty(this.parsedAddresses)) { + return getVirtualHost(); + } + Address address = this.parsedAddresses.get(0); + return (address.virtualHost != null) ? address.virtualHost : getVirtualHost(); + } + + public void setVirtualHost(String virtualHost) { + this.virtualHost = "".equals(virtualHost) ? "/" : virtualHost; + } + + public Duration getRequestedHeartbeat() { + return this.requestedHeartbeat; + } + + public void setRequestedHeartbeat(Duration requestedHeartbeat) { + this.requestedHeartbeat = requestedHeartbeat; + } + + public boolean isPublisherConfirms() { + return this.publisherConfirms; + } + + public void setPublisherConfirms(boolean publisherConfirms) { + this.publisherConfirms = publisherConfirms; + } + + public boolean isPublisherReturns() { + return this.publisherReturns; + } + + public void setPublisherReturns(boolean publisherReturns) { + this.publisherReturns = publisherReturns; + } + + public Duration getConnectionTimeout() { + return this.connectionTimeout; + } + + public void setConnectionTimeout(Duration connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public Cache getCache() { + return this.cache; + } + + public Listener getListener() { + return this.listener; + } + + public Template getTemplate() { + return this.template; + } + + public static class Ssl { + + /** + * Whether to enable SSL support. + */ + private boolean enabled; + + /** + * Path to the key store that holds the SSL certificate. + */ + private String keyStore; + + /** + * Key store type. + */ + private String keyStoreType = "PKCS12"; + + /** + * Password used to access the key store. + */ + private String keyStorePassword; + + /** + * Trust store that holds SSL certificates. + */ + private String trustStore; + + /** + * Trust store type. + */ + private String trustStoreType = "JKS"; + + /** + * Password used to access the trust store. + */ + private String trustStorePassword; + + /** + * SSL algorithm to use. By default, configured by the Rabbit client library. + */ + private String algorithm; + + /** + * Whether to enable server side certificate validation. + */ + private boolean validateServerCertificate = true; + + /** + * Whether to enable hostname verification. + */ + private boolean verifyHostname = true; + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getKeyStore() { + return this.keyStore; + } + + public void setKeyStore(String keyStore) { + this.keyStore = keyStore; + } + + public String getKeyStoreType() { + return this.keyStoreType; + } + + public void setKeyStoreType(String keyStoreType) { + this.keyStoreType = keyStoreType; + } + + public String getKeyStorePassword() { + return this.keyStorePassword; + } + + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } + + public String getTrustStore() { + return this.trustStore; + } + + public void setTrustStore(String trustStore) { + this.trustStore = trustStore; + } + + public String getTrustStoreType() { + return this.trustStoreType; + } + + public void setTrustStoreType(String trustStoreType) { + this.trustStoreType = trustStoreType; + } + + public String getTrustStorePassword() { + return this.trustStorePassword; + } + + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + public String getAlgorithm() { + return this.algorithm; + } + + public void setAlgorithm(String sslAlgorithm) { + this.algorithm = sslAlgorithm; + } + + public boolean isValidateServerCertificate() { + return this.validateServerCertificate; + } + + public void setValidateServerCertificate(boolean validateServerCertificate) { + this.validateServerCertificate = validateServerCertificate; + } + + public boolean getVerifyHostname() { + return this.verifyHostname; + } + + public void setVerifyHostname(boolean verifyHostname) { + this.verifyHostname = verifyHostname; + } + + } + + public static class Cache { + + private final Cache.Channel channel = new Cache.Channel(); + + private final Cache.Connection connection = new Cache.Connection(); + + public Cache.Channel getChannel() { + return this.channel; + } + + public Cache.Connection getConnection() { + return this.connection; + } + + public static class Channel { + + /** + * Number of channels to retain in the cache. When "check-timeout" > 0, max + * channels per connection. + */ + private Integer size; + + /** + * Duration to wait to obtain a channel if the cache size has been reached. If + * 0, always create a new channel. + */ + private Duration checkoutTimeout; + + public Integer getSize() { + return this.size; + } + + public void setSize(Integer size) { + this.size = size; + } + + public Duration getCheckoutTimeout() { + return this.checkoutTimeout; + } + + public void setCheckoutTimeout(Duration checkoutTimeout) { + this.checkoutTimeout = checkoutTimeout; + } + + } + + public static class Connection { + + /** + * Connection factory cache mode. + */ + private String mode = "CHANNEL"; + + /** + * Number of connections to cache. Only applies when mode is CONNECTION. + */ + private Integer size; + + public String getMode() { + return this.mode; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public Integer getSize() { + return this.size; + } + + public void setSize(Integer size) { + this.size = size; + } + + } + + } + + public enum ContainerType { + + /** + * Container where the RabbitMQ consumer dispatches messages to an invoker thread. + */ + SIMPLE, + + /** + * Container where the listener is invoked directly on the RabbitMQ consumer + * thread. + */ + DIRECT + + } + + public static class Listener { + + /** + * Listener container type. + */ + private ContainerType type = ContainerType.SIMPLE; + + private final SimpleContainer simple = new SimpleContainer(); + + private final DirectContainer direct = new DirectContainer(); + + public ContainerType getType() { + return this.type; + } + + public void setType(ContainerType containerType) { + this.type = containerType; + } + + public SimpleContainer getSimple() { + return this.simple; + } + + public DirectContainer getDirect() { + return this.direct; + } + + } + + public abstract static class AmqpContainer { + + /** + * Whether to start the container automatically on startup. + */ + private boolean autoStartup = true; + + /** + * Acknowledge mode of container. + */ + private Integer acknowledgeMode; + + /** + * Maximum number of unacknowledged messages that can be outstanding at each + * consumer. + */ + private Integer prefetch; + + /** + * Whether rejected deliveries are re-queued by default. + */ + private Boolean defaultRequeueRejected; + + /** + * How often idle container events should be published. + */ + private Duration idleEventInterval; + + /** + * Optional properties for a retry interceptor. + */ + private final ListenerRetry retry = new ListenerRetry(); + + public boolean isAutoStartup() { + return this.autoStartup; + } + + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + public Integer getAcknowledgeMode() { + return this.acknowledgeMode; + } + + public void setAcknowledgeMode(Integer acknowledgeMode) { + this.acknowledgeMode = acknowledgeMode; + } + + public Integer getPrefetch() { + return this.prefetch; + } + + public void setPrefetch(Integer prefetch) { + this.prefetch = prefetch; + } + + public Boolean getDefaultRequeueRejected() { + return this.defaultRequeueRejected; + } + + public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { + this.defaultRequeueRejected = defaultRequeueRejected; + } + + public Duration getIdleEventInterval() { + return this.idleEventInterval; + } + + public void setIdleEventInterval(Duration idleEventInterval) { + this.idleEventInterval = idleEventInterval; + } + + public abstract boolean isMissingQueuesFatal(); + + public ListenerRetry getRetry() { + return this.retry; + } + + } + + /** + * Configuration properties for {@code SimpleMessageListenerContainer}. + */ + public static class SimpleContainer extends AmqpContainer { + + /** + * Minimum number of listener invoker threads. + */ + private Integer concurrency; + + /** + * Maximum number of listener invoker threads. + */ + private Integer maxConcurrency; + + /** + * Number of messages to be processed between acks when the acknowledge mode is + * AUTO. If larger than prefetch, prefetch will be increased to this value. + */ + private Integer transactionSize; + + /** + * Whether to fail if the queues declared by the container are not available on + * the communications and/or whether to stop the container if one or more queues are + * deleted at runtime. + */ + private boolean missingQueuesFatal = true; + + public Integer getConcurrency() { + return this.concurrency; + } + + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } + + public Integer getMaxConcurrency() { + return this.maxConcurrency; + } + + public void setMaxConcurrency(Integer maxConcurrency) { + this.maxConcurrency = maxConcurrency; + } + + public Integer getTransactionSize() { + return this.transactionSize; + } + + public void setTransactionSize(Integer transactionSize) { + this.transactionSize = transactionSize; + } + + @Override + public boolean isMissingQueuesFatal() { + return this.missingQueuesFatal; + } + + public void setMissingQueuesFatal(boolean missingQueuesFatal) { + this.missingQueuesFatal = missingQueuesFatal; + } + + } + + /** + * Configuration properties for {@code DirectMessageListenerContainer}. + */ + public static class DirectContainer extends AmqpContainer { + + /** + * Number of consumers per queue. + */ + private Integer consumersPerQueue; + + /** + * Whether to fail if the queues declared by the container are not available on + * the communications. + */ + private boolean missingQueuesFatal = false; + + public Integer getConsumersPerQueue() { + return this.consumersPerQueue; + } + + public void setConsumersPerQueue(Integer consumersPerQueue) { + this.consumersPerQueue = consumersPerQueue; + } + + @Override + public boolean isMissingQueuesFatal() { + return this.missingQueuesFatal; + } + + public void setMissingQueuesFatal(boolean missingQueuesFatal) { + this.missingQueuesFatal = missingQueuesFatal; + } + + } + + public static class Template { + + private final Retry retry = new Retry(); + + /** + * Whether to enable mandatory messages. + */ + private Boolean mandatory; + + /** + * Timeout for `receive()` operations. + */ + private Duration receiveTimeout; + + /** + * Timeout for `sendAndReceive()` operations. + */ + private Duration replyTimeout; + + /** + * Name of the default exchange to use for send operations. + */ + private String exchange = ""; + + /** + * Value of a default routing key to use for send operations. + */ + private String routingKey = ""; + + /** + * Name of the default queue to receive messages from when none is specified + * explicitly. + */ + private String queue; + + public Retry getRetry() { + return this.retry; + } + + public Boolean getMandatory() { + return this.mandatory; + } + + public void setMandatory(Boolean mandatory) { + this.mandatory = mandatory; + } + + public Duration getReceiveTimeout() { + return this.receiveTimeout; + } + + public void setReceiveTimeout(Duration receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public Duration getReplyTimeout() { + return this.replyTimeout; + } + + public void setReplyTimeout(Duration replyTimeout) { + this.replyTimeout = replyTimeout; + } + + public String getExchange() { + return this.exchange; + } + + public void setExchange(String exchange) { + this.exchange = exchange; + } + + public String getRoutingKey() { + return this.routingKey; + } + + public void setRoutingKey(String routingKey) { + this.routingKey = routingKey; + } + + public String getQueue() { + return this.queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + } + + public static class Retry { + + /** + * Whether publishing retries are enabled. + */ + private boolean enabled; + + /** + * Maximum number of attempts to deliver a message. + */ + private int maxAttempts = 3; + + /** + * Duration between the first and second attempt to deliver a message. + */ + private Duration initialInterval = Duration.ofMillis(1000); + + /** + * Multiplier to apply to the previous retry interval. + */ + private double multiplier = 1.0; + + /** + * Maximum duration between attempts. + */ + private Duration maxInterval = Duration.ofMillis(10000); + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getMaxAttempts() { + return this.maxAttempts; + } + + public void setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + } + + public Duration getInitialInterval() { + return this.initialInterval; + } + + public void setInitialInterval(Duration initialInterval) { + this.initialInterval = initialInterval; + } + + public double getMultiplier() { + return this.multiplier; + } + + public void setMultiplier(double multiplier) { + this.multiplier = multiplier; + } + + public Duration getMaxInterval() { + return this.maxInterval; + } + + public void setMaxInterval(Duration maxInterval) { + this.maxInterval = maxInterval; + } + + } + + public static class ListenerRetry extends Retry { + + /** + * Whether retries are stateless or stateful. + */ + private boolean stateless = true; + + public boolean isStateless() { + return this.stateless; + } + + public void setStateless(boolean stateless) { + this.stateless = stateless; + } + + } + + private static final class Address { + + private static final String PREFIX_AMQP = "amqp://"; + + private static final int DEFAULT_PORT = 5672; + + private String host; + + private int port; + + private String username; + + private String password; + + private String virtualHost; + + private Address(String input) { + input = input.trim(); + input = trimPrefix(input); + input = parseUsernameAndPassword(input); + input = parseVirtualHost(input); + parseHostAndPort(input); + } + + private String trimPrefix(String input) { + if (input.startsWith(PREFIX_AMQP)) { + input = input.substring(PREFIX_AMQP.length()); + } + return input; + } + + private String parseUsernameAndPassword(String input) { + if (input.contains("@")) { + String[] split = StringUtils.split(input, "@"); + if (split == null) return input; + String creds = split[0]; + input = split[1]; + split = StringUtils.split(creds, ":"); + if (split != null) { + this.username = split[0]; + if (split.length > 1) { + this.password = split[1]; + } + } + } + return input; + } + + private String parseVirtualHost(String input) { + int hostIndex = input.indexOf('/'); + if (hostIndex >= 0) { + this.virtualHost = input.substring(hostIndex + 1); + if (this.virtualHost.isEmpty()) { + this.virtualHost = "/"; + } + input = input.substring(0, hostIndex); + } + return input; + } + + private void parseHostAndPort(String input) { + int portIndex = input.indexOf(':'); + if (portIndex == -1) { + this.host = input; + this.port = DEFAULT_PORT; + } + else { + this.host = input.substring(0, portIndex); + this.port = Integer.valueOf(input.substring(portIndex + 1)); + } + } + + } + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java new file mode 100644 index 00000000..906a65ca --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java @@ -0,0 +1,48 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.Setter; +import org.reactivecommons.async.rabbit.config.RabbitProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + + +@Getter +@Setter +@ConfigurationProperties(prefix = "app.async") +public class AsyncProps { + + @NestedConfigurationProperty + private FluxProps flux = new FluxProps(); + + @NestedConfigurationProperty + private DomainProps domain = new DomainProps(); + + @NestedConfigurationProperty + private DirectProps direct = new DirectProps(); + + @NestedConfigurationProperty + private GlobalProps global = new GlobalProps(); + + @NestedConfigurationProperty + private NotificationProps notificationProps = new NotificationProps(); + + @NestedConfigurationProperty + private Map connections = new TreeMap<>(); + + private List listenRepliesFrom = new ArrayList<>(); + + private Integer maxRetries = 10; + + private Integer prefetchCount = 250; + + private Integer retryDelay = 1000; + + private Boolean withDLQRetry = false; + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java new file mode 100644 index 00000000..faf2ec48 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/BrokerConfigProps.java @@ -0,0 +1,61 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.commons.config.IBrokerConfigProps; +import org.reactivecommons.async.commons.utils.NameGenerator; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.atomic.AtomicReference; + + +@Getter +@Configuration +@RequiredArgsConstructor +public class BrokerConfigProps implements IBrokerConfigProps { + + @Value("${spring.application.name}") + private String appName; + private final AsyncProps asyncProps; + private final AtomicReference replyQueueName = new AtomicReference<>(); + + @Override + public String getEventsQueue() { + return appName + ".subsEvents"; + } + + @Override + public String getQueriesQueue() { + return appName + ".query"; + } + + @Override + public String getCommandsQueue() { + return appName; + } + + @Override + public String getReplyQueue() { + final String name = replyQueueName.get(); + if (name == null) { + final String replyName = NameGenerator.generateNameFrom(appName, "replies"); + if (replyQueueName.compareAndSet(null, replyName)) { + return replyName; + } else { + return replyQueueName.get(); + } + } + return name; + } + + @Override + public String getDomainEventsExchangeName() { + return asyncProps.getDomain().getEvents().getExchange(); + } + + @Override + public String getDirectMessagesExchangeName() { + return asyncProps.getDirect().getExchange(); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/DirectProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/DirectProps.java new file mode 100644 index 00000000..371747e6 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/DirectProps.java @@ -0,0 +1,18 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Optional; + +@Getter +@Setter +public class DirectProps { + + private String exchange = "directMessages"; + + private Optional maxLengthBytes = Optional.empty(); + + private boolean discardTimeoutQueries = false; + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/DomainProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/DomainProps.java new file mode 100644 index 00000000..555b4ab6 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/DomainProps.java @@ -0,0 +1,14 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + +@Getter +@Setter +public class DomainProps { + + @NestedConfigurationProperty + private EventsProps events = new EventsProps(); + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/EventsProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/EventsProps.java new file mode 100644 index 00000000..6cfe1715 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/EventsProps.java @@ -0,0 +1,16 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Optional; + +@Getter +@Setter +public class EventsProps { + + private String exchange = "domainEvents"; + + private Optional maxLengthBytes = Optional.empty(); + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/FluxProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/FluxProps.java new file mode 100644 index 00000000..23d1f8d4 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/FluxProps.java @@ -0,0 +1,12 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class FluxProps { + + private Integer maxConcurrency = 250; + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/GlobalProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/GlobalProps.java new file mode 100644 index 00000000..4bba4f5a --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/GlobalProps.java @@ -0,0 +1,16 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Optional; + +@Getter +@Setter +public class GlobalProps { + + private String exchange = "globalReply"; + + private Optional maxLengthBytes = Optional.empty(); + +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/NotificationProps.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/NotificationProps.java new file mode 100644 index 00000000..aa433acd --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/props/NotificationProps.java @@ -0,0 +1,28 @@ +package org.reactivecommons.async.rabbit.config.props; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.reactivecommons.async.commons.utils.NameGenerator; + +import java.util.concurrent.atomic.AtomicReference; + +@Getter +@RequiredArgsConstructor +public class NotificationProps { + + private final AtomicReference queueName = new AtomicReference<>(); + private final String queueSuffix = "notification"; + + public String getQueueName(String applicationName) { + final String name = this.queueName.get(); + if(name == null) return getGeneratedName(applicationName); + return name; + } + + private String getGeneratedName(String applicationName) { + String generatedName = NameGenerator.generateNameFrom(applicationName, queueSuffix); + return this.queueName + .compareAndSet(null, generatedName) ? + generatedName : this.queueName.get(); + } +} diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicator.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicator.java new file mode 100644 index 00000000..ddb38a8a --- /dev/null +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicator.java @@ -0,0 +1,35 @@ +package org.reactivecommons.async.rabbit.health; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; +import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +public class RabbitReactiveHealthIndicator extends AbstractReactiveHealthIndicator { + private static final String VERSION = "version"; + private final ConnectionFactoryProvider provider; + + @Override + protected Mono doHealthCheck(Health.Builder builder) { + return Mono.defer(this::getVersion) + .map(version -> builder.up().withDetail(VERSION, version).build()); + } + + private Mono getVersion() { + return Mono.just(provider) + .map(ConnectionFactoryProvider::getConnectionFactory) + .map(this::getRawVersion); + } + + @SneakyThrows + private String getRawVersion(ConnectionFactory factory) { + try (Connection connection = factory.newConnection()) { + return connection.getServerProperties().get(VERSION).toString(); + } + } +} diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java new file mode 100644 index 00000000..0135d5a8 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/CommandListenersConfigTest.java @@ -0,0 +1,64 @@ +package org.reactivecommons.async.rabbit.config; + +import com.rabbitmq.client.AMQP; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.HandlerResolver; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; +import org.reactivecommons.async.rabbit.communications.TopologyCreator; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationCommandListener; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ConsumeOptions; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.Receiver; + +import java.lang.reflect.Field; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +@ExtendWith(MockitoExtension.class) +class CommandListenersConfigTest { + + + private final AsyncProps props = new AsyncProps(); + private CommandListenersConfig config = new CommandListenersConfig(props); + private final ReactiveMessageListener listener = mock(ReactiveMessageListener.class); + private final TopologyCreator creator = mock(TopologyCreator.class); + private final HandlerResolver handlerResolver = mock(HandlerResolver.class); + private final MessageConverter messageConverter = mock(MessageConverter.class); + private final CustomReporter customReporter = mock(CustomReporter.class); + private final Receiver receiver = mock(Receiver.class); + private final ConnectionManager manager = new ConnectionManager(); + + @BeforeEach + public void init() throws NoSuchFieldException, IllegalAccessException { + final Field appName = CommandListenersConfig.class.getDeclaredField("appName"); + appName.setAccessible(true); + appName.set(config, "queue"); + when(creator.bind(any(BindingSpecification.class))).thenReturn(Mono.just(mock(AMQP.Queue.BindOk.class))); + when(creator.declare(any(ExchangeSpecification.class))).thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class))); + when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(listener.getTopologyCreator()).thenReturn(creator); + when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never()); + when(listener.getReceiver()).thenReturn(receiver); + when(listener.getMaxConcurrency()).thenReturn(20); + manager.addDomain(DEFAULT_DOMAIN, listener, null, handlerResolver); + } + + @Test + void applicationCommandListener() { + final ApplicationCommandListener commandListener = config.applicationCommandListener(manager, messageConverter, customReporter); + Assertions.assertThat(commandListener).isNotNull(); + } +} diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java new file mode 100644 index 00000000..de8c406b --- /dev/null +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/EventListenersConfigTest.java @@ -0,0 +1,70 @@ +package org.reactivecommons.async.rabbit.config; + +import com.rabbitmq.client.AMQP; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.reactivecommons.async.api.HandlerRegistry; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.HandlerResolver; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; +import org.reactivecommons.async.rabbit.communications.TopologyCreator; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationEventListener; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ConsumeOptions; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Receiver; + +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class EventListenersConfigTest { + + private final AsyncProps props = new AsyncProps(); + private final EventListenersConfig config = new EventListenersConfig(props); + private final ReactiveMessageListener listener = mock(ReactiveMessageListener.class); + private final ReactiveMessageSender sender = mock(ReactiveMessageSender.class); + private final TopologyCreator creator = mock(TopologyCreator.class); + private final HandlerResolver handlerResolver = mock(HandlerResolver.class); + private final MessageConverter messageConverter = mock(MessageConverter.class); + private final CustomReporter customReporter = mock(CustomReporter.class); + private final Receiver receiver = mock(Receiver.class); + private ConnectionManager connectionManager; + + @BeforeEach + public void init() { + when(handlerResolver.getEventListeners()).thenReturn(Collections.emptyList()); + when(creator.bind(any(BindingSpecification.class))).thenReturn(Mono.just(mock(AMQP.Queue.BindOk.class))); + when(creator.declare(any(ExchangeSpecification.class))).thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class))); + when(creator.declare(any(QueueSpecification.class))).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareDLQ(any(String.class), any(String.class), any(Integer.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(listener.getTopologyCreator()).thenReturn(creator); + when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never()); + when(listener.getReceiver()).thenReturn(receiver); + when(listener.getMaxConcurrency()).thenReturn(20); + connectionManager = new ConnectionManager(); + connectionManager.addDomain(HandlerRegistry.DEFAULT_DOMAIN, listener, sender, handlerResolver); + } + + @Test + void eventListener() { + final ApplicationEventListener eventListener = config.eventListener( + messageConverter, + connectionManager, + customReporter + ); + + Assertions.assertThat(eventListener).isNotNull(); + } +} diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java new file mode 100644 index 00000000..9328d049 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/NotificacionListenersConfigTest.java @@ -0,0 +1,65 @@ +package org.reactivecommons.async.rabbit.config; + +import com.rabbitmq.client.AMQP; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.reactivecommons.async.commons.DiscardNotifier; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.HandlerResolver; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; +import org.reactivecommons.async.rabbit.communications.TopologyCreator; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationNotificationListener; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ConsumeOptions; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Receiver; + +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +class NotificacionListenersConfigTest { + + private final AsyncProps props = new AsyncProps(); + private final NotificacionListenersConfig config = new NotificacionListenersConfig(props); + private final ReactiveMessageListener listener = mock(ReactiveMessageListener.class); + private final TopologyCreator creator = mock(TopologyCreator.class); + private final HandlerResolver handlerResolver = mock(HandlerResolver.class); + private final MessageConverter messageConverter = mock(MessageConverter.class); + private final DiscardNotifier discardNotifier = mock(DiscardNotifier.class); + private final CustomReporter customReporter = mock(CustomReporter.class); + private final Receiver receiver = mock(Receiver.class); + private final ConnectionManager manager = new ConnectionManager(); + + @BeforeEach + public void init() { + when(handlerResolver.getEventListeners()).thenReturn(Collections.emptyList()); + when(creator.bind(any(BindingSpecification.class))).thenReturn(Mono.just(mock(AMQP.Queue.BindOk.class))); + when(creator.declare(any(ExchangeSpecification.class))).thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class))); + when(creator.declare(any(QueueSpecification.class))).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareDLQ(any(String.class), any(String.class), any(Integer.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(listener.getTopologyCreator()).thenReturn(creator); + when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never()); + when(listener.getReceiver()).thenReturn(receiver); + when(listener.getMaxConcurrency()).thenReturn(20); + manager.addDomain(DEFAULT_DOMAIN, listener, null, handlerResolver); + } + + @Test + void eventNotificationListener() { + final ApplicationNotificationListener applicationEventListener = + config.eventNotificationListener(manager, messageConverter, customReporter); + Assertions.assertThat(applicationEventListener).isNotNull(); + } +} diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java new file mode 100644 index 00000000..e6fa51de --- /dev/null +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/QueryListenerConfigTest.java @@ -0,0 +1,64 @@ +package org.reactivecommons.async.rabbit.config; + +import com.rabbitmq.client.AMQP; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.ext.CustomReporter; +import org.reactivecommons.async.rabbit.HandlerResolver; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener; +import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; +import org.reactivecommons.async.rabbit.communications.TopologyCreator; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; +import org.reactivecommons.async.rabbit.listeners.ApplicationQueryListener; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ConsumeOptions; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Receiver; + +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; + +class QueryListenerConfigTest { + + private final AsyncProps props = new AsyncProps(); + private final QueryListenerConfig config = new QueryListenerConfig(props); + private final ReactiveMessageListener listener = mock(ReactiveMessageListener.class); + private final TopologyCreator creator = mock(TopologyCreator.class); + private final HandlerResolver handlerResolver = mock(HandlerResolver.class); + private final MessageConverter messageConverter = mock(MessageConverter.class); + private final CustomReporter customReporter = mock(CustomReporter.class); + private final Receiver receiver = mock(Receiver.class); + private final ReactiveMessageSender sender = mock(ReactiveMessageSender.class); + private final ConnectionManager manager = new ConnectionManager(); + + @BeforeEach + public void init() { + when(handlerResolver.getEventListeners()).thenReturn(Collections.emptyList()); + when(creator.bind(any(BindingSpecification.class))).thenReturn(Mono.just(mock(AMQP.Queue.BindOk.class))); + when(creator.declare(any(ExchangeSpecification.class))).thenReturn(Mono.just(mock(AMQP.Exchange.DeclareOk.class))); + when(creator.declare(any(QueueSpecification.class))).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareDLQ(any(String.class), any(String.class), any(Integer.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareQueue(any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(creator.declareQueue(any(String.class), any(String.class), any())).thenReturn(Mono.just(mock(AMQP.Queue.DeclareOk.class))); + when(listener.getTopologyCreator()).thenReturn(creator); + when(receiver.consumeManualAck(any(String.class), any(ConsumeOptions.class))).thenReturn(Flux.never()); + when(listener.getReceiver()).thenReturn(receiver); + when(listener.getMaxConcurrency()).thenReturn(20); + manager.addDomain(DEFAULT_DOMAIN, listener, sender, handlerResolver); + } + + @Test + void queryListener() { + final ApplicationQueryListener queryListener = config.queryListener(messageConverter, manager, customReporter); + Assertions.assertThat(queryListener).isNotNull(); + } +} diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/RabbitMqConfigTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/RabbitMqConfigTest.java new file mode 100644 index 00000000..450d2f2a --- /dev/null +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/config/RabbitMqConfigTest.java @@ -0,0 +1,67 @@ +package org.reactivecommons.async.rabbit.config; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.junit.jupiter.api.Test; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.ext.CustomReporter; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class RabbitMqConfigTest { + + RabbitMqConfig config = new RabbitMqConfig(); + + @Test + void retryInitialConnection() throws IOException, TimeoutException { + final String connectionType = "sender"; + final String appName = "appName"; + final String connectionName = "appName sender"; + + final AtomicInteger count = new AtomicInteger(); + final Connection connection = mock(Connection.class); + ConnectionFactory factory = mock(ConnectionFactory.class); + when(factory.newConnection(connectionName)).thenAnswer(invocation -> { + if(count.incrementAndGet() == 10){ + return connection; + } + throw new RuntimeException(); + }); + StepVerifier.withVirtualTime(() -> config.createConnectionMono(factory, appName, connectionType)) + .thenAwait(Duration.ofMinutes(2)) + .expectNext(connection).verifyComplete(); + } + + @Test + void shouldCreateDefaultErrorReporter() { + final CustomReporter errorReporter = config.reactiveCommonsCustomErrorReporter(); + assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(Command.class), true)).isNotNull(); + assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(DomainEvent.class), true)).isNotNull(); + assertThat(errorReporter.reportError(mock(Throwable.class), mock(Message.class), mock(AsyncQuery.class), true)).isNotNull(); + } + + @Test + void shouldGenerateDefaultReeporter() { + final CustomReporter customReporter = config.reactiveCommonsCustomErrorReporter(); + final Mono r1 = customReporter.reportError(mock(Throwable.class), mock(Message.class), mock(Command.class), true); + final Mono r2 = customReporter.reportError(mock(Throwable.class), mock(Message.class), mock(DomainEvent.class), true); + final Mono r3 = customReporter.reportError(mock(Throwable.class), mock(Message.class), mock(AsyncQuery.class), true); + + assertThat(r1).isNotNull(); + assertThat(r2).isNotNull(); + assertThat(r3).isNotNull(); + + } +} diff --git a/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicatorTest.java b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicatorTest.java new file mode 100644 index 00000000..2d173758 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/test/java/org/reactivecommons/async/rabbit/health/RabbitReactiveHealthIndicatorTest.java @@ -0,0 +1,71 @@ +package org.reactivecommons.async.rabbit.health; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Health.Builder; +import org.springframework.boot.actuate.health.Status; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class RabbitReactiveHealthIndicatorTest { + @Mock + private ConnectionFactoryProvider provider; + @Mock + private ConnectionFactory factory; + @Mock + private Connection connection; + @InjectMocks + private RabbitReactiveHealthIndicator indicator; + + @BeforeEach + void setup() { + when(provider.getConnectionFactory()).thenReturn(factory); + } + + @Test + void shouldBeUp() throws IOException, TimeoutException { + // Arrange + Map properties = new TreeMap<>(); + properties.put("version", "1.2.3"); + when(factory.newConnection()).thenReturn(connection); + when(connection.getServerProperties()).thenReturn(properties); + // Act + Mono result = indicator.doHealthCheck(new Builder()); + // Assert + StepVerifier.create(result) + .assertNext(health -> { + assertEquals("1.2.3", health.getDetails().get("version")); + assertEquals(Status.UP, health.getStatus()); + }) + .verifyComplete(); + } + + @Test + void shouldBeDown() throws IOException, TimeoutException { + // Arrange + when(factory.newConnection()).thenThrow(new TimeoutException("Connection timeout")); + // Act + Mono result = indicator.doHealthCheck(new Builder()); + // Assert + StepVerifier.create(result) + .expectError(TimeoutException.class) + .verify(); + } +} diff --git a/async/async-rabbit-starter-eda/src/test/resources/application.properties b/async/async-rabbit-starter-eda/src/test/resources/application.properties new file mode 100644 index 00000000..3d8d7db0 --- /dev/null +++ b/async/async-rabbit-starter-eda/src/test/resources/application.properties @@ -0,0 +1 @@ +spring.application.name=test-app \ No newline at end of file diff --git a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java index cc89ace6..790724f4 100644 --- a/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java +++ b/async/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java @@ -62,6 +62,7 @@ import java.util.logging.Level; import java.util.stream.Stream; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; import static reactor.rabbitmq.ExchangeSpecification.exchange; @Log @@ -213,14 +214,14 @@ public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandle final ConcurrentMap> eventsToBind = registries .values().stream() - .flatMap(r -> r.getEventListeners().stream()) + .flatMap(r -> r.getDomainEventListeners().get(DEFAULT_DOMAIN).stream()) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll); // event handlers and dynamic handlers final ConcurrentMap> eventHandlers = registries .values().stream() - .flatMap(r -> Stream.concat(r.getEventListeners().stream(), r.getDynamicEventHandlers().stream())) + .flatMap(r -> Stream.concat(r.getDomainEventListeners().get(DEFAULT_DOMAIN).stream(), r.getDynamicEventHandlers().stream())) .collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler), ConcurrentHashMap::putAll); diff --git a/async/async-rabbit/async-rabbit.gradle b/async/async-rabbit/async-rabbit.gradle index 3ab1f6e7..5694cd10 100644 --- a/async/async-rabbit/async-rabbit.gradle +++ b/async/async-rabbit/async-rabbit.gradle @@ -14,4 +14,5 @@ dependencies { api 'com.rabbitmq:amqp-client' api 'com.fasterxml.jackson.core:jackson-databind' testImplementation 'io.projectreactor:reactor-test' + implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0' } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java index 7e0f25da..9fa35e10 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java @@ -1,5 +1,6 @@ package org.reactivecommons.async.rabbit; +import io.cloudevents.CloudEvent; import io.micrometer.core.instrument.MeterRegistry; import org.reactivecommons.api.domain.Command; import org.reactivecommons.async.api.AsyncQuery; @@ -22,7 +23,12 @@ import java.util.concurrent.TimeoutException; import static java.lang.Boolean.TRUE; -import static org.reactivecommons.async.commons.Headers.*; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; +import static org.reactivecommons.async.commons.Headers.COMPLETION_ONLY_SIGNAL; +import static org.reactivecommons.async.commons.Headers.CORRELATION_ID; +import static org.reactivecommons.async.commons.Headers.REPLY_ID; +import static org.reactivecommons.async.commons.Headers.REPLY_TIMEOUT_MILLIS; +import static org.reactivecommons.async.commons.Headers.SERVED_QUERY_ID; import static reactor.core.publisher.Mono.fromCallable; public class RabbitDirectAsyncGateway implements DirectAsyncGateway { @@ -53,7 +59,22 @@ public RabbitDirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, @Override public Mono sendCommand(Command command, String targetName) { - return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands); + return sendCommand(command, targetName, DEFAULT_DOMAIN); + } + + @Override + public Mono sendCommand(Command command, String targetName, String domain) { + return resolveSender(domain).sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands); + } + + @Override + public Mono sendCommand(CloudEvent command, String targetName) { + return sendCommand(new Command<>(command.getType(), command.getId(), command), targetName); + } + + @Override + public Mono sendCommand(CloudEvent command, String targetName, String domain) { + return sendCommand(new Command<>(command.getType(), command.getId(), command), targetName, domain); } public Flux sendCommands(Flux> commands, String targetName) { @@ -63,6 +84,11 @@ public Flux sendCommands(Flux> commands, S @Override public Mono requestReply(AsyncQuery query, String targetName, Class type) { + return requestReply(query, targetName, type, DEFAULT_DOMAIN); + } + + @Override + public Mono requestReply(AsyncQuery query, String targetName, Class type, String domain) { final String correlationID = UUID.randomUUID().toString().replaceAll("-", ""); final Mono replyHolder = router.register(correlationID) @@ -76,7 +102,7 @@ public Mono requestReply(AsyncQuery query, String targetName, Class headers.put(CORRELATION_ID, correlationID); headers.put(REPLY_TIMEOUT_MILLIS, replyTimeout.toMillis()); - return sender.sendNoConfirm(query, exchange, targetName + ".query", headers, persistentQueries) + return resolveSender(domain).sendNoConfirm(query, exchange, targetName + ".query", headers, persistentQueries) .then(replyHolder) .name("async_query") .tag("operation", query.getResource()) @@ -84,6 +110,16 @@ public Mono requestReply(AsyncQuery query, String targetName, Class .tap(Micrometer.metrics(meterRegistry)); } + @Override + public Mono requestReply(CloudEvent query, String targetName, Class type) { + return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type); + } + + @Override + public Mono requestReply(CloudEvent query, String targetName, Class type, String domain) { + return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type, domain); + } + @Override public Mono reply(T response, From from) { final HashMap headers = new HashMap<>(); @@ -96,4 +132,8 @@ public Mono reply(T response, From from) { return sender.sendNoConfirm(response, "globalReply", from.getReplyID(), headers, false); } + protected ReactiveMessageSender resolveSender(String domain) { // NOSONAR + return sender; + } + } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java index d3e75e18..57f99a9d 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java @@ -1,7 +1,9 @@ package org.reactivecommons.async.rabbit; +import io.cloudevents.CloudEvent; import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; @@ -30,4 +32,9 @@ public Mono emit(DomainEvent event) { .onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err)); } + @Override + public Publisher emit(CloudEvent event) { + return emit(new DomainEvent<>(event.getType(), event.getId(), event)); + } + } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java new file mode 100644 index 00000000..7ae7e162 --- /dev/null +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java @@ -0,0 +1,15 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import lombok.experimental.UtilityClass; + +@UtilityClass +public class CloudEventBuilderExt { + private static final ObjectMapper mapper = new ObjectMapper(); + + @SneakyThrows + public static byte[] asBytes(Object object) { + return mapper.writeValueAsBytes(object); + } +} diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java new file mode 100644 index 00000000..b4791e53 --- /dev/null +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java @@ -0,0 +1,205 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; +import lombok.Data; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.exceptions.MessageConversionException; +import org.reactivecommons.async.rabbit.RabbitMessage; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +public class JacksonCloudEventMessageConverter implements MessageConverter { + private static final String CONTENT_TYPE = "application/json"; + public static final String FAILED_TO_CONVERT_MESSAGE_CONTENT = "Failed to convert Message content"; + + private final ObjectMapper objectMapper; + + + public JacksonCloudEventMessageConverter(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { + try { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + T value = extractData(bodyClass, asyncQueryJson.getQueryData()); + return new AsyncQuery<>(asyncQueryJson.getResource(), value); + } catch (IOException e) { + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); + } + } + + @Override + public DomainEvent readDomainEvent(Message message, Class bodyClass) { + try { + final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class); + + T value = extractData(bodyClass, domainEventJson.getData()); + + return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value); + } catch (IOException e) { + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); + } + } + + @Override + public Command readCommand(Message message, Class bodyClass) { + try { + final CommandJson commandJson = readValue(message, CommandJson.class); + T value = extractData(bodyClass, commandJson.getData()); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), value); + } catch (IOException e) { + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); + } + } + + @Override + @SuppressWarnings("unchecked") + public T readValue(Message message, Class valueClass) { + try { + if(valueClass == CloudEvent.class){ + + return (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(objectMapper.readValue(message.getBody(), byte[].class)); + + } + return objectMapper.readValue(message.getBody(), valueClass); + } catch (IOException e) { + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); + } + } + + @Override + @SuppressWarnings("unchecked") + public Command readCommandStructure(Message message) { + final CommandJson commandJson = readValue(message, CommandJson.class); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T) commandJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public DomainEvent readDomainEventStructure(Message message) { + final DomainEventJson eventJson = readValue(message, DomainEventJson.class); + return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T) eventJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public AsyncQuery readAsyncQueryStructure(Message message) { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData()); + } + + + public Message commandToMessage(Command object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data)); + } + + + public Message eventToMessage(DomainEvent object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data)); + } + + + public Message queryToMessage(AsyncQuery object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getQueryData()); + + return getRabbitMessage(new AsyncQuery<>(object.getResource(), data)); + } + @Override + public Message toMessage(Object object) { + if(object instanceof DomainEvent + && ((DomainEvent) object).getData() instanceof CloudEvent){ + return eventToMessage((DomainEvent) object); + + } + if(object instanceof Command + && ((Command) object).getData() instanceof CloudEvent){ + return commandToMessage((Command) object); + } + if(object instanceof AsyncQuery + && ((AsyncQuery) object).getQueryData() instanceof CloudEvent){ + return queryToMessage((AsyncQuery) object); + } + return getRabbitMessage(object); + } + + private RabbitMessage getRabbitMessage(Object object) { + byte[] bytes; + try { + String jsonString = this.objectMapper.writeValueAsString(object); + bytes = jsonString.getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); + } + RabbitMessage.RabbitMessageProperties props = new RabbitMessage.RabbitMessageProperties(); + props.setContentType(CONTENT_TYPE); + props.setContentEncoding(StandardCharsets.UTF_8.name()); + props.setContentLength(bytes.length); + return new RabbitMessage(bytes, props); + } + + private T extractData(Class bodyClass, JsonNode node) throws JsonProcessingException { + T value; + if(bodyClass == CloudEvent.class){ + + value = (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(Base64.getDecoder() + .decode(node.asText())); + + } + else{ + value = objectMapper.treeToValue(node, bodyClass); + } + return value; + } + + @Data + private static class AsyncQueryJson { + private String resource; + private JsonNode queryData; + } + + @Data + private static class DomainEventJson { + private String name; + private String eventId; + private JsonNode data; + } + + @Data + private static class CommandJson { + private String name; + private String commandId; + private JsonNode data; + } +} diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java index bd3a6539..0872dd85 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java @@ -16,6 +16,7 @@ public class JacksonMessageConverter implements MessageConverter { private static final String CONTENT_TYPE = "application/json"; + public static final String FAILED_TO_CONVERT_MESSAGE_CONTENT = "Failed to convert Message content"; private final ObjectMapper objectMapper; @@ -31,7 +32,7 @@ public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { final T value = objectMapper.treeToValue(asyncQueryJson.getQueryData(), bodyClass); return new AsyncQuery<>(asyncQueryJson.getResource(), value); } catch (IOException e) { - throw new MessageConversionException("Failed to convert Message content", e); + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); } } @@ -42,7 +43,7 @@ public DomainEvent readDomainEvent(Message message, Class bodyClass) { final T value = objectMapper.treeToValue(domainEventJson.getData(), bodyClass); return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value); } catch (IOException e) { - throw new MessageConversionException("Failed to convert Message content", e); + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); } } @@ -53,7 +54,7 @@ public Command readCommand(Message message, Class bodyClass) { final T value = objectMapper.treeToValue(commandJson.getData(), bodyClass); return new Command<>(commandJson.getName(), commandJson.getCommandId(), value); } catch (IOException e) { - throw new MessageConversionException("Failed to convert Message content", e); + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); } } @@ -62,7 +63,7 @@ public T readValue(Message message, Class valueClass) { try { return objectMapper.readValue(message.getBody(), valueClass); } catch (IOException e) { - throw new MessageConversionException("Failed to convert Message content", e); + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); } } @@ -94,7 +95,7 @@ public Message toMessage(Object object) { String jsonString = this.objectMapper.writeValueAsString(object); bytes = jsonString.getBytes(StandardCharsets.UTF_8); } catch (IOException e) { - throw new MessageConversionException("Failed to convert Message content", e); + throw new MessageConversionException(FAILED_TO_CONVERT_MESSAGE_CONTENT, e); } RabbitMessage.RabbitMessageProperties props = new RabbitMessage.RabbitMessageProperties(); props.setContentType(CONTENT_TYPE); diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java index 2d5fa136..dca6c722 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java @@ -1,6 +1,9 @@ package org.reactivecommons.async.rabbit.listeners; import com.rabbitmq.client.AMQP; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import lombok.extern.java.Log; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; import org.reactivecommons.async.commons.DiscardNotifier; @@ -149,8 +152,16 @@ protected Function, Mono> enrichPostProcess(Message msg) { final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString(); final HashMap headers = new HashMap<>(); headers.put(CORRELATION_ID, correlationID); + Object response = signal.get(); + if(response instanceof CloudEvent) { + byte[] serialized = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize((CloudEvent) response); + return sender.sendNoConfirm(serialized, replyExchange, replyID, headers, false); + } - return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false); + return sender.sendNoConfirm(response, replyExchange, replyID, headers, false); }); } diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExtTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExtTest.java new file mode 100644 index 00000000..03335367 --- /dev/null +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExtTest.java @@ -0,0 +1,21 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import java.util.Date; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CloudEventBuilderExtTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + void asBytes(){ + Date date = new Date(); + SampleClass result = new SampleClass("35", "name1", date); + byte[] arrayByte = CloudEventBuilderExt.asBytes(result); + assertThat(arrayByte).isNotEmpty(); + } +} diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverterTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverterTest.java new file mode 100644 index 00000000..71da4ecc --- /dev/null +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverterTest.java @@ -0,0 +1,171 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import org.junit.jupiter.api.Test; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.commons.communications.Message; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; +import java.util.Date; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class JacksonCloudEventMessageConverterTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final JacksonCloudEventMessageConverter converter = new JacksonCloudEventMessageConverter(objectMapper); + + @Test + void readAsyncQuery() throws JsonProcessingException { + Date date = new Date(); + CloudEvent query = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("query") + .withData("application/json", CloudEventBuilderExt.asBytes(new SampleClass("35", "name1", date))) + .build(); + final Message message = converter.toMessage(new AsyncQuery<>(query.getType(), query)); + final AsyncQuery value = converter.readAsyncQuery(message, CloudEvent.class); + byte[] bytes = value.getQueryData().getData().toBytes(); + String stringData = new String(bytes, StandardCharsets.UTF_8); + SampleClass result = objectMapper.readValue(stringData, SampleClass.class); + assertThat(result) + .extracting(SampleClass::getId, SampleClass::getName, SampleClass::getDate) + .containsExactly("35", "name1", date); + + } + + @Test + void readDomainEvent() throws JsonProcessingException { + Date date = new Date(); + CloudEvent event = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("event") + .withData("application/json", CloudEventBuilderExt.asBytes(new SampleClass("35", "name1", date))) + .build(); + final Message message = converter.toMessage(new DomainEvent<>(event.getType(), event.getId(), event)); + final DomainEvent value = converter.readDomainEvent(message, CloudEvent.class); + byte[] bytes = value.getData().getData().toBytes(); + String stringData = new String(bytes, StandardCharsets.UTF_8); + SampleClass result = objectMapper.readValue(stringData, SampleClass.class); + assertThat(result) + .extracting(SampleClass::getId, SampleClass::getName, SampleClass::getDate) + .containsExactly("35", "name1", date); + } + + @Test + void readCommand() throws JsonProcessingException { + Date date = new Date(); + CloudEvent command = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("command") + .withData("application/json", CloudEventBuilderExt.asBytes(new SampleClass("35", "name1", date))) + .build(); + final Message message = converter.toMessage(new Command<>(command.getType(), command.getId(), command)); + final Command value = converter.readCommand(message, CloudEvent.class); + byte[] bytes = value.getData().getData().toBytes(); + String stringData = new String(bytes, StandardCharsets.UTF_8); + SampleClass result = objectMapper.readValue(stringData, SampleClass.class); + assertThat(result) + .extracting(SampleClass::getId, SampleClass::getName, SampleClass::getDate) + .containsExactly("35", "name1", date); + } + @Test + void toMessage() { + final Message message = converter.toMessage(new SampleClass("42", "Daniel", new Date())); + assertThat(new String(message.getBody())).contains("42").contains("Daniel"); + } + + @Test + void toMessageWhenDataIsNull() throws IOException { + final Message message = converter.toMessage(null); + + final JsonNode jsonNode = objectMapper.readTree(message.getBody()); + assertThat(jsonNode.isNull()).isTrue(); + } + + @Test + void toMessageWhenDataIsEmpty() throws IOException { + final Message message = converter.toMessage(""); + + final JsonNode jsonNode = objectMapper.readTree(message.getBody()); + assertThat(jsonNode.asText()).isEmpty(); + } + + @Test + void readValue() { + Date date = new Date(); + final Message message = converter.toMessage(new SampleClass("35", "name1", date)); + final SampleClass value = converter.readValue(message, SampleClass.class); + assertThat(value).extracting(SampleClass::getId, SampleClass::getName, SampleClass::getDate) + .containsExactly("35", "name1", date); + } + + @Test + void readValueString() { + final Message message = converter.toMessage("Hi!"); + final String value = converter.readValue(message, String.class); + assertThat(value).isEqualTo("Hi!"); + } + + @Test + void shouldConvertToCommandStructure() { + final SampleClass data = new SampleClass("35", "name1", new Date()); + final Message message = converter.toMessage(new Command<>("cmd.name", "42", data)); + final Command command = converter.readCommandStructure(message); + + assertThat(command.getData()).isInstanceOf(JsonNode.class); + assertThat(command.getName()).isEqualTo("cmd.name"); + } + + @Test + void shouldConvertToDomainEventStructure() { + final SampleClass data = new SampleClass("35", "name1", new Date()); + final Message message = converter.toMessage(new DomainEvent<>("event.name", "42", data)); + final DomainEvent event = converter.readDomainEventStructure(message); + + assertThat(event.getData()).isInstanceOf(JsonNode.class); + assertThat(event.getName()).isEqualTo("event.name"); + final JsonNode jsonNode = (JsonNode) event.getData(); + assertThat(jsonNode.findValue("name").asText()).isEqualTo("name1"); + } + + @Test + void shouldConvertToQueryStructure() { + final SampleClass data = new SampleClass("35", "sample1", new Date()); + final Message message = converter.toMessage(new AsyncQuery<>("query.name", data)); + final AsyncQuery query = converter.readAsyncQueryStructure(message); + + assertThat(query.getQueryData()).isInstanceOf(JsonNode.class); + assertThat(query.getResource()).isEqualTo("query.name"); + final JsonNode jsonNode = (JsonNode) query.getQueryData(); + assertThat(jsonNode.findValue("name").asText()).isEqualTo("sample1"); + } + + @Test + void shouldNotFailWithTilde() { + // Arrange + final String name = "example with word containing tilde áéíóúñ"; + final SampleClass data = new SampleClass("35", name, new Date()); + final Message message = converter.toMessage(new AsyncQuery<>("query.name", data)); + // Act + final AsyncQuery query = converter.readAsyncQueryStructure(message); + // Assert + assertThat(query.getQueryData()).isInstanceOf(JsonNode.class); + assertThat(query.getResource()).isEqualTo("query.name"); + final JsonNode jsonNode = (JsonNode) query.getQueryData(); + assertThat(jsonNode.findValue("name").asText()).isEqualTo(name); + } +} diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java index 077adefa..7e9b5ad2 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/listeners/ListenerReporterTestSuperClass.java @@ -50,6 +50,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN; import static reactor.core.publisher.Mono.empty; import static reactor.core.publisher.Mono.just; @@ -131,8 +132,8 @@ protected Flux createSource(Function rout protected abstract GenericMessageListener createMessageListener(final HandlerResolver handlerResolver); private HandlerResolver createHandlerResolver(final HandlerRegistry registry) { - final Map> eventHandlers = Stream.concat(registry.getDynamicEventHandlers().stream(), registry.getEventListeners().stream()).collect(toMap(RegisteredEventListener::getPath, identity())); - final Map> eventsToBind = registry.getEventListeners().stream().collect(toMap(RegisteredEventListener::getPath, identity())); + final Map> eventHandlers = Stream.concat(registry.getDynamicEventHandlers().stream(), registry.getDomainEventListeners().get(DEFAULT_DOMAIN).stream()).collect(toMap(RegisteredEventListener::getPath, identity())); + final Map> eventsToBind = registry.getDomainEventListeners().get(DEFAULT_DOMAIN).stream().collect(toMap(RegisteredEventListener::getPath, identity())); final Map> notificationHandlers = registry.getEventNotificationListener().stream().collect(toMap(RegisteredEventListener::getPath, identity())); final Map> queryHandlers = registry.getHandlers().stream().collect(toMap(RegisteredQueryHandler::getPath, identity())); final Map> commandHandlers = registry.getCommandHandlers().stream().collect(toMap(RegisteredCommandHandler::getPath, identity())); diff --git a/domain/domain-events/domain-events-api.gradle b/domain/domain-events/domain-events-api.gradle index cef707ef..2279536b 100644 --- a/domain/domain-events/domain-events-api.gradle +++ b/domain/domain-events/domain-events-api.gradle @@ -5,4 +5,5 @@ ext { dependencies { api 'org.reactivestreams:reactive-streams:1.0.4' + api 'io.cloudevents:cloudevents-api:2.5.0' } diff --git a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java index 0c713d73..5e166bd8 100644 --- a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java +++ b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java @@ -1,7 +1,11 @@ package org.reactivecommons.api.domain; +import io.cloudevents.CloudEvent; import org.reactivestreams.Publisher; public interface DomainEventBus { Publisher emit(DomainEvent event); + + Publisher emit(CloudEvent event); + } diff --git a/gradle.properties b/gradle.properties index a7ef7071..7a342ada 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,4 +2,4 @@ version=2.0.0 springBootVersion=3.0.2 reactorRabbitVersion=1.5.5 gradleVersionsVersion=0.36.0 -toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,domain-events-api,async-rabbit +toPublish=async-commons,async-commons-api,async-commons-rabbit-standalone,async-commons-rabbit-starter,async-commons-rabbit-starter-eda,domain-events-api,async-rabbit diff --git a/samples/async/receiver-responder/async-receiver-sample.gradle b/samples/async/receiver-responder/async-receiver-sample.gradle index d6bd1c51..2675840d 100644 --- a/samples/async/receiver-responder/async-receiver-sample.gradle +++ b/samples/async/receiver-responder/async-receiver-sample.gradle @@ -1,7 +1,8 @@ apply plugin: 'org.springframework.boot' dependencies { - implementation project(":async-commons-rabbit-starter") + implementation project(":async-commons-rabbit-starter-eda") implementation 'org.springframework.boot:spring-boot-starter' runtimeOnly 'org.springframework.boot:spring-boot-devtools' + implementation 'io.cloudevents:cloudevents-core:2.5.0' } diff --git a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java index 6aa12a70..9aade845 100644 --- a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java +++ b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java @@ -1,5 +1,7 @@ package sample; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -8,21 +10,29 @@ import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.DirectAsyncGateway; import org.reactivecommons.async.api.HandlerRegistry; -import org.reactivecommons.async.api.handlers.EventHandler; import org.reactivecommons.async.api.handlers.QueryHandler; +import org.reactivecommons.async.impl.config.annotations.EnableCommandListeners; import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway; import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus; +import org.reactivecommons.async.impl.config.annotations.EnableEventListeners; import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners; +import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import reactor.core.publisher.Mono; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.UUID; + import static org.reactivecommons.async.api.HandlerRegistry.register; import static reactor.core.publisher.Mono.just; @SpringBootApplication -@EnableMessageListeners +@EnableEventListeners +@EnableCommandListeners @EnableDomainEventBus @EnableDirectAsyncGateway @Log @@ -48,22 +58,56 @@ public Mono handle(AddMemberCommand command) { } @Bean - public HandlerRegistry handlerRegistrySubs(DirectAsyncGateway gateway) { + public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/) { return HandlerRegistry.register() - .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) - .listenEvent("fixed.event", message -> Mono.empty(), Object.class) + +// .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) +// .listenEvent("fixed.event", message -> Mono.empty(), Object.class) +// .listenDomainEvent("accounts", "account.created", message -> Mono.empty(), Object.class) +// .listenDomainEvent("deposits", "transfer.xxx", message -> Mono.empty(), Object.class) +// .serveQuery("query1", message -> { +// log.info("resolving from direct query"); +// return just(new RespQuery1("Ok", message)); +// }, Call.class) +// .serveQuery("sample.query.*", message -> { +// log.info("resolving from direct query"); +// return just(new RespQuery1("Ok", message)); +// }, Call.class) +// .serveQuery("query2", (from, message) -> { +// log.info("resolving from delegate query"); +// return gateway.reply(new RespQuery1("Ok", message), from).then(); +// }, Call.class); + +// .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) +// .listenEvent("event", message -> { +// log.info(message.getData().toString()); +// return Mono.empty(); +// }, CloudEvent.class) + .handleCommand("command", message -> { + log.info(message.getData().toString()); + return Mono.empty(); + }, CloudEvent.class) .serveQuery("query1", message -> { - log.info("resolving from direct query"); - return just(new RespQuery1("Ok", message)); - }, Call.class) - .serveQuery("sample.query.*", message -> { - log.info("resolving from direct query"); - return just(new RespQuery1("Ok", message)); - }, Call.class) - .serveQuery("query2", (from, message) -> { + log.info("resolving from direct query" + message); + Map mapData = Map.of("1", "data"); + CloudEvent response = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("query1.response") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(mapData)) + .build(); + return just(response); + }, CloudEvent.class); +// .serveQuery("sample.query.*", message -> { +// log.info("resolving from direct query"); +// return just(new RespQuery1("Ok", message)); +// }, Call.class); + /*.serveQuery("query2", (from, message) -> { log.info("resolving from delegate query"); return gateway.reply(new RespQuery1("Ok", message), from).then(); }, Call.class); +*/ } //@Bean diff --git a/samples/async/receiver-responder/src/main/resources/application.properties b/samples/async/receiver-responder/src/main/resources/application.properties deleted file mode 100644 index 2f677350..00000000 --- a/samples/async/receiver-responder/src/main/resources/application.properties +++ /dev/null @@ -1,2 +0,0 @@ -spring.application.name=receiver -spring.rabbitmq.virtual-host=test diff --git a/samples/async/receiver-responder/src/main/resources/application.yaml b/samples/async/receiver-responder/src/main/resources/application.yaml new file mode 100644 index 00000000..3d282ae3 --- /dev/null +++ b/samples/async/receiver-responder/src/main/resources/application.yaml @@ -0,0 +1,17 @@ +spring: + application: + name: receiver + rabbitmq: + virtual-host: test +app: + async: + max-retries: 10 + with-d-l-q-retry: true + retry-delay: 1000 # son milisegundos + connections: + app: + virtualHost: domain-a +# accounts: +# virtualHost: domain-a +# deposits: +# virtualHost: domain-a diff --git a/samples/async/sender-client/async-sender-client.gradle b/samples/async/sender-client/async-sender-client.gradle index 047f45b2..8c82b9e4 100644 --- a/samples/async/sender-client/async-sender-client.gradle +++ b/samples/async/sender-client/async-sender-client.gradle @@ -1,8 +1,10 @@ apply plugin: 'org.springframework.boot' dependencies { - implementation project(':async-commons-rabbit-starter') + implementation project(':async-commons-rabbit-starter-eda') implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'io.micrometer:micrometer-registry-prometheus' + implementation 'io.cloudevents:cloudevents-core:2.5.0' + } diff --git a/samples/async/sender-client/src/main/java/sample/SampleRestController.java b/samples/async/sender-client/src/main/java/sample/SampleRestController.java index ff29616e..cb365daf 100644 --- a/samples/async/sender-client/src/main/java/sample/SampleRestController.java +++ b/samples/async/sender-client/src/main/java/sample/SampleRestController.java @@ -1,10 +1,15 @@ package sample; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.AsyncQuery; import org.reactivecommons.async.api.DirectAsyncGateway; +import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; @@ -13,11 +18,18 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.UUID; + @RestController public class SampleRestController { @Autowired private DirectAsyncGateway directAsyncGateway; + + @Autowired + private DomainEventBus domainEventBus; private final String queryName = "query1"; private final String queryName2 = "query2"; private final String target = "receiver"; @@ -25,11 +37,45 @@ public class SampleRestController { private final WebClient webClient = WebClient.builder().build(); @PostMapping(path = "/sample", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) - public Mono sampleService(@RequestBody Call call) { - AsyncQuery query = new AsyncQuery<>(queryName, call); - return directAsyncGateway.requestReply(query, target, RespQuery1.class); + public Mono sampleService(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent query = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType(queryName) // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + + return directAsyncGateway.requestReply(query, target, CloudEvent.class, "accounts"); } + @PostMapping(path = "/sample/event", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) + public Mono sampleServiceEvent(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent event = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("event") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + return Mono.from(domainEventBus.emit(event)).thenReturn("event"); + } + + @PostMapping(path = "/sample/command", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) + public Mono sampleServiceCommand(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent command = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("command") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + + return directAsyncGateway.sendCommand(command, target, "accounts").thenReturn("command"); + } @PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono sampleServices(@RequestBody Call call) { AsyncQuery query = new AsyncQuery<>("sample.query.any.that.matches", call); diff --git a/samples/async/sender-client/src/main/resources/application.properties b/samples/async/sender-client/src/main/resources/application.properties deleted file mode 100644 index 5fb947ad..00000000 --- a/samples/async/sender-client/src/main/resources/application.properties +++ /dev/null @@ -1,5 +0,0 @@ -spring.application.name=sender -server.port=4001 -spring.rabbitmq.virtual-host=test -management.endpoint.health.show-details=always -management.endpoints.web.exposure.include=health,prometheus diff --git a/samples/async/sender-client/src/main/resources/application.yaml b/samples/async/sender-client/src/main/resources/application.yaml new file mode 100644 index 00000000..09861b2f --- /dev/null +++ b/samples/async/sender-client/src/main/resources/application.yaml @@ -0,0 +1,24 @@ +spring: + application: + name: sender + rabbitmq: + virtual-host: test +server: + port: 4001 +management: + endpoint: + health: + show-details: always + endpoints: + web: + exposure: + include: health,prometheus +app: + async: + listenRepliesFrom: [accounts] + connections: + app: + virtualHost: / + accounts: + virtualHost: domain-a +