Skip to content

Commit

Permalink
Add support for publisher confirms
Browse files Browse the repository at this point in the history
Add an optional callback to the RMQConnectionFactory to be notified
of confirmed/nack-ed published messages. Setting this callback will
enable publisher confirms globally (for all the children objects created
by the RMQConnectionFactory instance).

Publisher confirms use a publishing sequence number to correlate
messages. As there is no such concept in JMS, the JMS client takes care
of mapping sequence numbers with original messages.

The confirm callback has one method with one context parameter. This
allows adding more information about published messages later (e.g. the
session or producer instance), without breaking changes. The context
consists now of the original message and a flag for confirmed or nacked.

Fixes #115
  • Loading branch information
acogoluegnes committed Aug 28, 2019
1 parent 5be302b commit 48382b2
Show file tree
Hide file tree
Showing 14 changed files with 508 additions and 32 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<junit.jupiter.version>5.5.1</junit.jupiter.version>
<mockito-core.version>3.0.0</mockito-core.version>
<awaitility.version>3.1.6</awaitility.version>
<assertj.version>3.13.2</assertj.version>
<spring.version>5.1.8.RELEASE</spring.version>

<maven.dependency.plugin.version>2.8</maven.dependency.plugin.version>
Expand Down Expand Up @@ -154,6 +155,12 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
Expand Down
60 changes: 37 additions & 23 deletions src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,15 @@
import com.rabbitmq.client.Address;
import com.rabbitmq.client.MetricsCollector;
import com.rabbitmq.client.NoOpMetricsCollector;
import com.rabbitmq.jms.client.ConnectionParams;
import com.rabbitmq.jms.client.RMQConnection;
import com.rabbitmq.jms.client.RMQMessage;
import com.rabbitmq.jms.client.ReceivingContextConsumer;
import com.rabbitmq.jms.client.SendingContextConsumer;
import com.rabbitmq.jms.client.*;
import com.rabbitmq.jms.util.RMQJMSException;
import com.rabbitmq.jms.util.RMQJMSSecurityException;
import com.rabbitmq.jms.util.WhiteListObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.NamingException;
import javax.naming.RefAddr;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import javax.jms.*;
import javax.naming.*;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import java.io.IOException;
Expand All @@ -45,9 +28,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.rabbitmq.jms.util.UriCodec.encHost;
import static com.rabbitmq.jms.util.UriCodec.encSegment;
import static com.rabbitmq.jms.util.UriCodec.encUserinfo;
import static com.rabbitmq.jms.util.UriCodec.*;

/**
* RabbitMQ Implementation of JMS {@link ConnectionFactory}
Expand Down Expand Up @@ -135,6 +116,21 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
*/
private ReceivingContextConsumer receivingContextConsumer = ctx -> {};

/**
* Callback to be notified of publisher confirms.
* <p>
* When this property is set, publisher confirms are enabled for all
* the underlying AMQP {@link com.rabbitmq.client.Channel}s created by
* this {@link ConnectionFactory}.
*
* @see <a href="https://www.rabbitmq.com/confirms.html#publisher-confirms">Publisher Confirms</a>
* @see <a href="https://www.rabbitmq.com/publishers.html#data-safety">Publisher Guide</a>
* @see ConfirmListener
* @since 1.13.0
*/
private ConfirmListener confirmListener;


/** Default not to use ssl */
private boolean ssl = false;
private String tlsProtocol;
Expand Down Expand Up @@ -260,6 +256,7 @@ protected Connection createConnection(String username, String password, Connecti
.setAmqpPropertiesCustomiser(amqpPropertiesCustomiser)
.setSendingContextConsumer(sendingContextConsumer)
.setReceivingContextConsumer(rcc)
.setConfirmListener(confirmListener)
);
conn.setTrustedPackages(this.trustedPackages);
logger.debug("Connection {} created.", conn);
Expand Down Expand Up @@ -907,6 +904,23 @@ public void setDeclareReplyToDestination(boolean declareReplyToDestination) {
this.declareReplyToDestination = declareReplyToDestination;
}

