From 643f8b0c287cb1c03870ccb2641db57a661b1a92 Mon Sep 17 00:00:00 2001 From: Richard Pringle Date: Mon, 17 Jun 2024 00:21:13 +0800 Subject: [PATCH] #263 outbox benchmark tweaks Signed-off-by: Richard Pringle --- src/.editorconfig | 4 + src/Infrastructure/docker-compose.yml | 2 + .../MessageBusBuilderExtensions.cs | 6 +- .../Interceptors/OutboxSendingTask.cs | 151 ++++++----- .../SlimMessageBus.Host.Outbox.csproj | 6 + src/SlimMessageBus.Host/MessageBusBase.cs | 6 +- .../DataAccess/CustomerContext.cs | 5 +- .../DatabaseFacadeExtensions.cs | 252 ++++++++++++++++++ .../DatabaseFacadeExtenstions.cs | 19 -- .../20230121072718_InitialCreate.Designer.cs | 49 ---- .../20230121072718_InitialCreate.cs | 33 --- .../20240401094307_AddUniqueIdToCustomer.cs | 62 ----- ...20240617163727_AssemblySchema.Designer.cs} | 101 +++---- .../20240617163727_AssemblySchema.cs | 41 +++ .../CustomerContextModelSnapshot.cs | 5 +- .../OutboxBenchmarkTests.cs | 69 +++-- .../OutboxTests.cs | 42 +-- ...ssageBus.Host.Outbox.DbContext.Test.csproj | 4 + .../Usings.cs | 1 + .../IntegrationTest/BaseIntegrationTest.cs | 1 + 20 files changed, 527 insertions(+), 332 deletions(-) create mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtensions.cs delete mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs delete mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.Designer.cs delete mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.cs delete mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240401094307_AddUniqueIdToCustomer.cs rename src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/{20240401094307_AddUniqueIdToCustomer.Designer.cs => 20240617163727_AssemblySchema.Designer.cs} (84%) create mode 100644 src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240617163727_AssemblySchema.cs diff --git a/src/.editorconfig b/src/.editorconfig index aa4a4b1a..1e259e83 100644 --- a/src/.editorconfig +++ b/src/.editorconfig @@ -178,3 +178,7 @@ dotnet_style_allow_multiple_blank_lines_experimental = true:silent dotnet_style_allow_statement_immediately_after_block_experimental = true:silent dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion dotnet_diagnostic.CA1859.severity = silent + +[*.csproj] +indent_style = space +indent_size = 2 diff --git a/src/Infrastructure/docker-compose.yml b/src/Infrastructure/docker-compose.yml index 53ff46b6..18c7eaf7 100644 --- a/src/Infrastructure/docker-compose.yml +++ b/src/Infrastructure/docker-compose.yml @@ -2,6 +2,7 @@ version: '3.4' services: zookeeper: + container_name: slim.zookeeper image: wurstmeister/zookeeper ports: - "2181:2181" @@ -9,6 +10,7 @@ services: - slim kafka: + container_name: slim.kafka image: wurstmeister/kafka ports: - "9092:9092" diff --git a/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs index f91d5f7b..051b6016 100644 --- a/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.Outbox/Configuration/MessageBusBuilderExtensions.cs @@ -1,5 +1,7 @@ namespace SlimMessageBus.Host.Outbox; +using Microsoft.Extensions.DependencyInjection; + public static class MessageBusBuilderExtensions { public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action configure = null) @@ -30,7 +32,9 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action), typeof(TransactionScopeConsumerInterceptor<>))); - services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.AddSingleton(); + services.TryAddEnumerable(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); + services.TryAddSingleton(); services.TryAddSingleton(svp => diff --git a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs index 7a5b4537..9bfb77be 100644 --- a/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs +++ b/src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs @@ -1,6 +1,6 @@ namespace SlimMessageBus.Host.Outbox; -public class OutboxSendingTask( +internal class OutboxSendingTask( ILoggerFactory loggerFactory, OutboxSettings outboxSettings, IServiceProvider serviceProvider, @@ -141,31 +141,10 @@ private async Task Run() var outboxRepository = scope.ServiceProvider.GetRequiredService(); - var processedIds = new List(_outboxSettings.PollBatchSize); - for (var ct = _loopCts.Token; !ct.IsCancellationRequested;) { - var idleRun = true; - try - { - var lockExpiresOn = DateTime.UtcNow.Add(_outboxSettings.LockExpiration); - var lockedCount = await outboxRepository.TryToLock(_instanceIdProvider.GetInstanceId(), lockExpiresOn, ct).ConfigureAwait(false); - // Check if some messages where locked - if (lockedCount > 0) - { - idleRun = await SendMessages(scope.ServiceProvider, outboxRepository, processedIds, ct).ConfigureAwait(false); - } - } - catch (TaskCanceledException) - { - throw; - } - catch (Exception e) - { - _logger.LogError(e, "Error while processing outbox messages"); - } - - if (idleRun) + await SendMessages(scope.ServiceProvider, outboxRepository, ct).ConfigureAwait(false); + if (!ct.IsCancellationRequested) { if (ShouldRunCleanup()) { @@ -173,7 +152,7 @@ private async Task Run() await outboxRepository.DeleteSent(DateTime.UtcNow.Add(-_outboxSettings.MessageCleanup.Age), ct).ConfigureAwait(false); } - await Task.Delay(_outboxSettings.PollIdleSleep).ConfigureAwait(false); + await Task.Delay(_outboxSettings.PollIdleSleep, ct).ConfigureAwait(false); } } } @@ -200,73 +179,97 @@ private async Task Run() } } - private async Task SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, List processedIds, CancellationToken ct) + public async Task SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken ct) { var messageBus = serviceProvider.GetRequiredService(); var compositeMessageBus = messageBus as ICompositeMessageBus; var messageBusTarget = messageBus as IMessageBusTarget; - var idleRun = true; - - for (var hasMore = true; hasMore && !ct.IsCancellationRequested;) + var processedIds = new List(_outboxSettings.PollBatchSize); + bool idleRun; + var count = 0; + do { - var outboxMessages = await outboxRepository.FindNextToSend(_instanceIdProvider.GetInstanceId(), ct); - if (outboxMessages.Count == 0) - { - break; - } - + idleRun = true; try { - for (var i = 0; i < outboxMessages.Count && !ct.IsCancellationRequested; i++) + var lockExpiresOn = DateTime.UtcNow.Add(_outboxSettings.LockExpiration); + var lockedCount = await outboxRepository.TryToLock(_instanceIdProvider.GetInstanceId(), lockExpiresOn, ct).ConfigureAwait(false); + // Check if some messages where locked + if (lockedCount > 0) { - var outboxMessage = outboxMessages[i]; - - var now = DateTime.UtcNow; - if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn) + for (var hasMore = true; hasMore && !ct.IsCancellationRequested;) { - _logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer); - hasMore = false; - break; - } - - var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName); - if (bus == null) - { - _logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName); - continue; - } - - _logger.LogDebug("Sending outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName); - var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload); + var outboxMessages = await outboxRepository.FindNextToSend(_instanceIdProvider.GetInstanceId(), ct); + if (outboxMessages.Count == 0) + { + break; + } - // Add special header to supress from forwarding the message againt to outbox - var headers = outboxMessage.Headers ?? new Dictionary(); - headers.Add(OutboxForwardingPublishInterceptor.SkipOutboxHeader, string.Empty); + try + { + for (var i = 0; i < outboxMessages.Count && !ct.IsCancellationRequested; i++) + { + var outboxMessage = outboxMessages[i]; + + var now = DateTime.UtcNow; + if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn) + { + _logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer); + hasMore = false; + break; + } + + var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName); + if (bus == null) + { + _logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName); + continue; + } + + _logger.LogDebug("Sending outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName); + var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload); + + // Add special header to supress from forwarding the message againt to outbox + var headers = outboxMessage.Headers ?? new Dictionary(); + headers.Add(OutboxForwardingPublishInterceptor.SkipOutboxHeader, string.Empty); + + if (!ct.IsCancellationRequested) + { + await bus.ProducePublish(message, path: outboxMessage.Path, headers: headers, messageBusTarget, cancellationToken: ct); + + processedIds.Add(outboxMessage.Id); + } + } + } + finally + { + // confirm what messages were processed + if (processedIds.Count > 0) + { + _logger.LogDebug("Updating {MessageCount} outbox messages as sent", processedIds.Count); + await outboxRepository.UpdateToSent(processedIds, ct); - if (!ct.IsCancellationRequested) - { - await bus.ProducePublish(message, path: outboxMessage.Path, headers: headers, messageBusTarget, cancellationToken: ct); + idleRun = false; - processedIds.Add(outboxMessage.Id); + count += processedIds.Count; + processedIds.Clear(); + } + } } } } - finally + catch (TaskCanceledException) { - // confirm what messages were processed - if (processedIds.Count > 0) - { - _logger.LogDebug("Updating {MessageCount} outbox messages as sent", processedIds.Count); - await outboxRepository.UpdateToSent(processedIds, ct); - - idleRun = false; - - processedIds.Clear(); - } + throw; } - } - return idleRun; + catch (Exception e) + { + _logger.LogError(e, "Error while processing outbox messages"); + } + } while (!idleRun && !ct.IsCancellationRequested); + + return count; } private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, string name) diff --git a/src/SlimMessageBus.Host.Outbox/SlimMessageBus.Host.Outbox.csproj b/src/SlimMessageBus.Host.Outbox/SlimMessageBus.Host.Outbox.csproj index 125da858..7edb88e9 100644 --- a/src/SlimMessageBus.Host.Outbox/SlimMessageBus.Host.Outbox.csproj +++ b/src/SlimMessageBus.Host.Outbox/SlimMessageBus.Host.Outbox.csproj @@ -17,4 +17,10 @@ + + + <_Parameter1>SlimMessageBus.Host.Outbox.DbContext.Test + + + diff --git a/src/SlimMessageBus.Host/MessageBusBase.cs b/src/SlimMessageBus.Host/MessageBusBase.cs index a447b176..bec6672c 100644 --- a/src/SlimMessageBus.Host/MessageBusBase.cs +++ b/src/SlimMessageBus.Host/MessageBusBase.cs @@ -205,7 +205,11 @@ private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType) { foreach (var i in _lifecycleInterceptors) { - await i.OnBusLifecycle(eventType, MessageBusTarget); + var task = i.OnBusLifecycle(eventType, MessageBusTarget); + if (task != null) + { + await task; + } } } } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DataAccess/CustomerContext.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DataAccess/CustomerContext.cs index 02c40ddb..68bd8f28 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DataAccess/CustomerContext.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DataAccess/CustomerContext.cs @@ -4,6 +4,8 @@ public class CustomerContext : DbContext { + public const string Schema = "Outbox"; + public DbSet Customers { get; set; } #region EF migrations @@ -35,6 +37,7 @@ public CustomerContext(DbContextOptions options) : base(options protected override void OnModelCreating(ModelBuilder modelBuilder) { - modelBuilder.Entity(x => x.ToTable("IntTest_Customer")); + modelBuilder.HasDefaultSchema(Schema); + modelBuilder.Entity(); } } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtensions.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtensions.cs new file mode 100644 index 00000000..0f2f091e --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtensions.cs @@ -0,0 +1,252 @@ +namespace SlimMessageBus.Host.Outbox.DbContext.Test; + +using Microsoft.Data.SqlClient; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; + +public static class DatabaseFacadeExtensions +{ + public static async Task DropSchemaIfExistsAsync(this DatabaseFacade database, string schema, CancellationToken cancellationToken = default) + { + const string sql = $""" + IF EXISTS( + SELECT schema_name + FROM information_schema.schemata + WHERE schema_name = @p0 + ) + BEGIN + DECLARE @sql NVARCHAR(MAX); + SELECT @sql = CONCAT('DROP SCHEMA ', @p0); + EXEC(@sql) + PRINT 'Schema dropped.' + END + """; + + var connection = database.GetDbConnection(); + if (!connection.GetType().Name.Equals("SqlConnection")) + { + throw new InvalidOperationException($"'{nameof(EnsureSchemaIsEmptyAsync)}' supports SQL server only."); + } + + try + { + await database.EnsureSchemaIsEmptyAsync(schema, cancellationToken); + await database.ExecuteSqlRawAsync(sql, [schema], cancellationToken); + } + catch (SqlException ex) when (ex.Number == 4060) + { + // db does not exist + } + } + + public static async Task EnsureSchemaIsEmptyAsync(this DatabaseFacade database, string schema, CancellationToken cancellationToken = default) + { + Debug.Assert(!schema.Equals("dbo", StringComparison.OrdinalIgnoreCase)); + + FormattableString sql = $""" + IF EXISTS ( + SELECT schema_name + FROM information_schema.schemata + WHERE schema_name = {schema} + ) + BEGIN + -- Disable foreign key constraints + PRINT 'Disabling foreign key constraints...'; + + DECLARE @sql NVARCHAR(MAX); + DECLARE @constraint NVARCHAR(MAX); + + DECLARE constraint_cursor CURSOR FOR + SELECT 'ALTER TABLE ' + QUOTENAME(OBJECT_SCHEMA_NAME(parent_object_id)) + '.' + QUOTENAME(OBJECT_NAME(parent_object_id)) + ' NOCHECK CONSTRAINT ' + QUOTENAME(name) + FROM sys.foreign_keys + WHERE OBJECT_SCHEMA_NAME(parent_object_id) = {schema}; + + OPEN constraint_cursor; + FETCH NEXT FROM constraint_cursor INTO @constraint; + + WHILE @@FETCH_STATUS = 0 + BEGIN + EXEC sp_executesql @constraint; + FETCH NEXT FROM constraint_cursor INTO @constraint; + END; + + CLOSE constraint_cursor; + DEALLOCATE constraint_cursor; + + PRINT 'Foreign key constraints disabled.'; + + -- Drop all foreign key constraints + PRINT 'Dropping foreign key constraints...'; + + SET @sql = N''; + SELECT @sql += 'ALTER TABLE ' + QUOTENAME(OBJECT_SCHEMA_NAME(parent_object_id)) + '.' + QUOTENAME(OBJECT_NAME(parent_object_id)) + ' DROP CONSTRAINT ' + QUOTENAME(name) + ';' + FROM sys.foreign_keys + WHERE OBJECT_SCHEMA_NAME(parent_object_id) = {schema}; + + EXEC sp_executesql @sql; + + PRINT 'Foreign key constraints dropped.'; + + -- Drop all tables + PRINT 'Dropping tables...'; + + DECLARE @table NVARCHAR(MAX); + + DECLARE table_cursor CURSOR FOR + SELECT QUOTENAME(TABLE_SCHEMA) + '.' + QUOTENAME(TABLE_NAME) + FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA = {schema}; + + OPEN table_cursor; + FETCH NEXT FROM table_cursor INTO @table; + + WHILE @@FETCH_STATUS = 0 + BEGIN + SET @sql = 'DROP TABLE ' + @table; + EXEC sp_executesql @sql; + FETCH NEXT FROM table_cursor INTO @table; + END; + + CLOSE table_cursor; + DEALLOCATE table_cursor; + + PRINT 'Tables dropped.'; + + -- Drop all views + PRINT 'Dropping views...'; + + DECLARE @view NVARCHAR(MAX); + + DECLARE view_cursor CURSOR FOR + SELECT QUOTENAME(TABLE_SCHEMA) + '.' + QUOTENAME(TABLE_NAME) + FROM INFORMATION_SCHEMA.VIEWS + WHERE TABLE_SCHEMA = {schema}; + + OPEN view_cursor; + FETCH NEXT FROM view_cursor INTO @view; + + WHILE @@FETCH_STATUS = 0 + BEGIN + SET @sql = 'DROP VIEW ' + @view; + EXEC sp_executesql @sql; + FETCH NEXT FROM view_cursor INTO @view; + END; + + CLOSE view_cursor; + DEALLOCATE view_cursor; + + PRINT 'Views dropped.'; + + -- Drop all stored procedures + PRINT 'Dropping stored procedures...'; + + DECLARE @procedure NVARCHAR(MAX); + + DECLARE procedure_cursor CURSOR FOR + SELECT QUOTENAME(SCHEMA_NAME(schema_id)) + '.' + QUOTENAME(name) + FROM sys.procedures + WHERE SCHEMA_NAME(schema_id) = {schema}; + + OPEN procedure_cursor; + FETCH NEXT FROM procedure_cursor INTO @procedure; + + WHILE @@FETCH_STATUS = 0 + BEGIN + SET @sql = 'DROP PROCEDURE ' + @procedure; + EXEC sp_executesql @sql; + FETCH NEXT FROM procedure_cursor INTO @procedure; + END; + + CLOSE procedure_cursor; + DEALLOCATE procedure_cursor; + + PRINT 'Stored procedures dropped.'; + + -- Drop all functions + PRINT 'Dropping functions...'; + + DECLARE @function NVARCHAR(MAX); + + DECLARE function_cursor CURSOR FOR + SELECT QUOTENAME(SCHEMA_NAME(schema_id)) + '.' + QUOTENAME(name) + FROM sys.objects + WHERE type IN ('FN', 'IF', 'TF') -- Scalar, inline table-valued, and table-valued functions + AND SCHEMA_NAME(schema_id) = {schema}; + + OPEN function_cursor; + FETCH NEXT FROM function_cursor INTO @function; + + WHILE @@FETCH_STATUS = 0 + BEGIN + SET @sql = 'DROP FUNCTION ' + @function; + EXEC sp_executesql @sql; + FETCH NEXT FROM function_cursor INTO @function; + END; + + CLOSE function_cursor; + DEALLOCATE function_cursor; + + PRINT 'Functions dropped.'; + + -- Drop all user-defined types (UDTs) + PRINT 'Dropping user-defined types...'; + + DECLARE @udt NVARCHAR(MAX); + + DECLARE udt_cursor CURSOR FOR + SELECT QUOTENAME(SCHEMA_NAME(schema_id)) + '.' + QUOTENAME(name) + FROM sys.types + WHERE is_user_defined = 1 + AND SCHEMA_NAME(schema_id) = {schema}; + + OPEN udt_cursor; + FETCH NEXT FROM udt_cursor INTO @udt; + + WHILE @@FETCH_STATUS = 0 + BEGIN + SET @sql = 'DROP TYPE ' + @udt; + EXEC sp_executesql @sql; + FETCH NEXT FROM udt_cursor INTO @udt; + END; + + CLOSE udt_cursor; + DEALLOCATE udt_cursor; + + PRINT 'User-defined types dropped.'; + + -- Drop all sequences + PRINT 'Dropping sequences...'; + + DECLARE @sequence NVARCHAR(MAX); + + DECLARE sequence_cursor CURSOR FOR + SELECT QUOTENAME(SCHEMA_NAME(schema_id)) + '.' + QUOTENAME(name) + FROM sys.sequences + WHERE SCHEMA_NAME(schema_id) = {schema}; + + OPEN sequence_cursor; + FETCH NEXT FROM sequence_cursor INTO @sequence; + + WHILE @@FETCH_STATUS = 0 + BEGIN + SET @sql = 'DROP SEQUENCE ' + @sequence; + EXEC sp_executesql @sql; + FETCH NEXT FROM sequence_cursor INTO @sequence; + END; + + CLOSE sequence_cursor; + DEALLOCATE sequence_cursor; + + PRINT 'Sequences dropped.'; + END + """; + + var connection = database.GetDbConnection(); + if (!connection.GetType().Name.Equals("SqlConnection")) + { + throw new InvalidOperationException($"'{nameof(EnsureSchemaIsEmptyAsync)}' supports SQL server only."); + }; + + await database.ExecuteSqlInterpolatedAsync(sql, cancellationToken); + } +} \ No newline at end of file diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs deleted file mode 100644 index 92d2f921..00000000 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/DatabaseFacadeExtenstions.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace SlimMessageBus.Host.Outbox.DbContext.Test; - -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; - -public static class DatabaseFacadeExtenstions -{ - public static Task EraseTableIfExists(this DatabaseFacade db, string tableName) - { -#pragma warning disable EF1002 // Risk of vulnerability to SQL injection. - return db.ExecuteSqlRawAsync($""" - IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = '{tableName}') - BEGIN - DELETE FROM dbo.{tableName}; - END - """); -#pragma warning restore EF1002 // Risk of vulnerability to SQL injection. - } -} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.Designer.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.Designer.cs deleted file mode 100644 index bdb58b9a..00000000 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.Designer.cs +++ /dev/null @@ -1,49 +0,0 @@ -// -using System; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; -using Microsoft.EntityFrameworkCore.Metadata; -using Microsoft.EntityFrameworkCore.Migrations; -using Microsoft.EntityFrameworkCore.Storage.ValueConversion; -using SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess; - -#nullable disable - -namespace SlimMessageBus.Host.Outbox.DbContext.Test.Migrations -{ - [DbContext(typeof(CustomerContext))] - [Migration("20230121072718_InitialCreate")] - partial class InitialCreate - { - /// - protected override void BuildTargetModel(ModelBuilder modelBuilder) - { -#pragma warning disable 612, 618 - modelBuilder - .HasAnnotation("ProductVersion", "7.0.1") - .HasAnnotation("Relational:MaxIdentifierLength", 128); - - SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); - - modelBuilder.Entity("SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess.Customer", b => - { - b.Property("Id") - .ValueGeneratedOnAdd() - .HasColumnType("uniqueidentifier"); - - b.Property("Firstname") - .IsRequired() - .HasColumnType("nvarchar(max)"); - - b.Property("Lastname") - .IsRequired() - .HasColumnType("nvarchar(max)"); - - b.HasKey("Id"); - - b.ToTable("IntTest_Customer", (string)null); - }); -#pragma warning restore 612, 618 - } - } -} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.cs deleted file mode 100644 index 640b5e42..00000000 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20230121072718_InitialCreate.cs +++ /dev/null @@ -1,33 +0,0 @@ -#nullable disable - -namespace SlimMessageBus.Host.Outbox.DbContext.Test.Migrations; - -using Microsoft.EntityFrameworkCore.Migrations; - -/// -public partial class InitialCreate : Migration -{ - /// - protected override void Up(MigrationBuilder migrationBuilder) - { - migrationBuilder.CreateTable( - name: "IntTest_Customer", - columns: table => new - { - Id = table.Column(type: "uniqueidentifier", nullable: false), - Firstname = table.Column(type: "nvarchar(max)", nullable: false), - Lastname = table.Column(type: "nvarchar(max)", nullable: false) - }, - constraints: table => - { - table.PrimaryKey("PK_IntTest_Customer", x => x.Id); - }); - } - - /// - protected override void Down(MigrationBuilder migrationBuilder) - { - migrationBuilder.DropTable( - name: "IntTest_Customer"); - } -} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240401094307_AddUniqueIdToCustomer.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240401094307_AddUniqueIdToCustomer.cs deleted file mode 100644 index 96c5eb12..00000000 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240401094307_AddUniqueIdToCustomer.cs +++ /dev/null @@ -1,62 +0,0 @@ -#nullable disable - -namespace SlimMessageBus.Host.Outbox.DbContext.Test.Migrations; - -using Microsoft.EntityFrameworkCore.Migrations; -/// -public partial class AddUniqueIdToCustomer : Migration -{ - /// - protected override void Up(MigrationBuilder migrationBuilder) - { - migrationBuilder.AlterColumn( - name: "Lastname", - table: "IntTest_Customer", - type: "nvarchar(max)", - nullable: true, - oldClrType: typeof(string), - oldType: "nvarchar(max)"); - - migrationBuilder.AlterColumn( - name: "Firstname", - table: "IntTest_Customer", - type: "nvarchar(max)", - nullable: true, - oldClrType: typeof(string), - oldType: "nvarchar(max)"); - - migrationBuilder.AddColumn( - name: "UniqueId", - table: "IntTest_Customer", - type: "nvarchar(max)", - nullable: true); - } - - /// - protected override void Down(MigrationBuilder migrationBuilder) - { - migrationBuilder.DropColumn( - name: "UniqueId", - table: "IntTest_Customer"); - - migrationBuilder.AlterColumn( - name: "Lastname", - table: "IntTest_Customer", - type: "nvarchar(max)", - nullable: false, - defaultValue: "", - oldClrType: typeof(string), - oldType: "nvarchar(max)", - oldNullable: true); - - migrationBuilder.AlterColumn( - name: "Firstname", - table: "IntTest_Customer", - type: "nvarchar(max)", - nullable: false, - defaultValue: "", - oldClrType: typeof(string), - oldType: "nvarchar(max)", - oldNullable: true); - } -} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240401094307_AddUniqueIdToCustomer.Designer.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240617163727_AssemblySchema.Designer.cs similarity index 84% rename from src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240401094307_AddUniqueIdToCustomer.Designer.cs rename to src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240617163727_AssemblySchema.Designer.cs index 8ea59d70..69b24a4c 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240401094307_AddUniqueIdToCustomer.Designer.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240617163727_AssemblySchema.Designer.cs @@ -1,50 +1,51 @@ -// -using System; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Infrastructure; -using Microsoft.EntityFrameworkCore.Metadata; -using Microsoft.EntityFrameworkCore.Migrations; -using Microsoft.EntityFrameworkCore.Storage.ValueConversion; -using SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess; - -#nullable disable - -namespace SlimMessageBus.Host.Outbox.DbContext.Test.Migrations -{ - [DbContext(typeof(CustomerContext))] - [Migration("20240401094307_AddUniqueIdToCustomer")] - partial class AddUniqueIdToCustomer - { - /// - protected override void BuildTargetModel(ModelBuilder modelBuilder) - { -#pragma warning disable 612, 618 - modelBuilder - .HasAnnotation("ProductVersion", "8.0.1") - .HasAnnotation("Relational:MaxIdentifierLength", 128); - - SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); - - modelBuilder.Entity("SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess.Customer", b => - { - b.Property("Id") - .ValueGeneratedOnAdd() - .HasColumnType("uniqueidentifier"); - - b.Property("Firstname") - .HasColumnType("nvarchar(max)"); - - b.Property("Lastname") - .HasColumnType("nvarchar(max)"); - - b.Property("UniqueId") - .HasColumnType("nvarchar(max)"); - - b.HasKey("Id"); - - b.ToTable("IntTest_Customer", (string)null); - }); -#pragma warning restore 612, 618 - } - } -} +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess; + +#nullable disable + +namespace SlimMessageBus.Host.Outbox.DbContext.Test.Migrations +{ + [DbContext(typeof(CustomerContext))] + [Migration("20240617163727_AssemblySchema")] + partial class AssemblySchema + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasDefaultSchema("Outbox") + .HasAnnotation("ProductVersion", "8.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 128); + + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess.Customer", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uniqueidentifier"); + + b.Property("Firstname") + .HasColumnType("nvarchar(max)"); + + b.Property("Lastname") + .HasColumnType("nvarchar(max)"); + + b.Property("UniqueId") + .HasColumnType("nvarchar(max)"); + + b.HasKey("Id"); + + b.ToTable("Customers", "Outbox"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240617163727_AssemblySchema.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240617163727_AssemblySchema.cs new file mode 100644 index 00000000..340a543c --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/20240617163727_AssemblySchema.cs @@ -0,0 +1,41 @@ +using System; +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace SlimMessageBus.Host.Outbox.DbContext.Test.Migrations +{ + /// + public partial class AssemblySchema : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.EnsureSchema( + name: "Outbox"); + + migrationBuilder.CreateTable( + name: "Customers", + schema: "Outbox", + columns: table => new + { + Id = table.Column(type: "uniqueidentifier", nullable: false), + Firstname = table.Column(type: "nvarchar(max)", nullable: true), + Lastname = table.Column(type: "nvarchar(max)", nullable: true), + UniqueId = table.Column(type: "nvarchar(max)", nullable: true) + }, + constraints: table => + { + table.PrimaryKey("PK_Customers", x => x.Id); + }); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "Customers", + schema: "Outbox"); + } + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/CustomerContextModelSnapshot.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/CustomerContextModelSnapshot.cs index 40968f5f..b8096700 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/CustomerContextModelSnapshot.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Migrations/CustomerContextModelSnapshot.cs @@ -17,7 +17,8 @@ protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "8.0.1") + .HasDefaultSchema("Outbox") + .HasAnnotation("ProductVersion", "8.0.3") .HasAnnotation("Relational:MaxIdentifierLength", 128); SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); @@ -39,7 +40,7 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Id"); - b.ToTable("IntTest_Customer", (string)null); + b.ToTable("Customers", "Outbox"); }); #pragma warning restore 612, 618 } diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs index 97fb771f..a445f4a5 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxBenchmarkTests.cs @@ -1,5 +1,7 @@ namespace SlimMessageBus.Host.Outbox.DbContext.Test; +using Microsoft.EntityFrameworkCore.Migrations; + /// /// This test should help to understand the runtime performance and overhead of the outbox feature. /// It will generate the time measurements for a given transport (Azure DB + Azure SQL instance) as the baseline, @@ -8,25 +10,26 @@ /// /// [Trait("Category", "Integration")] // for benchmarks +[Collection(CustomerContext.Schema)] public class OutboxBenchmarkTests(ITestOutputHelper testOutputHelper) : BaseIntegrationTest(testOutputHelper) { - private static readonly string OutboxTableName = "IntTest_Benchmark_Outbox"; - private static readonly string MigrationsTableName = "IntTest_Benchmark_Migrations"; - private bool _useOutbox; protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { services.AddSlimMessageBus(mbb => { + mbb.AutoStartConsumersEnabled(false); + mbb.AddChildBus("ExternalBus", mbb => { - var topic = "tests.outbox-benchmark/customer-events"; + var topic = $"smb-tests/outbox-benchmark/{Guid.NewGuid():N}/customer-events"; mbb .WithProviderServiceBus(cfg => { cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); cfg.PrefetchCount = 100; // fetch 100 messages at a time + cfg.TopologyProvisioning.CreateTopicOptions = o => o.AutoDeleteOnIdle = TimeSpan.FromMinutes(5); }) .Produce(x => x.DefaultTopic(topic)) .Consume(x => x @@ -46,26 +49,36 @@ protected override void SetupServices(ServiceCollection services, IConfiguration mbb.AddOutboxUsingDbContext(opts => { opts.PollBatchSize = 100; - opts.PollIdleSleep = TimeSpan.FromSeconds(0.5); + opts.LockExpiration = TimeSpan.FromMinutes(5); + opts.PollIdleSleep = TimeSpan.FromDays(1); opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10); opts.MessageCleanup.Age = TimeSpan.FromMinutes(1); - opts.SqlSettings.DatabaseTableName = OutboxTableName; - opts.SqlSettings.DatabaseMigrationsTableName = MigrationsTableName; + opts.SqlSettings.DatabaseSchemaName = CustomerContext.Schema; }); - mbb.AutoStartConsumersEnabled(false); }); services.AddSingleton>(); // Entity Framework setup - application specific EF DbContext - services.AddDbContext(options => options.UseSqlServer(Secrets.Service.PopulateSecrets(Configuration.GetConnectionString("DefaultConnection")))); + services.AddDbContext( + options => options.UseSqlServer( + Secrets.Service.PopulateSecrets(Configuration.GetConnectionString("DefaultConnection")), + x => x.MigrationsHistoryTable(HistoryRepository.DefaultTableName, CustomerContext.Schema))); } - private async Task PerformDbOperation(Func action) + private async Task PerformDbOperation(Func action) { - using var scope = ServiceProvider!.CreateScope(); - var context = scope.ServiceProvider.GetRequiredService(); - await action(context); + var scope = ServiceProvider!.CreateScope(); + try + { + var context = scope.ServiceProvider.GetRequiredService(); + var outboxMigrationService = scope.ServiceProvider.GetRequiredService(); + await action(context, outboxMigrationService); + } + finally + { + await ((IAsyncDisposable)scope).DisposeAsync(); + } } [Theory] @@ -76,14 +89,14 @@ public async Task Given_EventPublisherAndConsumerUsingOutbox_When_BurstOfEventsI // arrange _useOutbox = useOutbox; - await PerformDbOperation(async context => + await PerformDbOperation(async (context, outboxMigrationService) => { // migrate db + await context.Database.DropSchemaIfExistsAsync(context.Model.GetDefaultSchema()); await context.Database.MigrateAsync(); - // clean outbox from previous test run - await context.Database.EraseTableIfExists(OutboxTableName); - await context.Database.EraseTableIfExists(MigrationsTableName); + // migrate outbox sql + await outboxMigrationService.Migrate(CancellationToken.None); }); var surnames = new[] { "Doe", "Smith", "Kowalsky" }; @@ -118,6 +131,22 @@ await using (unitOfWorkScope as IAsyncDisposable) var publishTimerElapsed = publishTimer.Elapsed; + // publish messages + var outboxPublishTimerElapsed = TimeSpan.Zero; + if (_useOutbox) + { + var outboxSendingTask = ServiceProvider.GetRequiredService(); + var outputRepository = ServiceProvider.GetRequiredService(); + + var outboxTimer = Stopwatch.StartNew(); + var publishCount = await outboxSendingTask.SendMessages(ServiceProvider, outputRepository, CancellationToken.None); + outboxPublishTimerElapsed = outboxTimer.Elapsed; + + publishCount.Should().Be(messageCount); + } + + store.Clear(); + // start consumers await EnsureConsumersStarted(); @@ -126,12 +155,12 @@ await using (unitOfWorkScope as IAsyncDisposable) await store.WaitUntilArriving(newMessagesTimeout: 5, expectedCount: events.Count); // assert - var consumeTimerElapsed = consumptionTimer.Elapsed; // Log the measured times - Logger.LogInformation("Message Publish took: {Elapsed}", publishTimerElapsed); - Logger.LogInformation("Message Consume took: {Elapsed}", consumeTimerElapsed); + Logger.LogInformation("Message Publish took : {Elapsed}", publishTimerElapsed); + Logger.LogInformation("Outbox publish took : {Elapsed}", outboxPublishTimerElapsed); + Logger.LogInformation("Message Consume took : {Elapsed}", consumeTimerElapsed); // Ensure the expected number of events was actually published to ASB and delivered via that channel. store.Count.Should().Be(events.Count); diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs index 6164536c..eac1a574 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/OutboxTests.cs @@ -1,14 +1,14 @@ namespace SlimMessageBus.Host.Outbox.DbContext.Test; +using Microsoft.EntityFrameworkCore.Migrations; + [Trait("Category", "Integration")] +[Collection(CustomerContext.Schema)] public class OutboxTests(ITestOutputHelper testOutputHelper) : BaseIntegrationTest(testOutputHelper) { private TransactionType _testParamTransactionType; private BusType _testParamBusType; - private static readonly string OutboxTableName = "IntTest_Outbox"; - private static readonly string MigrationsTableName = "IntTest_Migrations"; - protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration) { services.AddSlimMessageBus(mbb => @@ -73,8 +73,9 @@ protected override void SetupServices(ServiceCollection services, IConfiguration { cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); cfg.PrefetchCount = 100; + cfg.TopologyProvisioning.CreateTopicOptions = o => o.AutoDeleteOnIdle = TimeSpan.FromMinutes(5); }); - topic = "tests.outbox/customer-events"; + topic = $"smb-tests/outbox/{Guid.NewGuid():N}/customer-events"; } mbb @@ -95,22 +96,31 @@ protected override void SetupServices(ServiceCollection services, IConfiguration opts.PollIdleSleep = TimeSpan.FromSeconds(0.5); opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10); opts.MessageCleanup.Age = TimeSpan.FromMinutes(1); - opts.SqlSettings.DatabaseTableName = OutboxTableName; - opts.SqlSettings.DatabaseMigrationsTableName = MigrationsTableName; + opts.SqlSettings.DatabaseSchemaName = CustomerContext.Schema; }); }); services.AddSingleton>(); // Entity Framework setup - application specific EF DbContext - services.AddDbContext(options => options.UseSqlServer(Secrets.Service.PopulateSecrets(Configuration.GetConnectionString("DefaultConnection")))); + services.AddDbContext( + options => options.UseSqlServer( + Secrets.Service.PopulateSecrets(Configuration.GetConnectionString("DefaultConnection")), + x => x.MigrationsHistoryTable(HistoryRepository.DefaultTableName, CustomerContext.Schema))); } private async Task PerformDbOperation(Func action) { - using var scope = ServiceProvider!.CreateScope(); - var context = scope.ServiceProvider.GetRequiredService(); - await action(context); + var scope = ServiceProvider!.CreateScope(); + try + { + var context = scope.ServiceProvider.GetRequiredService(); + await action(context); + } + finally + { + await ((IAsyncDisposable)scope).DisposeAsync(); + } } public const string InvalidLastname = "Exception"; @@ -125,19 +135,11 @@ public async Task Given_CommandHandlerInTransaction_When_ExceptionThrownDuringHa _testParamTransactionType = transactionType; _testParamBusType = busType; - await PerformDbOperation(async context => + await PerformDbOperation(async (context) => { // migrate db + await context.Database.DropSchemaIfExistsAsync(context.Model.GetDefaultSchema()); await context.Database.MigrateAsync(); - - // clean outbox from previous test run - await context.Database.EraseTableIfExists(OutboxTableName); - await context.Database.EraseTableIfExists(MigrationsTableName); - - // clean the customers table - var cust = await context.Customers.ToListAsync(); - context.Customers.RemoveRange(cust); - await context.SaveChangesAsync(); }); var surnames = new[] { "Doe", "Smith", InvalidLastname }; diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj index e2d802a7..d4784177 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/SlimMessageBus.Host.Outbox.DbContext.Test.csproj @@ -30,4 +30,8 @@ + + + + diff --git a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Usings.cs b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Usings.cs index 4728b0e1..0aedb8b2 100644 --- a/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Usings.cs +++ b/src/Tests/SlimMessageBus.Host.Outbox.DbContext.Test/Usings.cs @@ -1,3 +1,4 @@ +global using System.ComponentModel; global using System.Diagnostics; global using System.Reflection; diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs index 70a335aa..b024b5a3 100644 --- a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs @@ -81,6 +81,7 @@ protected async Task EnsureConsumersStarted() { var timeout = Stopwatch.StartNew(); var consumerControl = ServiceProvider.GetRequiredService(); + await consumerControl.Start(); // ensure the consumers are warm while (!consumerControl.IsStarted && timeout.ElapsedMilliseconds < 5000) await Task.Delay(100);