From aa391342ebc6c212cb07d9cd039d3e096bcd2227 Mon Sep 17 00:00:00 2001 From: Tomasz Maruszak Date: Sat, 15 Jun 2024 00:41:58 +0200 Subject: [PATCH] [Host.Outbox] Ensure SQL tables are provisioned without consumer start and with first publish. Also: - Add baseline performance test for Outbox. - Send Created bus lifecycle event to be able to hook into Master bus creation Signed-off-by: Tomasz Maruszak --- infrastructure.ps1 | 1 + .../ServiceBusMessageBus.cs | 2 +- .../Bus/MessageBusLifecycleEventType.cs | 7 +- .../Interceptors/OutboxSendingTask.cs | 40 +++++- src/SlimMessageBus.Host/MessageBusBase.cs | 38 ++--- .../DatabaseFacadeExtenstions.cs | 18 +++ .../OutboxBenchmarkTests.cs | 132 ++++++++++++++++++ .../OutboxTests.cs | 59 ++++---- .../IntegrationTest/BaseIntegrationTest.cs | 2 +- src/secrets.txt.sample | 2 +- 10 files changed, 248 insertions(+), 53 deletions(-) create mode 100644 infrastructure.ps1 create mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs create mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs diff --git a/infrastructure.ps1 b/infrastructure.ps1 new file mode 100644 index 00000000..545a7ede --- /dev/null +++ b/infrastructure.ps1 @@ -0,0 +1 @@ +docker compose -f src/Infrastructure/docker-compose.yml up --force-recreate -V \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs index 8ee49042..a9e76996 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs @@ -109,7 +109,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso responseConsumer: this, messagePayloadProvider: m => m.Body.ToArray()); - AddConsumerFrom(topicSubscription, messageProcessor, new[] { Settings.RequestResponse }); + AddConsumerFrom(topicSubscription, messageProcessor, [Settings.RequestResponse]); } } diff --git a/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs b/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs index 1b5b4baf..ac5d73c1 100644 --- a/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs +++ b/src/SlimMessageBus.Host.Interceptor/Bus/MessageBusLifecycleEventType.cs @@ -2,8 +2,13 @@ public enum MessageBusLifecycleEventType { + /// + /// Invoked when the master bus is created. + /// Can be used to initalize any resource before the messages are produced or consumed. + /// + Created, Starting, Started, Stopping, - Stopped, + Stopped } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs index be05119a..05591a52 100644 --- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs @@ -14,6 +14,12 @@ public class OutboxSendingTask( private CancellationTokenSource _loopCts; private Task _loopTask; + + private Task _migrateSchemaTask; + private Task _startBusTask; + private Task _stopBusTask; + + private int _busCreateCount; private int _busStartCount; private DateTime? _cleanupNextRun; @@ -75,35 +81,61 @@ protected async Task Stop() public Task OnBusLifecycle(MessageBusLifecycleEventType eventType, IMessageBus bus) { + if (eventType == MessageBusLifecycleEventType.Created) + { + // Run schema migration only once for the first bus created + if (Interlocked.Increment(ref _busCreateCount) == 1) + { + _migrateSchemaTask = MigrateSchema(_serviceProvider, default); + } + return _migrateSchemaTask; + } if (eventType == MessageBusLifecycleEventType.Started) { // The first started bus starts this outbox task if (Interlocked.Increment(ref _busStartCount) == 1) { - return Start(); + _startBusTask = Start(); } + return _startBusTask; } if (eventType == MessageBusLifecycleEventType.Stopping) { // The last stopped bus stops this outbox task if (Interlocked.Decrement(ref _busStartCount) == 0) { - return Stop(); + _stopBusTask = Stop(); } + return _stopBusTask; } return Task.CompletedTask; } + private async Task MigrateSchema(IServiceProvider serviceProvider, CancellationToken cancellationToken) + { + try + { + var outboxMigrationService = serviceProvider.GetRequiredService(); + await outboxMigrationService.Migrate(cancellationToken); + } + catch (Exception e) + { + _logger.LogError(e, "Outbox schema migration failed"); + throw; + } + } + private async Task Run() { try { + _logger.LogInformation("Outbox loop started"); var scope = _serviceProvider.CreateScope(); + try { - var outboxMigrationService = scope.ServiceProvider.GetRequiredService(); - await outboxMigrationService.Migrate(_loopCts.Token); + await MigrateSchema(scope.ServiceProvider, _loopCts.Token); var outboxRepository = scope.ServiceProvider.GetRequiredService(); diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index 1b4c22e8..a447b176 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -144,23 +144,27 @@ protected void OnBuildProvider() { ValidationService.AssertSettings(); - Build(); - - if (Settings.AutoStartConsumers) - { - // Fire and forget start - _ = Task.Run(async () => - { - try - { - await Start().ConfigureAwait(false); - } - catch (Exception e) - { - _logger.LogError(e, "Could not auto start consumers"); - } + Build(); + + // Notify the bus has been created - before any message can be produced + AddInit(OnBusLifecycle(MessageBusLifecycleEventType.Created)); + + // Auto start consumers if enabled + if (Settings.AutoStartConsumers) + { + // Fire and forget start + _ = Task.Run(async () => + { + try + { + await Start().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Could not auto start consumers"); + } }); - } + } } protected virtual void Build() @@ -196,7 +200,7 @@ protected virtual void BuildPendingRequestStore() private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType) { - _lifecycleInterceptors ??= Settings.ServiceProvider?.GetService>(); + _lifecycleInterceptors ??= Settings.ServiceProvider?.GetServices(); if (_lifecycleInterceptors != null) { foreach (var i in _lifecycleInterceptors) diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs new file mode 100644 index 00000000..1faac076 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs @@ -0,0 +1,18 @@ +namespace SlimMessageBus.Host.Outbox.DbContext.Test; + +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; + +public static class DatabaseFacadeExtenstions +{ + public static Task EraseTableIfExists(this DatabaseFacade db, string tableName) + { + return db.ExecuteSqlRawAsync( + $""" + IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = '{tableName}') + BEGIN + DELETE FROM dbo.{tableName}; + END + """); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs new file mode 100644 index 00000000..9440c3c4 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs @@ -0,0 +1,132 @@ +namespace SlimMessageBus.Host.Outbox.DbContext.Test; + +using System.Diagnostics; +using System.Reflection; + +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +using SecretStore; + +using SlimMessageBus.Host; +using SlimMessageBus.Host.AzureServiceBus; +using SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess; +using SlimMessageBus.Host.Serialization.SystemTextJson; +using SlimMessageBus.Host.Test.Common.IntegrationTest; + +[Trait("Category", "Integration")] // for benchmarks +public class OutboxBenchmarkTests(ITestOutputHelper testOutputHelper) : BaseIntegrationTest(testOutputHelper) +{ + private static readonly string OutboxTableName = "IntTest_Benchmark_Outbox"; + private static readonly string MigrationsTableName = "IntTest_Benchmark_Migrations"; + + protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) + { + services.AddSlimMessageBus(mbb => + { + mbb.AddChildBus("ExternalBus", mbb => + { + var topic = "tests.outbox-benchmark/customer-events"; + mbb + .WithProviderServiceBus(cfg => cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"])) + .Produce(x => x.DefaultTopic(topic)) + .Consume(x => x + .Topic(topic) + .WithConsumer() + .SubscriptionName(nameof(OutboxBenchmarkTests))) // for AzureSB + .UseOutbox(); // All outgoing messages from this bus will go out via an outbox + }); + mbb.AddServicesFromAssembly(Assembly.GetExecutingAssembly()); + mbb.AddJsonSerializer(); + mbb.AddOutboxUsingDbContext(opts => + { + opts.PollBatchSize = 100; + opts.PollIdleSleep = TimeSpan.FromSeconds(0.5); + opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10); + opts.MessageCleanup.Age = TimeSpan.FromMinutes(1); + opts.SqlSettings.DatabaseTableName = OutboxTableName; + opts.SqlSettings.DatabaseMigrationsTableName = MigrationsTableName; + }); + mbb.AutoStartConsumersEnabled(false); + }); + + services.AddSingleton>(); + + // Entity Framework setup - application specific EF DbContext + services.AddDbContext(options => options.UseSqlServer(Secrets.Service.PopulateSecrets(Configuration.GetConnectionString("DefaultConnection")))); + } + + private async Task PerformDbOperation(Func action) + { + using var scope = ServiceProvider!.CreateScope(); + var context = scope.ServiceProvider.GetRequiredService(); + await action(context); + } + + [Theory] + [InlineData([100])] + public async Task Given_EventPublisherAndConsumerUsingOutbox_When_BurstOfEventsIsSent_Then_EventsAreConsumedProperly(int messageCount) + { + // arrange + await PerformDbOperation(async context => + { + // migrate db + await context.Database.MigrateAsync(); + + // clean outbox from previous test run + await context.Database.EraseTableIfExists(OutboxTableName); + await context.Database.EraseTableIfExists(MigrationsTableName); + }); + + var surnames = new[] { "Doe", "Smith", "Kowalsky" }; + var events = Enumerable.Range(0, messageCount).Select(x => new CustomerCreatedEvent(Guid.NewGuid(), $"John {x:000}", surnames[x % surnames.Length])).ToList(); + var store = ServiceProvider!.GetRequiredService>(); + + // act + + // publish the events in one shot (consumers are not started yet) + var publishTimer = Stopwatch.StartNew(); + + var publishTasks = events + .Select(async ev => + { + var unitOfWorkScope = ServiceProvider!.CreateScope(); + await using (unitOfWorkScope as IAsyncDisposable) + { + var bus = unitOfWorkScope.ServiceProvider.GetRequiredService(); + try + { + await bus.Publish(ev, headers: new Dictionary { ["CustomerId"] = ev.Id }); + } + catch (Exception ex) + { + Logger.LogInformation("Exception occurred while publishing event {Event}: {Message}", ev, ex.Message); + } + } + }) + .ToArray(); + + await Task.WhenAll(publishTasks); + + var publishTimerElapsed = publishTimer.Elapsed; + + // start consumers + await EnsureConsumersStarted(); + + // consume the events from outbox + var consumptionTimer = Stopwatch.StartNew(); + await store.WaitUntilArriving(newMessagesTimeout: 5, expectedCount: events.Count); + + // assert + + var consumeTimerElapsed = consumptionTimer.Elapsed; + + Logger.LogInformation("Message Publish took: {Elapsed}", publishTimerElapsed); + Logger.LogInformation("Message Consume took: {Elapsed}", consumeTimerElapsed); + + // Ensure the expected number of events was actually published to ASB and delivered via that channel. + store.Count.Should().Be(events.Count); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs index da53ba1a..09a1a7be 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs @@ -38,6 +38,9 @@ public enum BusType Kafka, } + private static readonly string OutboxTableName = "IntTest_Outbox"; + private static readonly string MigrationsTableName = "IntTest_Migrations"; + protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { services.AddSlimMessageBus(mbb => @@ -119,7 +122,8 @@ protected override void SetupServices(ServiceCollection services, IConfiguration opts.PollIdleSleep = TimeSpan.FromSeconds(0.5); opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10); opts.MessageCleanup.Age = TimeSpan.FromMinutes(1); - opts.SqlSettings.DatabaseTableName = "IntTest_Outbox"; + opts.SqlSettings.DatabaseTableName = OutboxTableName; + opts.SqlSettings.DatabaseMigrationsTableName = MigrationsTableName; }); }); @@ -139,10 +143,10 @@ private async Task PerformDbOperation(Func action) public const string InvalidLastname = "Exception"; [Theory] - [InlineData([TransactionType.SqlTransaction, BusType.AzureSB])] - [InlineData([TransactionType.TarnsactionScope, BusType.AzureSB])] - [InlineData([TransactionType.SqlTransaction, BusType.Kafka])] - public async Task Given_CommandHandlerInTransaction_When_ExceptionThrownDuringHandlingRaisedAtTheEnd_Then_TransactionIsRolledBack_And_NoDataSaved_And_NoEventRaised(TransactionType transactionType, BusType busType) + [InlineData([TransactionType.SqlTransaction, BusType.AzureSB, 100])] + [InlineData([TransactionType.TarnsactionScope, BusType.AzureSB, 100])] + [InlineData([TransactionType.SqlTransaction, BusType.Kafka, 100])] + public async Task Given_CommandHandlerInTransaction_When_ExceptionThrownDuringHandlingRaisedAtTheEnd_Then_TransactionIsRolledBack_And_NoDataSaved_And_NoEventRaised(TransactionType transactionType, BusType busType, int messageCount) { // arrange _testParamTransactionType = transactionType; @@ -153,14 +157,9 @@ public async Task Given_CommandHandlerInTransaction_When_ExceptionThrownDuringHa // migrate db await context.Database.MigrateAsync(); - //// clean outbox from previous test run - await context.Database.ExecuteSqlRawAsync( - """ - IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = 'IntTest_Outbox') - BEGIN - DELETE FROM dbo.IntTest_Outbox; - END - """); + // clean outbox from previous test run + await context.Database.EraseTableIfExists(OutboxTableName); + await context.Database.EraseTableIfExists(MigrationsTableName); // clean the customers table var cust = await context.Customers.ToListAsync(); @@ -169,7 +168,7 @@ IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AN }); var surnames = new[] { "Doe", "Smith", InvalidLastname }; - var commands = Enumerable.Range(0, 100).Select(x => new CreateCustomerCommand($"John {x:000}", surnames[x % surnames.Length])); + var commands = Enumerable.Range(0, messageCount).Select(x => new CreateCustomerCommand($"John {x:000}", surnames[x % surnames.Length])); var validCommands = commands.Where(x => !string.Equals(x.Lastname, InvalidLastname, StringComparison.InvariantCulture)).ToList(); var store = ServiceProvider!.GetRequiredService>(); @@ -180,23 +179,27 @@ IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AN store.Clear(); // act - foreach (var cmd in commands) - { - var unitOfWorkScope = ServiceProvider!.CreateScope(); - await using (unitOfWorkScope as IAsyncDisposable) + var sendTasks = commands + .Select(async cmd => { - var bus = unitOfWorkScope.ServiceProvider.GetRequiredService(); - - try + var unitOfWorkScope = ServiceProvider!.CreateScope(); + await using (unitOfWorkScope as IAsyncDisposable) { - var res = await bus.Send(cmd); - } - catch (Exception ex) - { - Logger.LogInformation("Exception occurred while handling cmd {Command}: {Message}", cmd, ex.Message); + var bus = unitOfWorkScope.ServiceProvider.GetRequiredService(); + + try + { + var res = await bus.Send(cmd); + } + catch (Exception ex) + { + Logger.LogInformation("Exception occurred while handling cmd {Command}: {Message}", cmd, ex.Message); + } } - } - } + }) + .ToArray(); + + await Task.WhenAll(sendTasks); await store.WaitUntilArriving(newMessagesTimeout: 5, expectedCount: validCommands.Count); diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs index 00435556..70a335aa 100644 --- a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs @@ -83,7 +83,7 @@ protected async Task EnsureConsumersStarted() var consumerControl = ServiceProvider.GetRequiredService(); // ensure the consumers are warm - while (!consumerControl.IsStarted && timeout.ElapsedMilliseconds < 5000) await Task.Delay(200); + while (!consumerControl.IsStarted && timeout.ElapsedMilliseconds < 5000) await Task.Delay(100); } public Task InitializeAsync() diff --git a/src/secrets.txt.sample b/src/secrets.txt.sample index f335f86e..00b53da1 100644 --- a/src/secrets.txt.sample +++ b/src/secrets.txt.sample @@ -13,10 +13,10 @@ kafka_username=user kafka_password=password mqtt_server=localhost +mqtt_secure=false mqtt_port=1883 mqtt_username= mqtt_password= -mqtt_secure=false sqlserver_connectionstring=Server=localhost;Initial Catalog=SlimMessageBus_Outbox;User ID=sa;Password=SuperSecretP@55word;TrustServerCertificate=true;MultipleActiveResultSets=true;