Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
private String host = "localhost";
/** Default port NOT SET - determined by the type of connection (ssl or non-ssl) */
private int port = -1;
/** How long to wait for onMessage to return, in milliseconds */
private int onMessageTimeoutMs = 2000;

/** Default not to use ssl */
private boolean ssl = false;
Expand Down Expand Up @@ -91,7 +93,7 @@ public Connection createConnection(String username, String password) throws JMSE
setRabbitUri(logger, this, factory, this.getUri());
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(factory);

RMQConnection conn = new RMQConnection(rabbitConnection, getTerminationTimeout(), getQueueBrowserReadMax());
RMQConnection conn = new RMQConnection(rabbitConnection, getTerminationTimeout(), getQueueBrowserReadMax(), getOnMessageTimeoutMs());
conn.setTrustedPackages(this.trustedPackages);
logger.debug("Connection {} created.", conn);
return conn;
Expand All @@ -106,7 +108,7 @@ public Connection createConnection(String username, String password, List<Addres
maybeEnableTLS(cf);
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, endpoints);

RMQConnection conn = new RMQConnection(rabbitConnection, getTerminationTimeout(), getQueueBrowserReadMax());
RMQConnection conn = new RMQConnection(rabbitConnection, getTerminationTimeout(), getQueueBrowserReadMax(), getOnMessageTimeoutMs());
conn.setTrustedPackages(this.trustedPackages);
logger.debug("Connection {} created.", conn);
return conn;
Expand Down Expand Up @@ -185,7 +187,8 @@ public String toString() {
.append(", host='").append(this.host)
.append("', port=").append(this.getPort())
.append(", virtualHost='").append(this.virtualHost)
.append("', queueBrowserReadMax=").append(this.queueBrowserReadMax)
.append("', onMessageTimeoutMs=").append(this.onMessageTimeoutMs)
.append(", queueBrowserReadMax=").append(this.queueBrowserReadMax)
.append('}').toString();
}

Expand Down Expand Up @@ -319,6 +322,7 @@ public Reference getReference() throws NamingException {
Reference ref = new Reference(RMQConnectionFactory.class.getName());
addStringRefProperty(ref, "uri", this.getUri());
addIntegerRefProperty(ref, "queueBrowserReadMax", this.getQueueBrowserReadMax());
addIntegerRefProperty(ref, "onMessageTimeoutMs", this.getOnMessageTimeoutMs());
return ref;
}

Expand Down Expand Up @@ -522,4 +526,20 @@ public int getQueueBrowserReadMax() {
public void setQueueBrowserReadMax(int queueBrowserReadMax) {
this.queueBrowserReadMax = Math.max(0, queueBrowserReadMax);
}

/**
* Returns the time in milliseconds {@link MessageListener#onMessage(Message)} can take to process a message
* @return the time in milliseconds {@link MessageListener#onMessage(Message)} can take to process a message
*/
public int getOnMessageTimeoutMs() { return this.onMessageTimeoutMs; }

/**
* Sets <i>onMessageTimeoutMs</i>: the time in milliseconds {@link MessageListener#onMessage(Message)} can take to process a message.
* Non-positive values are rejected.
* @param onMessageTimeoutMs - duration in milliseconds
*/
public void setOnMessageTimeoutMs(int onMessageTimeoutMs){
if (onMessageTimeoutMs > 0) this.onMessageTimeoutMs = onMessageTimeoutMs;
else this.logger.warn("Cannot set onMessageTimeoutMs to non-positive value {} (on {})", onMessageTimeoutMs, this);
}
}
2 changes: 2 additions & 0 deletions src/main/java/com/rabbitmq/jms/admin/RMQObjectFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
* <li>password</li>
* <li>port</li>
* <li>queueBrowserReadMax</li>
* <li>onMessageTimeoutMs</li>
* <li>ssl</li>
* <li>terminationTimeout</li>
* <li>username</li>
Expand Down Expand Up @@ -165,6 +166,7 @@ public Object createConnectionFactory(Reference ref, Name name) throws NamingExc
f.setPassword (getStringProperty (ref, "password", true, f.getPassword() ));
f.setPort (getIntProperty (ref, "port", true, f.getPort() ));
f.setQueueBrowserReadMax(getIntProperty (ref, "queueBrowserReadMax", true, f.getQueueBrowserReadMax()));
f.setOnMessageTimeoutMs (getIntProperty (ref, "onMessageTimeoutMs", true, f.getOnMessageTimeoutMs() ));
f.setSsl (getBooleanProperty(ref, "ssl", true, f.isSsl() ));
f.setTerminationTimeout (getLongProperty (ref, "terminationTimeout", true, f.getTerminationTimeout() ));
f.setUsername (getStringProperty (ref, "username", true, f.getUsername() ));
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/rabbitmq/jms/client/RMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
/** max number of messages to read from a browsed queue */
private final int queueBrowserReadMax;

