From edbf27b2f6812bc15f8313d7a3e22a5e7dab2afe Mon Sep 17 00:00:00 2001 From: Juan Carlos Galvis Zuluaga Date: Thu, 18 May 2023 15:03:16 -0500 Subject: [PATCH 1/3] start cloud events --- .../async/api/DirectAsyncGateway.java | 3 +++ async/async-rabbit/async-rabbit.gradle | 1 + .../rabbit/RabbitDirectAsyncGateway.java | 17 ++++++++++++++ .../async/rabbit/RabbitDomainEventBus.java | 6 +++++ .../converters/json/CloudEventBuilderExt.java | 15 +++++++++++++ .../listeners/ApplicationQueryListener.java | 13 ++++++++++- domain/domain-events/domain-events-api.gradle | 1 + .../api/domain/DomainEventBus.java | 3 +++ .../async-receiver-sample.gradle | 1 + .../main/java/sample/SampleReceiverApp.java | 21 ++++++++++++++---- .../sender-client/async-sender-client.gradle | 2 ++ .../java/sample/SampleRestController.java | 22 ++++++++++++++++--- 12 files changed, 97 insertions(+), 8 deletions(-) create mode 100644 async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java index 39ab22f5..49a67ebb 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java @@ -1,10 +1,13 @@ package org.reactivecommons.async.api; +import io.cloudevents.CloudEvent; import org.reactivecommons.api.domain.Command; import reactor.core.publisher.Mono; public interface DirectAsyncGateway { Mono sendCommand(Command command, String targetName); + Mono sendCommand(CloudEvent command, String targetName); Mono requestReply(AsyncQuery query, String targetName, Class type); + Mono requestReply(CloudEvent query, String targetName, Class type); Mono reply(T response, From from); } diff --git a/async/async-rabbit/async-rabbit.gradle b/async/async-rabbit/async-rabbit.gradle index 3ab1f6e7..5694cd10 100644 --- a/async/async-rabbit/async-rabbit.gradle +++ b/async/async-rabbit/async-rabbit.gradle @@ -14,4 +14,5 @@ dependencies { api 'com.rabbitmq:amqp-client' api 'com.fasterxml.jackson.core:jackson-databind' testImplementation 'io.projectreactor:reactor-test' + implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0' } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java index 7e0f25da..6979ba4c 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java @@ -1,5 +1,8 @@ package org.reactivecommons.async.rabbit; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import io.micrometer.core.instrument.MeterRegistry; import org.reactivecommons.api.domain.Command; import org.reactivecommons.async.api.AsyncQuery; @@ -56,6 +59,11 @@ public Mono sendCommand(Command command, String targetName) { return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands); } + @Override + public Mono sendCommand(CloudEvent command, String targetName) { + return sendCommand(new Command<>(command.getType(), command.getId(), command), targetName); + } + public Flux sendCommands(Flux> commands, String targetName) { return sender.sendWithConfirmBatch(commands, exchange, targetName, Collections.emptyMap(), persistentCommands); } @@ -84,6 +92,15 @@ public Mono requestReply(AsyncQuery query, String targetName, Class .tap(Micrometer.metrics(meterRegistry)); } + @Override + public Mono requestReply(CloudEvent query, String targetName, Class type) { + byte[] serialized = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(query); + return requestReply(new AsyncQuery<>(query.getType(), serialized), targetName, type); + } + @Override public Mono reply(T response, From from) { final HashMap headers = new HashMap<>(); diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java index d3e75e18..e03e8e4f 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java @@ -1,7 +1,9 @@ package org.reactivecommons.async.rabbit; +import io.cloudevents.CloudEvent; import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; @@ -30,4 +32,8 @@ public Mono emit(DomainEvent event) { .onErrorMap(err -> new RuntimeException("Event send failure: " + event.getName(), err)); } + @Override + public Publisher emit(CloudEvent event) { + return emit(new DomainEvent<>(event.getType(), event.getId(), event)); + } } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java new file mode 100644 index 00000000..7ae7e162 --- /dev/null +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java @@ -0,0 +1,15 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import lombok.experimental.UtilityClass; + +@UtilityClass +public class CloudEventBuilderExt { + private static final ObjectMapper mapper = new ObjectMapper(); + + @SneakyThrows + public static byte[] asBytes(Object object) { + return mapper.writeValueAsBytes(object); + } +} diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java index 2d5fa136..dca6c722 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java @@ -1,6 +1,9 @@ package org.reactivecommons.async.rabbit.listeners; import com.rabbitmq.client.AMQP; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import lombok.extern.java.Log; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; import org.reactivecommons.async.commons.DiscardNotifier; @@ -149,8 +152,16 @@ protected Function, Mono> enrichPostProcess(Message msg) { final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString(); final HashMap headers = new HashMap<>(); headers.put(CORRELATION_ID, correlationID); + Object response = signal.get(); + if(response instanceof CloudEvent) { + byte[] serialized = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize((CloudEvent) response); + return sender.sendNoConfirm(serialized, replyExchange, replyID, headers, false); + } - return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false); + return sender.sendNoConfirm(response, replyExchange, replyID, headers, false); }); } diff --git a/domain/domain-events/domain-events-api.gradle b/domain/domain-events/domain-events-api.gradle index cef707ef..2279536b 100644 --- a/domain/domain-events/domain-events-api.gradle +++ b/domain/domain-events/domain-events-api.gradle @@ -5,4 +5,5 @@ ext { dependencies { api 'org.reactivestreams:reactive-streams:1.0.4' + api 'io.cloudevents:cloudevents-api:2.5.0' } diff --git a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java index 0c713d73..df697403 100644 --- a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java +++ b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java @@ -1,7 +1,10 @@ package org.reactivecommons.api.domain; +import io.cloudevents.CloudEvent; import org.reactivestreams.Publisher; public interface DomainEventBus { Publisher emit(DomainEvent event); + + Publisher emit(CloudEvent event); } diff --git a/samples/async/receiver-responder/async-receiver-sample.gradle b/samples/async/receiver-responder/async-receiver-sample.gradle index d6bd1c51..a0b30457 100644 --- a/samples/async/receiver-responder/async-receiver-sample.gradle +++ b/samples/async/receiver-responder/async-receiver-sample.gradle @@ -4,4 +4,5 @@ dependencies { implementation project(":async-commons-rabbit-starter") implementation 'org.springframework.boot:spring-boot-starter' runtimeOnly 'org.springframework.boot:spring-boot-devtools' + implementation 'io.cloudevents:cloudevents-core:2.5.0' } diff --git a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java index 6aa12a70..8cb8557e 100644 --- a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java +++ b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java @@ -1,5 +1,7 @@ package sample; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -8,16 +10,20 @@ import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.DirectAsyncGateway; import org.reactivecommons.async.api.HandlerRegistry; -import org.reactivecommons.async.api.handlers.EventHandler; import org.reactivecommons.async.api.handlers.QueryHandler; import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway; import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus; import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners; +import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import reactor.core.publisher.Mono; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.UUID; + import static org.reactivecommons.async.api.HandlerRegistry.register; import static reactor.core.publisher.Mono.just; @@ -53,9 +59,16 @@ public HandlerRegistry handlerRegistrySubs(DirectAsyncGateway gateway) { .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) .listenEvent("fixed.event", message -> Mono.empty(), Object.class) .serveQuery("query1", message -> { - log.info("resolving from direct query"); - return just(new RespQuery1("Ok", message)); - }, Call.class) + log.info("resolving from direct query" + message); + CloudEvent response = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("query1.response") // + .withTime(OffsetDateTime.now()) + .withData("application/json", "result".getBytes()) + .build(); + return just(response); + }, byte[].class) .serveQuery("sample.query.*", message -> { log.info("resolving from direct query"); return just(new RespQuery1("Ok", message)); diff --git a/samples/async/sender-client/async-sender-client.gradle b/samples/async/sender-client/async-sender-client.gradle index 047f45b2..68b28959 100644 --- a/samples/async/sender-client/async-sender-client.gradle +++ b/samples/async/sender-client/async-sender-client.gradle @@ -5,4 +5,6 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'io.micrometer:micrometer-registry-prometheus' + implementation 'io.cloudevents:cloudevents-core:2.5.0' + } diff --git a/samples/async/sender-client/src/main/java/sample/SampleRestController.java b/samples/async/sender-client/src/main/java/sample/SampleRestController.java index ff29616e..d4e620e0 100644 --- a/samples/async/sender-client/src/main/java/sample/SampleRestController.java +++ b/samples/async/sender-client/src/main/java/sample/SampleRestController.java @@ -1,10 +1,14 @@ package sample; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.reactivecommons.async.api.AsyncQuery; import org.reactivecommons.async.api.DirectAsyncGateway; +import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; @@ -13,6 +17,10 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.UUID; + @RestController public class SampleRestController { @@ -25,9 +33,17 @@ public class SampleRestController { private final WebClient webClient = WebClient.builder().build(); @PostMapping(path = "/sample", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) - public Mono sampleService(@RequestBody Call call) { - AsyncQuery query = new AsyncQuery<>(queryName, call); - return directAsyncGateway.requestReply(query, target, RespQuery1.class); + public Mono sampleService(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent query = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType(queryName) // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + + return directAsyncGateway.requestReply(query, target, CloudEvent.class); } @PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) From ed50a370b8c7f46c0fedd196fdb646cbce0b4943 Mon Sep 17 00:00:00 2001 From: Jorge Espinosa Date: Thu, 1 Jun 2023 15:02:12 -0500 Subject: [PATCH 2/3] Modificacion libreria con CloudEvents --- .../async-commons-api.gradle | 2 + .../async/api/HandlerRegistry.java | 20 ++++- .../commons/converters/MessageConverter.java | 1 + .../rabbit/RabbitDirectAsyncGateway.java | 6 +- .../json/JacksonMessageConverter.java | 90 ++++++++++++++++++- .../main/java/sample/SampleReceiverApp.java | 15 +++- .../src/main/resources/application.properties | 2 +- .../java/sample/SampleRestController.java | 30 +++++++ .../src/main/resources/application.properties | 1 - 9 files changed, 153 insertions(+), 14 deletions(-) diff --git a/async/async-commons-api/async-commons-api.gradle b/async/async-commons-api/async-commons-api.gradle index 3e79429d..e1a26241 100644 --- a/async/async-commons-api/async-commons-api.gradle +++ b/async/async-commons-api/async-commons-api.gradle @@ -7,4 +7,6 @@ dependencies { api project(':domain-events-api') compileOnly 'io.projectreactor:reactor-core' testImplementation 'io.projectreactor:reactor-test' + implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0' + } diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java index aca8879d..3e306efd 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java @@ -1,5 +1,8 @@ package org.reactivecommons.async.api; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -25,6 +28,7 @@ public class HandlerRegistry { private final List> handlers = new CopyOnWriteArrayList<>(); private final List> commandHandlers = new CopyOnWriteArrayList<>(); + public static HandlerRegistry register() { return new HandlerRegistry(); } @@ -67,7 +71,21 @@ public HandlerRegistry serveQuery(String resource, QueryHandler han } public HandlerRegistry serveQuery(String resource, QueryHandler handler, Class queryClass) { - handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass)); + if(queryClass == CloudEvent.class){ + handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> + { + CloudEvent query = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(message); + + return handler.handle((R) query); + + } , byte[].class)); + } + else{ + handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass)); + } return this; } diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java index 85ea576c..2202ab5b 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java @@ -1,5 +1,6 @@ package org.reactivecommons.async.commons.converters; +import io.cloudevents.CloudEvent; import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.async.api.AsyncQuery; diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java index 6979ba4c..ccd3c16d 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java @@ -94,11 +94,7 @@ public Mono requestReply(AsyncQuery query, String targetName, Class @Override public Mono requestReply(CloudEvent query, String targetName, Class type) { - byte[] serialized = EventFormatProvider - .getInstance() - .resolveFormat(JsonFormat.CONTENT_TYPE) - .serialize(query); - return requestReply(new AsyncQuery<>(query.getType(), serialized), targetName, type); + return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type); } @Override diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java index bd3a6539..b2a54bd3 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java @@ -1,7 +1,11 @@ package org.reactivecommons.async.rabbit.converters.json; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import lombok.Data; import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEvent; @@ -13,6 +17,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Base64; public class JacksonMessageConverter implements MessageConverter { private static final String CONTENT_TYPE = "application/json"; @@ -28,7 +33,7 @@ public JacksonMessageConverter(ObjectMapper objectMapper) { public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { try { final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); - final T value = objectMapper.treeToValue(asyncQueryJson.getQueryData(), bodyClass); + T value = extractData(bodyClass, asyncQueryJson.getQueryData()); return new AsyncQuery<>(asyncQueryJson.getResource(), value); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -39,7 +44,9 @@ public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { public DomainEvent readDomainEvent(Message message, Class bodyClass) { try { final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class); - final T value = objectMapper.treeToValue(domainEventJson.getData(), bodyClass); + + T value = extractData(bodyClass, domainEventJson.getData()); + return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -50,7 +57,7 @@ public DomainEvent readDomainEvent(Message message, Class bodyClass) { public Command readCommand(Message message, Class bodyClass) { try { final CommandJson commandJson = readValue(message, CommandJson.class); - final T value = objectMapper.treeToValue(commandJson.getData(), bodyClass); + T value = extractData(bodyClass, commandJson.getData()); return new Command<>(commandJson.getName(), commandJson.getCommandId(), value); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -60,6 +67,14 @@ public Command readCommand(Message message, Class bodyClass) { @Override public T readValue(Message message, Class valueClass) { try { + if(valueClass == CloudEvent.class){ + + return (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(objectMapper.readValue(message.getBody(), byte[].class)); + + } return objectMapper.readValue(message.getBody(), valueClass); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -87,10 +102,62 @@ public AsyncQuery readAsyncQueryStructure(Message message) { return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData()); } + + public Message commandToMessage(Command object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data)); + } + + + public Message eventToMessage(DomainEvent object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data)); + } + + + public Message queryToMessage(AsyncQuery object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getQueryData()); + + return getRabbitMessage(new AsyncQuery<>(object.getResource(), data)); + } @Override public Message toMessage(Object object) { + byte[] bytes; + if(object instanceof DomainEvent + && ((DomainEvent) object).getData() instanceof CloudEvent){ + + return eventToMessage((DomainEvent) object); + + } + if(object instanceof Command + && ((Command) object).getData() instanceof CloudEvent){ + + return commandToMessage((Command) object); + } + if(object instanceof AsyncQuery + && ((AsyncQuery) object).getQueryData() instanceof CloudEvent){ + + return queryToMessage((AsyncQuery) object); + } + + return getRabbitMessage(object); + } + + private RabbitMessage getRabbitMessage(Object object) { byte[] bytes; try { + String jsonString = this.objectMapper.writeValueAsString(object); bytes = jsonString.getBytes(StandardCharsets.UTF_8); } catch (IOException e) { @@ -103,6 +170,23 @@ public Message toMessage(Object object) { return new RabbitMessage(bytes, props); } + private T extractData(Class bodyClass, JsonNode node) throws JsonProcessingException { + T value; + if(bodyClass == CloudEvent.class){ + + value = (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(Base64.getDecoder() + .decode(node.asText())); + + } + else{ + value = objectMapper.treeToValue(node, bodyClass); + } + return value; + } + @Data private static class AsyncQueryJson { private String resource; diff --git a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java index 8cb8557e..94e47e60 100644 --- a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java +++ b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java @@ -22,6 +22,7 @@ import java.net.URI; import java.time.OffsetDateTime; +import java.util.Map; import java.util.UUID; import static org.reactivecommons.async.api.HandlerRegistry.register; @@ -57,18 +58,26 @@ public Mono handle(AddMemberCommand command) { public HandlerRegistry handlerRegistrySubs(DirectAsyncGateway gateway) { return HandlerRegistry.register() .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) - .listenEvent("fixed.event", message -> Mono.empty(), Object.class) + .listenEvent("event", message -> { + log.info(message.getData().toString()); + return Mono.empty(); + }, CloudEvent.class) + .handleCommand("command", message -> { + log.info(message.getData().toString()); + return Mono.empty(); + }, CloudEvent.class) .serveQuery("query1", message -> { log.info("resolving from direct query" + message); + Map mapData = Map.of("1", "data"); CloudEvent response = CloudEventBuilder.v1() // .withId(UUID.randomUUID().toString()) // .withSource(URI.create("https://spring.io/foos"))// .withType("query1.response") // .withTime(OffsetDateTime.now()) - .withData("application/json", "result".getBytes()) + .withData("application/json", CloudEventBuilderExt.asBytes(mapData)) .build(); return just(response); - }, byte[].class) + }, CloudEvent.class) .serveQuery("sample.query.*", message -> { log.info("resolving from direct query"); return just(new RespQuery1("Ok", message)); diff --git a/samples/async/receiver-responder/src/main/resources/application.properties b/samples/async/receiver-responder/src/main/resources/application.properties index 2f677350..5b074d10 100644 --- a/samples/async/receiver-responder/src/main/resources/application.properties +++ b/samples/async/receiver-responder/src/main/resources/application.properties @@ -1,2 +1,2 @@ spring.application.name=receiver -spring.rabbitmq.virtual-host=test + diff --git a/samples/async/sender-client/src/main/java/sample/SampleRestController.java b/samples/async/sender-client/src/main/java/sample/SampleRestController.java index d4e620e0..ef5ae3fe 100644 --- a/samples/async/sender-client/src/main/java/sample/SampleRestController.java +++ b/samples/async/sender-client/src/main/java/sample/SampleRestController.java @@ -6,6 +6,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.AsyncQuery; import org.reactivecommons.async.api.DirectAsyncGateway; import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; @@ -26,6 +27,9 @@ public class SampleRestController { @Autowired private DirectAsyncGateway directAsyncGateway; + + @Autowired + private DomainEventBus domainEventBus; private final String queryName = "query1"; private final String queryName2 = "query2"; private final String target = "receiver"; @@ -45,7 +49,33 @@ public Mono sampleService(@RequestBody Call call) throws JsonProcess return directAsyncGateway.requestReply(query, target, CloudEvent.class); } + @PostMapping(path = "/sample/event", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) + public Mono sampleServiceEvent(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent event = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("event") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + + return Mono.from(domainEventBus.emit(event)).thenReturn("event"); + } + @PostMapping(path = "/sample/command", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) + public Mono sampleServiceCommand(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent command = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("command") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + + return directAsyncGateway.sendCommand(command, target).thenReturn("command"); + } @PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono sampleServices(@RequestBody Call call) { AsyncQuery query = new AsyncQuery<>("sample.query.any.that.matches", call); diff --git a/samples/async/sender-client/src/main/resources/application.properties b/samples/async/sender-client/src/main/resources/application.properties index 5fb947ad..166d9f7b 100644 --- a/samples/async/sender-client/src/main/resources/application.properties +++ b/samples/async/sender-client/src/main/resources/application.properties @@ -1,5 +1,4 @@ spring.application.name=sender server.port=4001 -spring.rabbitmq.virtual-host=test management.endpoint.health.show-details=always management.endpoints.web.exposure.include=health,prometheus From a7f59ac9a8c4668fe2ec75656347ad6f43ea98dd Mon Sep 17 00:00:00 2001 From: Jorge Espinosa Date: Thu, 8 Jun 2023 11:32:32 -0500 Subject: [PATCH 3/3] Refinamiento codigo --- .../async/rabbit/config/RabbitMqConfig.java | 3 +- .../JacksonCloudEventMessageConverter.java | 209 ++++++++++++++++++ .../json/JacksonMessageConverter.java | 90 +------- 3 files changed, 214 insertions(+), 88 deletions(-) create mode 100644 async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java index 4b43916d..f42a981b 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java @@ -26,6 +26,7 @@ import org.reactivecommons.async.rabbit.communications.TopologyCreator; import org.reactivecommons.async.rabbit.config.props.AsyncProps; import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps; +import org.reactivecommons.async.rabbit.converters.json.JacksonCloudEventMessageConverter; import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -156,7 +157,7 @@ public ObjectMapperSupplier objectMapperSupplier() { @Bean @ConditionalOnMissingBean public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) { - return new JacksonMessageConverter(objectMapperSupplier.get()); + return new JacksonCloudEventMessageConverter(objectMapperSupplier.get()); } @Bean diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java new file mode 100644 index 00000000..08f8b092 --- /dev/null +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java @@ -0,0 +1,209 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; +import lombok.Data; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.exceptions.MessageConversionException; +import org.reactivecommons.async.rabbit.RabbitMessage; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +public class JacksonCloudEventMessageConverter implements MessageConverter { + private static final String CONTENT_TYPE = "application/json"; + + private final ObjectMapper objectMapper; + + + public JacksonCloudEventMessageConverter(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { + try { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + T value = extractData(bodyClass, asyncQueryJson.getQueryData()); + return new AsyncQuery<>(asyncQueryJson.getResource(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public DomainEvent readDomainEvent(Message message, Class bodyClass) { + try { + final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class); + + T value = extractData(bodyClass, domainEventJson.getData()); + + return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public Command readCommand(Message message, Class bodyClass) { + try { + final CommandJson commandJson = readValue(message, CommandJson.class); + T value = extractData(bodyClass, commandJson.getData()); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public T readValue(Message message, Class valueClass) { + try { + if(valueClass == CloudEvent.class){ + + return (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(objectMapper.readValue(message.getBody(), byte[].class)); + + } + return objectMapper.readValue(message.getBody(), valueClass); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public Command readCommandStructure(Message message) { + final CommandJson commandJson = readValue(message, CommandJson.class); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T) commandJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public DomainEvent readDomainEventStructure(Message message) { + final DomainEventJson eventJson = readValue(message, DomainEventJson.class); + return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T) eventJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public AsyncQuery readAsyncQueryStructure(Message message) { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData()); + } + + + public Message commandToMessage(Command object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data)); + } + + + public Message eventToMessage(DomainEvent object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data)); + } + + + public Message queryToMessage(AsyncQuery object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getQueryData()); + + return getRabbitMessage(new AsyncQuery<>(object.getResource(), data)); + } + @Override + public Message toMessage(Object object) { + byte[] bytes; + if(object instanceof DomainEvent + && ((DomainEvent) object).getData() instanceof CloudEvent){ + + return eventToMessage((DomainEvent) object); + + } + if(object instanceof Command + && ((Command) object).getData() instanceof CloudEvent){ + + return commandToMessage((Command) object); + } + if(object instanceof AsyncQuery + && ((AsyncQuery) object).getQueryData() instanceof CloudEvent){ + + return queryToMessage((AsyncQuery) object); + } + + return getRabbitMessage(object); + } + + private RabbitMessage getRabbitMessage(Object object) { + byte[] bytes; + try { + + String jsonString = this.objectMapper.writeValueAsString(object); + bytes = jsonString.getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + RabbitMessage.RabbitMessageProperties props = new RabbitMessage.RabbitMessageProperties(); + props.setContentType(CONTENT_TYPE); + props.setContentEncoding(StandardCharsets.UTF_8.name()); + props.setContentLength(bytes.length); + return new RabbitMessage(bytes, props); + } + + private T extractData(Class bodyClass, JsonNode node) throws JsonProcessingException { + T value; + if(bodyClass == CloudEvent.class){ + + value = (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(Base64.getDecoder() + .decode(node.asText())); + + } + else{ + value = objectMapper.treeToValue(node, bodyClass); + } + return value; + } + + @Data + private static class AsyncQueryJson { + private String resource; + private JsonNode queryData; + } + + @Data + private static class DomainEventJson { + private String name; + private String eventId; + private JsonNode data; + } + + @Data + private static class CommandJson { + private String name; + private String commandId; + private JsonNode data; + } +} diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java index b2a54bd3..bd3a6539 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonMessageConverter.java @@ -1,11 +1,7 @@ package org.reactivecommons.async.rabbit.converters.json; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import io.cloudevents.CloudEvent; -import io.cloudevents.core.provider.EventFormatProvider; -import io.cloudevents.jackson.JsonFormat; import lombok.Data; import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEvent; @@ -17,7 +13,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Base64; public class JacksonMessageConverter implements MessageConverter { private static final String CONTENT_TYPE = "application/json"; @@ -33,7 +28,7 @@ public JacksonMessageConverter(ObjectMapper objectMapper) { public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { try { final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); - T value = extractData(bodyClass, asyncQueryJson.getQueryData()); + final T value = objectMapper.treeToValue(asyncQueryJson.getQueryData(), bodyClass); return new AsyncQuery<>(asyncQueryJson.getResource(), value); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -44,9 +39,7 @@ public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { public DomainEvent readDomainEvent(Message message, Class bodyClass) { try { final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class); - - T value = extractData(bodyClass, domainEventJson.getData()); - + final T value = objectMapper.treeToValue(domainEventJson.getData(), bodyClass); return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -57,7 +50,7 @@ public DomainEvent readDomainEvent(Message message, Class bodyClass) { public Command readCommand(Message message, Class bodyClass) { try { final CommandJson commandJson = readValue(message, CommandJson.class); - T value = extractData(bodyClass, commandJson.getData()); + final T value = objectMapper.treeToValue(commandJson.getData(), bodyClass); return new Command<>(commandJson.getName(), commandJson.getCommandId(), value); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -67,14 +60,6 @@ public Command readCommand(Message message, Class bodyClass) { @Override public T readValue(Message message, Class valueClass) { try { - if(valueClass == CloudEvent.class){ - - return (T) EventFormatProvider - .getInstance() - .resolveFormat(JsonFormat.CONTENT_TYPE) - .deserialize(objectMapper.readValue(message.getBody(), byte[].class)); - - } return objectMapper.readValue(message.getBody(), valueClass); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); @@ -102,62 +87,10 @@ public AsyncQuery readAsyncQueryStructure(Message message) { return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData()); } - - public Message commandToMessage(Command object) { - byte[] data = EventFormatProvider - .getInstance() - .resolveFormat(JsonFormat.CONTENT_TYPE) - .serialize(object.getData()); - - return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data)); - } - - - public Message eventToMessage(DomainEvent object) { - byte[] data = EventFormatProvider - .getInstance() - .resolveFormat(JsonFormat.CONTENT_TYPE) - .serialize(object.getData()); - - return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data)); - } - - - public Message queryToMessage(AsyncQuery object) { - byte[] data = EventFormatProvider - .getInstance() - .resolveFormat(JsonFormat.CONTENT_TYPE) - .serialize(object.getQueryData()); - - return getRabbitMessage(new AsyncQuery<>(object.getResource(), data)); - } @Override public Message toMessage(Object object) { - byte[] bytes; - if(object instanceof DomainEvent - && ((DomainEvent) object).getData() instanceof CloudEvent){ - - return eventToMessage((DomainEvent) object); - - } - if(object instanceof Command - && ((Command) object).getData() instanceof CloudEvent){ - - return commandToMessage((Command) object); - } - if(object instanceof AsyncQuery - && ((AsyncQuery) object).getQueryData() instanceof CloudEvent){ - - return queryToMessage((AsyncQuery) object); - } - - return getRabbitMessage(object); - } - - private RabbitMessage getRabbitMessage(Object object) { byte[] bytes; try { - String jsonString = this.objectMapper.writeValueAsString(object); bytes = jsonString.getBytes(StandardCharsets.UTF_8); } catch (IOException e) { @@ -170,23 +103,6 @@ private RabbitMessage getRabbitMessage(Object object) { return new RabbitMessage(bytes, props); } - private T extractData(Class bodyClass, JsonNode node) throws JsonProcessingException { - T value; - if(bodyClass == CloudEvent.class){ - - value = (T) EventFormatProvider - .getInstance() - .resolveFormat(JsonFormat.CONTENT_TYPE) - .deserialize(Base64.getDecoder() - .decode(node.asText())); - - } - else{ - value = objectMapper.treeToValue(node, bodyClass); - } - return value; - } - @Data private static class AsyncQueryJson { private String resource;