From f60fa9fbfecc4cf82ed7bf0dfa294d94511c9905 Mon Sep 17 00:00:00 2001 From: Jesse Ehrenzweig Date: Wed, 5 Feb 2025 12:24:50 -0500 Subject: [PATCH 1/2] Refactored PublisherConfirms.cs * Deduped PublishMessagesInBatchAsync() method's batch publishing code * Deleted outstandingMessageCount variable; use publishTasks.Count instead * Fixed potential bug by setting minimum batch size to 1 --- dotnet/PublisherConfirms/PublisherConfirms.cs | 32 ++++--------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/dotnet/PublisherConfirms/PublisherConfirms.cs b/dotnet/PublisherConfirms/PublisherConfirms.cs index a295376d..cd159e6d 100644 --- a/dotnet/PublisherConfirms/PublisherConfirms.cs +++ b/dotnet/PublisherConfirms/PublisherConfirms.cs @@ -83,20 +83,20 @@ async Task PublishMessagesInBatchAsync() QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName; - int batchSize = MAX_OUTSTANDING_CONFIRMS / 2; - int outstandingMessageCount = 0; + int batchSize = Math.Max(1, MAX_OUTSTANDING_CONFIRMS / 2); - var sw = new Stopwatch(); - sw.Start(); + var sw = Stopwatch.StartNew(); var publishTasks = new List(); for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] body = Encoding.UTF8.GetBytes(i.ToString()); - publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props)); - outstandingMessageCount++; + ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props); + publishTasks.Add(publishTask); - if (outstandingMessageCount == batchSize) + // NOTE: [publishTasks] should be published after the final message has been added, + // even if the # of tasks it contains isn't equal to [batchSize]. + if (publishTasks.Count == batchSize || i+1 == MESSAGE_COUNT) { foreach (ValueTask pt in publishTasks) { @@ -110,25 +110,7 @@ async Task PublishMessagesInBatchAsync() } } publishTasks.Clear(); - outstandingMessageCount = 0; - } - } - - if (publishTasks.Count > 0) - { - foreach (ValueTask pt in publishTasks) - { - try - { - await pt; - } - catch (Exception ex) - { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); - } } - publishTasks.Clear(); - outstandingMessageCount = 0; } sw.Stop(); From f8095d86901c995c15e52a968c8b757f349ba91d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 5 Feb 2025 11:07:15 -0800 Subject: [PATCH 2/2] Avoid somewhat complicated if logic. --- dotnet/PublisherConfirms/PublisherConfirms.cs | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/dotnet/PublisherConfirms/PublisherConfirms.cs b/dotnet/PublisherConfirms/PublisherConfirms.cs index cd159e6d..5fbb1a10 100644 --- a/dotnet/PublisherConfirms/PublisherConfirms.cs +++ b/dotnet/PublisherConfirms/PublisherConfirms.cs @@ -1,7 +1,7 @@ -using System.Buffers.Binary; +using RabbitMQ.Client; +using System.Buffers.Binary; using System.Diagnostics; using System.Text; -using RabbitMQ.Client; const ushort MAX_OUTSTANDING_CONFIRMS = 256; @@ -94,27 +94,34 @@ async Task PublishMessagesInBatchAsync() ValueTask publishTask = channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props); publishTasks.Add(publishTask); - // NOTE: [publishTasks] should be published after the final message has been added, - // even if the # of tasks it contains isn't equal to [batchSize]. - if (publishTasks.Count == batchSize || i+1 == MESSAGE_COUNT) + await MaybeAwaitPublishes(publishTasks, batchSize); + } + + // Await any remaining tasks in case message count was not + // evenly divisible by batch size. + await MaybeAwaitPublishes(publishTasks, 0); + + sw.Stop(); + Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms"); +} + +static async Task MaybeAwaitPublishes(List publishTasks, int batchSize) +{ + if (publishTasks.Count >= batchSize) + { + foreach (ValueTask pt in publishTasks) { - foreach (ValueTask pt in publishTasks) + try { - try - { - await pt; - } - catch (Exception ex) - { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); - } + await pt; + } + catch (Exception ex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); } - publishTasks.Clear(); } + publishTasks.Clear(); } - - sw.Stop(); - Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms"); } async Task HandlePublishConfirmsAsynchronously()