From 9bf0e5cd6d4e382dd9f86049c5c488e0c62cae03 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 20 Feb 2024 09:37:45 -0800 Subject: [PATCH] Add test code for issue #1464 --- projects/Test/Common/IntegrationFixture.cs | 16 +++++ .../Test/Common/TestConnectionRecoveryBase.cs | 22 ------- projects/Test/Integration/TestToxiproxy.cs | 60 +++++++++++++++++++ projects/toxiproxy-netcore | 2 +- 4 files changed, 77 insertions(+), 23 deletions(-) diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 60028da94..74e4f9007 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -551,6 +551,22 @@ protected static byte[] GetRandomBody(ushort size = 1024) return body; } + protected static Task WaitForRecoveryAsync(IConnection conn) + { + TaskCompletionSource tcs = PrepareForRecovery((AutorecoveringConnection)conn); + return WaitAsync(tcs, "recovery succeded"); + } + + protected static TaskCompletionSource PrepareForRecovery(IConnection conn) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + AutorecoveringConnection aconn = conn as AutorecoveringConnection; + aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); + + return tcs; + } + public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture); } } diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index ee50887c2..439da74f5 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -231,16 +231,6 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn) return tcs; } - protected static TaskCompletionSource PrepareForRecovery(IConnection conn) - { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - AutorecoveringConnection aconn = conn as AutorecoveringConnection; - aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); - - return tcs; - } - protected static Task WaitForConfirmsWithCancellationAsync(IChannel m) { using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4))) @@ -249,18 +239,6 @@ protected static Task WaitForConfirmsWithCancellationAsync(IChannel m) } } - protected Task WaitForRecoveryAsync() - { - TaskCompletionSource tcs = PrepareForRecovery((AutorecoveringConnection)_conn); - return WaitAsync(tcs, "recovery succeded"); - } - - internal Task WaitForRecoveryAsync(AutorecoveringConnection conn) - { - TaskCompletionSource tcs = PrepareForRecovery(conn); - return WaitAsync(tcs, "recovery succeeded"); - } - protected Task WaitForShutdownAsync() { TaskCompletionSource tcs = PrepareForShutdown(_conn); diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index da2b7e8a6..82c5b5a86 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -169,6 +169,66 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() _output.WriteLine($"[INFO] heartbeat timeout took {sw.Elapsed}"); } + [SkippableFact] + [Trait("Category", "Toxiproxy")] + public async Task TestTcpReset_GH1464() + { + Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + + ConnectionFactory cf = CreateConnectionFactory(); + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), ProxyPort); + cf.Port = ProxyPort; + cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); + cf.AutomaticRecoveryEnabled = true; + + var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + Task recoveryTask = Task.Run(async () => + { + using (IConnection conn = await cf.CreateConnectionAsync()) + { + conn.ConnectionShutdown += (o, ea) => + { + connectionShutdownTcs.SetResult(true); + }; + + using (IChannel ch = await conn.CreateChannelAsync()) + { + channelCreatedTcs.SetResult(true); + await WaitForRecoveryAsync(conn); + await ch.CloseAsync(); + } + + await conn.CloseAsync(); + } + }); + + Assert.True(await channelCreatedTcs.Task); + + const string toxicName = "rmq-localhost-reset_peer"; + var resetPeerToxic = new ResetPeerToxic(); + resetPeerToxic.Name = toxicName; + resetPeerToxic.Attributes.Timeout = 500; + resetPeerToxic.Toxicity = 1.0; + + var sw = new Stopwatch(); + sw.Start(); + + await _rmqProxy.AddAsync(resetPeerToxic); + Task updateProxyTask = _rmqProxy.UpdateAsync(); + + await Task.WhenAll(updateProxyTask, connectionShutdownTcs.Task); + + await _rmqProxy.RemoveToxicAsync(toxicName); + + await recoveryTask; + + sw.Stop(); + + _output.WriteLine($"[INFO] reset peer took {sw.Elapsed}"); + } + private bool AreToxiproxyTestsEnabled { get diff --git a/projects/toxiproxy-netcore b/projects/toxiproxy-netcore index 13dfaee21..fe846157e 160000 --- a/projects/toxiproxy-netcore +++ b/projects/toxiproxy-netcore @@ -1 +1 @@ -Subproject commit 13dfaee21150e753bf948cd0141c83ea3f7f9372 +Subproject commit fe846157ed525e84e33e272e8116efffc499520d