From 1667f692e4ff10c4ce38e86b3e03a953e387802f Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Feb 2011 16:31:25 +0000 Subject: [PATCH 1/7] Added support for consumer_cancel_notify to .Net client --- docs/specs/amqp0-9-1.stripped.xml | 2 ++ docs/specs/amqp0-9-1.xml | 17 ++++++++++---- .../src/client/api/DefaultBasicConsumer.cs | 6 +++++ .../src/client/api/IBasicConsumer.cs | 8 +++++++ .../RabbitMQ.Client/src/client/api/IModel.cs | 2 ++ .../src/client/impl/ModelBase.cs | 23 +++++++++++++++++++ .../src/client/impl/v0_9_1/ProtocolBase.cs | 1 + 7 files changed, 55 insertions(+), 4 deletions(-) diff --git a/docs/specs/amqp0-9-1.stripped.xml b/docs/specs/amqp0-9-1.stripped.xml index fa643201a..09cc34a97 100644 --- a/docs/specs/amqp0-9-1.stripped.xml +++ b/docs/specs/amqp0-9-1.stripped.xml @@ -384,12 +384,14 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/docs/specs/amqp0-9-1.xml b/docs/specs/amqp0-9-1.xml index 0f4c8a9c3..af6eac829 100644 --- a/docs/specs/amqp0-9-1.xml +++ b/docs/specs/amqp0-9-1.xml @@ -7,7 +7,8 @@ exchange.bind and exchange.bind-ok, exchange.unbind and exchange.unbind-ok, basic.nack - and the ability for the Server to send basic.ack and basic.nack to the client. + and the ability for the Server to send basic.ack, basic.nack and + basic.cancel to the client. --> @@ -2533,6 +2533,13 @@ messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. + + It is also sent from the server to the client in the event of + the consumer being unexpectedly cancelled (i.e. cancelled for + any reason other than the server receiving the corresponding + basic.cancel from the client). This allows clients to be + notified of the loss of consumers due to events such as queue + deletion. @@ -2546,6 +2553,7 @@ + @@ -2557,6 +2565,7 @@ This method confirms that the cancellation was completed. + diff --git a/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs index f65fb7302..6fb20a34d 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs @@ -131,6 +131,12 @@ public virtual void HandleBasicCancelOk(string consumerTag) OnCancel(); } + ///Default implementation - calls OnCancel(). + public virtual void HandleBasicCancel(string consumerTag) + { + OnCancel(); + } + ///Default implementation - sets ShutdownReason and ///calls OnCancel(). public virtual void HandleModelShutdown(IModel model, ShutdownEventArgs reason) diff --git a/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs index 0731e7023..295553016 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs @@ -69,6 +69,14 @@ public interface IBasicConsumer ///Called upon successful deregistration of the ///consumer from the broker. void HandleBasicCancelOk(string consumerTag); + + /// + /// Called when the consumer is cancelled for reasons other than by a + /// basicCancel: e.g. the queue has been deleted (either by this channel or + /// by any other channel). See handleCancelOk for notification of consumer + /// cancellation due to basicCancel. + /// + void HandleBasicCancel(string consumerTag); ///Called when the model shuts down. void HandleModelShutdown(IModel model, ShutdownEventArgs reason); diff --git a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs index 817a8f791..0784dc7f4 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IModel.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IModel.cs @@ -801,6 +801,8 @@ public interface IFullModel : RabbitMQ.Client.IModel [AmqpContentBodyMapping] byte[] body); + void HandleBasicCancel(string consumerTag, bool nowait); + ///Handle incoming Basic.Return methods. Signals a ///BasicReturnEvent. void HandleBasicReturn(ushort replyCode, diff --git a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs index 61f2a5c21..d119ebbc2 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs @@ -556,6 +556,29 @@ public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] b } } + public void HandleBasicCancel(string consumerTag, bool nowait) + { + IBasicConsumer consumer; + lock (m_consumers) + { + consumer = (IBasicConsumer)m_consumers[consumerTag]; + m_consumers.Remove(consumerTag); + } + if (consumer == null) + { + consumer = DefaultConsumer; + } + + try { + consumer.HandleBasicCancel(consumerTag); + } catch (Exception e) { + CallbackExceptionEventArgs args = new CallbackExceptionEventArgs(e); + args.Detail["consumer"] = consumer; + args.Detail["context"] = "HandleBasicCancel"; + OnCallbackException(args); + } + } + public void HandleBasicReturn(ushort replyCode, string replyText, string exchange, diff --git a/projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs b/projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs index 272e1e3cc..f1156c253 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/v0_9_1/ProtocolBase.cs @@ -51,6 +51,7 @@ public abstract class ProtocolBase: AbstractProtocolBase { Capabilities["publisher_confirms"] = true; Capabilities["exchange_exchange_bindings"] = true; Capabilities["basic.nack"] = true; + Capabilities["consumer_cancel_notify"] = true; } public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) { From f7caa419878407c0566b3385ff5a5e5fa46df21f Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 23 Feb 2011 21:31:43 +0000 Subject: [PATCH 2/7] Added .Net unit test for consumer death notification --- .../Unit/src/unit/TestConsumerCancelNotify.cs | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 projects/client/Unit/src/unit/TestConsumerCancelNotify.cs diff --git a/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs b/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs new file mode 100644 index 000000000..c71204dad --- /dev/null +++ b/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs @@ -0,0 +1,104 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 1.1. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (C) 2007-2011 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v1.1: +// +//--------------------------------------------------------------------------- +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is VMware, Inc. +// Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using NUnit.Framework; + +using System; +using System.IO; +using System.Text; +using System.Collections; +using System.Threading; + +using RabbitMQ.Client; +using RabbitMQ.Client.Impl; +using RabbitMQ.Util; + +namespace RabbitMQ.Client.Unit +{ + [TestFixture] + public class TestConsumerCancelNotify + { + + Object lockObject = new Object(); + bool notified = false; + + [Test] + public void TestConsumerCancelNotification() + { + string queue = "queue_consumer_notify"; + ConnectionFactory connFactory = new ConnectionFactory(); + IConnection conn = connFactory.CreateConnection(); + IModel chan = conn.CreateModel(); + chan.QueueDeclare(queue, false, true, false, null); + IBasicConsumer consumer = new CancelNotificationConsumer(chan, this); + chan.BasicConsume(queue, false, consumer); + + chan.QueueDelete(queue); + lock (lockObject) + { + if (!notified) + { + Monitor.Wait(lockObject); + } + Assert.IsTrue(notified); + } + } + + public class CancelNotificationConsumer : QueueingBasicConsumer + { + TestConsumerCancelNotify testClass; + + public CancelNotificationConsumer(IModel model, TestConsumerCancelNotify tc) : base(model) + { + this.testClass = tc; + } + + public override void HandleBasicCancel(string consumerTag) + { + lock (testClass.lockObject) + { + testClass.notified = true; + Monitor.PulseAll(testClass.lockObject); + } + } + } + } +} + From 7e8aec7677220fa8c7b4cfcd4902474797505e9d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 24 Feb 2011 11:53:51 +0000 Subject: [PATCH 3/7] cosmetics --- .../Unit/src/unit/TestConsumerCancelNotify.cs | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs b/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs index c71204dad..5c30dfa24 100644 --- a/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs +++ b/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs @@ -50,18 +50,15 @@ using RabbitMQ.Client.Impl; using RabbitMQ.Util; -namespace RabbitMQ.Client.Unit -{ +namespace RabbitMQ.Client.Unit { [TestFixture] - public class TestConsumerCancelNotify - { + public class TestConsumerCancelNotify { Object lockObject = new Object(); bool notified = false; [Test] - public void TestConsumerCancelNotification() - { + public void TestConsumerCancelNotification() { string queue = "queue_consumer_notify"; ConnectionFactory connFactory = new ConnectionFactory(); IConnection conn = connFactory.CreateConnection(); @@ -71,10 +68,8 @@ public void TestConsumerCancelNotification() chan.BasicConsume(queue, false, consumer); chan.QueueDelete(queue); - lock (lockObject) - { - if (!notified) - { + lock (lockObject) { + if (!notified) { Monitor.Wait(lockObject); } Assert.IsTrue(notified); @@ -85,15 +80,12 @@ public class CancelNotificationConsumer : QueueingBasicConsumer { TestConsumerCancelNotify testClass; - public CancelNotificationConsumer(IModel model, TestConsumerCancelNotify tc) : base(model) - { + public CancelNotificationConsumer(IModel model, TestConsumerCancelNotify tc) : base(model) { this.testClass = tc; } - public override void HandleBasicCancel(string consumerTag) - { - lock (testClass.lockObject) - { + public override void HandleBasicCancel(string consumerTag) { + lock (testClass.lockObject) { testClass.notified = true; Monitor.PulseAll(testClass.lockObject); } From b54fa2f5a801dc7738bafc2524ee21056b53101d Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Thu, 24 Feb 2011 14:53:57 +0000 Subject: [PATCH 4/7] ipv6 support for .net client Parse ipv6 hostnames Attempt to connect to hosts using ipv6 first --- .../src/client/api/AmqpTcpEndpoint.cs | 18 ++++++++++- .../src/client/impl/SocketFrameHandler_0_9.cs | 32 ++++++++++++++----- .../src/unit/TestAmqpTcpEndpointParsing.cs | 18 +++++++++++ 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/projects/client/RabbitMQ.Client/src/client/api/AmqpTcpEndpoint.cs b/projects/client/RabbitMQ.Client/src/client/api/AmqpTcpEndpoint.cs index 55fced59e..c99716f93 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/AmqpTcpEndpoint.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/AmqpTcpEndpoint.cs @@ -40,6 +40,7 @@ using System; using System.Collections; +using System.Text.RegularExpressions; using RabbitMQ.Client.Impl; namespace RabbitMQ.Client @@ -243,9 +244,24 @@ public override int GetHashCode() /// entire string is used as the hostname, and the port-number /// is set to -1 (meaning the default number for the protocol /// variant specified). + /// Hostnames provided as IPv6 must appear in square brackets ([]). /// public static AmqpTcpEndpoint Parse(IProtocol protocol, string address) { - int index = address.IndexOf(':'); + Match match = Regex.Match(address, @"^\s*\[([%:0-9A-Fa-f]+)\](:(.*))?\s*$"); + if (match.Success) + { + GroupCollection groups = match.Groups; + int portNum = -1; + if (groups[2].Success) + { + string portStr = groups[3].Value; + portNum = (portStr.Length == 0) ? -1 : int.Parse(portStr); + } + return new AmqpTcpEndpoint(protocol, + match.Groups[1].Value, + portNum); + } + int index = address.LastIndexOf(':'); if (index == -1) { return new AmqpTcpEndpoint(protocol, address, -1); } else { diff --git a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler_0_9.cs b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler_0_9.cs index 43463374a..b1d9290b0 100644 --- a/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler_0_9.cs +++ b/projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler_0_9.cs @@ -60,8 +60,24 @@ public class SocketFrameHandler_0_9 : IFrameHandler public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint) { m_endpoint = endpoint; - m_socket = new TcpClient(); - m_socket.Connect(endpoint.HostName, endpoint.Port); + m_socket = null; + if (Socket.OSSupportsIPv6) + { + try + { + m_socket = new TcpClient(AddressFamily.InterNetworkV6); + m_socket.Connect(endpoint.HostName, endpoint.Port); + } + catch(SocketException) + { + m_socket = null; + } + } + if (m_socket == null) + { + m_socket = new TcpClient(AddressFamily.InterNetwork); + m_socket.Connect(endpoint.HostName, endpoint.Port); + } // disable Nagle's algorithm, for more consistently low latency m_socket.NoDelay = true; @@ -88,12 +104,12 @@ public AmqpTcpEndpoint Endpoint public int Timeout { - set - { - if (m_socket.Connected) - { - m_socket.ReceiveTimeout = value; - } + set + { + if (m_socket.Connected) + { + m_socket.ReceiveTimeout = value; + } } } diff --git a/projects/client/Unit/src/unit/TestAmqpTcpEndpointParsing.cs b/projects/client/Unit/src/unit/TestAmqpTcpEndpointParsing.cs index 4e46e0d07..a7d8d861c 100644 --- a/projects/client/Unit/src/unit/TestAmqpTcpEndpointParsing.cs +++ b/projects/client/Unit/src/unit/TestAmqpTcpEndpointParsing.cs @@ -145,5 +145,23 @@ public void TestMultipleTwoMultipleCommas() Assert.AreEqual("other", es[1].HostName); Assert.AreEqual(2345, es[1].Port); } + + [Test] + public void TestIpv6WithPort() + { + AmqpTcpEndpoint e = AmqpTcpEndpoint.Parse(Protocols.DefaultProtocol, "[::1]:1234"); + Assert.AreEqual(Protocols.DefaultProtocol, e.Protocol); + Assert.AreEqual("::1", e.HostName); + Assert.AreEqual(1234, e.Port); + } + + [Test] + public void TestIpv6WithoutPort() + { + AmqpTcpEndpoint e = AmqpTcpEndpoint.Parse(Protocols.DefaultProtocol, "[::1]"); + Assert.AreEqual(Protocols.DefaultProtocol, e.Protocol); + Assert.AreEqual("::1", e.HostName); + Assert.AreEqual(Protocols.DefaultProtocol.DefaultPort, e.Port); + } } } From d276efd570f729ae16bbb33ad883b6c055baed2c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 4 Mar 2011 14:56:49 +0000 Subject: [PATCH 5/7] Remove trailing whitespace --- docs/specs/amqp0-9-1.xml | 5 +++-- .../src/client/api/DefaultBasicConsumer.cs | 2 +- .../RabbitMQ.Client/src/client/api/IBasicConsumer.cs | 2 +- .../client/Unit/src/unit/TestConsumerCancelNotify.cs | 10 +++++----- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/docs/specs/amqp0-9-1.xml b/docs/specs/amqp0-9-1.xml index af6eac829..d079aaa21 100644 --- a/docs/specs/amqp0-9-1.xml +++ b/docs/specs/amqp0-9-1.xml @@ -155,8 +155,9 @@ diff --git a/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs index 6fb20a34d..3fea0a443 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/DefaultBasicConsumer.cs @@ -136,7 +136,7 @@ public virtual void HandleBasicCancel(string consumerTag) { OnCancel(); } - + ///Default implementation - sets ShutdownReason and ///calls OnCancel(). public virtual void HandleModelShutdown(IModel model, ShutdownEventArgs reason) diff --git a/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs b/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs index 295553016..a8afd6aee 100644 --- a/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs +++ b/projects/client/RabbitMQ.Client/src/client/api/IBasicConsumer.cs @@ -69,7 +69,7 @@ public interface IBasicConsumer ///Called upon successful deregistration of the ///consumer from the broker. void HandleBasicCancelOk(string consumerTag); - + /// /// Called when the consumer is cancelled for reasons other than by a /// basicCancel: e.g. the queue has been deleted (either by this channel or diff --git a/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs b/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs index 5c30dfa24..3b8be1128 100644 --- a/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs +++ b/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs @@ -56,7 +56,7 @@ public class TestConsumerCancelNotify { Object lockObject = new Object(); bool notified = false; - + [Test] public void TestConsumerCancelNotification() { string queue = "queue_consumer_notify"; @@ -66,7 +66,7 @@ public class TestConsumerCancelNotify { chan.QueueDeclare(queue, false, true, false, null); IBasicConsumer consumer = new CancelNotificationConsumer(chan, this); chan.BasicConsume(queue, false, consumer); - + chan.QueueDelete(queue); lock (lockObject) { if (!notified) { @@ -75,15 +75,15 @@ public class TestConsumerCancelNotify { Assert.IsTrue(notified); } } - + public class CancelNotificationConsumer : QueueingBasicConsumer { TestConsumerCancelNotify testClass; - + public CancelNotificationConsumer(IModel model, TestConsumerCancelNotify tc) : base(model) { this.testClass = tc; } - + public override void HandleBasicCancel(string consumerTag) { lock (testClass.lockObject) { testClass.notified = true; From 937a4416769752b3ec96e40b336a9d2191419a45 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 4 Mar 2011 17:14:34 +0000 Subject: [PATCH 6/7] improve clarity of wording --- docs/specs/amqp0-9-1.xml | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/specs/amqp0-9-1.xml b/docs/specs/amqp0-9-1.xml index d079aaa21..cd0c70f0f 100644 --- a/docs/specs/amqp0-9-1.xml +++ b/docs/specs/amqp0-9-1.xml @@ -2535,12 +2535,16 @@ that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. - It is also sent from the server to the client in the event of - the consumer being unexpectedly cancelled (i.e. cancelled for - any reason other than the server receiving the corresponding - basic.cancel from the client). This allows clients to be - notified of the loss of consumers due to events such as queue - deletion. + It may also be sent from the server to the client in the event + of the consumer being unexpectedly cancelled (i.e. cancelled + for any reason other than the server receiving the + corresponding basic.cancel from the client). This allows + clients to be notified of the loss of consumers due to events + such as queue deletion. Note that as it is not a MUST for + clients to accept this method from the client, it is advisable + for the broker to be able to identify those clients that are + capable of accepting the method, through some means of + capability negotiation. From 4474f3b312de24a52158b638f8a0db96d94cc152 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 11 Mar 2011 11:24:39 +0000 Subject: [PATCH 7/7] Merge bug21647 (allow consumers to be informed of queue death)