Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/main/java/com/rabbitmq/client/impl/StrictExceptionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,36 @@ public void handleConsumerException(Channel channel, Throwable exception,
Consumer consumer, String consumerTag,
String methodName)
{
handleChannelKiller(channel, exception, "Consumer " + consumer
+ " (" + consumerTag + ")"
+ " method " + methodName
+ " for channel " + channel);
String logMessage = "Consumer " + consumer
+ " (" + consumerTag + ")"
+ " method " + methodName
+ " for channel " + channel;
String closeMessage = "Consumer"
+ " (" + consumerTag + ")"
+ " method " + methodName
+ " for channel " + channel;
handleChannelKiller(channel, exception, logMessage, closeMessage);
}

@Override
protected void handleChannelKiller(Channel channel, Throwable exception, String what) {
log(what + " threw an exception for channel " + channel, exception);
handleChannelKiller(channel, exception, what, what);
}

protected void handleChannelKiller(Channel channel, Throwable exception, String logMessage, String closeMessage) {
log(logMessage + " threw an exception for channel " + channel, exception);
try {
channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + closeMessage);
} catch (AlreadyClosedException ace) {
// noop
} catch (TimeoutException ace) {
// noop
} catch (IOException ioe) {
log("Failure during close of channel " + channel + " after " + exception, ioe);
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + closeMessage);
}
}



}
3 changes: 2 additions & 1 deletion src/test/java/com/rabbitmq/client/test/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
RecoveryDelayHandlerTest.class,
FrameBuilderTest.class,
PropertyFileInitialisationTest.class,
ClientVersionTest.class
ClientVersionTest.class,
StrictExceptionHandlerTest.class
})
public class ClientTests {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2018-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.StrictExceptionHandler;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public class StrictExceptionHandlerTest {

@Test
public void tooLongClosingMessage() throws Exception {
ConnectionFactory cf = TestUtils.connectionFactory();
final CountDownLatch latch = new CountDownLatch(1);
cf.setExceptionHandler(new StrictExceptionHandler() {
@Override
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
try {
super.handleConsumerException(channel, exception, consumer, consumerTag, methodName);
} catch (IllegalArgumentException e) {
fail("No exception should caught");
}
latch.countDown();
}
});
Connection c = null;
try {
c = cf.newConnection();
Channel channel = c.createChannel();
String queue = channel.queueDeclare().getQueue();
channel.basicConsume(queue,
new VeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongClassName(
channel
));
channel.basicPublish("", queue, null, new byte[0]);
assertThat(latch.await(5, TimeUnit.SECONDS), is(true));
} finally {
if (c != null) {
c.close();
}
}
}

static class VeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongClassName
extends DefaultConsumer {

public VeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongClassName(
Channel channel) {
super(channel);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
throw new RuntimeException();
}
}
}