From a7be49b8a426ba371d6e9d58a5f78353ec99f490 Mon Sep 17 00:00:00 2001 From: Guillaume Mornet Date: Thu, 8 Sep 2016 10:47:57 +0200 Subject: [PATCH 1/2] Provide a way to configure onMessageTimeoutMs The time limit for onMessage() processing defaults to 2 seconds. This patch makes onMessageTimeoutMs configurable as as JNDI parameter with a 2s default. --- doc-notes/docupdates1.3.2.md | 7 +++++ .../jms/admin/RMQConnectionFactory.java | 26 ++++++++++++++++--- .../rabbitmq/jms/admin/RMQObjectFactory.java | 2 ++ .../rabbitmq/jms/client/RMQConnection.java | 14 +++++++--- .../com/rabbitmq/jms/client/RMQSession.java | 9 ++++--- .../jms/admin/RMQConnectionFactoryTest.java | 5 ++++ 6 files changed, 52 insertions(+), 11 deletions(-) diff --git a/doc-notes/docupdates1.3.2.md b/doc-notes/docupdates1.3.2.md index afd034e9..6f88ddcf 100644 --- a/doc-notes/docupdates1.3.2.md +++ b/doc-notes/docupdates1.3.2.md @@ -90,6 +90,13 @@ Here is a complete list of the attributes/properties available:   RabbitMQ port used for connections. The default is “5672” unless this is an SSL connection, in which case the default is “5671”. + + onMessageTimeoutMs +   + + How long to wait for onMessage to return, in milliseconds. Non-positive values are rejected. The default is “2000”. + + queueBrowserReadMax   diff --git a/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java b/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java index 529aef7d..64b61946 100644 --- a/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java +++ b/src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java @@ -45,6 +45,8 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S private String host = "localhost"; /** Default port NOT SET - determined by the type of connection (ssl or non-ssl) */ private int port = -1; + /** How long to wait for onMessage to return, in milliseconds */ + private int onMessageTimeoutMs = 2000; /** Default not to use ssl */ private boolean ssl = false; @@ -91,7 +93,7 @@ public Connection createConnection(String username, String password) throws JMSE setRabbitUri(logger, this, factory, this.getUri()); com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(factory); - RMQConnection conn = new RMQConnection(rabbitConnection, getTerminationTimeout(), getQueueBrowserReadMax()); + RMQConnection conn = new RMQConnection(rabbitConnection, getTerminationTimeout(), getQueueBrowserReadMax(), getOnMessageTimeoutMs()); conn.setTrustedPackages(this.trustedPackages); logger.debug("Connection {} created.", conn); return conn; @@ -106,7 +108,7 @@ public Connection createConnection(String username, String password, ListonMessageTimeoutMs: the time in milliseconds {@link MessageListener#onMessage(Message)} can take to process a message. + * Non-positive values are rejected. + * @param onMessageTimeoutMs - duration in milliseconds + */ + public void setOnMessageTimeoutMs(int onMessageTimeoutMs){ + if (onMessageTimeoutMs > 0) this.onMessageTimeoutMs = onMessageTimeoutMs; + else this.logger.warn("Cannot set onMessageTimeoutMs to non-positive value {} (on {})", onMessageTimeoutMs, this); + } } diff --git a/src/main/java/com/rabbitmq/jms/admin/RMQObjectFactory.java b/src/main/java/com/rabbitmq/jms/admin/RMQObjectFactory.java index 659cd3e7..fb63f0db 100644 --- a/src/main/java/com/rabbitmq/jms/admin/RMQObjectFactory.java +++ b/src/main/java/com/rabbitmq/jms/admin/RMQObjectFactory.java @@ -77,6 +77,7 @@ *
  • password
  • *
  • port
  • *
  • queueBrowserReadMax
  • + *
  • onMessageTimeoutMs
  • *
  • ssl
  • *
  • terminationTimeout
  • *
  • username
  • @@ -165,6 +166,7 @@ public Object createConnectionFactory(Reference ref, Name name) throws NamingExc f.setPassword (getStringProperty (ref, "password", true, f.getPassword() )); f.setPort (getIntProperty (ref, "port", true, f.getPort() )); f.setQueueBrowserReadMax(getIntProperty (ref, "queueBrowserReadMax", true, f.getQueueBrowserReadMax())); + f.setOnMessageTimeoutMs (getIntProperty (ref, "onMessageTimeoutMs", true, f.getOnMessageTimeoutMs() )); f.setSsl (getBooleanProperty(ref, "ssl", true, f.isSsl() )); f.setTerminationTimeout (getLongProperty (ref, "terminationTimeout", true, f.getTerminationTimeout() )); f.setUsername (getStringProperty (ref, "username", true, f.getUsername() )); diff --git a/src/main/java/com/rabbitmq/jms/client/RMQConnection.java b/src/main/java/com/rabbitmq/jms/client/RMQConnection.java index c60f9fc2..be844f4d 100644 --- a/src/main/java/com/rabbitmq/jms/client/RMQConnection.java +++ b/src/main/java/com/rabbitmq/jms/client/RMQConnection.java @@ -70,6 +70,9 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti /** max number of messages to read from a browsed queue */ private final int queueBrowserReadMax; + /** How long to wait for onMessage to return, in milliseconds */ + private final int onMessageTimeoutMs; + private static ConcurrentHashMap CLIENT_IDS = new ConcurrentHashMap(); /** List of all our durable subscriptions so we can track them on a per connection basis (maintained by sessions).*/ @@ -90,23 +93,26 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti * @param rabbitConnection the TCP connection wrapper to the RabbitMQ broker * @param terminationTimeout timeout for close in milliseconds * @param queueBrowserReadMax maximum number of messages to read from a QueueBrowser (before filtering) + * @param onMessageTimeoutMs how long to wait for onMessage to return, in milliseconds */ - public RMQConnection(com.rabbitmq.client.Connection rabbitConnection, long terminationTimeout, int queueBrowserReadMax) { + public RMQConnection(com.rabbitmq.client.Connection rabbitConnection, long terminationTimeout, int queueBrowserReadMax, int onMessageTimeoutMs) { rabbitConnection.addShutdownListener(new RMQConnectionShutdownListener()); this.rabbitConnection = rabbitConnection; this.terminationTimeout = terminationTimeout; this.queueBrowserReadMax = queueBrowserReadMax; + this.onMessageTimeoutMs = onMessageTimeoutMs; } private static final long FIFTEEN_SECONDS_MS = 15000; + private static final int TWO_SECONDS_MS = 2000; /** - * Creates an RMQConnection object, with default termination timeout of 15 seconds, and unlimited reads from QueueBrowsers. + * Creates an RMQConnection object, with default termination timeout of 15 seconds, a 2 seconds timeout for onMessage, and unlimited reads from QueueBrowsers. * @param rabbitConnection the TCP connection wrapper to the RabbitMQ broker */ public RMQConnection(com.rabbitmq.client.Connection rabbitConnection) { - this(rabbitConnection, FIFTEEN_SECONDS_MS, 0); + this(rabbitConnection, FIFTEEN_SECONDS_MS, 0, TWO_SECONDS_MS); } /** For RMQSession to retrieve */ @@ -120,7 +126,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS logger.trace("transacted={}, acknowledgeMode={}", transacted, acknowledgeMode); illegalStateExceptionIfClosed(); freezeClientID(); - RMQSession session = new RMQSession(this, transacted, acknowledgeMode, this.subscriptions); + RMQSession session = new RMQSession(this, transacted, onMessageTimeoutMs, acknowledgeMode, this.subscriptions); session.setTrustedPackages(this.trustedPackages); this.sessions.add(session); return session; diff --git a/src/main/java/com/rabbitmq/jms/client/RMQSession.java b/src/main/java/com/rabbitmq/jms/client/RMQSession.java index 19526d90..4873c3eb 100644 --- a/src/main/java/com/rabbitmq/jms/client/RMQSession.java +++ b/src/main/java/com/rabbitmq/jms/client/RMQSession.java @@ -142,9 +142,7 @@ private static Map generateJMSTypeIdents() { private static final String JMS_TOPIC_SELECTOR_EXCHANGE_TYPE = "x-jms-topic"; - private static final long ON_MESSAGE_EXECUTOR_TIMEOUT_MS = 2000; // 2 seconds - - private final DeliveryExecutor deliveryExecutor = new DeliveryExecutor(ON_MESSAGE_EXECUTOR_TIMEOUT_MS); + private final DeliveryExecutor deliveryExecutor; /** The channels we use for browsing queues (there may be more than one in operation at a time) */ private Set browsingChannels = new HashSet(); // @GuardedBy(bcLock) @@ -161,15 +159,18 @@ private static Map generateJMSTypeIdents() { * Creates a session object associated with a connection * @param connection the connection that we will send data on * @param transacted whether this session is transacted or not + * @param onMessageTimeoutMs how long to wait for onMessage to return, in milliseconds * @param mode the (fixed) acknowledgement mode for this session * @param subscriptions the connection's subscriptions, shared with all sessions * @throws JMSException if we fail to create a {@link Channel} object on the connection, or if the acknowledgement mode is incorrect */ - public RMQSession(RMQConnection connection, boolean transacted, int mode, Map subscriptions) throws JMSException { + public RMQSession(RMQConnection connection, boolean transacted, int onMessageTimeoutMs, int mode, Map subscriptions) throws JMSException { if (mode<0 || mode>CLIENT_INDIVIDUAL_ACKNOWLEDGE) throw new JMSException(String.format("cannot create session with acknowledgement mode = %d.", mode)); this.connection = connection; this.transacted = transacted; this.subscriptions = subscriptions; + this.deliveryExecutor = new DeliveryExecutor(onMessageTimeoutMs); + if (transacted) { this.acknowledgeMode = Session.SESSION_TRANSACTED; this.isIndividualAck = false; diff --git a/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java b/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java index cf07b659..c95942f5 100644 --- a/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java +++ b/src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java @@ -19,6 +19,7 @@ public class RMQConnectionFactoryTest { RMQConnectionFactory defaultFact = new RMQConnectionFactory(); defaultProps.setProperty("uri", defaultFact.getUri()); defaultProps.setProperty("queueBrowserReadMax", "0"); + defaultProps.setProperty("onMessageTimeoutMs", "2000"); } private static Properties getProps(Reference ref) { @@ -79,6 +80,7 @@ public void testUpdatedConnectionFactoryReference() throws Exception { connFactory.setPassword("my-password"); connFactory.setPort(42); connFactory.setQueueBrowserReadMax(52); + connFactory.setOnMessageTimeoutMs(62); connFactory.setSsl(true); connFactory.setTerminationTimeout(1234567890123456789L); connFactory.setUsername("fred"); @@ -89,6 +91,7 @@ public void testUpdatedConnectionFactoryReference() throws Exception { assertEquals("Not the correct uri", "amqps://fred:my-password@sillyHost:42/bill", newProps.getProperty("uri")); assertEquals("Not the correct queueBrowserReadMax", "52", newProps.getProperty("queueBrowserReadMax")); + assertEquals("Not the correct onMessageTimeoutMs", "62", newProps.getProperty("onMessageTimeoutMs")); } @Test @@ -133,6 +136,7 @@ public void testConnectionFactoryReferenceUpdated() throws Exception { addStringRefProperty(ref, "password", "my-password"); addStringRefProperty(ref, "port", "42"); addStringRefProperty(ref, "queueBrowserReadMax", "52"); // duplicates don't overwrite + addStringRefProperty(ref, "onMessageTimeoutMs", "62"); addStringRefProperty(ref, "ssl", "true"); addStringRefProperty(ref, "terminationTimeout","1234567890123456789"); addStringRefProperty(ref, "username", "fred"); @@ -144,6 +148,7 @@ public void testConnectionFactoryReferenceUpdated() throws Exception { assertEquals("Not the correct password", "my-password", newFactory.getPassword()); assertEquals("Not the correct port", 42, newFactory.getPort()); assertEquals("Not the correct queueBrowserReadMax", 52, newFactory.getQueueBrowserReadMax()); + assertEquals("Not the correct onMessageTimeoutMs", 62, newFactory.getOnMessageTimeoutMs()); assertEquals("Not the correct ssl", true, newFactory.isSsl()); assertEquals("Not the correct terminationTimeout", 1234567890123456789L, newFactory.getTerminationTimeout()); From 7425192c172f728057709396636f5b2816a3c0f9 Mon Sep 17 00:00:00 2001 From: Guillaume Mornet Date: Thu, 8 Sep 2016 12:34:55 +0200 Subject: [PATCH 2/2] Keep docupdates1.3.2.md unchanged --- doc-notes/docupdates1.3.2.md | 7 ------- 1 file changed, 7 deletions(-) diff --git a/doc-notes/docupdates1.3.2.md b/doc-notes/docupdates1.3.2.md index 6f88ddcf..afd034e9 100644 --- a/doc-notes/docupdates1.3.2.md +++ b/doc-notes/docupdates1.3.2.md @@ -90,13 +90,6 @@ Here is a complete list of the attributes/properties available:   RabbitMQ port used for connections. The default is “5672” unless this is an SSL connection, in which case the default is “5671”. - - onMessageTimeoutMs -   - - How long to wait for onMessage to return, in milliseconds. Non-positive values are rejected. The default is “2000”. - - queueBrowserReadMax