Skip to content

Commit

Permalink
Fix econnaborted seen in integration test runs
Browse files Browse the repository at this point in the history
This error was introduced with commit 1fa0562

The main frame reading loop was cancelled too soon.
  • Loading branch information
lukebakken committed Mar 27, 2024
1 parent f664259 commit eaa7e33
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
8 changes: 6 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
Expand Up @@ -50,7 +50,7 @@ public Task UpdateSecretAsync(string newSecret, string reason)

internal void NotifyReceivedCloseOk()
{
TerminateMainloop();
MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true);
_closed = true;
}

Expand Down Expand Up @@ -112,7 +112,11 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
var serverVersion = new AmqpVersion(connectionStart.m_versionMajor, connectionStart.m_versionMinor);
if (!serverVersion.Equals(Protocol.Version))
{
TerminateMainloop();
/*
* Note:
* FinishCloseAsync will cancel the main loop
*/
MaybeTerminateMainloopAndStopHeartbeatTimers();
await FinishCloseAsync(cancellationToken);
throw new ProtocolVersionMismatchException(Protocol.MajorVersion, Protocol.MinorVersion, serverVersion.Major, serverVersion.Minor);
}
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs
Expand Up @@ -116,8 +116,13 @@ private async void HeartbeatReadTimerCallback(object? state)

if (shouldTerminate)
{
TerminateMainloop();
await FinishCloseAsync(_mainLoopCts.Token)
MaybeTerminateMainloopAndStopHeartbeatTimers();
/*
* Note: do NOT use the main loop cancellation token,
* because FininshCloseAsync immediately cancels it
*/
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout);
await FinishCloseAsync(cts.Token)
.ConfigureAwait(false);
}
else
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Expand Up @@ -165,9 +165,12 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance
///<remarks>
/// May be called more than once. Should therefore be idempotent.
///</remarks>
private void TerminateMainloop()
private void MaybeTerminateMainloopAndStopHeartbeatTimers(bool cancelMainLoop = false)
{
_mainLoopCts.Cancel();
if (cancelMainLoop)
{
_mainLoopCts.Cancel();
}
MaybeStopHeartbeatTimers();
}

Expand Down
8 changes: 6 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Expand Up @@ -362,7 +362,11 @@ await _session0.TransmitAsync(method, cancellationToken)
}
finally
{
TerminateMainloop();
/*
* Note:
* NotifyReceivedCloseOk will cancel the main loop
*/
MaybeTerminateMainloopAndStopHeartbeatTimers();
}
}

Expand Down Expand Up @@ -402,7 +406,7 @@ internal void ClosedViaPeer(ShutdownEventArgs reason)

OnShutdown(reason);
_session0.SetSessionClosing(true);
TerminateMainloop();
MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true);
}

// Only call at the end of the Mainloop or HeartbeatLoop
Expand Down

0 comments on commit eaa7e33

Please sign in to comment.