Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions async/async-commons-api/async-commons-api.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ dependencies {
api project(':domain-events-api')
compileOnly 'io.projectreactor:reactor-core'
testImplementation 'io.projectreactor:reactor-test'
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package org.reactivecommons.async.api;

import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.Command;
import reactor.core.publisher.Mono;

public interface DirectAsyncGateway {
<T> Mono<Void> sendCommand(Command<T> command, String targetName);
Mono<Void> sendCommand(CloudEvent command, String targetName);
<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type);
<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type);
<T> Mono<Void> reply(T response, From from);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.reactivecommons.async.api;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -26,6 +29,7 @@ public class HandlerRegistry {
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();


public static HandlerRegistry register() {
return new HandlerRegistry();
}
Expand Down Expand Up @@ -75,7 +79,21 @@ public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> han
}

public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
if(queryClass == CloudEvent.class){
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) ->
{
CloudEvent query = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(message);

return handler.handle((R) query);

} , byte[].class));
}
else{
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.reactivecommons.async.commons.converters;

import io.cloudevents.CloudEvent;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.async.api.AsyncQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivecommons.async.rabbit.config.props.AsyncProps;
import org.reactivecommons.async.rabbit.config.props.BrokerConfigProps;
import org.reactivecommons.async.rabbit.converters.json.JacksonCloudEventMessageConverter;
import org.reactivecommons.async.rabbit.converters.json.JacksonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -156,7 +157,7 @@ public ObjectMapperSupplier objectMapperSupplier() {
@Bean
@ConditionalOnMissingBean
public MessageConverter messageConverter(ObjectMapperSupplier objectMapperSupplier) {
return new JacksonMessageConverter(objectMapperSupplier.get());
return new JacksonCloudEventMessageConverter(objectMapperSupplier.get());
}

@Bean
Expand Down
1 change: 1 addition & 0 deletions async/async-rabbit/async-rabbit.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ dependencies {
api 'com.rabbitmq:amqp-client'
api 'com.fasterxml.jackson.core:jackson-databind'
testImplementation 'io.projectreactor:reactor-test'
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.reactivecommons.async.rabbit;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import io.micrometer.core.instrument.MeterRegistry;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.async.api.AsyncQuery;
Expand Down Expand Up @@ -56,6 +59,11 @@ public <T> Mono<Void> sendCommand(Command<T> command, String targetName) {
return sender.sendWithConfirm(command, exchange, targetName, Collections.emptyMap(), persistentCommands);
}

@Override
public Mono<Void> sendCommand(CloudEvent command, String targetName) {
return sendCommand(new Command<>(command.getType(), command.getId(), command), targetName);
}

public <T> Flux<OutboundMessageResult> sendCommands(Flux<Command<T>> commands, String targetName) {
return sender.sendWithConfirmBatch(commands, exchange, targetName, Collections.emptyMap(), persistentCommands);
}
Expand Down Expand Up @@ -84,6 +92,11 @@ public <T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class
.tap(Micrometer.metrics(meterRegistry));
}

@Override
public <R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type) {
return requestReply(new AsyncQuery<>(query.getType(), query), targetName, type);
}

@Override
public <T> Mono<Void> reply(T response, From from) {
final HashMap<String, Object> headers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.reactivecommons.async.rabbit;

import io.cloudevents.CloudEvent;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.commons.config.BrokerConfig;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.api.domain.DomainEventBus;
Expand Down Expand Up @@ -31,8 +33,8 @@ public <T> Mono<Void> emit(DomainEvent<T> event) {
}

@Override
public <T> Mono<Void> emit(CloudEvent<T> event) {
return emit(new DomainEvent(event.getName(), event.getId(), event))
public Publisher<Void> emit(CloudEvent event) {
return emit(new DomainEvent<>(event.getType(), event.getId(), event));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.reactivecommons.async.rabbit.converters.json;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;

@UtilityClass
public class CloudEventBuilderExt {
private static final ObjectMapper mapper = new ObjectMapper();

@SneakyThrows
public static byte[] asBytes(Object object) {
return mapper.writeValueAsBytes(object);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package org.reactivecommons.async.rabbit.converters.json;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import lombok.Data;
import org.reactivecommons.api.domain.Command;
import org.reactivecommons.api.domain.DomainEvent;
import org.reactivecommons.async.api.AsyncQuery;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.exceptions.MessageConversionException;
import org.reactivecommons.async.rabbit.RabbitMessage;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

public class JacksonCloudEventMessageConverter implements MessageConverter {
private static final String CONTENT_TYPE = "application/json";

private final ObjectMapper objectMapper;


public JacksonCloudEventMessageConverter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
try {
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
T value = extractData(bodyClass, asyncQueryJson.getQueryData());
return new AsyncQuery<>(asyncQueryJson.getResource(), value);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
}

@Override
public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
try {
final DomainEventJson domainEventJson = readValue(message, DomainEventJson.class);

T value = extractData(bodyClass, domainEventJson.getData());

return new DomainEvent<>(domainEventJson.getName(), domainEventJson.getEventId(), value);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
}

@Override
public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
try {
final CommandJson commandJson = readValue(message, CommandJson.class);
T value = extractData(bodyClass, commandJson.getData());
return new Command<>(commandJson.getName(), commandJson.getCommandId(), value);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
}

@Override
public <T> T readValue(Message message, Class<T> valueClass) {
try {
if(valueClass == CloudEvent.class){

return (T) EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(objectMapper.readValue(message.getBody(), byte[].class));

}
return objectMapper.readValue(message.getBody(), valueClass);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
}

@Override
@SuppressWarnings("unchecked")
public <T> Command<T> readCommandStructure(Message message) {
final CommandJson commandJson = readValue(message, CommandJson.class);
return new Command<>(commandJson.getName(), commandJson.getCommandId(), (T) commandJson.getData());
}

@Override
@SuppressWarnings("unchecked")
public <T> DomainEvent<T> readDomainEventStructure(Message message) {
final DomainEventJson eventJson = readValue(message, DomainEventJson.class);
return new DomainEvent<>(eventJson.getName(), eventJson.getEventId(), (T) eventJson.getData());
}

@Override
@SuppressWarnings("unchecked")
public <T> AsyncQuery<T> readAsyncQueryStructure(Message message) {
final AsyncQueryJson asyncQueryJson = readValue(message, AsyncQueryJson.class);
return new AsyncQuery<>(asyncQueryJson.getResource(), (T) asyncQueryJson.getQueryData());
}


public Message commandToMessage(Command<CloudEvent> object) {
byte[] data = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.serialize(object.getData());

return getRabbitMessage(new Command<>(object.getName(), object.getCommandId(), data));
}


public Message eventToMessage(DomainEvent<CloudEvent> object) {
byte[] data = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.serialize(object.getData());

return getRabbitMessage(new DomainEvent<>(object.getName(), object.getEventId(), data));
}


public Message queryToMessage(AsyncQuery<CloudEvent> object) {
byte[] data = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.serialize(object.getQueryData());

return getRabbitMessage(new AsyncQuery<>(object.getResource(), data));
}
@Override
public Message toMessage(Object object) {
byte[] bytes;
if(object instanceof DomainEvent
&& ((DomainEvent) object).getData() instanceof CloudEvent){

return eventToMessage((DomainEvent) object);

}
if(object instanceof Command
&& ((Command) object).getData() instanceof CloudEvent){

return commandToMessage((Command) object);
}
if(object instanceof AsyncQuery
&& ((AsyncQuery) object).getQueryData() instanceof CloudEvent){

return queryToMessage((AsyncQuery) object);
}

return getRabbitMessage(object);
}

private RabbitMessage getRabbitMessage(Object object) {
byte[] bytes;
try {

String jsonString = this.objectMapper.writeValueAsString(object);
bytes = jsonString.getBytes(StandardCharsets.UTF_8);
} catch (IOException e) {
throw new MessageConversionException("Failed to convert Message content", e);
}
RabbitMessage.RabbitMessageProperties props = new RabbitMessage.RabbitMessageProperties();
props.setContentType(CONTENT_TYPE);
props.setContentEncoding(StandardCharsets.UTF_8.name());
props.setContentLength(bytes.length);
return new RabbitMessage(bytes, props);
}

private <T> T extractData(Class<T> bodyClass, JsonNode node) throws JsonProcessingException {
T value;
if(bodyClass == CloudEvent.class){

value = (T) EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(Base64.getDecoder()
.decode(node.asText()));

}
else{
value = objectMapper.treeToValue(node, bodyClass);
}
return value;
}

@Data
private static class AsyncQueryJson {
private String resource;
private JsonNode queryData;
}

@Data
private static class DomainEventJson {
private String name;
private String eventId;
private JsonNode data;
}

@Data
private static class CommandJson {
private String name;
private String commandId;
private JsonNode data;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.commons.DiscardNotifier;
Expand Down Expand Up @@ -149,8 +152,16 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
final String correlationID = msg.getProperties().getHeaders().get(CORRELATION_ID).toString();
final HashMap<String, Object> headers = new HashMap<>();
headers.put(CORRELATION_ID, correlationID);
Object response = signal.get();
if(response instanceof CloudEvent) {
byte[] serialized = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.serialize((CloudEvent) response);
return sender.sendNoConfirm(serialized, replyExchange, replyID, headers, false);
}

return sender.sendNoConfirm(signal.get(), replyExchange, replyID, headers, false);
return sender.sendNoConfirm(response, replyExchange, replyID, headers, false);
});
}

Expand Down
Loading