Skip to content

Commit

Permalink
Merge pull request #1345 from rabbitmq/lukebakken/async-queue-declare
Browse files Browse the repository at this point in the history
Implement QueueDeclareAsync
  • Loading branch information
lukebakken committed May 11, 2023
2 parents 26e5657 + 7eef2b3 commit c69bf00
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 44 deletions.
15 changes: 15 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public interface IChannel : IDisposable
void BasicNack(ulong deliveryTag, bool multiple, bool requeue);

#nullable enable

/// <summary>
/// Publishes a message.
/// </summary>
Expand All @@ -198,6 +199,7 @@ public interface IChannel : IDisposable
/// </remarks>
void BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Publishes a message.
/// </summary>
Expand All @@ -208,6 +210,7 @@ void BasicPublish<TProperties>(string exchange, string routingKey, in TPropertie
/// </remarks>
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Asynchronously publishes a message.
/// </summary>
Expand All @@ -218,6 +221,7 @@ void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, i
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
/// Asynchronously publishes a message.
/// </summary>
Expand All @@ -228,6 +232,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

#nullable disable

/// <summary>
Expand Down Expand Up @@ -361,6 +366,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

/// <summary>
/// Asynchronously declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
/// <param name="queue">The name of the queue. Pass an empty string to make the server generate a name.</param>
/// <param name="durable">Should this queue will survive a broker restart?</param>
/// <param name="exclusive">Should this queue use be limited to its declaring connection? Such a queue will be deleted when its declaring connection closes.</param>
/// <param name="autoDelete">Should this queue be auto-deleted when its last consumer (if any) unsubscribes?</param>
/// <param name="arguments">Optional; additional queue arguments, e.g. "x-queue-type"</param>
ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

/// <summary>
/// Declares a queue. See the <a href="https://www.rabbitmq.com/queues.html">Queues guide</a> to learn more.
/// </summary>
Expand Down
16 changes: 6 additions & 10 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,12 @@ public override void _Private_QueueBind(string queue, string exchange, string ro

public override void _Private_QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, IDictionary<string, object> arguments)
{
/*
* Note:
* Even though nowait is a parameter, ChannelSend must be used
*/
var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
if (nowait)
{
ChannelSend(method);
}
else
{
ChannelSend(method);
}
ChannelSend(method);
}

public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait)
Expand Down Expand Up @@ -382,8 +379,7 @@ protected override bool DispatchAsynchronous(in IncomingCommand cmd)
}
case ProtocolCommandId.QueueDeclareOk:
{
HandleQueueDeclareOk(in cmd);
return true;
return HandleQueueDeclareOk(in cmd);
}
default: return false;
}
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,14 @@ public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool
_connection.RecordQueue(new RecordedQueue(queue, queue.Length == 0, durable, exclusive, autoDelete, arguments));
}

public async ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
ThrowIfDisposed();
QueueDeclareOk result = await _innerChannel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments);
_connection.RecordQueue(new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments));
return result;
}

public QueueDeclareOk QueueDeclarePassive(string queue)
=> InnerChannel.QueueDeclarePassive(queue);

Expand Down
127 changes: 111 additions & 16 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Impl
{
Expand All @@ -57,7 +56,6 @@ internal abstract class ChannelBase : IChannel, IRecoverable
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

private readonly object _rpcLock = new object();
private readonly object _confirmLock = new object();
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();

Expand Down Expand Up @@ -328,7 +326,8 @@ private void HandleCommand(in IncomingCommand cmd)
{
if (!DispatchAsynchronous(in cmd)) // Was asynchronous. Already processed. No need to process further.
{
_continuationQueue.Next().HandleCommand(in cmd);
IRpcContinuation c = _continuationQueue.Next();
c.HandleCommand(in cmd);
}
}

Expand All @@ -337,12 +336,17 @@ protected void ChannelRpc<TMethod>(in TMethod method, ProtocolCommandId returnCo
{
var k = new SimpleBlockingRpcContinuation();
IncomingCommand reply;
lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);
}
finally
{
_rpcSemaphore.Release();
}

reply.ReturnMethodBuffer();

Expand All @@ -358,12 +362,17 @@ protected void ChannelRpc<TMethod>(in TMethod method, ProtocolCommandId returnCo
var k = new SimpleBlockingRpcContinuation();
IncomingCommand reply;

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
Session.Transmit(in method);
k.GetReply(ContinuationTimeout, out reply);
}
finally
{
_rpcSemaphore.Release();
}

if (reply.CommandId != returnCommandId)
{
Expand Down Expand Up @@ -783,13 +792,21 @@ protected void HandleConnectionUnblocked()
Session.Connection.HandleConnectionUnblocked();
}

protected void HandleQueueDeclareOk(in IncomingCommand cmd)
protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
{
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
var k = (QueueDeclareRpcContinuation)_continuationQueue.Next();
k.m_result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
if (_continuationQueue.TryPeek<QueueDeclareRpcContinuation>(out var k))
{
_continuationQueue.Next();
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
k.m_result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
return true;
}
else
{
return false;
}
}

public abstract void _Private_BasicCancel(string consumerTag, bool nowait);
Expand Down Expand Up @@ -844,12 +861,17 @@ public void BasicCancel(string consumerTag)
{
var k = new BasicConsumerRpcContinuation { m_consumerTag = consumerTag };

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_BasicCancel(consumerTag, false);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}
}

