Skip to content

Commit

Permalink
Add test code for issue #1464
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Jan 9, 2024
1 parent f8a3028 commit b2fe2c4
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 0 deletions.
6 changes: 6 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AsyncIntegration", "projects\Test\AsyncIntegration\AsyncIntegration.csproj", "{D98F96C5-F7FB-45FC-92A0-9133850FB432}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "gh-1464", "projects\gh-1464\gh-1464.csproj", "{4EC44119-0975-429D-BB0E-F90803B8861D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -89,6 +91,10 @@ Global
{D98F96C5-F7FB-45FC-92A0-9133850FB432}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D98F96C5-F7FB-45FC-92A0-9133850FB432}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D98F96C5-F7FB-45FC-92A0-9133850FB432}.Release|Any CPU.Build.0 = Release|Any CPU
{4EC44119-0975-429D-BB0E-F90803B8861D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4EC44119-0975-429D-BB0E-F90803B8861D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4EC44119-0975-429D-BB0E-F90803B8861D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4EC44119-0975-429D-BB0E-F90803B8861D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
159 changes: 159 additions & 0 deletions projects/gh-1464/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
using System.Diagnostics;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

const string queueName = "gh-1464";
const int messageCount = 1024;
int messagesReceived = 0;

var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var consumeConnectionShutdownSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var cts = new CancellationTokenSource();
cts.Token.Register(() =>
{
Console.WriteLine("[INFO] CANCELLING PUBLISH SYNC SOURCE");
publishSyncSource.SetCanceled();
});

Console.CancelKeyPress += delegate (object? sender, ConsoleCancelEventArgs e)
{
e.Cancel = true;
cts.Cancel();
};

var cf = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
Port = 55672,
DispatchConsumersAsync = true
};

using IConnection consumeConnection = await cf.CreateConnectionAsync();
using IChannel consumeChannel = await consumeConnection.CreateChannelAsync();

consumeConnection.ConnectionShutdown += (o, ea) =>
{
Console.WriteLine("[INFO] SAW CONSUME CONNECTION SHUTDOWN");
if (cts.IsCancellationRequested)
{
Console.WriteLine("[INFO] CANCELING SHUTDOWN SYNC SOURCE");
consumeConnectionShutdownSource.SetCanceled();
}
};

consumeChannel.ChannelShutdown += (o, ea) =>
{
Console.WriteLine("[INFO] SAW CONSUME CHANNEL SHUTDOWN");
};

var consumer = new AsyncEventingBasicConsumer(consumeChannel);

consumer.ConsumerCancelled += async (object sender, ConsumerEventArgs args) =>
{
Debug.Assert(Object.ReferenceEquals(consumer, sender));
Console.WriteLine("[INFO] SAW CONSUMER CANCELLED");
await Task.Yield();
};

consumer.Registered += async (object sender, ConsumerEventArgs args) =>
{
Debug.Assert(Object.ReferenceEquals(consumer, sender));
Console.WriteLine("[INFO] SAW CONSUMER REGISTERED");
await Task.Yield();
};

consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
Debug.Assert(Object.ReferenceEquals(consumer, sender));
var c = sender as AsyncEventingBasicConsumer;
Console.WriteLine($"[INFO] CONSUMER SAW TAG: {args.DeliveryTag}");
await consumeChannel.BasicAckAsync(args.DeliveryTag, false);
messagesReceived++;
if (messagesReceived == messageCount)
{
publishSyncSource.SetResult(true);
}
};

QueueDeclareOk q = await consumeChannel.QueueDeclareAsync(queue: queueName,
passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null);
Debug.Assert(queueName == q.QueueName);

await consumeChannel.BasicQosAsync(0, 1, false);
await consumeChannel.BasicConsumeAsync(queue: queueName, autoAck: false,
consumerTag: string.Empty, noLocal: false, exclusive: false,
arguments: null, consumer);

var publishTask = Task.Run(async () =>
{
var publishConnectionFactory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true
};
using (IConnection publishConnection = await publishConnectionFactory.CreateConnectionAsync())
{
using (IChannel publishChannel = await publishConnection.CreateChannelAsync())
{
publishChannel.BasicAcks += (object? sender, BasicAckEventArgs e) =>
{
Console.WriteLine($"[INFO] PUBLISHER SAW ACK: {e.DeliveryTag}");
};
publishChannel.BasicNacks += (object? sender, BasicNackEventArgs e) =>
{
Console.WriteLine($"[INFO] PUBLISHER SAW NACK: {e.DeliveryTag}");
};
await publishChannel.ConfirmSelectAsync();
Console.WriteLine($"[INFO] PUBLISHING MESSAGES: {messageCount}");
for (int i = 0; i < messageCount; i++)
{
cts.Token.ThrowIfCancellationRequested();
byte[] _body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
await publishChannel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, mandatory: true, body: _body);
await publishChannel.WaitForConfirmsOrDieAsync();
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"[INFO] SENT MESSAGE: {i}");
}
}
}
});

try
{
await publishSyncSource.Task;
Debug.Assert(messageCount == messagesReceived);
}
catch (OperationCanceledException ex)
{
Console.WriteLine($"[INFO] CANCELLATION REQUESTED: {ex}");
await consumeConnection.CloseAsync();

try
{
await consumeConnectionShutdownSource.Task;
}
catch (OperationCanceledException ccssex)
{
Console.WriteLine($"[INFO] CONSUME CONNECTION SYNC SOURCE CANCELED: {ccssex}");
}
}

try
{
await publishTask;
}
catch (OperationCanceledException pubex)
{
Console.WriteLine($"[INFO] PUBLISH TASK CANCELED: {pubex}");
}

Console.WriteLine($"[INFO] PUBLISH TASK COMPLETED");

Console.WriteLine($"[INFO] ALL TASKS COMPLETED");
15 changes: 15 additions & 0 deletions projects/gh-1464/gh-1464.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>gh_1464</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\RabbitMQ.Client\RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>

0 comments on commit b2fe2c4

Please sign in to comment.