Skip to content

Commit

Permalink
Add channel QoS setting
Browse files Browse the repository at this point in the history
Fixes #11
  • Loading branch information
acogoluegnes committed Dec 30, 2016
1 parent 1ea0f7a commit 2a7ffc4
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

<properties>
<rabbitmq.version>4.0.0</rabbitmq.version>
<awaitility.version>2.0.0</awaitility.version>

<java-compile-version>1.6</java-compile-version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -116,6 +117,12 @@
<version>1.7.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
/** The time to wait for threads/messages to terminate during {@link Connection#close()} */
private volatile long terminationTimeout = Long.getLong("rabbit.jms.terminationTimeout", 15000);

/**
* QoS setting for channels created by this connection factory.
*
* @see com.rabbitmq.client.Channel#basicQos(int)
*/
private int channelsQos = RMQConnection.NO_CHANNEL_QOS;

/**
* Classes in these packages can be transferred via ObjectMessage.
*
Expand Down Expand Up @@ -99,6 +106,7 @@ public Connection createConnection(String username, String password) throws JMSE
.setTerminationTimeout(getTerminationTimeout())
.setQueueBrowserReadMax(getQueueBrowserReadMax())
.setOnMessageTimeoutMs(getOnMessageTimeoutMs())
.setChannelsQos(channelsQos)
);
conn.setTrustedPackages(this.trustedPackages);
logger.debug("Connection {} created.", conn);
Expand Down Expand Up @@ -553,4 +561,24 @@ public void setOnMessageTimeoutMs(int onMessageTimeoutMs){
if (onMessageTimeoutMs > 0) this.onMessageTimeoutMs = onMessageTimeoutMs;
else this.logger.warn("Cannot set onMessageTimeoutMs to non-positive value {} (on {})", onMessageTimeoutMs, this);
}

/**
* QoS setting for channels created by this connection factory.
*
* @see com.rabbitmq.client.Channel#basicQos(int)
*/
public int getChannelsQos() {
return channelsQos;
}