/**
* Set the callback to be notified of publisher confirms.
* <p>
* When this property is set, publisher confirms are enabled for all
* the underlying AMQP {@link com.rabbitmq.client.Channel}s created by
* this {@link ConnectionFactory}.
*
* @param confirmListener the callback
* @see <a href="https://www.rabbitmq.com/confirms.html#publisher-confirms">Publisher Confirms</a>
* @see <a href="https://www.rabbitmq.com/publishers.html#data-safety">Publisher Guide</a>
* @see ConfirmListener
* @since 1.13.0
*/
public void setConfirmListener(ConfirmListener confirmListener) {
this.confirmListener = confirmListener;
}

@FunctionalInterface
private interface ConnectionCreator {
com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception;
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/ConfirmListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Copyright (c) 2019 Pivotal Software, Inc. All rights reserved. */
package com.rabbitmq.jms.client;

/**
* Listener to be notified of publisher confirms.
* <p>
* Publisher Confirms are a RabbitMQ extension to the AMQP protocol.
* In the context of the JMS client, they allow to be asynchronously
* notified when an outbound message has been confirmed by the broker
* (meaning the message made it safely to the broker).
* <p>
* Note publisher confirms are enabled at the {@link com.rabbitmq.jms.admin.RMQConnectionFactory} level,
* by setting a {@link ConfirmListener} instance with
* {@link com.rabbitmq.jms.admin.RMQConnectionFactory#setConfirmListener(ConfirmListener)}. This
* means all the AMQP {@link com.rabbitmq.client.Channel} created from the {@link com.rabbitmq.jms.admin.RMQConnectionFactory}
* instance will have publisher confirms enabled.
*
* @see <a href="https://www.rabbitmq.com/confirms.html#publisher-confirms">Publisher Confirms</a>
* @see <a href="https://www.rabbitmq.com/publishers.html#data-safety">Publisher Guide</a>
* @see com.rabbitmq.jms.admin.RMQConnectionFactory#setConfirmListener(ConfirmListener)
* @since 1.13.0
*/
@FunctionalInterface
public interface ConfirmListener {

/**
* Callback invoked when an outbound message is confirmed.
*
* @param context information about the confirmed message
*/
void handle(PublisherConfirmContext context);

}
16 changes: 16 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public class ConnectionParams {
*/
private ReceivingContextConsumer receivingContextConsumer;

/**
* Callback for publisher confirms.
*
* @since 1.13.0
*/
private ConfirmListener confirmListener;

public Connection getRabbitConnection() {
return rabbitConnection;
}
Expand Down Expand Up @@ -176,4 +183,13 @@ public ConnectionParams setReceivingContextConsumer(ReceivingContextConsumer rec
this.receivingContextConsumer = receivingContextConsumer;
return this;
}

public ConnectionParams setConfirmListener(ConfirmListener confirmListener) {
this.confirmListener = confirmListener;
return this;
}

public ConfirmListener getConfirmListener() {
return confirmListener;
}
}
39 changes: 39 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/PublisherConfirmContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* Copyright (c) 2019 Pivotal Software, Inc. All rights reserved. */
package com.rabbitmq.jms.client;

import javax.jms.Message;

/**
* Information an outbound message being confirmed.
*
* @see ConfirmListener
* @since 1.13.0
*/
public class PublisherConfirmContext {

private final Message message;
private final boolean ack;

PublisherConfirmContext(Message message, boolean ack) {
this.message = message;
this.ack = ack;
}

/**
* The message being confirmed.
*
* @return the confirmed message
*/
public Message getMessage() {
return message;
}

/**
* Whether the message is confirmed or nack-ed (considered lost).
*
* @return true if confirmed, false if nack-ed
*/
public boolean isAck() {
return ack;
}
}
90 changes: 90 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/PublisherConfirmsUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* Copyright (c) 2019 Pivotal Software, Inc. All rights reserved. */
package com.rabbitmq.jms.client;

import com.rabbitmq.client.Channel;

import javax.jms.Message;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/**
* Utility class to handle publisher confirms.
*
* @since 1.13.0
*/
class PublisherConfirmsUtils {

/**
* Enables publisher confirms support.
* <p>
* Adds a {@link com.rabbitmq.client.ConfirmListener} to the AMQP {@link Channel}
* and notifies the user-provided {@link ConfirmListener} when confirms notified
* arrive.
* <p>
* Returns a {@link PublishingListener} that must be called whenever a message is published.
*
* @param channel
* @param confirmListener
* @return
*/
static PublishingListener configurePublisherConfirmsSupport(Channel channel, ConfirmListener confirmListener) {
final Map<Long, Message> outstandingConfirms = new ConcurrentHashMap<>();
final AtomicLong multipleLowerBound = new AtomicLong(1);
PublishingListener publishingListener = (message, sequenceNumber) -> {
outstandingConfirms.put(sequenceNumber, message);
};
channel.addConfirmListener(new com.rabbitmq.client.ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
cleanPublisherConfirmsCorrelation(
outstandingConfirms, multipleLowerBound,
deliveryTag, multiple, message -> confirmListener.handle(new PublisherConfirmContext(message, true))
);
}

@Override
public void handleNack(long deliveryTag, boolean multiple) {
cleanPublisherConfirmsCorrelation(
outstandingConfirms, multipleLowerBound,
deliveryTag, multiple, message -> confirmListener.handle(new PublisherConfirmContext(message, false))
);
}
});
return publishingListener;
}