public void BasicCancelNoWait(string consumerTag)
Expand All @@ -872,7 +894,8 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool

var k = new BasicConsumerRpcContinuation { m_consumer = consumer };

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
// Non-nowait. We have an unconventional means of getting
Expand All @@ -881,6 +904,11 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool
/*nowait:*/ false, arguments);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}

string actualConsumerTag = k.m_consumerTag;

return actualConsumerTag;
Expand All @@ -889,12 +917,18 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool
public BasicGetResult BasicGet(string queue, bool autoAck)
{
var k = new BasicGetRpcContinuation();
lock (_rpcLock)

_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_BasicGet(queue, autoAck);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}

return k.m_result;
}
Expand Down Expand Up @@ -982,12 +1016,17 @@ public void BasicRecover(bool requeue)
{
var k = new SimpleBlockingRpcContinuation();

lock (_rpcLock)
_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_BasicRecover(requeue);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}
}

public abstract void BasicRecoverAsync(bool requeue);
Expand Down Expand Up @@ -1065,6 +1104,11 @@ public QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, b
return QueueDeclare(queue, false, durable, exclusive, autoDelete, arguments);
}

public ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
return QueueDeclareAsync(queue, false, durable, exclusive, autoDelete, arguments);
}

public void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
_Private_QueueDeclare(queue, false, durable, exclusive, autoDelete, true, arguments);
Expand Down Expand Up @@ -1196,17 +1240,44 @@ public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
var k = new QueueDeclareRpcContinuation();
lock (_rpcLock)

_rpcSemaphore.Wait();
try
{
Enqueue(k);
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
k.GetReply(ContinuationTimeout);
}
finally
{
_rpcSemaphore.Release();
}

QueueDeclareOk result = k.m_result;
CurrentQueue = result.QueueName;
return result;
}

private async ValueTask<QueueDeclareOk> QueueDeclareAsync(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
{
var k = new QueueDeclareAsyncRpcContinuation();
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
try
{
Enqueue(k);

var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
await ModelSendAsync(method).ConfigureAwait(false);

QueueDeclareOk result = await k;
CurrentQueue = result.QueueName;
return result;
}
finally
{
_rpcSemaphore.Release();
}
}

public class BasicConsumerRpcContinuation : SimpleBlockingRpcContinuation
{
Expand All @@ -1228,5 +1299,29 @@ public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
{
public QueueDeclareOk m_result;
}

public class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation<QueueDeclareOk>
{
public override void HandleCommand(in IncomingCommand cmd)
{
try
{
var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodBytes.Span);
var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount);
if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk)
{
_tcs.TrySetResult(result);
}
else
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}
}
finally
{
cmd.ReturnMethodBuffer();
}
}
}
}
}
19 changes: 19 additions & 0 deletions projects/RabbitMQ.Client/client/impl/RpcContinuationQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,24 @@ public IRpcContinuation Next()
{
return Interlocked.Exchange(ref _outstandingRpc, s_tmp);
}

///<summary>Peek at the next waiting continuation.</summary>
///<remarks>
///<para>
/// It is an error to call this method when there are no
/// waiting continuations.
///</para>
///</remarks>
public bool TryPeek<T>(out T continuation) where T : IRpcContinuation
{
if (_outstandingRpc is T result)
{
continuation = result;
return true;
}

continuation = default;
return false;
}
}
}
Loading

0 comments on commit c69bf00

Please sign in to comment.