diff --git a/bom/pom.xml b/bom/pom.xml
index 2f2b3f32..85fb8abd 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -43,6 +43,7 @@
0.3.0
1.0
3.0.0
+ 4.0.1
2.10.0
@@ -183,6 +184,12 @@
${mockito.version}
test
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ test
+
org.apache.activemq
diff --git a/broker/src/test/java/util/Broker.java b/broker/src/test/java/util/Broker.java
index a8fa36cf..754ce6f3 100644
--- a/broker/src/test/java/util/Broker.java
+++ b/broker/src/test/java/util/Broker.java
@@ -19,11 +19,15 @@
package util;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.SimpleLayout;
import org.junit.jupiter.api.extension.ExtensionContext;
import java.io.IOException;
@@ -42,11 +46,21 @@ public class Broker implements AutoCloseable, ExtensionContext.Store.CloseableRe
public Configuration configuration = new ConfigurationImpl();
public Broker() {
- tempDir = null;
+ this(null);
}
public Broker(Path tempDir) {
this.tempDir = tempDir;
+ configureBroker(configuration);
+ }
+
+ /**
+ * Set configuration for the test broker. Delta from broker's default configuration.
+ *
+ * @param configuration configuration to apply the delta to
+ */
+ private static void configureBroker(Configuration configuration) {
+ configuration.setMaxDiskUsage(100); // my laptop is constantly running out of disk space
}
public void startBroker() {
@@ -68,6 +82,17 @@ public void close() {
}
}
+ /**
+ * Configures a log4j appender if there isn't any, so that log messages flood the stdout
+ */
+ public static void configureLogging() {
+ if (LogManager.getRootLogger().getAllAppenders().hasMoreElements()) {
+ return;
+ }
+ ConsoleAppender consoleAppender = new ConsoleAppender(new SimpleLayout(), ConsoleAppender.SYSTEM_OUT);
+ LogManager.getRootLogger().addAppender(consoleAppender);
+ }
+
/**
* @return port where the acceptor listens
*/
@@ -75,7 +100,7 @@ public int addAMQPAcceptor() {
Exception lastException = null;
for (int i = 0; i < 10; i++) {
try {
- int port = findRandomOpenPortOnAllLocalInterfaces();
+ int port = findRandomAvailablePortOnAllLocalInterfaces();
Acceptor acceptor = embeddedBroker.getActiveMQServer().getRemotingService().createAcceptor("amqp", "tcp://127.0.0.1:" + port + "?protocols=AMQP");
acceptor.start(); // this will throw if the port is not available
return port;
@@ -90,7 +115,7 @@ int addCoreAcceptor() {
Exception lastException = null;
for (int i = 0; i < 10; i++) {
try {
- int port = findRandomOpenPortOnAllLocalInterfaces();
+ int port = findRandomAvailablePortOnAllLocalInterfaces();
Acceptor acceptor = embeddedBroker.getActiveMQServer().getRemotingService().createAcceptor("core", "tcp://127.0.0.1:" + port + "?protocols=CORE");
acceptor.start(); // this will throw if the port is not available
return port;
@@ -120,7 +145,7 @@ public int addAMQPSAcceptor(InputStream keyStore) {
Exception lastException = null;
for (int i = 0; i < 10; i++) {
try {
- int port = findRandomOpenPortOnAllLocalInterfaces();
+ int port = findRandomAvailablePortOnAllLocalInterfaces();
Acceptor acceptor = embeddedBroker.getActiveMQServer().getRemotingService().createAcceptor("amqps",
"tcp://0.0.0.0:" + port + "?sslEnabled=true;keyStorePath=" + keyStorePath + ";keyStorePassword=secureexample;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300");
acceptor.start(); // this will throw if the port is not available
@@ -132,12 +157,16 @@ public int addAMQPSAcceptor(InputStream keyStore) {
throw new RuntimeException("Failed to bind to an available port", lastException);
}
+ public org.apache.activemq.artemis.core.server.Queue getProxyToQueue(String queueName) {
+ return embeddedBroker.getActiveMQServer().locateQueue(SimpleString.toSimpleString(queueName));
+ }
+
/**
* @return port number (there is a race so it may not be available anymore)
* @throws IOException
*/
// https://stackoverflow.com/questions/2675362/how-to-find-an-available-port
- private int findRandomOpenPortOnAllLocalInterfaces() throws IOException {
+ private int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
try (ServerSocket socket = new ServerSocket()) {
socket.setReuseAddress(true);
socket.bind(new InetSocketAddress(0));
diff --git a/cli-qpid-jms/pom.xml b/cli-qpid-jms/pom.xml
index 0016405c..582d8e4e 100644
--- a/cli-qpid-jms/pom.xml
+++ b/cli-qpid-jms/pom.xml
@@ -60,6 +60,11 @@
junit-pioneer
test
+
+ org.awaitility
+ awaitility
+ test
+
com.redhat.cli-java
diff --git a/cli-qpid-jms/src/test/java/ENTMQCL1860.java b/cli-qpid-jms/src/test/java/ENTMQCL1860.java
new file mode 100644
index 00000000..a70e851d
--- /dev/null
+++ b/cli-qpid-jms/src/test/java/ENTMQCL1860.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2019 Red Hat, Inc.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import util.Broker;
+import util.BrokerFixture;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static org.awaitility.Awaitility.await;
+
+// Originally created by Andy Taylor,
+// available from https://github.com/andytaylor/activemq-artemis/commits/ENTMQBR-2677
+class ENTMQCL1860 {
+ @BeforeAll
+ static void configureLogging() {
+ // turn on extra verbose logging
+ //Broker.configureLogging();
+ }
+
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ @ExtendWith(BrokerFixture.class)
+ void testAckWithSessionClose(@BrokerFixture.TempBroker Broker broker) throws Exception {
+ broker.configuration.setSecurityEnabled(false);
+ broker.configuration.setPersistenceEnabled(false); // this, or tmpdir, otherwise test runs interact
+ broker.startBroker();
+
+ String brokerUrl = "amqp://localhost:" + broker.addAMQPAcceptor();
+ JmsConnectionFactory connectionFactory = new JmsConnectionFactory(brokerUrl);
+
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createMessage());
+ connection.close();
+ Queue queueView = broker.getProxyToQueue(getQueueName());
+
+ await().untilAsserted(() -> assertThat(queueView.getMessageCount()).isEqualTo(1));
+
+ // Now create a new connection and receive and acknowledge
+ for (int i = 0; i < 10; i++) {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
+ Message message = consumer.receive();
+ assertWithMessage("Message is null during for loop iteration i = %s", i)
+ .that(message).isNotNull();
+ connection.close();
+ if (i > 0) {
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+ }
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
+ Message message = consumer.receiveNoWait();
+ Assert.assertNull(message);
+ connection.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ private String getQueueName() {
+ return "someQueue";
+ }
+}