/** How long to wait for onMessage to return, in milliseconds */
private final int onMessageTimeoutMs;

private static ConcurrentHashMap<String, String> CLIENT_IDS = new ConcurrentHashMap<String, String>();

/** List of all our durable subscriptions so we can track them on a per connection basis (maintained by sessions).*/
Expand All @@ -90,23 +93,26 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
* @param rabbitConnection the TCP connection wrapper to the RabbitMQ broker
* @param terminationTimeout timeout for close in milliseconds
* @param queueBrowserReadMax maximum number of messages to read from a QueueBrowser (before filtering)
* @param onMessageTimeoutMs how long to wait for onMessage to return, in milliseconds
*/
public RMQConnection(com.rabbitmq.client.Connection rabbitConnection, long terminationTimeout, int queueBrowserReadMax) {
public RMQConnection(com.rabbitmq.client.Connection rabbitConnection, long terminationTimeout, int queueBrowserReadMax, int onMessageTimeoutMs) {

rabbitConnection.addShutdownListener(new RMQConnectionShutdownListener());

this.rabbitConnection = rabbitConnection;
this.terminationTimeout = terminationTimeout;
this.queueBrowserReadMax = queueBrowserReadMax;
this.onMessageTimeoutMs = onMessageTimeoutMs;
}

private static final long FIFTEEN_SECONDS_MS = 15000;
private static final int TWO_SECONDS_MS = 2000;
/**
* Creates an RMQConnection object, with default termination timeout of 15 seconds, and unlimited reads from QueueBrowsers.
* Creates an RMQConnection object, with default termination timeout of 15 seconds, a 2 seconds timeout for onMessage, and unlimited reads from QueueBrowsers.
* @param rabbitConnection the TCP connection wrapper to the RabbitMQ broker
*/
public RMQConnection(com.rabbitmq.client.Connection rabbitConnection) {
this(rabbitConnection, FIFTEEN_SECONDS_MS, 0);
this(rabbitConnection, FIFTEEN_SECONDS_MS, 0, TWO_SECONDS_MS);
}

/** For RMQSession to retrieve */
Expand All @@ -120,7 +126,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
logger.trace("transacted={}, acknowledgeMode={}", transacted, acknowledgeMode);
illegalStateExceptionIfClosed();
freezeClientID();
RMQSession session = new RMQSession(this, transacted, acknowledgeMode, this.subscriptions);
RMQSession session = new RMQSession(this, transacted, onMessageTimeoutMs, acknowledgeMode, this.subscriptions);
session.setTrustedPackages(this.trustedPackages);
this.sessions.add(session);
return session;
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/com/rabbitmq/jms/client/RMQSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ private static Map<String, SqlExpressionType> generateJMSTypeIdents() {

private static final String JMS_TOPIC_SELECTOR_EXCHANGE_TYPE = "x-jms-topic";

private static final long ON_MESSAGE_EXECUTOR_TIMEOUT_MS = 2000; // 2 seconds

private final DeliveryExecutor deliveryExecutor = new DeliveryExecutor(ON_MESSAGE_EXECUTOR_TIMEOUT_MS);
private final DeliveryExecutor deliveryExecutor;

/** The channels we use for browsing queues (there may be more than one in operation at a time) */
private Set<Channel> browsingChannels = new HashSet<Channel>(); // @GuardedBy(bcLock)
Expand All @@ -161,15 +159,18 @@ private static Map<String, SqlExpressionType> generateJMSTypeIdents() {
* Creates a session object associated with a connection
* @param connection the connection that we will send data on
* @param transacted whether this session is transacted or not
* @param onMessageTimeoutMs how long to wait for onMessage to return, in milliseconds
* @param mode the (fixed) acknowledgement mode for this session
* @param subscriptions the connection's subscriptions, shared with all sessions
* @throws JMSException if we fail to create a {@link Channel} object on the connection, or if the acknowledgement mode is incorrect
*/
public RMQSession(RMQConnection connection, boolean transacted, int mode, Map<String, RMQMessageConsumer> subscriptions) throws JMSException {
public RMQSession(RMQConnection connection, boolean transacted, int onMessageTimeoutMs, int mode, Map<String, RMQMessageConsumer> subscriptions) throws JMSException {
if (mode<0 || mode>CLIENT_INDIVIDUAL_ACKNOWLEDGE) throw new JMSException(String.format("cannot create session with acknowledgement mode = %d.", mode));
this.connection = connection;
this.transacted = transacted;
this.subscriptions = subscriptions;
this.deliveryExecutor = new DeliveryExecutor(onMessageTimeoutMs);

if (transacted) {
this.acknowledgeMode = Session.SESSION_TRANSACTED;
this.isIndividualAck = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class RMQConnectionFactoryTest {
RMQConnectionFactory defaultFact = new RMQConnectionFactory();
defaultProps.setProperty("uri", defaultFact.getUri());
defaultProps.setProperty("queueBrowserReadMax", "0");
defaultProps.setProperty("onMessageTimeoutMs", "2000");
}

private static Properties getProps(Reference ref) {
Expand Down Expand Up @@ -79,6 +80,7 @@ public void testUpdatedConnectionFactoryReference() throws Exception {
connFactory.setPassword("my-password");
connFactory.setPort(42);
connFactory.setQueueBrowserReadMax(52);
connFactory.setOnMessageTimeoutMs(62);
connFactory.setSsl(true);
connFactory.setTerminationTimeout(1234567890123456789L);
connFactory.setUsername("fred");
Expand All @@ -89,6 +91,7 @@ public void testUpdatedConnectionFactoryReference() throws Exception {

assertEquals("Not the correct uri", "amqps://fred:my-password@sillyHost:42/bill", newProps.getProperty("uri"));
assertEquals("Not the correct queueBrowserReadMax", "52", newProps.getProperty("queueBrowserReadMax"));
assertEquals("Not the correct onMessageTimeoutMs", "62", newProps.getProperty("onMessageTimeoutMs"));
}

@Test
Expand Down Expand Up @@ -133,6 +136,7 @@ public void testConnectionFactoryReferenceUpdated() throws Exception {
addStringRefProperty(ref, "password", "my-password");
addStringRefProperty(ref, "port", "42");
addStringRefProperty(ref, "queueBrowserReadMax", "52"); // duplicates don't overwrite
addStringRefProperty(ref, "onMessageTimeoutMs", "62");
addStringRefProperty(ref, "ssl", "true");
addStringRefProperty(ref, "terminationTimeout","1234567890123456789");
addStringRefProperty(ref, "username", "fred");
Expand All @@ -144,6 +148,7 @@ public void testConnectionFactoryReferenceUpdated() throws Exception {
assertEquals("Not the correct password", "my-password", newFactory.getPassword());
assertEquals("Not the correct port", 42, newFactory.getPort());
assertEquals("Not the correct queueBrowserReadMax", 52, newFactory.getQueueBrowserReadMax());
assertEquals("Not the correct onMessageTimeoutMs", 62, newFactory.getOnMessageTimeoutMs());
assertEquals("Not the correct ssl", true, newFactory.isSsl());

assertEquals("Not the correct terminationTimeout", 1234567890123456789L, newFactory.getTerminationTimeout());
Expand Down