Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,31 @@ app.async.retryDelay=1000
app.async.maxRetries=10

```
### Domain custom Configuration (RabbitMQ)


```
app.async.domain.events.exchange=exchangeCustomName
app.async.domain.events.maxLengthBytes=125000000

```

### Direct custom Configuration (RabbitMQ)


```
app.async.direct.exchange=exchangeCustomName
app.async.direct.maxLengthBytes=125000000
```

### Global custom Configuration (RabbitMQ)


```
app.async.global.exchange=exchangeCustomName
app.async.global.maxLengthBytes=125000000
```

* withDLQRetry: Wheter to enable or not the new Retry DLQ Strategy
* retryDelay: Delay retry value in ms
* maxRetries: Max number of retries in case of error in adition to the one automatic retry per queue.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.reactivecommons.async.impl.config;

import lombok.RequiredArgsConstructor;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.HandlerResolver;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.config.props.AsyncProps;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.listeners.ApplicationCommandListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@RequiredArgsConstructor
@Import(RabbitMqConfig.class)
public class CommandListenersConfig {

@Value("${spring.application.name}")
private String appName;

private final AsyncProps asyncProps;

@Bean
public ApplicationCommandListener applicationCommandListener(ReactiveMessageListener listener,
HandlerResolver resolver, MessageConverter converter,
DiscardNotifier discardNotifier) {
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier);

commandListener.startListener();

return commandListener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.reactivecommons.async.impl.config;

import lombok.RequiredArgsConstructor;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.HandlerResolver;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.config.props.AsyncProps;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@RequiredArgsConstructor
@Import(RabbitMqConfig.class)
public class EventListenersConfig {

@Value("${spring.application.name}")
private String appName;

private final AsyncProps asyncProps;

@Bean
public ApplicationEventListener eventListener(HandlerResolver resolver, MessageConverter messageConverter,
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {

final ApplicationEventListener listener = new ApplicationEventListener(receiver,
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),asyncProps.getDomain().getEvents().getMaxLengthBytes(),
discardNotifier);

listener.startListener();

return listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

@Configuration
@RequiredArgsConstructor
@Deprecated
@Import(RabbitMqConfig.class)
public class MessageListenersConfig {

Expand All @@ -48,14 +49,14 @@ public ApplicationEventListener eventListener(HandlerResolver resolver, MessageC
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),
discardNotifier);
asyncProps.getDomain().getEvents().getMaxLengthBytes(), discardNotifier);
listener.startListener();
return listener;
}

@Bean
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
receiver,
asyncProps.getDomain().getEvents().getExchange(),
Expand All @@ -73,8 +74,8 @@ public ApplicationQueryListener queryListener(MessageConverter converter, Handle
DiscardNotifier discardNotifier) {
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
"globalReply", asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), discardNotifier);
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier);
listener.startListener();
return listener;
}
Expand All @@ -85,7 +86,7 @@ public ApplicationCommandListener applicationCommandListener(ReactiveMessageList
DiscardNotifier discardNotifier) {
ApplicationCommandListener commandListener = new ApplicationCommandListener(listener, appName, resolver,
asyncProps.getDirect().getExchange(), converter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(), discardNotifier);
asyncProps.getRetryDelay(), asyncProps.getDirect().getMaxLengthBytes(), discardNotifier);
commandListener.startListener();
return commandListener;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.reactivecommons.async.impl.config;

import lombok.RequiredArgsConstructor;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.HandlerResolver;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.config.props.AsyncProps;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.listeners.ApplicationEventListener;
import org.reactivecommons.async.impl.listeners.ApplicationNotificationListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@RequiredArgsConstructor
@Import(RabbitMqConfig.class)
public class NotificacionListenersConfig {

@Value("${spring.application.name}")
private String appName;

private final AsyncProps asyncProps;

@Bean
public ApplicationNotificationListener eventNotificationListener(HandlerResolver resolver, MessageConverter messageConverter,
ReactiveMessageListener receiver, DiscardNotifier discardNotifier) {
final ApplicationNotificationListener listener = new ApplicationNotificationListener(
receiver,
asyncProps.getDomain().getEvents().getExchange(),
asyncProps.getNotificationProps().getQueueName(appName),
resolver,
messageConverter,
discardNotifier);
listener.startListener();
return listener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.reactivecommons.async.impl.config;

import lombok.RequiredArgsConstructor;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.HandlerResolver;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
import org.reactivecommons.async.impl.config.props.AsyncProps;
import org.reactivecommons.async.impl.converters.MessageConverter;
import org.reactivecommons.async.impl.listeners.ApplicationQueryListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@RequiredArgsConstructor
@Import(RabbitMqConfig.class)
public class QueryListenerConfig {

@Value("${spring.application.name}")
private String appName;

private final AsyncProps asyncProps;

@Bean
public ApplicationQueryListener queryListener(MessageConverter converter, HandlerResolver resolver,
ReactiveMessageSender sender, ReactiveMessageListener rlistener,
DiscardNotifier discardNotifier) {
final ApplicationQueryListener listener = new ApplicationQueryListener(rlistener,
appName + ".query", resolver, sender, asyncProps.getDirect().getExchange(), converter,
asyncProps.getGlobal().getExchange(), asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(),
asyncProps.getRetryDelay(),asyncProps.getGlobal().getMaxLengthBytes(), discardNotifier);

listener.startListener();

return listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.reactivecommons.api.domain.DomainEventBus;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.RabbitDiscardNotifier;
import org.reactivecommons.async.impl.RabbitDomainEventBus;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.DefaultQueryHandler;
import org.reactivecommons.async.api.DynamicRegistry;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.impl.*;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.communications.ReactiveMessageSender;
import org.reactivecommons.async.impl.communications.TopologyCreator;
Expand All @@ -21,6 +26,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -29,6 +35,9 @@
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;

import static reactor.rabbitmq.ExchangeSpecification.exchange;
Expand Down Expand Up @@ -137,4 +146,61 @@ Mono<Connection> createConnectionMono(ConnectionFactory factory, String connecti
.cache();
}

@Bean
public HandlerResolver resolver(ApplicationContext context, DefaultCommandHandler defaultCommandHandler) {
final Map<String, HandlerRegistry> registries = context.getBeansOfType(HandlerRegistry.class);

final ConcurrentMap<String, RegisteredQueryHandler> handlers = registries
.values().stream()
.flatMap(r -> r.getHandlers().stream())
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
ConcurrentHashMap::putAll);

final ConcurrentMap<String, RegisteredEventListener> eventListeners = registries
.values().stream()
.flatMap(r -> r.getEventListeners().stream())
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
ConcurrentHashMap::putAll);

final ConcurrentMap<String, RegisteredCommandHandler> commandHandlers = registries
.values().stream()
.flatMap(r -> r.getCommandHandlers().stream())
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
ConcurrentHashMap::putAll);

final ConcurrentMap<String, RegisteredEventListener> eventNotificationListener = registries
.values()
.stream()
.flatMap(r -> r.getEventNotificationListener().stream())
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
ConcurrentHashMap::putAll);

return new HandlerResolver(handlers, eventListeners, commandHandlers, eventNotificationListener) {
@Override
@SuppressWarnings("unchecked")
public <T> RegisteredCommandHandler<T> getCommandHandler(String path) {
final RegisteredCommandHandler<T> handler = super.getCommandHandler(path);
return handler != null ? handler : new RegisteredCommandHandler<>("", defaultCommandHandler, Object.class);
}
};
}

@Bean
public DynamicRegistry dynamicRegistry(HandlerResolver resolver, ReactiveMessageListener listener, IBrokerConfigProps props) {
return new DynamicRegistryImp(resolver, listener.getTopologyCreator(), props);
}

@Bean
@ConditionalOnMissingBean
public DefaultQueryHandler defaultHandler() {
return (DefaultQueryHandler<Object, Object>) command ->
Mono.error(new RuntimeException("No Handler Registered"));
}

@Bean
@ConditionalOnMissingBean
public DefaultCommandHandler defaultCommandHandler() {
return message -> Mono.error(new RuntimeException("No Handler Registered"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.impl.config.CommandListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(CommandListenersConfig.class)
@Configuration
public @interface EnableCommandListeners {
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.impl.config.EventListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(EventListenersConfig.class)
@Configuration
public @interface EnableEventListeners {
}



Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.impl.config.MessageListenersConfig;
import org.reactivecommons.async.impl.config.CommandListenersConfig;
import org.reactivecommons.async.impl.config.EventListenersConfig;
import org.reactivecommons.async.impl.config.NotificacionListenersConfig;
import org.reactivecommons.async.impl.config.QueryListenerConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;


/**
* Actualmente se utiliza EnableMessageListeners para habilitar Comandos, querys y eventos al mismo tiempo,
* se han separado en 3 EnableCommandListeners, EnableQueryListeners y EnableEventListeners, estos se pueden utilizar
* todos juntos o de manera individual segun necesidad
* @deprecated Use EnableCommandListeners, EnableQueryListeners, EnableEventListeners
*/
@Deprecated
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(MessageListenersConfig.class)
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificacionListenersConfig.class})
@Configuration
public @interface EnableMessageListeners {
}
Expand Down
Loading