From b2fe2c4f1e4b2ab879c9dda0990eef4ed11339f6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 8 Jan 2024 16:25:42 -0800 Subject: [PATCH] Add test code for issue #1464 --- RabbitMQDotNetClient.sln | 6 ++ projects/gh-1464/Program.cs | 159 ++++++++++++++++++++++++++++++++ projects/gh-1464/gh-1464.csproj | 15 +++ 3 files changed, 180 insertions(+) create mode 100644 projects/gh-1464/Program.cs create mode 100644 projects/gh-1464/gh-1464.csproj diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index efdd0fcf9..6a232a7e0 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -39,6 +39,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AsyncIntegration", "projects\Test\AsyncIntegration\AsyncIntegration.csproj", "{D98F96C5-F7FB-45FC-92A0-9133850FB432}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "gh-1464", "projects\gh-1464\gh-1464.csproj", "{4EC44119-0975-429D-BB0E-F90803B8861D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -89,6 +91,10 @@ Global {D98F96C5-F7FB-45FC-92A0-9133850FB432}.Debug|Any CPU.Build.0 = Debug|Any CPU {D98F96C5-F7FB-45FC-92A0-9133850FB432}.Release|Any CPU.ActiveCfg = Release|Any CPU {D98F96C5-F7FB-45FC-92A0-9133850FB432}.Release|Any CPU.Build.0 = Release|Any CPU + {4EC44119-0975-429D-BB0E-F90803B8861D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4EC44119-0975-429D-BB0E-F90803B8861D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4EC44119-0975-429D-BB0E-F90803B8861D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4EC44119-0975-429D-BB0E-F90803B8861D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/projects/gh-1464/Program.cs b/projects/gh-1464/Program.cs new file mode 100644 index 000000000..6423b7c0c --- /dev/null +++ b/projects/gh-1464/Program.cs @@ -0,0 +1,159 @@ +using System.Diagnostics; +using System.Text; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +const string queueName = "gh-1464"; +const int messageCount = 1024; +int messagesReceived = 0; + +var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); +var consumeConnectionShutdownSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + +using var cts = new CancellationTokenSource(); +cts.Token.Register(() => +{ + Console.WriteLine("[INFO] CANCELLING PUBLISH SYNC SOURCE"); + publishSyncSource.SetCanceled(); +}); + +Console.CancelKeyPress += delegate (object? sender, ConsoleCancelEventArgs e) +{ + e.Cancel = true; + cts.Cancel(); +}; + +var cf = new ConnectionFactory +{ + AutomaticRecoveryEnabled = true, + TopologyRecoveryEnabled = true, + Port = 55672, + DispatchConsumersAsync = true +}; + +using IConnection consumeConnection = await cf.CreateConnectionAsync(); +using IChannel consumeChannel = await consumeConnection.CreateChannelAsync(); + +consumeConnection.ConnectionShutdown += (o, ea) => +{ + Console.WriteLine("[INFO] SAW CONSUME CONNECTION SHUTDOWN"); + if (cts.IsCancellationRequested) + { + Console.WriteLine("[INFO] CANCELING SHUTDOWN SYNC SOURCE"); + consumeConnectionShutdownSource.SetCanceled(); + } +}; + +consumeChannel.ChannelShutdown += (o, ea) => +{ + Console.WriteLine("[INFO] SAW CONSUME CHANNEL SHUTDOWN"); +}; + +var consumer = new AsyncEventingBasicConsumer(consumeChannel); + +consumer.ConsumerCancelled += async (object sender, ConsumerEventArgs args) => +{ + Debug.Assert(Object.ReferenceEquals(consumer, sender)); + Console.WriteLine("[INFO] SAW CONSUMER CANCELLED"); + await Task.Yield(); +}; + +consumer.Registered += async (object sender, ConsumerEventArgs args) => +{ + Debug.Assert(Object.ReferenceEquals(consumer, sender)); + Console.WriteLine("[INFO] SAW CONSUMER REGISTERED"); + await Task.Yield(); +}; + +consumer.Received += async (object sender, BasicDeliverEventArgs args) => +{ + Debug.Assert(Object.ReferenceEquals(consumer, sender)); + var c = sender as AsyncEventingBasicConsumer; + Console.WriteLine($"[INFO] CONSUMER SAW TAG: {args.DeliveryTag}"); + await consumeChannel.BasicAckAsync(args.DeliveryTag, false); + messagesReceived++; + if (messagesReceived == messageCount) + { + publishSyncSource.SetResult(true); + } +}; + +QueueDeclareOk q = await consumeChannel.QueueDeclareAsync(queue: queueName, + passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null); +Debug.Assert(queueName == q.QueueName); + +await consumeChannel.BasicQosAsync(0, 1, false); +await consumeChannel.BasicConsumeAsync(queue: queueName, autoAck: false, + consumerTag: string.Empty, noLocal: false, exclusive: false, + arguments: null, consumer); + +var publishTask = Task.Run(async () => +{ + var publishConnectionFactory = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + TopologyRecoveryEnabled = true + }; + + using (IConnection publishConnection = await publishConnectionFactory.CreateConnectionAsync()) + { + using (IChannel publishChannel = await publishConnection.CreateChannelAsync()) + { + publishChannel.BasicAcks += (object? sender, BasicAckEventArgs e) => + { + Console.WriteLine($"[INFO] PUBLISHER SAW ACK: {e.DeliveryTag}"); + }; + + publishChannel.BasicNacks += (object? sender, BasicNackEventArgs e) => + { + Console.WriteLine($"[INFO] PUBLISHER SAW NACK: {e.DeliveryTag}"); + }; + + await publishChannel.ConfirmSelectAsync(); + + Console.WriteLine($"[INFO] PUBLISHING MESSAGES: {messageCount}"); + for (int i = 0; i < messageCount; i++) + { + cts.Token.ThrowIfCancellationRequested(); + byte[] _body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); + await publishChannel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, mandatory: true, body: _body); + await publishChannel.WaitForConfirmsOrDieAsync(); + await Task.Delay(TimeSpan.FromSeconds(1)); + Console.WriteLine($"[INFO] SENT MESSAGE: {i}"); + } + } + } +}); + +try +{ + await publishSyncSource.Task; + Debug.Assert(messageCount == messagesReceived); +} +catch (OperationCanceledException ex) +{ + Console.WriteLine($"[INFO] CANCELLATION REQUESTED: {ex}"); + await consumeConnection.CloseAsync(); + + try + { + await consumeConnectionShutdownSource.Task; + } + catch (OperationCanceledException ccssex) + { + Console.WriteLine($"[INFO] CONSUME CONNECTION SYNC SOURCE CANCELED: {ccssex}"); + } +} + +try +{ + await publishTask; +} +catch (OperationCanceledException pubex) +{ + Console.WriteLine($"[INFO] PUBLISH TASK CANCELED: {pubex}"); +} + +Console.WriteLine($"[INFO] PUBLISH TASK COMPLETED"); + +Console.WriteLine($"[INFO] ALL TASKS COMPLETED"); diff --git a/projects/gh-1464/gh-1464.csproj b/projects/gh-1464/gh-1464.csproj new file mode 100644 index 000000000..6a8430ae8 --- /dev/null +++ b/projects/gh-1464/gh-1464.csproj @@ -0,0 +1,15 @@ + + + + Exe + net6.0 + gh_1464 + enable + enable + + + + + + +