diff --git a/src/main/java/com/rabbitmq/client/impl/StrictExceptionHandler.java b/src/main/java/com/rabbitmq/client/impl/StrictExceptionHandler.java index 2c722d5d76..ab373e15fb 100644 --- a/src/main/java/com/rabbitmq/client/impl/StrictExceptionHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/StrictExceptionHandler.java @@ -55,25 +55,36 @@ public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) { - handleChannelKiller(channel, exception, "Consumer " + consumer - + " (" + consumerTag + ")" - + " method " + methodName - + " for channel " + channel); + String logMessage = "Consumer " + consumer + + " (" + consumerTag + ")" + + " method " + methodName + + " for channel " + channel; + String closeMessage = "Consumer" + + " (" + consumerTag + ")" + + " method " + methodName + + " for channel " + channel; + handleChannelKiller(channel, exception, logMessage, closeMessage); } @Override protected void handleChannelKiller(Channel channel, Throwable exception, String what) { - log(what + " threw an exception for channel " + channel, exception); + handleChannelKiller(channel, exception, what, what); + } + + protected void handleChannelKiller(Channel channel, Throwable exception, String logMessage, String closeMessage) { + log(logMessage + " threw an exception for channel " + channel, exception); try { - channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what); + channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + closeMessage); } catch (AlreadyClosedException ace) { // noop } catch (TimeoutException ace) { // noop } catch (IOException ioe) { log("Failure during close of channel " + channel + " after " + exception, ioe); - channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what); + channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + closeMessage); } } + + } diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index 08a8d02259..16545e9003 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -53,7 +53,8 @@ RecoveryDelayHandlerTest.class, FrameBuilderTest.class, PropertyFileInitialisationTest.class, - ClientVersionTest.class + ClientVersionTest.class, + StrictExceptionHandlerTest.class }) public class ClientTests { diff --git a/src/test/java/com/rabbitmq/client/test/StrictExceptionHandlerTest.java b/src/test/java/com/rabbitmq/client/test/StrictExceptionHandlerTest.java new file mode 100644 index 0000000000..5cc68d1eb3 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/StrictExceptionHandlerTest.java @@ -0,0 +1,83 @@ +// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.impl.StrictExceptionHandler; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class StrictExceptionHandlerTest { + + @Test + public void tooLongClosingMessage() throws Exception { + ConnectionFactory cf = TestUtils.connectionFactory(); + final CountDownLatch latch = new CountDownLatch(1); + cf.setExceptionHandler(new StrictExceptionHandler() { + @Override + public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) { + try { + super.handleConsumerException(channel, exception, consumer, consumerTag, methodName); + } catch (IllegalArgumentException e) { + fail("No exception should caught"); + } + latch.countDown(); + } + }); + Connection c = null; + try { + c = cf.newConnection(); + Channel channel = c.createChannel(); + String queue = channel.queueDeclare().getQueue(); + channel.basicConsume(queue, + new VeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongClassName( + channel + )); + channel.basicPublish("", queue, null, new byte[0]); + assertThat(latch.await(5, TimeUnit.SECONDS), is(true)); + } finally { + if (c != null) { + c.close(); + } + } + } + + static class VeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongClassName + extends DefaultConsumer { + + public VeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongClassName( + Channel channel) { + super(channel); + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + throw new RuntimeException(); + } + } +}