Skip to content

Commit

Permalink
#263 outbox benchmark tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
  • Loading branch information
EtherZa authored and zarusz committed Jun 17, 2024
1 parent 683f2a7 commit 643f8b0
Show file tree
Hide file tree
Showing 20 changed files with 527 additions and 332 deletions.
4 changes: 4 additions & 0 deletions src/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,7 @@ dotnet_style_allow_multiple_blank_lines_experimental = true:silent
dotnet_style_allow_statement_immediately_after_block_experimental = true:silent
dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion
dotnet_diagnostic.CA1859.severity = silent

[*.csproj]
indent_style = space
indent_size = 2
2 changes: 2 additions & 0 deletions src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ version: '3.4'

services:
zookeeper:
container_name: slim.zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- slim

kafka:
container_name: slim.kafka
image: wurstmeister/kafka
ports:
- "9092:9092"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace SlimMessageBus.Host.Outbox;

using Microsoft.Extensions.DependencyInjection;

public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<OutboxSettings> configure = null)
Expand Down Expand Up @@ -30,7 +32,9 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<Out
}
// Without optimization: services.TryAddEnumerable(ServiceDescriptor.Transient(typeof(IConsumerInterceptor<>), typeof(TransactionScopeConsumerInterceptor<>)));
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask>());
services.AddSingleton<OutboxSendingTask>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
services.TryAddSingleton(svp =>
Expand Down
151 changes: 77 additions & 74 deletions src/SlimMessageBus.Host.Outbox/Interceptors/OutboxSendingTask.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace SlimMessageBus.Host.Outbox;

public class OutboxSendingTask(
internal class OutboxSendingTask(
ILoggerFactory loggerFactory,
OutboxSettings outboxSettings,
IServiceProvider serviceProvider,
Expand Down Expand Up @@ -141,39 +141,18 @@ private async Task Run()

var outboxRepository = scope.ServiceProvider.GetRequiredService<IOutboxRepository>();

var processedIds = new List<Guid>(_outboxSettings.PollBatchSize);

for (var ct = _loopCts.Token; !ct.IsCancellationRequested;)
{
var idleRun = true;
try
{
var lockExpiresOn = DateTime.UtcNow.Add(_outboxSettings.LockExpiration);
var lockedCount = await outboxRepository.TryToLock(_instanceIdProvider.GetInstanceId(), lockExpiresOn, ct).ConfigureAwait(false);
// Check if some messages where locked
if (lockedCount > 0)
{
idleRun = await SendMessages(scope.ServiceProvider, outboxRepository, processedIds, ct).ConfigureAwait(false);
}
}
catch (TaskCanceledException)
{
throw;
}
catch (Exception e)
{
_logger.LogError(e, "Error while processing outbox messages");
}

if (idleRun)
await SendMessages(scope.ServiceProvider, outboxRepository, ct).ConfigureAwait(false);
if (!ct.IsCancellationRequested)
{
if (ShouldRunCleanup())
{
_logger.LogTrace("Running cleanup of sent messages");
await outboxRepository.DeleteSent(DateTime.UtcNow.Add(-_outboxSettings.MessageCleanup.Age), ct).ConfigureAwait(false);
}

await Task.Delay(_outboxSettings.PollIdleSleep).ConfigureAwait(false);
await Task.Delay(_outboxSettings.PollIdleSleep, ct).ConfigureAwait(false);
}
}
}
Expand All @@ -200,73 +179,97 @@ private async Task Run()
}
}

private async Task<bool> SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, List<Guid> processedIds, CancellationToken ct)
public async Task<int> SendMessages(IServiceProvider serviceProvider, IOutboxRepository outboxRepository, CancellationToken ct)
{
var messageBus = serviceProvider.GetRequiredService<IMessageBus>();
var compositeMessageBus = messageBus as ICompositeMessageBus;
var messageBusTarget = messageBus as IMessageBusTarget;

var idleRun = true;

for (var hasMore = true; hasMore && !ct.IsCancellationRequested;)
var processedIds = new List<Guid>(_outboxSettings.PollBatchSize);
bool idleRun;
var count = 0;
do
{
var outboxMessages = await outboxRepository.FindNextToSend(_instanceIdProvider.GetInstanceId(), ct);
if (outboxMessages.Count == 0)
{
break;
}

idleRun = true;
try
{
for (var i = 0; i < outboxMessages.Count && !ct.IsCancellationRequested; i++)
var lockExpiresOn = DateTime.UtcNow.Add(_outboxSettings.LockExpiration);
var lockedCount = await outboxRepository.TryToLock(_instanceIdProvider.GetInstanceId(), lockExpiresOn, ct).ConfigureAwait(false);
// Check if some messages where locked
if (lockedCount > 0)
{
var outboxMessage = outboxMessages[i];

var now = DateTime.UtcNow;
if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn)
for (var hasMore = true; hasMore && !ct.IsCancellationRequested;)
{
_logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer);
hasMore = false;
break;
}

var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName);
if (bus == null)
{
_logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName);
continue;
}

