Skip to content
This repository

Fix channel leakage when broker sends consumer cancel notifications #119

Closed
wants to merge 3 commits into from

2 participants

Daniel White Mike Hadlow
Daniel White

The simplest way that this can occur is when deleting the queue (i.e. IAdvancedBus.QueueDelete).

What ends up happening is that the transient queue is removed from the ConsumerFactory, the underlying resources are never cleaned up and so the channels end up building up until the connection is lost.

Two potential issues:

  • PersistentConsumer currently isn't included in this since it seems as though it should handle cancellation differently.
  • The disposal of the model is a little funky. I'm not sure if the underlying cause is the same bug I've seen reported before: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2012-October/023410.html. By disposing via the queue the model seems to get disposed successfully when connected to RabbitMQ.
danielwhite added some commits
Daniel White danielwhite Provide means to pass in an alternative queue for consumer tests d176279
Daniel White danielwhite Extend MockBuilder to allow for consumer cancellation
This allows us to test the behaviour of a channel when RabbitMQ cancels
the consumer.
6cba50b
Daniel White danielwhite Ensure transient consumer is disposed when the queue is deleted
Previously, the underlying factory would simply remove it from its list,
upon the receipt of 'basic.cancel' from the server.  Unfortunately, it
would not request for resources to be cleaned up and would end up leaving
consumers (and channels by extension) open indefinitely until the
underlying connection was lost.

It is yet to be determined if this should apply to PersistentConsumer, but
since it does very little when cancel occurs, it doesn't seem to apply.
0b36c1f
Mike Hadlow
Owner

Daniel, thanks for doing this, excellent work as usual. I want to test it thoroughly before pulling into master. How do you do integration test? By that I mean, how can you get RabbitMQ to send basic.cancel to a consumer?

Mike Hadlow
Owner

I guess if a queue that a persistent consumer is consuming from is deleted, then the correct behavior is for the channel to be closed and the consumer disposed.

Daniel White

Looks like I missed the integration tests before. I'll add them and update the pull request. At present, I'm triggering the behaviour by calling QueueDelete on the same bus that established the consumer.

As for the persistent consumers, my main concern is that the delete could come from numerous places. At least for transient consumers (i.e. exclusive queues), you can be sure that something on your connection triggered the delete. I might be missing something, but it appears that there isn't a means to determine if or when a consumer has been cancelled.

Mike Hadlow
Owner

I've taken your ideas and done my own implementation now. I think I'm going to take the opinionated stance that a deleted queue is a deleted queue, and EasyNetQ will not attempt to re-create it. Any consumers consuming from it will be closed and their channels shut down.

It turns out that implementing cancellation for this scenario also makes it easy to implement user cancellation, so I'll be releasing that at the same time, probably later today when I'm happy I haven't broken anything obvious.

Thanks so much for raising this issue and and your excellent work on the pull request, which I stole from shamelessly. I'm just sorry I took so long to pay it any attention.

Daniel White

I'm a happy man if the problem ends up solved. :)

Mike Hadlow
Owner

Closing. Thanks again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 3 unique commits by 1 author.

Nov 06, 2013
Daniel White danielwhite Provide means to pass in an alternative queue for consumer tests d176279
Daniel White danielwhite Extend MockBuilder to allow for consumer cancellation
This allows us to test the behaviour of a channel when RabbitMQ cancels
the consumer.
6cba50b
Daniel White danielwhite Ensure transient consumer is disposed when the queue is deleted
Previously, the underlying factory would simply remove it from its list,
upon the receipt of 'basic.cancel' from the server.  Unfortunately, it
would not request for resources to be cleaned up and would end up leaving
consumers (and channels by extension) open indefinitely until the
underlying connection was lost.

