Skip to content

Commit

Permalink
merging bug23887 into default
Browse files Browse the repository at this point in the history
  • Loading branch information
Vlad Alexandru Ionescu committed Mar 17, 2011
2 parents 3120445 + faed96f commit 161b7fc
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 10 deletions.
2 changes: 2 additions & 0 deletions docs/specs/amqp0-9-1.stripped.xml
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,14 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
</method>
<method name="cancel" synchronous="1" index="30">
<chassis name="server" implement="MUST"/>
<chassis name="client" implement="SHOULD"/>
<response name="cancel-ok"/>
<field name="consumer-tag" domain="consumer-tag"/>
<field name="no-wait" domain="no-wait"/>
</method>
<method name="cancel-ok" synchronous="1" index="31">
<chassis name="client" implement="MUST"/>
<chassis name="server" implement="MAY"/>
<field name="consumer-tag" domain="consumer-tag"/>
</method>
<method name="publish" content="1" index="40">
Expand Down
16 changes: 15 additions & 1 deletion docs/specs/amqp0-9-1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
exchange.bind and exchange.bind-ok,
exchange.unbind and exchange.unbind-ok,
basic.nack
and the ability for the Server to send basic.ack and basic.nack to the client.
and the ability for the Server to send basic.ack, basic.nack and
basic.cancel to the client.
-->

<!--
Expand Down Expand Up @@ -2533,6 +2534,17 @@
messages, but it does mean the server will not send any more messages for
that consumer. The client may receive an arbitrary number of messages in
between sending the cancel method and receiving the cancel-ok reply.

It may also be sent from the server to the client in the event
of the consumer being unexpectedly cancelled (i.e. cancelled
for any reason other than the server receiving the
corresponding basic.cancel from the client). This allows
clients to be notified of the loss of consumers due to events
such as queue deletion. Note that as it is not a MUST for
clients to accept this method from the client, it is advisable
for the broker to be able to identify those clients that are
capable of accepting the method, through some means of
capability negotiation.
</doc>

<rule name = "01">
Expand All @@ -2546,6 +2558,7 @@
</rule>

<chassis name = "server" implement = "MUST" />
<chassis name = "client" implement = "SHOULD" />
<response name = "cancel-ok" />

<field name = "consumer-tag" domain = "consumer-tag" />
Expand All @@ -2557,6 +2570,7 @@
This method confirms that the cancellation was completed.
</doc>
<chassis name = "client" implement = "MUST" />
<chassis name = "server" implement = "MAY" />
<field name = "consumer-tag" domain = "consumer-tag" />
</method>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

using System;
using System.Collections;
using System.Text.RegularExpressions;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -243,9 +244,24 @@ public override int GetHashCode()
/// entire string is used as the hostname, and the port-number
/// is set to -1 (meaning the default number for the protocol
/// variant specified).
/// Hostnames provided as IPv6 must appear in square brackets ([]).
///</remarks>
public static AmqpTcpEndpoint Parse(IProtocol protocol, string address) {
int index = address.IndexOf(':');
Match match = Regex.Match(address, @"^\s*\[([%:0-9A-Fa-f]+)\](:(.*))?\s*$");
if (match.Success)
{
GroupCollection groups = match.Groups;
int portNum = -1;
if (groups[2].Success)
{
string portStr = groups[3].Value;
portNum = (portStr.Length == 0) ? -1 : int.Parse(portStr);
}
return new AmqpTcpEndpoint(protocol,
match.Groups[1].Value,
portNum);
}
int index = address.LastIndexOf(':');
if (index == -1) {
return new AmqpTcpEndpoint(protocol, address, -1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public virtual void HandleBasicCancelOk(string consumerTag)
OnCancel();
}

///<summary>Default implementation - calls OnCancel().</summary>
public virtual void HandleBasicCancel(string consumerTag)
{
OnCancel();
}

///<summary>Default implementation - sets ShutdownReason and
///calls OnCancel().</summary>
public virtual void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public interface IBasicConsumer
///consumer from the broker.</summary>
void HandleBasicCancelOk(string consumerTag);

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a
/// basicCancel: e.g. the queue has been deleted (either by this channel or
/// by any other channel). See handleCancelOk for notification of consumer
/// cancellation due to basicCancel.
/// </summary>
void HandleBasicCancel(string consumerTag);

///<summary>Called when the model shuts down.</summary>
void HandleModelShutdown(IModel model, ShutdownEventArgs reason);

Expand Down
2 changes: 2 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ public interface IFullModel : RabbitMQ.Client.IModel
[AmqpContentBodyMapping]
byte[] body);

void HandleBasicCancel(string consumerTag, bool nowait);

///<summary>Handle incoming Basic.Return methods. Signals a
///BasicReturnEvent.</summary>
void HandleBasicReturn(ushort replyCode,
Expand Down
23 changes: 23 additions & 0 deletions projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,29 @@ public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] b
}
}

