Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions async/async-commons-api/async-commons-api.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ dependencies {
api project(':domain-events-api')
compileOnly 'io.projectreactor:reactor-core'
testImplementation 'io.projectreactor:reactor-test'
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
package org.reactivecommons.async.api;

import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.Command;
import reactor.core.publisher.Mono;

public interface DirectAsyncGateway {
<T> Mono<Void> sendCommand(Command<T> command, String targetName);

<T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain);

Mono<Void> sendCommand(CloudEvent command, String targetName);

Mono<Void> sendCommand(CloudEvent command, String targetName, String domain);

<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type);

<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain);

<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type);

<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain);

<T> Mono<Void> reply(T response, From from);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.reactivecommons.async.api;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -13,24 +16,36 @@

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

@Getter
@NoArgsConstructor(access = AccessLevel.PACKAGE)
public class HandlerRegistry {

private final List<RegisteredEventListener<?>> eventListeners = new CopyOnWriteArrayList<>();
public static final String DEFAULT_DOMAIN = "app";
private final Map<String, List<RegisteredEventListener<?>>> domainEventListeners = new ConcurrentHashMap<>();
private final List<RegisteredEventListener<?>> dynamicEventHandlers = new CopyOnWriteArrayList<>();
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();


public static HandlerRegistry register() {
return new HandlerRegistry();
HandlerRegistry instance = new HandlerRegistry();
instance.domainEventListeners.put(DEFAULT_DOMAIN, new CopyOnWriteArrayList<>());
return instance;
}

public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, EventHandler<T> handler, Class<T> eventClass) {
domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>())
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
return this;
}

public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler, Class<T> eventClass) {
eventListeners.add(new RegisteredEventListener<>(eventName, handler, eventClass));
domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>())
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
return this;
}

Expand Down Expand Up @@ -67,7 +82,21 @@ public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> han
}

public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
if(queryClass == CloudEvent.class){
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) ->
{
CloudEvent query = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(message);

return handler.handle((R) query);

} , byte[].class));
}
else{
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;

class HandlerRegistryTest {
private final HandlerRegistry registry = HandlerRegistry.register();
Expand All @@ -27,7 +28,7 @@ void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {

registry.listenEvent(name, eventHandler);

assertThat(registry.getEventListeners())
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
.containsExactly(name, SomeDataClass.class, eventHandler)).hasSize(1);
Expand All @@ -43,7 +44,7 @@ void shouldRegisterPatternEventHandlerWithTypeInference() {
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);

assertThat(registry.getEventListeners())
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
.usingRecursiveComparison()
.isEqualTo(expectedRegisteredEventListener));
Expand All @@ -62,7 +63,7 @@ void shouldRegisterPatternEventHandler() {
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);

assertThat(registry.getEventListeners())
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
.usingRecursiveComparison()
.isEqualTo(expectedRegisteredEventListener));
Expand All @@ -84,7 +85,7 @@ public void listenEvent() {
EventHandler<SomeDataClass> handler = mock(EventHandler.class);
registry.listenEvent(name, handler, SomeDataClass.class);

assertThat(registry.getEventListeners())
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
.anySatisfy(registered -> assertThat(registered)
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.reactivecommons.async.commons.converters;

import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.async.api.AsyncQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

class KeyMatcherPerformanceWildcardTest {
Expand All @@ -27,13 +32,13 @@ public void setUp() {
File file = new File(classLoader.getResource("wildcard_names_for_matching.txt").getFile());
File file2 = new File(classLoader.getResource("concrete_names_for_matching.txt").getFile());
try {
Set<String> names = new HashSet<>(Files
Set<String> names = new HashSet<>(Files
.readAllLines(Paths.get(file.getAbsolutePath())));
candidates = names.stream()
.collect(Collectors.toMap(name -> name, name -> name));
testList = new ArrayList<>(new HashSet<>(Files
.readAllLines(Paths.get(file2.getAbsolutePath()))));
testResultList = new ArrayList<>(testList.size()*10);
.readAllLines(Paths.get(file2.getAbsolutePath()))));
testResultList = new ArrayList<>(testList.size() * 10);
} catch (IOException e) {
e.printStackTrace();
}
Expand All @@ -43,14 +48,14 @@ public void setUp() {
void keyMatcherLookupShouldPerformInLessThan30Micros() {
final int size = testList.size();
final long init = System.currentTimeMillis();
for (int i = 0; i< size*10; ++i){
testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i%size)));
for (int i = 0; i < size * 10; ++i) {
testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i % size)));
}
final long end = System.currentTimeMillis();


final long total = end - init;
final double microsPerLookup = ((total+0.0)/testResultList.size())*1000;
final double microsPerLookup = ((total + 0.0) / testResultList.size()) * 1000;
System.out.println("Performed Lookups: " + testResultList.size());
System.out.println("Total Execution Time: " + total + "ms");
System.out.println("Microseconds per lookup: " + microsPerLookup + "us");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
ext {
artifactId = 'async-commons-rabbit-starter-eda'
artifactDescription = 'Async Commons Starter EDA'
}

dependencies {
api project(':async-rabbit')
compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'org.springframework.boot:spring-boot-starter-actuator'

annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'

testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.springframework.boot:spring-boot-starter-actuator'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.reactivecommons.async;

import io.micrometer.core.instrument.MeterRegistry;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.rabbit.config.ConnectionManager;

import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;

public class RabbitEDADirectAsyncGateway extends RabbitDirectAsyncGateway {
private final ConnectionManager manager;

public RabbitEDADirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, String exchange, MessageConverter converter, MeterRegistry meterRegistry) {
super(config, router, manager.getSender(DEFAULT_DOMAIN), exchange, converter, meterRegistry);
this.manager = manager;
}

@Override
protected ReactiveMessageSender resolveSender(String domain) {
return manager.getSender(domain);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.CommandListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(CommandListenersConfig.class)
@Configuration
public @interface EnableCommandListeners {
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.DirectAsyncGatewayConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(DirectAsyncGatewayConfig.class)
@Configuration
public @interface EnableDirectAsyncGateway {
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.EventBusConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(EventBusConfig.class)
@Configuration
public @interface EnableDomainEventBus {
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.EventListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(EventListenersConfig.class)
@Configuration
public @interface EnableEventListeners {
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.CommandListenersConfig;
import org.reactivecommons.async.rabbit.config.EventListenersConfig;
import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
import org.reactivecommons.async.rabbit.config.QueryListenerConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;

/**
* This annotation enables all messages listeners (Query, Commands, Events). If you want to enable separately, please use
* EnableCommandListeners, EnableQueryListeners or EnableEventListeners.
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({CommandListenersConfig.class, QueryListenerConfig.class, EventListenersConfig.class, NotificacionListenersConfig.class})
@Configuration
public @interface EnableMessageListeners {
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.NotificacionListenersConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(NotificacionListenersConfig.class)
@Configuration
public @interface EnableNotificationListener {
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.reactivecommons.async.impl.config.annotations;

import org.reactivecommons.async.rabbit.config.QueryListenerConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import(QueryListenerConfig.class)
@Configuration
public @interface EnableQueryListeners {
}



Loading