diff --git a/async/async-commons-api/async-commons-api.gradle b/async/async-commons-api/async-commons-api.gradle index 3e79429d..e1a26241 100644 --- a/async/async-commons-api/async-commons-api.gradle +++ b/async/async-commons-api/async-commons-api.gradle @@ -7,4 +7,6 @@ dependencies { api project(':domain-events-api') compileOnly 'io.projectreactor:reactor-core' testImplementation 'io.projectreactor:reactor-test' + implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0' + } diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java index 39ab22f5..49a67ebb 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/DirectAsyncGateway.java @@ -1,10 +1,13 @@ package org.reactivecommons.async.api; +import io.cloudevents.CloudEvent; import org.reactivecommons.api.domain.Command; import reactor.core.publisher.Mono; public interface DirectAsyncGateway { Mono sendCommand(Command command, String targetName); + Mono sendCommand(CloudEvent command, String targetName); Mono requestReply(AsyncQuery query, String targetName, Class type); + Mono requestReply(CloudEvent query, String targetName, Class type); Mono reply(T response, From from); } diff --git a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java index 325cd5f2..3c281028 100644 --- a/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java +++ b/async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java @@ -1,5 +1,8 @@ package org.reactivecommons.async.api; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -26,6 +29,7 @@ public class HandlerRegistry { private final List> handlers = new CopyOnWriteArrayList<>(); private final List> commandHandlers = new CopyOnWriteArrayList<>(); + public static HandlerRegistry register() { return new HandlerRegistry(); } @@ -75,7 +79,21 @@ public HandlerRegistry serveQuery(String resource, QueryHandler han } public HandlerRegistry serveQuery(String resource, QueryHandler handler, Class queryClass) { - handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass)); + if(queryClass == CloudEvent.class){ + handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> + { + CloudEvent query = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(message); + + return handler.handle((R) query); + + } , byte[].class)); + } + else{ + handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass)); + } return this; } diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java index 85ea576c..2202ab5b 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java @@ -1,5 +1,6 @@ package org.reactivecommons.async.commons.converters; +import io.cloudevents.CloudEvent; import org.reactivecommons.api.domain.Command; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.async.api.AsyncQuery; diff --git a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java index 4b43916d..f42a981b 100644 --- a/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java +++ b/async/async-rabbit-starter-eda/src/main/java/org/reactivecommons/async/rabbit/config/RabbitMqConfig.java @@ -26,6 +26,7 @@ import org.reactivecommons.async.rabbit.communications.TopologyCreator; import org.reactivecommons.async.rabbit.config.props.AsyncProps; import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps; +import org.reactivecommons.async.rabbit.converters.json.JacksonCloudEventMessageConverter; import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -156,7 +157,7 @@ public ObjectMapperSupplier objectMapperSupplier() { @Bean @ConditionalOnMissingBean public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) { - return new JacksonMessageConverter(objectMapperSupplier.get()); + return new JacksonCloudEventMessageConverter(objectMapperSupplier.get()); } @Bean diff --git a/async/async-rabbit/async-rabbit.gradle b/async/async-rabbit/async-rabbit.gradle index 3ab1f6e7..5694cd10 100644 --- a/async/async-rabbit/async-rabbit.gradle +++ b/async/async-rabbit/async-rabbit.gradle @@ -14,4 +14,5 @@ dependencies { api 'com.rabbitmq:amqp-client' api 'com.fasterxml.jackson.core:jackson-databind' testImplementation 'io.projectreactor:reactor-test' + implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0' } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java index 7e0f25da..ccd3c16d 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDirectAsyncGateway.java @@ -1,5 +1,8 @@ package org.reactivecommons.async.rabbit; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import io.micrometer.core.instrument.MeterRegistry; import org.reactivecommons.api.domain.Command; import org.reactivecommons.async.api.AsyncQuery; @@ -56,6 +59,11 @@ public Mono sendCommand(Command command, String targetName) { return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands); } + @Override + public Mono sendCommand(CloudEvent command, String targetName) { + return sendCommand(new Command<>(command.getType(), command.getId(), command), targetName); + } + public Flux sendCommands(Flux> commands, String targetName) { return sender.sendWithConfirmBatch(commands, exchange, targetName, Collections.emptyMap(), persistentCommands); } @@ -84,6 +92,11 @@ public Mono requestReply(AsyncQuery query, String targetName, Class .tap(Micrometer.metrics(meterRegistry)); } + @Override + public Mono requestReply(CloudEvent query, String targetName, Class type) { + return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type); + } + @Override public Mono reply(T response, From from) { final HashMap headers = new HashMap<>(); diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java index 6caadf9c..57f99a9d 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/RabbitDomainEventBus.java @@ -1,7 +1,9 @@ package org.reactivecommons.async.rabbit; +import io.cloudevents.CloudEvent; import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; import org.reactivecommons.async.commons.config.BrokerConfig; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import org.reactivecommons.api.domain.DomainEvent; import org.reactivecommons.api.domain.DomainEventBus; @@ -31,8 +33,8 @@ public Mono emit(DomainEvent event) { } @Override - public Mono emit(CloudEvent event) { - return emit(new DomainEvent(event.getName(), event.getId(), event)) + public Publisher emit(CloudEvent event) { + return emit(new DomainEvent<>(event.getType(), event.getId(), event)); } } diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java new file mode 100644 index 00000000..7ae7e162 --- /dev/null +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/CloudEventBuilderExt.java @@ -0,0 +1,15 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import lombok.experimental.UtilityClass; + +@UtilityClass +public class CloudEventBuilderExt { + private static final ObjectMapper mapper = new ObjectMapper(); + + @SneakyThrows + public static byte[] asBytes(Object object) { + return mapper.writeValueAsBytes(object); + } +} diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java new file mode 100644 index 00000000..08f8b092 --- /dev/null +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/converters/json/JacksonCloudEventMessageConverter.java @@ -0,0 +1,209 @@ +package org.reactivecommons.async.rabbit.converters.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; +import lombok.Data; +import org.reactivecommons.api.domain.Command; +import org.reactivecommons.api.domain.DomainEvent; +import org.reactivecommons.async.api.AsyncQuery; +import org.reactivecommons.async.commons.communications.Message; +import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.commons.exceptions.MessageConversionException; +import org.reactivecommons.async.rabbit.RabbitMessage; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +public class JacksonCloudEventMessageConverter implements MessageConverter { + private static final String CONTENT_TYPE = "application/json"; + + private final ObjectMapper objectMapper; + + + public JacksonCloudEventMessageConverter(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Override + public AsyncQuery readAsyncQuery(Message message, Class bodyClass) { + try { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + T value = extractData(bodyClass, asyncQueryJson.getQueryData()); + return new AsyncQuery<>(asyncQueryJson.getResource(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public DomainEvent readDomainEvent(Message message, Class bodyClass) { + try { + final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class); + + T value = extractData(bodyClass, domainEventJson.getData()); + + return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public Command readCommand(Message message, Class bodyClass) { + try { + final CommandJson commandJson = readValue(message, CommandJson.class); + T value = extractData(bodyClass, commandJson.getData()); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), value); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + public T readValue(Message message, Class valueClass) { + try { + if(valueClass == CloudEvent.class){ + + return (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(objectMapper.readValue(message.getBody(), byte[].class)); + + } + return objectMapper.readValue(message.getBody(), valueClass); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public Command readCommandStructure(Message message) { + final CommandJson commandJson = readValue(message, CommandJson.class); + return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T) commandJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public DomainEvent readDomainEventStructure(Message message) { + final DomainEventJson eventJson = readValue(message, DomainEventJson.class); + return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T) eventJson.getData()); + } + + @Override + @SuppressWarnings("unchecked") + public AsyncQuery readAsyncQueryStructure(Message message) { + final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class); + return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData()); + } + + + public Message commandToMessage(Command object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data)); + } + + + public Message eventToMessage(DomainEvent object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getData()); + + return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data)); + } + + + public Message queryToMessage(AsyncQuery object) { + byte[] data = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize(object.getQueryData()); + + return getRabbitMessage(new AsyncQuery<>(object.getResource(), data)); + } + @Override + public Message toMessage(Object object) { + byte[] bytes; + if(object instanceof DomainEvent + && ((DomainEvent) object).getData() instanceof CloudEvent){ + + return eventToMessage((DomainEvent) object); + + } + if(object instanceof Command + && ((Command) object).getData() instanceof CloudEvent){ + + return commandToMessage((Command) object); + } + if(object instanceof AsyncQuery + && ((AsyncQuery) object).getQueryData() instanceof CloudEvent){ + + return queryToMessage((AsyncQuery) object); + } + + return getRabbitMessage(object); + } + + private RabbitMessage getRabbitMessage(Object object) { + byte[] bytes; + try { + + String jsonString = this.objectMapper.writeValueAsString(object); + bytes = jsonString.getBytes(StandardCharsets.UTF_8); + } catch (IOException e) { + throw new MessageConversionException("Failed to convert Message content", e); + } + RabbitMessage.RabbitMessageProperties props = new RabbitMessage.RabbitMessageProperties(); + props.setContentType(CONTENT_TYPE); + props.setContentEncoding(StandardCharsets.UTF_8.name()); + props.setContentLength(bytes.length); + return new RabbitMessage(bytes, props); + } + + private T extractData(Class bodyClass, JsonNode node) throws JsonProcessingException { + T value; + if(bodyClass == CloudEvent.class){ + + value = (T) EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .deserialize(Base64.getDecoder() + .decode(node.asText())); + + } + else{ + value = objectMapper.treeToValue(node, bodyClass); + } + return value; + } + + @Data + private static class AsyncQueryJson { + private String resource; + private JsonNode queryData; + } + + @Data + private static class DomainEventJson { + private String name; + private String eventId; + private JsonNode data; + } + + @Data + private static class CommandJson { + private String name; + private String commandId; + private JsonNode data; + } +} diff --git a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java index 2d5fa136..dca6c722 100644 --- a/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java +++ b/async/async-rabbit/src/main/java/org/reactivecommons/async/rabbit/listeners/ApplicationQueryListener.java @@ -1,6 +1,9 @@ package org.reactivecommons.async.rabbit.listeners; import com.rabbitmq.client.AMQP; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.jackson.JsonFormat; import lombok.extern.java.Log; import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler; import org.reactivecommons.async.commons.DiscardNotifier; @@ -149,8 +152,16 @@ protected Function, Mono> enrichPostProcess(Message msg) { final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString(); final HashMap headers = new HashMap<>(); headers.put(CORRELATION_ID, correlationID); + Object response = signal.get(); + if(response instanceof CloudEvent) { + byte[] serialized = EventFormatProvider + .getInstance() + .resolveFormat(JsonFormat.CONTENT_TYPE) + .serialize((CloudEvent) response); + return sender.sendNoConfirm(serialized, replyExchange, replyID, headers, false); + } - return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false); + return sender.sendNoConfirm(response, replyExchange, replyID, headers, false); }); } diff --git a/domain/domain-events/domain-events-api.gradle b/domain/domain-events/domain-events-api.gradle index cef707ef..2279536b 100644 --- a/domain/domain-events/domain-events-api.gradle +++ b/domain/domain-events/domain-events-api.gradle @@ -5,4 +5,5 @@ ext { dependencies { api 'org.reactivestreams:reactive-streams:1.0.4' + api 'io.cloudevents:cloudevents-api:2.5.0' } diff --git a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java index 29bb17af..5e166bd8 100644 --- a/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java +++ b/domain/domain-events/src/main/java/org/reactivecommons/api/domain/DomainEventBus.java @@ -1,8 +1,11 @@ package org.reactivecommons.api.domain; +import io.cloudevents.CloudEvent; import org.reactivestreams.Publisher; public interface DomainEventBus { Publisher emit(DomainEvent event); - Publisher emit(CloudEvent event); + + Publisher emit(CloudEvent event); + } diff --git a/samples/async/receiver-responder/async-receiver-sample.gradle b/samples/async/receiver-responder/async-receiver-sample.gradle index 0e24f4c0..2675840d 100644 --- a/samples/async/receiver-responder/async-receiver-sample.gradle +++ b/samples/async/receiver-responder/async-receiver-sample.gradle @@ -4,4 +4,5 @@ dependencies { implementation project(":async-commons-rabbit-starter-eda") implementation 'org.springframework.boot:spring-boot-starter' runtimeOnly 'org.springframework.boot:spring-boot-devtools' + implementation 'io.cloudevents:cloudevents-core:2.5.0' } diff --git a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java index bdc0ca1e..494cefc3 100644 --- a/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java +++ b/samples/async/receiver-responder/src/main/java/sample/SampleReceiverApp.java @@ -1,5 +1,7 @@ package sample; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.RequiredArgsConstructor; @@ -8,17 +10,22 @@ import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.DirectAsyncGateway; import org.reactivecommons.async.api.HandlerRegistry; -import org.reactivecommons.async.api.handlers.EventHandler; import org.reactivecommons.async.api.handlers.QueryHandler; import org.reactivecommons.async.impl.config.annotations.EnableDirectAsyncGateway; import org.reactivecommons.async.impl.config.annotations.EnableDomainEventBus; import org.reactivecommons.async.impl.config.annotations.EnableEventListeners; import org.reactivecommons.async.impl.config.annotations.EnableMessageListeners; +import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import reactor.core.publisher.Mono; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.UUID; + import static org.reactivecommons.async.api.HandlerRegistry.register; import static reactor.core.publisher.Mono.just; @@ -51,10 +58,11 @@ public Mono handle(AddMemberCommand command) { @Bean public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/) { return HandlerRegistry.register() + // .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) .listenEvent("fixed.event", message -> Mono.empty(), Object.class) .listenDomainEvent("accounts", "account.created", message -> Mono.empty(), Object.class) - .listenDomainEvent("deposits", "transfer.xxx", message -> Mono.empty(), Object.class); + .listenDomainEvent("deposits", "transfer.xxx", message -> Mono.empty(), Object.class) // .serveQuery("query1", message -> { // log.info("resolving from direct query"); // return just(new RespQuery1("Ok", message)); @@ -67,6 +75,37 @@ public HandlerRegistry handlerRegistrySubs(/*DirectAsyncGateway gateway*/) { // log.info("resolving from delegate query"); // return gateway.reply(new RespQuery1("Ok", message), from).then(); // }, Call.class); + + .handleDynamicEvents("dynamic.*", message -> Mono.empty(), Object.class) + .listenEvent("event", message -> { + log.info(message.getData().toString()); + return Mono.empty(); + }, CloudEvent.class) + .handleCommand("command", message -> { + log.info(message.getData().toString()); + return Mono.empty(); + }, CloudEvent.class) + .serveQuery("query1", message -> { + log.info("resolving from direct query" + message); + Map mapData = Map.of("1", "data"); + CloudEvent response = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("query1.response") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(mapData)) + .build(); + return just(response); + }, CloudEvent.class) + .serveQuery("sample.query.*", message -> { + log.info("resolving from direct query"); + return just(new RespQuery1("Ok", message)); + }, Call.class); + /*.serveQuery("query2", (from, message) -> { + log.info("resolving from delegate query"); + return gateway.reply(new RespQuery1("Ok", message), from).then(); + }, Call.class); +*/ } //@Bean diff --git a/samples/async/sender-client/async-sender-client.gradle b/samples/async/sender-client/async-sender-client.gradle index 67a921a3..8c82b9e4 100644 --- a/samples/async/sender-client/async-sender-client.gradle +++ b/samples/async/sender-client/async-sender-client.gradle @@ -5,4 +5,6 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'io.micrometer:micrometer-registry-prometheus' + implementation 'io.cloudevents:cloudevents-core:2.5.0' + } diff --git a/samples/async/sender-client/src/main/java/sample/SampleRestController.java b/samples/async/sender-client/src/main/java/sample/SampleRestController.java index ff29616e..ef5ae3fe 100644 --- a/samples/async/sender-client/src/main/java/sample/SampleRestController.java +++ b/samples/async/sender-client/src/main/java/sample/SampleRestController.java @@ -1,10 +1,15 @@ package sample; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.api.AsyncQuery; import org.reactivecommons.async.api.DirectAsyncGateway; +import org.reactivecommons.async.rabbit.converters.json.CloudEventBuilderExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.PostMapping; @@ -13,11 +18,18 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.UUID; + @RestController public class SampleRestController { @Autowired private DirectAsyncGateway directAsyncGateway; + + @Autowired + private DomainEventBus domainEventBus; private final String queryName = "query1"; private final String queryName2 = "query2"; private final String target = "receiver"; @@ -25,11 +37,45 @@ public class SampleRestController { private final WebClient webClient = WebClient.builder().build(); @PostMapping(path = "/sample", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) - public Mono sampleService(@RequestBody Call call) { - AsyncQuery query = new AsyncQuery<>(queryName, call); - return directAsyncGateway.requestReply(query, target, RespQuery1.class); + public Mono sampleService(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent query = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType(queryName) // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + + return directAsyncGateway.requestReply(query, target, CloudEvent.class); } + @PostMapping(path = "/sample/event", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) + public Mono sampleServiceEvent(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent event = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("event") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + return Mono.from(domainEventBus.emit(event)).thenReturn("event"); + } + + @PostMapping(path = "/sample/command", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) + public Mono sampleServiceCommand(@RequestBody Call call) throws JsonProcessingException { +// AsyncQuery query = new AsyncQuery<>(queryName, call); + CloudEvent command = CloudEventBuilder.v1() // + .withId(UUID.randomUUID().toString()) // + .withSource(URI.create("https://spring.io/foos"))// + .withType("command") // + .withTime(OffsetDateTime.now()) + .withData("application/json", CloudEventBuilderExt.asBytes(call)) + .build(); + + return directAsyncGateway.sendCommand(command, target).thenReturn("command"); + } @PostMapping(path = "/sample/match", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public Mono sampleServices(@RequestBody Call call) { AsyncQuery query = new AsyncQuery<>("sample.query.any.that.matches", call);