Skip to content
Permalink
Browse files

GH-935: Handle all exceptions in handleDelivery

Fixes #935

- Don't call basicCancel if already canceled
- Catch all `Exception`s
  • Loading branch information
garyrussell authored and artembilan committed Mar 19, 2019
1 parent cc36ede commit 9c241c1bab294bea0007dc87650effa4502c29b3
@@ -827,11 +827,13 @@ public String toString() {

private final class InternalConsumer extends DefaultConsumer {

private final String queue;
private final String queueName;

boolean canceled;

InternalConsumer(Channel channel, String queue) {
super(channel);
this.queue = queue;
this.queueName = queue;
}

@Override
@@ -842,7 +844,7 @@ public void handleConsumeOk(String consumerTag) {
}
if (BlockingQueueConsumer.this.applicationEventPublisher != null) {
BlockingQueueConsumer.this.applicationEventPublisher
.publishEvent(new ConsumeOkEvent(this, this.queue, consumerTag));
.publishEvent(new ConsumeOkEvent(this, this.queueName, consumerTag));
}
}

@@ -866,10 +868,10 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
public void handleCancel(String consumerTag) {
if (logger.isWarnEnabled()) {
logger.warn("Cancel received for " + consumerTag + " ("
+ this.queue
+ this.queueName
+ "); " + BlockingQueueConsumer.this);
}
BlockingQueueConsumer.this.consumers.remove(this.queue);
BlockingQueueConsumer.this.consumers.remove(this.queueName);
if (!BlockingQueueConsumer.this.consumers.isEmpty()) {
basicCancel(false);
}
@@ -882,14 +884,15 @@ public void handleCancel(String consumerTag) {
public void handleCancelOk(String consumerTag) {
if (logger.isDebugEnabled()) {
logger.debug("Received cancelOk for tag " + consumerTag + " ("
+ this.queue
+ this.queueName
+ "); " + BlockingQueueConsumer.this);
}
this.canceled = true;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
if (logger.isDebugEnabled()) {
logger.debug("Storing delivery for consumerTag: '"
+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "
@@ -898,36 +901,40 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
try {
if (BlockingQueueConsumer.this.abortStarted > 0) {
if (!BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queue),
new Delivery(consumerTag, envelope, properties, body, this.queueName),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

Channel channelToClose = super.getChannel();
RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
// Defensive - should never happen
BlockingQueueConsumer.this.queue.clear();
channelToClose.basicNack(envelope.getDeliveryTag(), true, true);
channelToClose.basicCancel(consumerTag);
if (!this.canceled) {
getChannel().basicCancel(consumerTag);
}
try {
channelToClose.close();
}
catch (TimeoutException e) {
catch (@SuppressWarnings("unused") TimeoutException e) {
// no-op
}
}
}
else {
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queue));
.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
}
}
catch (InterruptedException e) {
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (Exception e) {
BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
}
}

@Override
public String toString() {
return "InternalConsumer{" + "queue='" + this.queue + '\'' +
return "InternalConsumer{" + "queue='" + this.queueName + '\'' +
", consumerTag='" + getConsumerTag() + '\'' +
'}';
}
@@ -42,8 +42,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.Level;
import org.junit.Rule;
@@ -302,7 +304,7 @@ public void testAlwaysCancelAutoRecoverConsumer() throws IOException {
}

@Test
public void testDrainAndReject() throws IOException {
public void testDrainAndReject() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
ChannelProxy channel = mock(ChannelProxy.class);
@@ -315,12 +317,18 @@ public void testDrainAndReject() throws IOException {
doReturn(isOpen.get()).when(channel).isOpen();
when(channel.queueDeclarePassive(anyString()))
.then(invocation -> mock(AMQP.Queue.DeclareOk.class));
doAnswer(i -> {
((Consumer) i.getArgument(6)).handleConsumeOk("consumerTag");
AtomicReference<Consumer> theConsumer = new AtomicReference<>();
doAnswer(inv -> {
Consumer consumer = inv.getArgument(6);
consumer.handleConsumeOk("consumerTag");
theConsumer.set(consumer);
return "consumerTag";
}).when(channel).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(),
anyMap(), any(Consumer.class));

doAnswer(inv -> {
theConsumer.get().handleCancelOk("consumerTag");
return null;
}).when(channel).basicCancel("consumerTag");
BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(connectionFactory,
new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<BlockingQueueConsumer>(),
AcknowledgeMode.AUTO, true, 2, "test");
@@ -346,9 +354,7 @@ public void testDrainAndReject() throws IOException {
envelope = new Envelope(3, false, "foo", "bar");
consumer.handleDelivery("consumerTag", envelope, props, new byte[0]);
assertThat(TestUtils.getPropertyValue(blockingQueueConsumer, "queue", BlockingQueue.class).size(), equalTo(0));
verify(channel).basicNack(3, true, true);
verify(channel, times(2)).basicCancel("consumerTag");
verify(channel, times(1)).basicCancel("consumerTag");
}


}

0 comments on commit 9c241c1

Please sign in to comment.
You can’t perform that action at this time.