Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMS: WIP: Initial commit for fault strategies #2599

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.smallrye.config.SmallRyeConfigProviderResolver;
import io.smallrye.config.inject.ConfigExtension;
import io.smallrye.reactive.messaging.jms.JmsConnector;
import io.smallrye.reactive.messaging.jms.fault.JmsFailStop;
import io.smallrye.reactive.messaging.jms.fault.JmsIgnoreFailure;
import io.smallrye.reactive.messaging.json.jackson.JacksonMapping;
import io.smallrye.reactive.messaging.json.jackson.ObjectMapperProvider;
import io.smallrye.reactive.messaging.providers.MediatorFactory;
Expand Down Expand Up @@ -96,7 +98,8 @@ protected Weld initWithoutConnectionFactory() {
weld.addBeanClass(EmitterFactoryImpl.class);
weld.addBeanClass(MutinyEmitterFactoryImpl.class);
weld.addBeanClass(LegacyEmitterFactoryImpl.class);

weld.addBeanClass(JmsFailStop.Factory.class);
weld.addBeanClass(JmsIgnoreFailure.Factory.class);
weld.addBeanClass(JmsConnector.class);
weld.addBeanClass(ObjectMapperProvider.class);
weld.addBeanClass(JacksonMapping.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.jms;

import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -12,20 +13,24 @@
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T> {
public class IncomingJmsMessage<T> implements ContextAwareMessage<T> {
private final Message delegate;
private final Executor executor;
private final Class<T> clazz;
private final JsonMapping jsonMapping;
private final IncomingJmsMessageMetadata jmsMetadata;
private final Metadata metadata;
private final JmsFailureHandler failureHandler;

IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping) {
IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping, JmsFailureHandler failureHandler) {
this.delegate = message;
this.jsonMapping = jsonMapping;
this.executor = executor;
this.failureHandler = failureHandler;
String cn = null;
try {
cn = message.getStringProperty("_classname");
Expand All @@ -42,7 +47,7 @@ public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.
}

this.jmsMetadata = new IncomingJmsMessageMetadata(message);
this.metadata = Metadata.of(this.jmsMetadata);
this.metadata = captureContextMetadata(this.jmsMetadata);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -119,6 +124,7 @@ public CompletionStage<Void> ack(Metadata metadata) {
}
})
.runSubscriptionOn(executor)
.emitOn(this::runOnMessageContext)
.subscribeAsCompletionStage();
}

Expand All @@ -127,6 +133,11 @@ public Metadata getMetadata() {
return metadata;
}

@Override
public CompletionStage<Void> nack(Throwable reason, Metadata metadata) {
return failureHandler.handle(this, reason, metadata).subscribeAsCompletionStage();
}

@SuppressWarnings({ "unchecked" })
@Override
public <C> C unwrap(Class<C> unwrapType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;

@ApplicationScoped
Expand Down Expand Up @@ -66,6 +68,9 @@
@ConnectorAttribute(name = "retry.initial-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The initial delay for the retry.", type = "string", defaultValue = "PT1S")
@ConnectorAttribute(name = "retry.max-delay", direction = Direction.INCOMING_AND_OUTGOING, description = "The maximum delay", type = "string", defaultValue = "PT10S")
@ConnectorAttribute(name = "retry.jitter", direction = Direction.INCOMING_AND_OUTGOING, description = "How much the delay jitters as a multiplier between 0 and 1. The formula is current delay * jitter. For example, with a current delay of 2H, a jitter of 0.5 will result in an actual delay somewhere between 1H and 3H.", type = "double", defaultValue = "0.5")
@ConnectorAttribute(name = "failure-strategy", type = "string", direction = Direction.INCOMING, description = "Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be `fail` (default), `ignore`, or `dead-letter-queue`", defaultValue = "fail")
@ConnectorAttribute(name = "dead-letter-queue.topic", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates on which queue the message is sent. Defaults is `dead-letter-topic-$channel`")
@ConnectorAttribute(name = "dead-letter-queue.producer-client-id", type = "string", direction = Direction.INCOMING, description = "When the `failure-strategy` is set to `dead-letter-queue` indicates what client id the generated producer should use. Defaults is `jms-dead-letter-topic-producer-$client-id`")
public class JmsConnector implements InboundConnector, OutboundConnector {

/**
Expand Down Expand Up @@ -100,6 +105,13 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL)
int ttl;

@Inject
@Any
Instance<JmsFailureHandler.Factory> failureHandlerFactories;

@Inject
ExecutionHolder executionHolder;

private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -134,7 +146,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor);
JmsSource source = new JmsSource(executionHolder.vertx(), holder, ic, jsonMapping, executor, failureHandlerFactories);
sources.add(source);
return source.getSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,50 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.inject.Instance;
import jakarta.jms.*;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.fault.JmsFailureHandler;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

class JmsSource {

private final Multi<IncomingJmsMessage<?>> source;
private final JmsResourceHolder<JMSConsumer> resourceHolder;
private final List<Throwable> failures = new ArrayList<>();

private final JmsPublisher publisher;

JmsSource(JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping,
Executor executor) {
private final Instance<JmsFailureHandler.Factory> failureHandlerFactories;
private final JmsConnectorIncomingConfiguration config;
private final JmsFailureHandler failureHandler;
private final Context context;

JmsSource(Vertx vertx, JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config,
JsonMapping jsonMapping,
Executor executor, Instance<JmsFailureHandler.Factory> failureHandlerFactories) {
String channel = config.getChannel();
final String destinationName = config.getDestination().orElseGet(config::getChannel);
String selector = config.getSelector().orElse(null);
boolean nolocal = config.getNoLocal();
boolean durable = config.getDurable();
String type = config.getDestinationType();
boolean retry = config.getRetry();
this.config = config;
this.resourceHolder = resourceHolder.configure(r -> getDestination(r.getContext(), destinationName, type),
r -> {
if (durable) {
Expand All @@ -47,8 +62,12 @@ class JmsSource {
});
resourceHolder.getClient();
this.publisher = new JmsPublisher(resourceHolder);
this.failureHandlerFactories = failureHandlerFactories;
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.failureHandler = createFailureHandler();
source = Multi.createFrom().publisher(publisher)
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping))
.emitOn(r -> context.runOnContext(r))
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping, failureHandler))
.onFailure(t -> {
log.terminalErrorOnChannel(channel);
this.resourceHolder.close();
Expand Down Expand Up @@ -92,6 +111,30 @@ Multi<IncomingJmsMessage<?>> getSource() {
return source;
}

private JmsFailureHandler createFailureHandler() {
String strategy = config.getFailureStrategy();
Instance<JmsFailureHandler.Factory> failureHandlerFactory = failureHandlerFactories
.select(Identifier.Literal.of(strategy));
if (failureHandlerFactory.isResolvable()) {
return failureHandlerFactory.get().create(config, this::reportFailure);
} else {
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
}
}

public synchronized void reportFailure(Throwable failure, boolean fatal) {
//log.failureReported(topics, failure);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(failure);

if (fatal) {
close();
}
}

@SuppressWarnings("PublisherImplementation")
private static class JmsPublisher implements Flow.Publisher<Message>, Flow.Subscription {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.smallrye.reactive.messaging.jms.fault;

import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;

public class JmsFailStop implements JmsFailureHandler {

private final String channel;
private final BiConsumer<Throwable, Boolean> reportFailure;

@ApplicationScoped
@Identifier(Strategy.FAIL)
public static class Factory implements JmsFailureHandler.Factory {

@Override
public JmsFailureHandler create(JmsConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure) {
return new JmsFailStop(config.getChannel(), reportFailure);
}
}

public <K, V> JmsFailStop(String channel, BiConsumer<Throwable, Boolean> reportFailure) {
this.channel = channel;
this.reportFailure = reportFailure;
}

@Override
public <T> Uni<Void> handle(IncomingJmsMessage<T> message, Throwable reason, Metadata metadata) {
// We don't commit, we just fail and stop the client.
log.messageNackedFailStop(channel);
// report failure to the connector for health check
reportFailure.accept(reason, true);
return Uni.createFrom().<Void> failure(reason)
.emitOn(message::runOnMessageContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.smallrye.reactive.messaging.jms.fault;

import java.util.function.BiConsumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;

/**
* Jms Failure handling strategy
*/
@Experimental("Experimental API")
public interface JmsFailureHandler {

/**
* Identifiers of default failure strategies
*/
interface Strategy {
String FAIL = "fail";
String IGNORE = "ignore";
String DEAD_LETTER_QUEUE = "dead-letter-queue";

}

/**
* Factory interface for {@link JmsFailureHandler}
*/
interface Factory {
JmsFailureHandler create(
JmsConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure);
}

/**
* Handle message negative-acknowledgment
*
* @param record incoming jms message
* @param reason nack reason
* @param metadata associated metadata with negative-acknowledgment
* @param <T> type of payload
* @return a completion stage completed when the message is negative-acknowledgement has completed.
*/
<T> Uni<Void> handle(IncomingJmsMessage<T> record, Throwable reason, Metadata metadata);

/**
* Called on channel shutdown
*/
default void terminate() {
// do nothing by default
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.smallrye.reactive.messaging.jms.fault;

import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessage;
import io.smallrye.reactive.messaging.jms.JmsConnectorIncomingConfiguration;

public class JmsIgnoreFailure implements JmsFailureHandler {

private final String channel;

@ApplicationScoped
@Identifier(Strategy.IGNORE)
public static class Factory implements JmsFailureHandler.Factory {

@Override
public JmsFailureHandler create(JmsConnectorIncomingConfiguration config,
BiConsumer<Throwable, Boolean> reportFailure) {
return new JmsIgnoreFailure(config.getChannel());
}
}

public JmsIgnoreFailure(String channel) {
this.channel = channel;
}

@Override
public <T> Uni<Void> handle(IncomingJmsMessage<T> message, Throwable reason, Metadata metadata) {
// We commit the message, log and continue
log.messageNackedIgnore(channel, reason.getMessage());
log.messageNackedFullIgnored(reason);
return Uni.createFrom().completionStage(message.ack())
.emitOn(message::runOnMessageContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ public interface JmsExceptions {
@Message(id = 15613, value = "There is already a subscriber")
IllegalStateException illegalStateAlreadySubscriber();

@Message(id = 18614, value = "Invalid failure strategy: %s")
IllegalArgumentException illegalArgumentInvalidFailureStrategy(String strategy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,16 @@ public interface JmsLogging extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 15806, value = "JMS Exception occurred. Closing the JMS context %s")
void jmsException(String channelName, @Cause Throwable e);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 15807, value = "A message sent to channel `%s` has been nacked, ignored failure is: %s.")
void messageNackedIgnore(String channel, String message);

@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 15808, value = "The full ignored failure is")
void messageNackedFullIgnored(@Cause Throwable reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 15809, value = "A message sent to channel `%s` has been nacked, fail-stop")
void messageNackedFailStop(String channel);
}
Loading
Loading