Skip to content

Commit

Permalink
Continue simplifying code
Browse files Browse the repository at this point in the history
Part of #1472

* Move some overrides to their base class

* Simplify method, add comments

* Try to figure out CI error here - https://github.com/rabbitmq/rabbitmq-dotnet-client/actions/runs/8072267202/job/22053764839

* Ensure `_closeReason` is not null

* Remove or address TODOs

* Revert to calling `AbortAsync` in `Dispose` for connections and channels

* Add test showing that `CloseAsync` is not required before `Dispose`
  • Loading branch information
lukebakken committed Feb 28, 2024
1 parent 57a5251 commit 7c75648
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void GlobalSetup()
_container = RabbitMQBroker.Start();

var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
// TODO / NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
// NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
_connection = EnsureCompleted(cf.CreateConnectionAsync());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ namespace RabbitMQ.Client.Exceptions
/// </summary>
[Serializable]
public class OperationInterruptedException
// TODO: inherit from OperationCanceledException
// The above is an old TODO that I don't think applies here.
: RabbitMQClientException
{
///<summary>Construct an OperationInterruptedException with
Expand Down
75 changes: 23 additions & 52 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,55 +42,45 @@ public Channel(ConnectionConfig config, ISession session) : base(config, session
{
}

public override Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
{
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken)
{
var method = new ChannelCloseOk();
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken)
{
var method = new ChannelFlowOk(active);
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken)
{
var method = new ConnectionCloseOk();
return ModelSendAsync(method, cancellationToken).AsTask();
}

public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple)
{
var method = new BasicAck(deliveryTag, multiple);
// TODO use cancellation token
// TODO cancellation token
return ModelSendAsync(method, CancellationToken.None);
}

public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue)
{
var method = new BasicNack(deliveryTag, multiple, requeue);
// TODO use cancellation token
// TODO cancellation token
return ModelSendAsync(method, CancellationToken.None);
}

public override Task BasicRejectAsync(ulong deliveryTag, bool requeue)
{
var method = new BasicReject(deliveryTag, requeue);
// TODO cancellation token?
// TODO cancellation token
return ModelSendAsync(method, CancellationToken.None).AsTask();
}

