Skip to content

Commit

Permalink
[Host.Outbox] Ensure SQL tables are provisioned without consumer star…
Browse files Browse the repository at this point in the history
…t and with first publish.

Also:
- Add baseline performance test for Outbox.
- Send Created bus lifecycle event to be able to hook into Master bus creation

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Jun 14, 2024
1 parent 959bf28 commit aa39134
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 53 deletions.
1 change: 1 addition & 0 deletions infrastructure.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker compose -f src/Infrastructure/docker-compose.yml up --force-recreate -V
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
responseConsumer: this,
messagePayloadProvider: m => m.Body.ToArray());

AddConsumerFrom(topicSubscription, messageProcessor, new[] { Settings.RequestResponse });
AddConsumerFrom(topicSubscription, messageProcessor, [Settings.RequestResponse]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

public enum MessageBusLifecycleEventType
{
/// <summary>
/// Invoked when the master bus is created.
/// Can be used to initalize any resource before the messages are produced or consumed.
/// </summary>
Created,
Starting,
Started,
Stopping,
Stopped,
Stopped
}
40 changes: 36 additions & 4 deletions src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ public class OutboxSendingTask(

private CancellationTokenSource _loopCts;
private Task _loopTask;

private Task _migrateSchemaTask;
private Task _startBusTask;
private Task _stopBusTask;

private int _busCreateCount;
private int _busStartCount;

private DateTime? _cleanupNextRun;
Expand Down Expand Up @@ -75,35 +81,61 @@ protected async Task Stop()

public Task OnBusLifecycle(MessageBusLifecycleEventType eventType, IMessageBus bus)
{
if (eventType == MessageBusLifecycleEventType.Created)
{
// Run schema migration only once for the first bus created
if (Interlocked.Increment(ref _busCreateCount) == 1)
{
_migrateSchemaTask = MigrateSchema(_serviceProvider, default);
}
return _migrateSchemaTask;
}
if (eventType == MessageBusLifecycleEventType.Started)
{
// The first started bus starts this outbox task
if (Interlocked.Increment(ref _busStartCount) == 1)
{
return Start();
_startBusTask = Start();
}
return _startBusTask;
}
if (eventType == MessageBusLifecycleEventType.Stopping)
{
// The last stopped bus stops this outbox task
if (Interlocked.Decrement(ref _busStartCount) == 0)
{
return Stop();
_stopBusTask = Stop();
}
return _stopBusTask;
}
return Task.CompletedTask;
}

private async Task MigrateSchema(IServiceProvider serviceProvider, CancellationToken cancellationToken)
{
try
{
var outboxMigrationService = serviceProvider.GetRequiredService<IOutboxMigrationService>();
await outboxMigrationService.Migrate(cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Outbox schema migration failed");
throw;
}
}

private async Task Run()
{
try
{

_logger.LogInformation("Outbox loop started");
var scope = _serviceProvider.CreateScope();

try
{
var outboxMigrationService = scope.ServiceProvider.GetRequiredService<IOutboxMigrationService>();
await outboxMigrationService.Migrate(_loopCts.Token);
await MigrateSchema(scope.ServiceProvider, _loopCts.Token);

var outboxRepository = scope.ServiceProvider.GetRequiredService<IOutboxRepository>();

Expand Down
38 changes: 21 additions & 17 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,27 @@ protected void OnBuildProvider()
{
ValidationService.AssertSettings();

Build();

if (Settings.AutoStartConsumers)
{
// Fire and forget start
_ = Task.Run(async () =>
{
try
{
await Start().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Could not auto start consumers");
}
Build();

// Notify the bus has been created - before any message can be produced
AddInit(OnBusLifecycle(MessageBusLifecycleEventType.Created));

// Auto start consumers if enabled
if (Settings.AutoStartConsumers)
{
// Fire and forget start
_ = Task.Run(async () =>
{
try
{
await Start().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Could not auto start consumers");
}
});
}
}
}

protected virtual void Build()
Expand Down Expand Up @@ -196,7 +200,7 @@ protected virtual void BuildPendingRequestStore()

private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
{
_lifecycleInterceptors ??= Settings.ServiceProvider?.GetService<IEnumerable<IMessageBusLifecycleInterceptor>>();
_lifecycleInterceptors ??= Settings.ServiceProvider?.GetServices<IMessageBusLifecycleInterceptor>();
if (_lifecycleInterceptors != null)
{
foreach (var i in _lifecycleInterceptors)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace SlimMessageBus.Host.Outbox.DbContext.Test;

using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;

public static class DatabaseFacadeExtenstions
{
public static Task EraseTableIfExists(this DatabaseFacade db, string tableName)
{
return db.ExecuteSqlRawAsync(
$"""
IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'dbo' AND TABLE_NAME = '{tableName}')
BEGIN
DELETE FROM dbo.{tableName};
END
""");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
namespace SlimMessageBus.Host.Outbox.DbContext.Test;

using System.Diagnostics;
using System.Reflection;

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

using SecretStore;

using SlimMessageBus.Host;
using SlimMessageBus.Host.AzureServiceBus;
using SlimMessageBus.Host.Outbox.DbContext.Test.DataAccess;
using SlimMessageBus.Host.Serialization.SystemTextJson;
using SlimMessageBus.Host.Test.Common.IntegrationTest;

[Trait("Category", "Integration")] // for benchmarks
public class OutboxBenchmarkTests(ITestOutputHelper testOutputHelper) : BaseIntegrationTest<OutboxBenchmarkTests>(testOutputHelper)
{
private static readonly string OutboxTableName = "IntTest_Benchmark_Outbox";
private static readonly string MigrationsTableName = "IntTest_Benchmark_Migrations";

protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration)
{
services.AddSlimMessageBus(mbb =>
{
mbb.AddChildBus("ExternalBus", mbb =>
{
var topic = "tests.outbox-benchmark/customer-events";
mbb
.WithProviderServiceBus(cfg => cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]))
.Produce<CustomerCreatedEvent>(x => x.DefaultTopic(topic))
.Consume<CustomerCreatedEvent>(x => x
.Topic(topic)
.WithConsumer<CustomerCreatedEventConsumer>()
.SubscriptionName(nameof(OutboxBenchmarkTests))) // for AzureSB
.UseOutbox(); // All outgoing messages from this bus will go out via an outbox
});
mbb.AddServicesFromAssembly(Assembly.GetExecutingAssembly());
mbb.AddJsonSerializer();
mbb.AddOutboxUsingDbContext<CustomerContext>(opts =>
{
opts.PollBatchSize = 100;
opts.PollIdleSleep = TimeSpan.FromSeconds(0.5);
opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
opts.MessageCleanup.Age = TimeSpan.FromMinutes(1);
opts.SqlSettings.DatabaseTableName = OutboxTableName;
opts.SqlSettings.DatabaseMigrationsTableName = MigrationsTableName;
});
mbb.AutoStartConsumersEnabled(false);
});

services.AddSingleton<TestEventCollector<CustomerCreatedEvent>>();

// Entity Framework setup - application specific EF DbContext
services.AddDbContext<CustomerContext>(options => options.UseSqlServer(Secrets.Service.PopulateSecrets(Configuration.GetConnectionString("DefaultConnection"))));
}

private async Task PerformDbOperation(Func<CustomerContext, Task> action)
{
using var scope = ServiceProvider!.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<CustomerContext>();
await action(context);
}

[Theory]
[InlineData([100])]
public async Task Given_EventPublisherAndConsumerUsingOutbox_When_BurstOfEventsIsSent_Then_EventsAreConsumedProperly(int messageCount)
{
// arrange
await PerformDbOperation(async context =>
{
// migrate db
await context.Database.MigrateAsync();
// clean outbox from previous test run
await context.Database.EraseTableIfExists(OutboxTableName);
await context.Database.EraseTableIfExists(MigrationsTableName);
});

var surnames = new[] { "Doe", "Smith", "Kowalsky" };
var events = Enumerable.Range(0, messageCount).Select(x => new CustomerCreatedEvent(Guid.NewGuid(), $"John {x:000}", surnames[x % surnames.Length])).ToList();
var store = ServiceProvider!.GetRequiredService<TestEventCollector<CustomerCreatedEvent>>();

// act

// publish the events in one shot (consumers are not started yet)
var publishTimer = Stopwatch.StartNew();

var publishTasks = events
.Select(async ev =>
{
var unitOfWorkScope = ServiceProvider!.CreateScope();
await using (unitOfWorkScope as IAsyncDisposable)
{
var bus = unitOfWorkScope.ServiceProvider.GetRequiredService<IMessageBus>();
try
{
await bus.Publish(ev, headers: new Dictionary<string, object> { ["CustomerId"] = ev.Id });
}
catch (Exception ex)
{
Logger.LogInformation("Exception occurred while publishing event {Event}: {Message}", ev, ex.Message);
}
}
})
.ToArray();

await Task.WhenAll(publishTasks);

var publishTimerElapsed = publishTimer.Elapsed;

// start consumers
await EnsureConsumersStarted();

// consume the events from outbox
var consumptionTimer = Stopwatch.StartNew();
await store.WaitUntilArriving(newMessagesTimeout: 5, expectedCount: events.Count);

// assert

var consumeTimerElapsed = consumptionTimer.Elapsed;

Logger.LogInformation("Message Publish took: {Elapsed}", publishTimerElapsed);
Logger.LogInformation("Message Consume took: {Elapsed}", consumeTimerElapsed);

// Ensure the expected number of events was actually published to ASB and delivered via that channel.
store.Count.Should().Be(events.Count);
}
}
Loading

0 comments on commit aa39134

Please sign in to comment.