_logger.LogDebug("Sending outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName);
var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload);
var outboxMessages = await outboxRepository.FindNextToSend(_instanceIdProvider.GetInstanceId(), ct);
if (outboxMessages.Count == 0)
{
break;
}

// Add special header to supress from forwarding the message againt to outbox
var headers = outboxMessage.Headers ?? new Dictionary<string, object>();
headers.Add(OutboxForwardingPublishInterceptor<object>.SkipOutboxHeader, string.Empty);
try
{
for (var i = 0; i < outboxMessages.Count && !ct.IsCancellationRequested; i++)
{
var outboxMessage = outboxMessages[i];

var now = DateTime.UtcNow;
if (now.Add(_outboxSettings.LockExpirationBuffer) > outboxMessage.LockExpiresOn)
{
_logger.LogDebug("Stopping the outbox message processing after {MessageCount} (out of {BatchCount}) because the message lock was close to expiration {LockBuffer}", i, _outboxSettings.PollBatchSize, _outboxSettings.LockExpirationBuffer);
hasMore = false;
break;
}

var bus = GetBus(compositeMessageBus, messageBusTarget, outboxMessage.BusName);
if (bus == null)
{
_logger.LogWarning("Not able to find matching bus provider for the outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus. The message will be skipped.", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName);
continue;
}

_logger.LogDebug("Sending outbox message with Id {MessageId} of type {MessageType} to path {Path} using {BusName} bus", outboxMessage.Id, outboxMessage.MessageType.Name, outboxMessage.Path, outboxMessage.BusName);
var message = bus.Serializer.Deserialize(outboxMessage.MessageType, outboxMessage.MessagePayload);

// Add special header to supress from forwarding the message againt to outbox
var headers = outboxMessage.Headers ?? new Dictionary<string, object>();
headers.Add(OutboxForwardingPublishInterceptor<object>.SkipOutboxHeader, string.Empty);

if (!ct.IsCancellationRequested)
{
await bus.ProducePublish(message, path: outboxMessage.Path, headers: headers, messageBusTarget, cancellationToken: ct);

processedIds.Add(outboxMessage.Id);
}
}
}
finally
{
// confirm what messages were processed
if (processedIds.Count > 0)
{
_logger.LogDebug("Updating {MessageCount} outbox messages as sent", processedIds.Count);
await outboxRepository.UpdateToSent(processedIds, ct);

if (!ct.IsCancellationRequested)
{
await bus.ProducePublish(message, path: outboxMessage.Path, headers: headers, messageBusTarget, cancellationToken: ct);
idleRun = false;

processedIds.Add(outboxMessage.Id);
count += processedIds.Count;
processedIds.Clear();
}
}
}
}
}
finally
catch (TaskCanceledException)
{
// confirm what messages were processed
if (processedIds.Count > 0)
{
_logger.LogDebug("Updating {MessageCount} outbox messages as sent", processedIds.Count);
await outboxRepository.UpdateToSent(processedIds, ct);

idleRun = false;

processedIds.Clear();
}
throw;
}
}
return idleRun;
catch (Exception e)
{
_logger.LogError(e, "Error while processing outbox messages");
}
} while (!idleRun && !ct.IsCancellationRequested);

return count;
}

private static IMasterMessageBus GetBus(ICompositeMessageBus compositeMessageBus, IMessageBusTarget messageBusTarget, string name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>SlimMessageBus.Host.Outbox.DbContext.Test</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

</Project>
6 changes: 5 additions & 1 deletion src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
{
foreach (var i in _lifecycleInterceptors)
{
await i.OnBusLifecycle(eventType, MessageBusTarget);
var task = i.OnBusLifecycle(eventType, MessageBusTarget);
if (task != null)
{
await task;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

public class CustomerContext : DbContext
{
public const string Schema = "Outbox";

public DbSet<Customer> Customers { get; set; }

#region EF migrations
Expand Down Expand Up @@ -35,6 +37,7 @@ public CustomerContext(DbContextOptions<CustomerContext> options) : base(options

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Customer>(x => x.ToTable("IntTest_Customer"));
modelBuilder.HasDefaultSchema(Schema);
modelBuilder.Entity<Customer>();
}
}
Loading

0 comments on commit 643f8b0

Please sign in to comment.