/// <summary>
/// Returning <c>true</c> from this method means that the command was server-originated,
/// and handled already.
/// Returning <c>false</c> (the default) means that the incoming command is the response to
/// a client-initiated RPC call, and must be handled.
/// </summary>
/// <param name="cmd">The incoming command from the AMQP server</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns></returns>
protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
switch (cmd.CommandId)
{
case ProtocolCommandId.BasicCancel:
{
HandleBasicCancel(in cmd);
return Task.FromResult(true);
}
case ProtocolCommandId.BasicDeliver:
{
HandleBasicDeliver(in cmd);
Expand All @@ -101,27 +91,6 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
HandleBasicAck(in cmd);
return Task.FromResult(true);
}
case ProtocolCommandId.BasicCancel:
{
HandleBasicCancel(in cmd);
return Task.FromResult(true);
}
case ProtocolCommandId.BasicCancelOk:
{
return Task.FromResult(false);
}
case ProtocolCommandId.BasicConsumeOk:
{
return Task.FromResult(false);
}
case ProtocolCommandId.BasicGetEmpty:
{
return Task.FromResult(false);
}
case ProtocolCommandId.BasicGetOk:
{
return Task.FromResult(false);
}
case ProtocolCommandId.BasicNack:
{
HandleBasicNack(in cmd);
Expand All @@ -134,6 +103,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ChannelClose:
{
// Note: always returns true
return HandleChannelCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelCloseOk:
Expand All @@ -143,6 +113,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ChannelFlow:
{
// Note: always returns true
return HandleChannelFlowAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionBlocked:
Expand All @@ -152,6 +123,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionClose:
{
// Note: always returns true
return HandleConnectionCloseAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionSecure:
Expand All @@ -161,6 +133,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionStart:
{
// Note: always returns true
return HandleConnectionStartAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionTune:
Expand All @@ -173,12 +146,10 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
HandleConnectionUnblocked(in cmd);
return Task.FromResult(true);
}
case ProtocolCommandId.QueueDeclareOk:
default:
{
bool result = HandleQueueDeclareOk(in cmd);
return Task.FromResult(result);
return Task.FromResult(false);
}
default: return Task.FromResult(false);
}
}
}
Expand Down
13 changes: 5 additions & 8 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
var tcs = (TaskCompletionSource<T>)state;
if (tcs.TrySetCanceled())
{
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
// Cancellation was successful, does this mean we should set a TimeoutException
// in the same manner as BlockingCell?
// Cancellation was successful, does this mean we set a TimeoutException
// in the same manner as BlockingCell used to
tcs.SetException(new TimeoutException("TODO LRB rabbitmq/rabbitmq-dotnet-client#1347"));
}
}, _tcs);
Expand All @@ -77,9 +76,9 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
var tcs = (TaskCompletionSource<T>)state;
if (tcs.TrySetCanceled())
{
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
// Cancellation was successful, does this mean we should set a TimeoutException
// in the same manner as BlockingCell?
// Cancellation was successful, does this mean we set a TimeoutException
// in the same manner as BlockingCell used to
tcs.SetException(new TimeoutException("TODO LRB rabbitmq/rabbitmq-dotnet-client#1347"));
}
}, state: _tcs, useSynchronizationContext: false);
#endif
Expand Down Expand Up @@ -148,8 +147,6 @@ public override void HandleCommand(in IncomingCommand cmd)
else if (cmd.CommandId == ProtocolCommandId.ConnectionTune)
{
var tune = new ConnectionTune(cmd.MethodSpan);
// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
// What to do if setting a result fails?
_tcs.TrySetResult(new ConnectionSecureOrTune
{
m_tuneDetails = new ConnectionTuneDetails { m_channelMax = tune._channelMax, m_frameMax = tune._frameMax, m_heartbeatInSeconds = tune._heartbeat }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,9 @@ public void Dispose()
return;
}

// TODO rabbitmq-dotnet-client-1472
// this.Abort();
if (IsOpen)
{
throw new InvalidOperationException("AutorecoveringChannel must be closed before calling Dispose!");
this.AbortAsync().GetAwaiter().GetResult();
}

_recordedConsumerTags.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private static void HandleTopologyRecoveryException(TopologyRecoveryException e)
ESLog.Info($"Will not retry recovery because of {e.InnerException?.GetType().FullName}: it's not a known problem with connectivity, ignoring it", e);
}

// TODO propagate token
// TODO propagate cancellation token
private async ValueTask<bool> TryPerformAutomaticRecoveryAsync(CancellationToken cancellationToken)
{
ESLog.Info("Performing automatic recovery");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,7 @@ public void Dispose()

try
{
// TODO rabbitmq-dotnet-client-1472
// this.Abort(InternalConstants.DefaultConnectionAbortTimeout);
if (IsOpen)
{
throw new InvalidOperationException("Connection must be closed before calling Dispose!");
}
_innerConnection.Dispose();
}
catch (Exception)
{
Expand Down
50 changes: 25 additions & 25 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,11 @@ private void OnSessionShutdown(object sender, ShutdownEventArgs reason)

internal bool SetCloseReason(ShutdownEventArgs reason)
{
if (reason is null)
{
throw new ArgumentNullException(nameof(reason));
}

// NB: this ensures that Close is only called once on a channel
return Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
}
Expand All @@ -487,20 +492,21 @@ protected virtual void Dispose(bool disposing)
{
if (disposing)
{
// dispose managed resources
// TODO exception?
if (IsOpen)
{
throw new InvalidOperationException("Channel must be closed before calling Dispose!");
this.AbortAsync().GetAwaiter().GetResult();
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
}

// dispose unmanaged resources
}

public abstract Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken);
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
{
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
return ModelSendAsync(method, cancellationToken).AsTask();
}

protected void HandleBasicAck(in IncomingCommand cmd)
{
Expand Down Expand Up @@ -663,8 +669,10 @@ protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, Cancella

Session.Close(CloseReason, false);

await _Private_ChannelCloseOkAsync(cancellationToken)
var method = new ChannelCloseOk();
await ModelSendAsync(method, cancellationToken)
.ConfigureAwait(false);

return true;
}
finally
Expand Down Expand Up @@ -710,8 +718,9 @@ protected async Task<bool> HandleChannelFlowAsync(IncomingCommand cmd, Cancellat
_flowControlBlock.Reset();
}

await _Private_ChannelFlowOkAsync(active, cancellationToken)
.ConfigureAwait(false);
var method = new ChannelFlowOk(active);
await ModelSendAsync(method, cancellationToken).
ConfigureAwait(false);

if (!_flowControlWrapper.IsEmpty)
{
Expand Down Expand Up @@ -748,8 +757,11 @@ protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, Cance
try
{
Session.Connection.ClosedViaPeer(reason);
await _Private_ConnectionCloseOkAsync(cancellationToken)

var replyMethod = new ConnectionCloseOk();
await ModelSendAsync(replyMethod, cancellationToken)
.ConfigureAwait(false);

SetCloseReason(Session.Connection.CloseReason);
}
catch (IOException)
Expand Down Expand Up @@ -832,19 +844,11 @@ protected void HandleConnectionUnblocked(in IncomingCommand cmd)
}
}

// TODO rabbitmq-dotnet-client-1472 remove this method
protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
{
return false;
}

public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken);

public abstract Task _Private_ChannelFlowOkAsync(bool active, CancellationToken cancellationToken);
public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);

public abstract Task _Private_ConnectionCloseOkAsync(CancellationToken cancellationToken);
public abstract ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue);

public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);
public abstract Task BasicRejectAsync(ulong deliveryTag, bool requeue);

public async Task BasicCancelAsync(string consumerTag, bool noWait)
{
Expand Down Expand Up @@ -946,8 +950,6 @@ await ModelSendAsync(method, k.CancellationToken)
}
}

public abstract ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue);

public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
Expand Down Expand Up @@ -1162,8 +1164,6 @@ await ModelSendAsync(method, k.CancellationToken)
}
}

public abstract Task BasicRejectAsync(ulong deliveryTag, bool requeue);

public async Task ConfirmSelectAsync()
{
using var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout);
Expand Down
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
HandleMainLoopException(ea);
}

// TODO is this the best way?
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout);
await FinishCloseAsync(cts.Token);
}
Expand Down Expand Up @@ -175,7 +174,7 @@ private void TerminateMainloop()
private void HandleMainLoopException(ShutdownEventArgs reason)
{
string message = reason.GetLogMessage();
if (!SetCloseReason(reason))
if (false == SetCloseReason(reason))
{
LogCloseError($"Unexpected Main Loop Exception while closing: {message}", reason.Exception);
return;
Expand Down

0 comments on commit 7c75648

Please sign in to comment.