It is yet to be determined if this should apply to PersistentConsumer, but
since it does very little when cancel occurs, it doesn't seem to apply.
0b36c1f
This page is out of date. Refresh to see the latest.
8 Source/EasyNetQ.Tests/ConsumeTests/ConsumerTestBase.cs
@@ -52,8 +52,14 @@ protected void SetUp()
52 52
53 53 protected void StartConsumer(Action<byte[], MessageProperties, MessageReceivedInfo> handler)
54 54 {
55   - ConsumerWasInvoked = false;
56 55 var queue = new Queue("my_queue", false);
  56 +
  57 + StartConsumer(handler, queue);
  58 + }
  59 +
  60 + protected void StartConsumer(Action<byte[], MessageProperties, MessageReceivedInfo> handler, Queue queue)
  61 + {
  62 + ConsumerWasInvoked = false;
57 63 MockBuilder.Bus.Advanced.Consume(queue, (body, properties, messageInfo) => Task.Factory.StartNew(() =>
58 64 {
59 65 DeliveredMessageBody = body;
27 Source/EasyNetQ.Tests/ConsumeTests/When_consume_is_cancelled_for_transient_consumer.cs
... ... @@ -0,0 +1,27 @@
  1 +// ReSharper disable InconsistentNaming
  2 +
  3 +using EasyNetQ.Topology;
  4 +using NUnit.Framework;
  5 +using Rhino.Mocks;
  6 +
  7 +namespace EasyNetQ.Tests.ConsumeTests
  8 +{
  9 + [TestFixture]
  10 + class When_consume_is_cancelled_for_transient_consumer : ConsumerTestBase
  11 + {
  12 + protected override void AdditionalSetUp()
  13 + {
  14 + StartConsumer((body, properties, info) => { }, new Queue("my_queue", true));
  15 + }
  16 +
  17 + [Test]
  18 + public void Should_dispose_channel()
  19 + {
  20 + MockBuilder.Channels[0].BasicCancel(ConsumerTag);
  21 +
  22 + MockBuilder.Channels[0].AssertWasCalled(x => x.Dispose());
  23 + }
  24 + }
  25 +}
  26 +
  27 +// ReSharper restore InconsistentNaming
1  Source/EasyNetQ.Tests/EasyNetQ.Tests.csproj
@@ -83,6 +83,7 @@
83 83 <Compile Include="ConsumeTests\When_a_message_is_delivered_to_the_consumer.cs" />
84 84 <Compile Include="ConsumeTests\When_a_polymorphic_message_is_delivered_to_the_consumer.cs" />
85 85 <Compile Include="ConsumeTests\When_consume_is_called.cs" />
  86 + <Compile Include="ConsumeTests\When_consume_is_cancelled_for_transient_consumer.cs" />
86 87 <Compile Include="EventBusTests.cs" />
87 88 <Compile Include="ExchangeQueueBindingTests.cs" />
88 89 <Compile Include="HandlerRunnerTests\When_a_user_handler_is_executed.cs" />
16 Source/EasyNetQ.Tests/Mocking/MockBuilder.cs
... ... @@ -1,5 +1,6 @@
1 1 using System;
2 2 using System.Collections.Generic;
  3 +using System.Linq;
3 4 using RabbitMQ.Client;
4 5 using RabbitMQ.Client.Framing.v0_9_1;
5 6 using Rhino.Mocks;
@@ -12,7 +13,7 @@ public class MockBuilder
12 13 readonly IConnection connection = MockRepository.GenerateStub<IConnection>();
13 14 readonly List<IModel> channels = new List<IModel>();
14 15 readonly Stack<IModel> channelPool = new Stack<IModel>();
15   - readonly List<IBasicConsumer> consumers = new List<IBasicConsumer>();
  16 + readonly Dictionary<string, IBasicConsumer> consumers = new Dictionary<string, IBasicConsumer>();
16 17 readonly IBasicProperties basicProperties = new BasicProperties();
17 18 private readonly IEasyNetQLogger logger = MockRepository.GenerateStub<IEasyNetQLogger>();
18 19 private readonly IBus bus;
@@ -67,8 +68,17 @@ public MockBuilder(string connectionString, Action<IServiceRegister> registerSer
67 68 var consumer = (IBasicConsumer)consumeInvokation.Arguments[3];
68 69
69 70 consumer.HandleBasicConsumeOk(consumerTag);
70   - consumers.Add(consumer);
  71 + consumers[consumerTag] = consumer;
71 72 }).Return("");
  73 + channel.Stub(x => x.BasicCancel(null))
  74 + .IgnoreArguments()
  75 + .WhenCalled(cancelInvokation =>
  76 + {
  77 + var consumerTag = (string)cancelInvokation.Arguments[0];
  78 + var consumer = consumers[consumerTag];
  79 +
  80 + consumer.HandleBasicCancel(consumerTag);
  81 + });
72 82 });
73 83
74 84 bus = RabbitHutch.CreateBus(connectionString, x =>
@@ -100,7 +110,7 @@ public List<IModel> Channels
100 110
101 111 public List<IBasicConsumer> Consumers
102 112 {
103   - get { return consumers; }
  113 + get { return consumers.Values.ToList(); }
104 114 }
105 115
106 116 public IBasicProperties BasicProperties
5 Source/EasyNetQ/Consumer/ConsumerFactory.cs
@@ -34,7 +34,10 @@ public ConsumerFactory(IInternalConsumerFactory internalConsumerFactory, IEventB
34 34 consumer.RemoveMeFromList += theConsumer =>
35 35 {
36 36 object value;
37   - consumers.TryRemove(theConsumer, out value);
  37 + if (consumers.TryRemove(theConsumer, out value))
  38 + {
  39 + theConsumer.Dispose();
  40 + }
38 41 };
39 42
40 43 consumers.TryAdd(consumer, null);
13 Source/EasyNetQ/Consumer/InternalConsumer.cs
@@ -164,8 +164,17 @@ public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
164 164
165 165 public void Dispose()
166 166 {
167   - Model.Dispose();
168   - Cancel();
  167 + var model = Model;
  168 + if (model != null)
  169 + {
  170 + // Queued because calling dipose directly will hang the connection.
  171 + consumerDispatcher.QueueAction(model.Dispose);
  172 + }
  173 +
  174 + if (IsRunning)
  175 + {
  176 + Cancel();
  177 + }
169 178 }
170 179 }
171 180 }

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.