Skip to content

Commit

Permalink
Add RabbitMQ (transport without batching) to baseline outbox test
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
  • Loading branch information
EtherZa committed Jun 19, 2024
1 parent 844378b commit c04cfd4
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 32 deletions.
4 changes: 2 additions & 2 deletions src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ services:

rabbitmq:
container_name: slim.rabbitmq
image: rabbitmq:3
image: rabbitmq:3-management
ports:
- 5672:5672
- 15672:15672
- 15672:15672 # user/pass: guest/guest
networks:
- slim

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ public enum BusType
{
AzureSB,
Kafka,
RabbitMQ
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
namespace SlimMessageBus.Host.Outbox.DbContext.Test;

using System.Net.Mime;

using Microsoft.EntityFrameworkCore.Migrations;

using SlimMessageBus.Host.RabbitMQ;

/// <summary>
/// This test should help to understand the runtime performance and overhead of the outbox feature.
/// It will generate the time measurements for a given transport (Azure DB + Azure SQL instance) as the baseline,
Expand All @@ -14,41 +18,96 @@
public class OutboxBenchmarkTests(ITestOutputHelper testOutputHelper) : BaseIntegrationTest<OutboxBenchmarkTests>(testOutputHelper)
{
private bool _useOutbox;
private BusType _testParamBusType;

protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration)
{
services.AddSlimMessageBus(mbb =>
{
mbb.AutoStartConsumersEnabled(false);
mbb.AddChildBus("ExternalBus", mbb =>
switch (_testParamBusType)
{
var topic = $"smb-tests/outbox-benchmark/{Guid.NewGuid():N}/customer-events";
mbb
.WithProviderServiceBus(cfg =>
case BusType.AzureSB:
mbb.AddChildBus("Azure", mbb =>
{
cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
cfg.PrefetchCount = 100; // fetch 100 messages at a time
cfg.TopologyProvisioning.CreateTopicOptions = o => o.AutoDeleteOnIdle = TimeSpan.FromMinutes(5);
})
.Produce<CustomerCreatedEvent>(x => x.DefaultTopic(topic))
.Consume<CustomerCreatedEvent>(x => x
.Topic(topic)
.WithConsumer<CustomerCreatedEventConsumer>()
.Instances(20) // process messages in parallel
.SubscriptionName(nameof(OutboxBenchmarkTests))); // for AzureSB
if (_useOutbox)
{
mbb
.UseOutbox(); // All outgoing messages from this bus will go out via an outbox
}
});
var topic = $"smb-tests/outbox-benchmark/{DateTimeOffset.UtcNow.Ticks}/customer-events";
mbb
.WithProviderServiceBus(cfg =>
{
cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]);
cfg.PrefetchCount = 100; // fetch 100 messages at a time
cfg.TopologyProvisioning.CreateTopicOptions = o => o.AutoDeleteOnIdle = TimeSpan.FromMinutes(5);
})
.Produce<CustomerCreatedEvent>(x => x.DefaultTopic(topic))
.Consume<CustomerCreatedEvent>(x => x
.Topic(topic)
.WithConsumer<CustomerCreatedEventConsumer>()
.Instances(20) // process messages in parallel
.SubscriptionName(nameof(OutboxBenchmarkTests))); // for AzureSB
if (_useOutbox)
{
mbb
.UseOutbox(); // All outgoing messages from this bus will go out via an outbox
}
});
break;
case BusType.RabbitMQ:
mbb.AddChildBus("Rabbit", mbb =>
{
var topic = $"{nameof(OutboxBenchmarkTests)}-{DateTimeOffset.UtcNow.Ticks}";
var queue = nameof(CustomerCreatedEvent);
mbb.WithProviderRabbitMQ(cfg =>
{
cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["RabbitMQ:ConnectionString"]);
cfg.ConnectionFactory.ClientProvidedName = $"{nameof(OutboxBenchmarkTests)}_{Environment.MachineName}";
cfg.UseMessagePropertiesModifier((m, p) =>
{
p.ContentType = MediaTypeNames.Application.Json;
});
cfg.UseExchangeDefaults(durable: false);
cfg.UseQueueDefaults(durable: false);
cfg.UseTopologyInitializer((channel, applyDefaultTopology) =>
{
// before test clean up
channel.QueueDelete(queue, ifUnused: true, ifEmpty: false);
channel.ExchangeDelete(topic, ifUnused: true);
// apply default SMB inferred topology
applyDefaultTopology();
// after
});
})
.Produce<CustomerCreatedEvent>(x => x
.Exchange(topic, exchangeType: ExchangeType.Fanout, autoDelete: true)
.RoutingKeyProvider((m, p) => Guid.NewGuid().ToString()))
.Consume<CustomerCreatedEvent>(x => x
.Queue(queue, autoDelete: true)
.ExchangeBinding(topic)
.AcknowledgementMode(RabbitMqMessageAcknowledgementMode.AckAutomaticByRabbit)
.WithConsumer<CustomerCreatedEventConsumer>());
if (_useOutbox)
{
mbb
.UseOutbox(); // All outgoing messages from this bus will go out via an outbox
}
});
break;
default:
throw new NotSupportedException($"Bus {_testParamBusType} is not configured");
}
mbb.AddServicesFromAssembly(Assembly.GetExecutingAssembly());
mbb.AddJsonSerializer();
mbb.AddOutboxUsingDbContext<CustomerContext>(opts =>
{
opts.PollBatchSize = 100;
opts.PollBatchSize = 500;
opts.LockExpiration = TimeSpan.FromMinutes(5);
opts.PollIdleSleep = TimeSpan.FromDays(1);
opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
Expand Down Expand Up @@ -82,10 +141,14 @@ private async Task PerformDbOperation(Func<CustomerContext, IOutboxMigrationServ
}

[Theory]
[InlineData([true, 1000])] // compare with outbox
[InlineData([false, 1000])] // vs. without outbox
public async Task Given_EventPublisherAndConsumerUsingOutbox_When_BurstOfEventsIsSent_Then_EventsAreConsumedProperly(bool useOutbox, int messageCount)
[InlineData([BusType.AzureSB, true, 1000])] // compare with outbox
[InlineData([BusType.RabbitMQ, true, 1000])]
[InlineData([BusType.AzureSB, false, 1000])] // vs. without outbox
[InlineData([BusType.RabbitMQ, false, 1000])]
public async Task Given_EventPublisherAndConsumerUsingOutbox_When_BurstOfEventsIsSent_Then_EventsAreConsumedProperly(BusType busType, bool useOutbox, int messageCount)
{
_testParamBusType = busType;

// arrange
_useOutbox = useOutbox;

Expand Down Expand Up @@ -117,7 +180,7 @@ await using (unitOfWorkScope as IAsyncDisposable)
var bus = unitOfWorkScope.ServiceProvider.GetRequiredService<IMessageBus>();
try
{
await bus.Publish(ev, headers: new Dictionary<string, object> { ["CustomerId"] = ev.Id });
await bus.Publish(ev, headers: new Dictionary<string, object> { ["CustomerId"] = ev.Id.ToString() });
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ protected override void SetupServices(ServiceCollection services, IConfiguration
{
mbb.UseSqlTransaction(); // Consumers/Handlers will be wrapped in a SqlTransaction
}
if (_testParamTransactionType == TransactionType.TarnsactionScope)
if (_testParamTransactionType == TransactionType.TransactionScope)
{
mbb.UseTransactionScope(); // Consumers/Handlers will be wrapped in a TransactionScope
}
Expand Down Expand Up @@ -127,7 +127,7 @@ private async Task PerformDbOperation(Func<CustomerContext, Task> action)

[Theory]
[InlineData([TransactionType.SqlTransaction, BusType.AzureSB, 100])]
[InlineData([TransactionType.TarnsactionScope, BusType.AzureSB, 100])]
[InlineData([TransactionType.TransactionScope, BusType.AzureSB, 100])]
[InlineData([TransactionType.SqlTransaction, BusType.Kafka, 100])]
public async Task Given_CommandHandlerInTransaction_When_ExceptionThrownDuringHandlingRaisedAtTheEnd_Then_TransactionIsRolledBack_And_NoDataSaved_And_NoEventRaised(TransactionType transactionType, BusType busType, int messageCount)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\SlimMessageBus.Host.RabbitMQ\SlimMessageBus.Host.RabbitMQ.csproj" />
<ProjectReference Include="..\SlimMessageBus.Host.Test.Common\SlimMessageBus.Host.Test.Common.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.AzureServiceBus\SlimMessageBus.Host.AzureServiceBus.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Kafka\SlimMessageBus.Host.Kafka.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
public enum TransactionType
{
SqlTransaction,
TarnsactionScope
TransactionScope
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"Brokers": "{{kafka_brokers}}",
"Username": "{{kafka_username}}",
"Password": "{{kafka_password}}",
"Secure": "{{kafka_secure}}"
"Secure": "{{kafka_secure}}"
},
"Azure": {
"EventHub": {
Expand All @@ -24,6 +24,9 @@
},
"ServiceBus": "{{azure_servicebus_connectionstring}}"
},
"RabbitMQ": {
"ConnectionString": "{{rabbitmq_connectionstring}}"
},
"ConnectionStrings": {
"DefaultConnection": "{{sqlserver_connectionstring}}"
}
Expand Down

0 comments on commit c04cfd4

Please sign in to comment.