public void HandleBasicCancel(string consumerTag, bool nowait)
{
IBasicConsumer consumer;
lock (m_consumers)
{
consumer = (IBasicConsumer)m_consumers[consumerTag];
m_consumers.Remove(consumerTag);
}
if (consumer == null)
{
consumer = DefaultConsumer;
}

try {
consumer.HandleBasicCancel(consumerTag);
} catch (Exception e) {
CallbackExceptionEventArgs args = new CallbackExceptionEventArgs(e);
args.Detail["consumer"] = consumer;
args.Detail["context"] = "HandleBasicCancel";
OnCallbackException(args);
}
}

public void HandleBasicReturn(ushort replyCode,
string replyText,
string exchange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,24 @@ public class SocketFrameHandler_0_9 : IFrameHandler
public SocketFrameHandler_0_9(AmqpTcpEndpoint endpoint)
{
m_endpoint = endpoint;
m_socket = new TcpClient();
m_socket.Connect(endpoint.HostName, endpoint.Port);
m_socket = null;
if (Socket.OSSupportsIPv6)
{
try
{
m_socket = new TcpClient(AddressFamily.InterNetworkV6);
m_socket.Connect(endpoint.HostName, endpoint.Port);
}
catch(SocketException)
{
m_socket = null;
}
}
if (m_socket == null)
{
m_socket = new TcpClient(AddressFamily.InterNetwork);
m_socket.Connect(endpoint.HostName, endpoint.Port);
}
// disable Nagle's algorithm, for more consistently low latency
m_socket.NoDelay = true;

Expand All @@ -88,12 +104,12 @@ public AmqpTcpEndpoint Endpoint

public int Timeout
{
set
{
if (m_socket.Connected)
{
m_socket.ReceiveTimeout = value;
}
set
{
if (m_socket.Connected)
{
m_socket.ReceiveTimeout = value;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public abstract class ProtocolBase: AbstractProtocolBase {
Capabilities["publisher_confirms"] = true;
Capabilities["exchange_exchange_bindings"] = true;
Capabilities["basic.nack"] = true;
Capabilities["consumer_cancel_notify"] = true;
}

public override IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint) {
Expand Down
18 changes: 18 additions & 0 deletions projects/client/Unit/src/unit/TestAmqpTcpEndpointParsing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,23 @@ public void TestMultipleTwoMultipleCommas()
Assert.AreEqual("other", es[1].HostName);
Assert.AreEqual(2345, es[1].Port);
}

[Test]
public void TestIpv6WithPort()
{
AmqpTcpEndpoint e = AmqpTcpEndpoint.Parse(Protocols.DefaultProtocol, "[::1]:1234");
Assert.AreEqual(Protocols.DefaultProtocol, e.Protocol);
Assert.AreEqual("::1", e.HostName);
Assert.AreEqual(1234, e.Port);
}

[Test]
public void TestIpv6WithoutPort()
{
AmqpTcpEndpoint e = AmqpTcpEndpoint.Parse(Protocols.DefaultProtocol, "[::1]");
Assert.AreEqual(Protocols.DefaultProtocol, e.Protocol);
Assert.AreEqual("::1", e.HostName);
Assert.AreEqual(Protocols.DefaultProtocol.DefaultPort, e.Port);
}
}
}
96 changes: 96 additions & 0 deletions projects/client/Unit/src/unit/TestConsumerCancelNotify.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 1.1.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (C) 2007-2011 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v1.1:
//
//---------------------------------------------------------------------------
// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is VMware, Inc.
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using NUnit.Framework;

using System;
using System.IO;
using System.Text;
using System.Collections;
using System.Threading;

using RabbitMQ.Client;
using RabbitMQ.Client.Impl;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Unit {
[TestFixture]
public class TestConsumerCancelNotify {

Object lockObject = new Object();
bool notified = false;

[Test]
public void TestConsumerCancelNotification() {
string queue = "queue_consumer_notify";
ConnectionFactory connFactory = new ConnectionFactory();
IConnection conn = connFactory.CreateConnection();
IModel chan = conn.CreateModel();
chan.QueueDeclare(queue, false, true, false, null);
IBasicConsumer consumer = new CancelNotificationConsumer(chan, this);
chan.BasicConsume(queue, false, consumer);

chan.QueueDelete(queue);
lock (lockObject) {
if (!notified) {
Monitor.Wait(lockObject);
}
Assert.IsTrue(notified);
}
}

public class CancelNotificationConsumer : QueueingBasicConsumer
{
TestConsumerCancelNotify testClass;

public CancelNotificationConsumer(IModel model, TestConsumerCancelNotify tc) : base(model) {
this.testClass = tc;
}

public override void HandleBasicCancel(string consumerTag) {
lock (testClass.lockObject) {
testClass.notified = true;
Monitor.PulseAll(testClass.lockObject);
}
}
}
}
}

0 comments on commit 161b7fc

Please sign in to comment.