/**
* Cleans the data structure used to correlate publishing sequence numbers to messages when a confirm comes in.
* <p>
* Invoke a provided callback for each message confirmed/nack-ed.
*
* @param outstandingConfirms
* @param multipleLowerBound
* @param deliveryTag
* @param multiple
* @param messageConsumer
*/
private static void cleanPublisherConfirmsCorrelation(Map<Long, Message> outstandingConfirms, AtomicLong multipleLowerBound,
long deliveryTag, boolean multiple, Consumer<Message> messageConsumer) {
Long lowerBound = multipleLowerBound.get();
if (multiple) {
for (long i = lowerBound; i <= deliveryTag; i++) {
Message message = outstandingConfirms.remove(i);
if (message != null) {
messageConsumer.accept(message);
}
}
} else {
Message message = outstandingConfirms.remove(deliveryTag);
if (message != null) {
messageConsumer.accept(message);
}
if (deliveryTag == lowerBound + 1) {
multipleLowerBound.compareAndSet(lowerBound, deliveryTag);
}
}
}

}
15 changes: 15 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/PublishingListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* Copyright (c) 2019 Pivotal Software, Inc. All rights reserved. */
package com.rabbitmq.jms.client;

import javax.jms.Message;

/**
* Internal interface to notify about a published message when publisher confirms are enabled.
*
* @since 1.13.0
*/
interface PublishingListener {

void publish(Message message, long sequenceNumber);

}
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/RMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti

private final ReceivingContextConsumer receivingContextConsumer;

/**
* Callback for publisher confirms.
*
* @since 1.13.0
*/
private final ConfirmListener confirmListener;

/**
* Classes in these packages can be transferred via ObjectMessage.
*
Expand All @@ -144,6 +151,7 @@ public RMQConnection(ConnectionParams connectionParams) {
this.amqpPropertiesCustomiser = connectionParams.getAmqpPropertiesCustomiser();
this.sendingContextConsumer = connectionParams.getSendingContextConsumer();
this.receivingContextConsumer = connectionParams.getReceivingContextConsumer();
this.confirmListener = connectionParams.getConfirmListener();
}

/**
Expand Down Expand Up @@ -195,6 +203,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
.setAmqpPropertiesCustomiser(this.amqpPropertiesCustomiser)
.setSendingContextConsumer(this.sendingContextConsumer)
.setReceivingContextConsumer(this.receivingContextConsumer)
.setConfirmListener(this.confirmListener)
);
session.setTrustedPackages(this.trustedPackages);
this.sessions.add(session);
Expand Down Expand Up @@ -382,6 +391,9 @@ Channel createRabbitChannel(boolean transactional) throws IOException {
if (transactional) {
channel.txSelect();
}
if (this.confirmListener != null) {
channel.confirmSelect();
}
return channel;
}

Expand Down
Loading

0 comments on commit 48382b2

Please sign in to comment.