/**
* QoS setting for channels created by this connection factory.
*
* @see com.rabbitmq.client.Channel#basicQos(int)
* @param channelsQos maximum number of messages that the server
* will deliver, 0 if unlimited
*/
public void setChannelsQos(int channelsQos) {
this.channelsQos = channelsQos;
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ public class ConnectionParams {
/** How long to wait for onMessage to return, in milliseconds */
private int onMessageTimeoutMs;

/**
* QoS setting for channels
*
* @see com.rabbitmq.client.Channel#basicQos(int)
*/
private int channelsQos = RMQConnection.NO_CHANNEL_QOS;

public Connection getRabbitConnection() {
return rabbitConnection;
}
Expand Down Expand Up @@ -55,4 +62,13 @@ public ConnectionParams setOnMessageTimeoutMs(int onMessageTimeoutMs) {
this.onMessageTimeoutMs = onMessageTimeoutMs;
return this;
}

public int getChannelsQos() {
return channelsQos;
}

public ConnectionParams setChannelsQos(int channelsQos) {
this.channelsQos = channelsQos;
return this;
}
}
15 changes: 14 additions & 1 deletion src/main/java/com/rabbitmq/jms/client/RMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
*/
public class RMQConnection implements Connection, QueueConnection, TopicConnection {

private final Logger logger = LoggerFactory.getLogger(RMQConnection.class);;
public static final int NO_CHANNEL_QOS = -1;

private final Logger logger = LoggerFactory.getLogger(RMQConnection.class);

/** the TCP connection wrapper to the RabbitMQ broker */
private final com.rabbitmq.client.Connection rabbitConnection;
Expand Down Expand Up @@ -81,6 +83,13 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
/** This is used for JMSCTS test cases, as ClientID should only be configurable right after the connection has been created */
private volatile boolean canSetClientID = true;

/**
* QoS setting for channels created by this connection.
*
* @see com.rabbitmq.client.Channel#basicQos(int)
*/
private final int channelsQos;

/**
* Classes in these packages can be transferred via ObjectMessage.
*
Expand All @@ -100,6 +109,7 @@ public RMQConnection(ConnectionParams connectionParams) {
this.terminationTimeout = connectionParams.getTerminationTimeout();
this.queueBrowserReadMax = connectionParams.getQueueBrowserReadMax();
this.onMessageTimeoutMs = connectionParams.getOnMessageTimeoutMs();
this.channelsQos = connectionParams.getChannelsQos();
}

/**
Expand Down Expand Up @@ -326,6 +336,9 @@ private void closeAllSessions() {

Channel createRabbitChannel(boolean transactional) throws IOException {
Channel channel = this.rabbitConnection.createChannel();
if(this.channelsQos != NO_CHANNEL_QOS) {
channel.basicQos(channelsQos);
}
if (transactional) {
channel.txSelect();
}
Expand Down
169 changes: 169 additions & 0 deletions src/test/java/com/rabbitmq/integration/tests/RabbitMqBasicQosIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
package com.rabbitmq.integration.tests;

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.client.RMQConnection;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Test;

import javax.jms.*;
import java.lang.IllegalStateException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.awaitility.Awaitility.*;
import static org.awaitility.Awaitility.waitAtMost;
import static org.awaitility.Duration.*;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;

/**
* Asynchronous Consumer integration test.
*/
public class RabbitMqBasicQosIT extends AbstractITQueue {
private static final String QUEUE_NAME = "test.queue." + RabbitMqBasicQosIT.class.getCanonicalName();
private static final String MESSAGE = "Hello " + RabbitMqBasicQosIT.class.getName();

@Test
public void messagesAreDispatchedToMultipleListenersWithQos() throws Exception {
sendMessages();
QueueConnection connection = null;
try {
connection = connection(1);
QueueSession queueSession = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(QUEUE_NAME);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);

CountDownLatch latch = new CountDownLatch(1);
CountDownLatch ackLatch = new CountDownLatch(1);

MessageListener listener = new MessageListener(latch, ackLatch);
queueReceiver.setMessageListener(listener);

// waiting for message to be received
latch.await(1000, TimeUnit.MILLISECONDS);
// message is in first listener, but not yet ack-ed
// no other message should be delivered to first listener

// start another listener
QueueSession queueSession2 = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QueueReceiver queueReceiver2 = queueSession2.createReceiver(queue);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch ackLatch2 = new CountDownLatch(1);
MessageListener listener2 = new MessageListener(latch2, ackLatch2);
queueReceiver2.setMessageListener(listener2);

// wait for second message in second listener
latch2.await(1000L, TimeUnit.MILLISECONDS);
// let's ack first message in first listener
ackLatch.countDown();
// let's ack second message in second listener
ackLatch2.countDown();

assertEquals(1, listener.getMessageCount());
assertEquals(1, listener2.getMessageCount());
} finally {
if(connection != null) {
connection.close();
}
}
}

@Test
public void messagesAreNotDispatchedToMultipleListenersWithoutQos() throws Exception {
sendMessages();
QueueConnection connection = null;
try {
connection = connection(RMQConnection.NO_CHANNEL_QOS);
QueueSession queueSession = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(QUEUE_NAME);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);

CountDownLatch latch = new CountDownLatch(1);
CountDownLatch ackLatch = new CountDownLatch(1);

MessageListener listener = new MessageListener(latch, ackLatch);
queueReceiver.setMessageListener(listener);

// waiting for message to be received
latch.await(1000, TimeUnit.MILLISECONDS);
// message is in first listener, but not yet ack-ed
// second message should arrive in first listener internal queue

// start another listener, but it should see anything
QueueSession queueSession2 = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QueueReceiver queueReceiver2 = queueSession2.createReceiver(queue);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch ackLatch2 = new CountDownLatch(1);
MessageListener listener2 = new MessageListener(latch2, ackLatch2);
queueReceiver2.setMessageListener(listener2);

// ack first message
ackLatch.countDown();
// second message should be delivered to first listener
waitAtMost(ONE_SECOND).untilCall(to(listener).getMessageCount(), equalTo(2));

assertEquals(2, listener.getMessageCount());
assertEquals(0, listener2.getMessageCount());
} finally {
if(connection != null) {
connection.close();
}
}


}

private void sendMessages() throws Exception {
try {
queueConn.start();
QueueSession queueSession = queueConn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(QUEUE_NAME);
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = queueSession.createTextMessage(MESSAGE);
queueSender.send(message);
queueSender.send(message);
} finally {
reconnect();
}
}

private QueueConnection connection(int qos) throws Exception {
RMQConnectionFactory connectionFactory = (RMQConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory().getConnectionFactory();
connectionFactory.setChannelsQos(qos);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
queueConnection.start();
return queueConnection;
}

private static class MessageListener implements javax.jms.MessageListener {
private final AtomicInteger messageCount = new AtomicInteger(0);
private final CountDownLatch latch;
private final CountDownLatch ackLatch;
public MessageListener(CountDownLatch latch, CountDownLatch ackLatch) {
this.latch = latch;
this.ackLatch = ackLatch;
}
@Override
public void onMessage(Message message) {
messageCount.incrementAndGet();
this.latch.countDown();
try {
this.ackLatch.await(1000L, TimeUnit.MILLISECONDS);
message.acknowledge();
} catch (Exception e) {
throw new RuntimeException("Error in message processing", e);
}

}
public int getMessageCount() {
return this.messageCount.get();
}
}
}

0 comments on commit 2a7ffc4

Please sign in to comment.