Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make handling of server-originated methods async #1508

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 27 additions & 20 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.ConsumerDispatching;
Expand All @@ -8,7 +9,7 @@ namespace RabbitMQ.Benchmarks
{
[Config(typeof(Config))]
[BenchmarkCategory("ConsumerDispatcher")]
internal class ConsumerDispatcherBase
public class ConsumerDispatcherBase
{
protected static readonly ManualResetEventSlim _autoResetEvent = new ManualResetEventSlim(false);

Expand All @@ -19,18 +20,16 @@ internal class ConsumerDispatcherBase
protected readonly string _exchange = "Exchange";
protected readonly string _routingKey = "RoutingKey";
protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties();
protected readonly RentedMemory _body;
protected readonly byte[] _body = new byte[512];

public ConsumerDispatcherBase()
{
var r = new Random();
byte[] body = new byte[512];
r.NextBytes(body);
_body = new RentedMemory(body);
r.NextBytes(_body);
}
}

internal class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
{
[Params(1, 30)]
public int Count { get; set; }
Expand All @@ -39,41 +38,49 @@ internal class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
public int Concurrency { get; set; }

[GlobalSetup(Target = nameof(AsyncConsumerDispatcher))]
public void SetUpAsyncConsumer()
public async Task SetUpAsyncConsumer()
{
_consumer.Count = Count;
_dispatcher = new AsyncConsumerDispatcher(null, Concurrency);
_dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag);
await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None);
}

[Benchmark]
public void AsyncConsumerDispatcher()
public async Task AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
using (RentedMemory body = new RentedMemory(_body))
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body);
for (int i = 0; i < Count; i++)
{
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
CancellationToken.None);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}

[GlobalSetup(Target = nameof(ConsumerDispatcher))]
public void SetUpConsumer()
public async Task SetUpConsumer()
{
_consumer.Count = Count;
_dispatcher = new ConsumerDispatcher(null, Concurrency);
_dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag);
await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None);
}

[Benchmark]
public void ConsumerDispatcher()
public async Task ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
using (RentedMemory body = new RentedMemory(_body))
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body);
for (int i = 0; i < Count; i++)
{
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
CancellationToken.None);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
}
}
20 changes: 10 additions & 10 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
{
case ProtocolCommandId.BasicCancel:
{
HandleBasicCancel(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleBasicCancelAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicDeliver:
{
HandleBasicDeliver(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleBasicDeliverAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicAck:
{
Expand All @@ -108,8 +108,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ChannelCloseOk:
{
HandleChannelCloseOk(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleChannelCloseOkAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelFlow:
{
Expand All @@ -128,8 +128,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionSecure:
{
HandleConnectionSecure(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionSecureAsync(cmd);
}
case ProtocolCommandId.ConnectionStart:
{
Expand All @@ -138,8 +138,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionTune:
{
HandleConnectionTune(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionTuneAsync(cmd);
}
case ProtocolCommandId.ConnectionUnblocked:
{
Expand Down
38 changes: 26 additions & 12 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

namespace RabbitMQ.Client.Impl
{
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation, IDisposable
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly CancellationTokenRegistration _cancellationTokenRegistration;
Expand Down Expand Up @@ -101,7 +101,7 @@ public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
return _tcsConfiguredTaskAwaitable.GetAwaiter();
}

public abstract void HandleCommand(in IncomingCommand cmd);
public abstract Task HandleCommandAsync(IncomingCommand cmd);

public virtual void HandleChannelShutdown(ShutdownEventArgs reason)
{
Expand Down Expand Up @@ -135,7 +135,7 @@ public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout)
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -156,6 +156,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand All @@ -173,7 +175,7 @@ public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan
_expectedCommandId = expectedCommandId;
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -185,6 +187,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand All @@ -205,7 +209,7 @@ public BasicCancelAsyncRpcContinuation(string consumerTag, IConsumerDispatcher c
_consumerDispatcher = consumerDispatcher;
}

public override void HandleCommand(in IncomingCommand cmd)
public override async Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -214,7 +218,8 @@ public override void HandleCommand(in IncomingCommand cmd)
var method = new Client.Framing.Impl.BasicCancelOk(cmd.MethodSpan);
_tcs.TrySetResult(true);
Debug.Assert(_consumerTag == method._consumerTag);
_consumerDispatcher.HandleBasicCancelOk(_consumerTag);
await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken)
.ConfigureAwait(false);
}
else
{
Expand All @@ -240,15 +245,16 @@ public BasicConsumeAsyncRpcContinuation(IBasicConsumer consumer, IConsumerDispat
_consumerDispatcher = consumerDispatcher;
}

public override void HandleCommand(in IncomingCommand cmd)
public override async Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
if (cmd.CommandId == ProtocolCommandId.BasicConsumeOk)
{
var method = new Client.Framing.Impl.BasicConsumeOk(cmd.MethodSpan);
_tcs.TrySetResult(method._consumerTag);
_consumerDispatcher.HandleBasicConsumeOk(_consumer, method._consumerTag);
await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerTag, CancellationToken)
.ConfigureAwait(false);
}
else
{
Expand All @@ -272,7 +278,7 @@ public BasicGetAsyncRpcContinuation(Func<ulong, ulong> adjustDeliveryTag, TimeSp
_adjustDeliveryTag = adjustDeliveryTag;
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand Down Expand Up @@ -300,6 +306,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand Down Expand Up @@ -389,7 +397,7 @@ public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(con
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -403,6 +411,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand Down Expand Up @@ -433,7 +443,7 @@ public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(cont
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -446,6 +456,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand All @@ -460,7 +472,7 @@ public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout) : base(conti
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -473,6 +485,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand Down