Skip to content

Commit

Permalink
Message Bus Consumers + OutBox Event Publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
phongnguyend committed Sep 9, 2023
1 parent 574549a commit 9dbc98c
Show file tree
Hide file tree
Showing 41 changed files with 1,130 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ Task SendAsync<T>(T message, MetaData metaData = null, CancellationToken cancell

Task ReceiveAsync<TConsumer, T>(Func<T, MetaData, Task> action, CancellationToken cancellationToken = default)
where T : IMessageBusMessage;

Task ReceiveAsync<TConsumer, T>(CancellationToken cancellationToken = default)
where T : IMessageBusMessage;

Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default);
}

public interface IMessageBusMessage
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers;

public interface IMessageBusConsumer<TConsumer, T>
{
Task HandleAsync(T data, MetaData metaData, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers;

public interface IOutBoxEventPublisher
{
static abstract string[] CanHandleEventTypes();

static abstract string CanHandleEventSource();

Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default);
}

public class PublishingOutBoxEvent
{
public string Id { get; set; }

public string EventType { get; set; }

public string EventSource { get; set; }

public string Payload { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -8,6 +12,51 @@ namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers;
public class MessageBus : IMessageBus
{
private readonly IServiceProvider _serviceProvider;
private static List<Type> _consumers = new List<Type>();
private static Dictionary<string, List<Type>> _outboxEventHandlers = new();

internal static void AddConsumers(Assembly assembly, IServiceCollection services)
{
var types = assembly.GetTypes()
.Where(x => x.GetInterfaces().Any(y => y.IsGenericType && y.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>)))
.ToList();

foreach (var type in types)
{
services.AddTransient(type);
}

_consumers.AddRange(types);
}

internal static void AddOutboxEventPublishers(Assembly assembly, IServiceCollection services)
{
var types = assembly.GetTypes()
.Where(x => x.GetInterfaces().Any(y => y == typeof(IOutBoxEventPublisher)))
.ToList();

foreach (var type in types)
{
services.AddTransient(type);
}

foreach (var item in types)
{
var canHandlerEventTypes = (string[])item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventTypes), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture);
var eventSource = (string)item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventSource), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture);

foreach (var eventType in canHandlerEventTypes)
{
var key = eventSource + ":" + eventType;
if (!_outboxEventHandlers.ContainsKey(key))
{
_outboxEventHandlers[key] = new List<Type>();
}

_outboxEventHandlers[key].Add(item);
}
}
}

public MessageBus(IServiceProvider serviceProvider)
{
Expand All @@ -25,4 +74,64 @@ public async Task SendAsync<T>(T message, MetaData metaData = null, Cancellation
{
await _serviceProvider.GetRequiredService<IMessageReceiver<TConsumer, T>>().ReceiveAsync(action, cancellationToken);
}

public async Task ReceiveAsync<TConsumer, T>(CancellationToken cancellationToken = default)
where T : IMessageBusMessage
{
await _serviceProvider.GetRequiredService<IMessageReceiver<TConsumer, T>>().ReceiveAsync(async (data, metaData) =>
{
using var scope = _serviceProvider.CreateScope();
foreach (Type handlerType in _consumers)
{
bool canHandleEvent = handlerType.GetInterfaces()
.Any(x => x.IsGenericType
&& x.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>)
&& x.GenericTypeArguments[0] == typeof(TConsumer) && x.GenericTypeArguments[1] == typeof(T));
if (canHandleEvent)
{
dynamic handler = scope.ServiceProvider.GetService(handlerType);
await handler.HandleAsync((dynamic)data, metaData, cancellationToken);
}
}
}, cancellationToken);
}

public async Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default)
{
var key = outbox.EventSource + ":" + outbox.EventType;
var handlerTypes = _outboxEventHandlers.ContainsKey(key) ? _outboxEventHandlers[key] : null;

if (handlerTypes == null)
{
// TODO: Take Note
return;
}

foreach (var type in handlerTypes)
{
dynamic handler = _serviceProvider.GetService(type);
await handler.HandleAsync(outbox, cancellationToken);
}
}
}

public static class MessageBusExtentions
{
public static void AddMessageBusConsumers(this IServiceCollection services, Assembly assembly)
{
MessageBus.AddConsumers(assembly, services);
}

public static void AddOutboxEventPublishers(this IServiceCollection services, Assembly assembly)
{
MessageBus.AddOutboxEventPublishers(assembly, services);
}

public static void AddMessageBus(this IServiceCollection services, Assembly assembly)
{
services.AddTransient<IMessageBus, MessageBus>();
services.AddMessageBusConsumers(assembly);
services.AddOutboxEventPublishers(assembly);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace ClassifiedAds.Infrastructure.HostedServices;

public sealed class MessageBusConsumerBackgroundService<TConsumer, T> : BackgroundService
where T : IMessageBusEvent
{
private readonly ILogger<MessageBusConsumerBackgroundService<TConsumer, T>> _logger;
private readonly IMessageBus _messageBus;

public MessageBusConsumerBackgroundService(ILogger<MessageBusConsumerBackgroundService<TConsumer, T>> logger,
IMessageBus messageBus)
{
_logger = logger;
_messageBus = messageBus;
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_messageBus.ReceiveAsync<TConsumer, T>(stoppingToken);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using ClassifiedAds.Domain.Repositories;
using ClassifiedAds.Infrastructure.HostedServices;
using ClassifiedAds.Services.AuditLog.Authorization;
using ClassifiedAds.Services.AuditLog.ConfigurationOptions;
using ClassifiedAds.Services.AuditLog.DTOs;
using ClassifiedAds.Services.AuditLog.Entities;
using ClassifiedAds.Services.AuditLog.HostedServices;
using ClassifiedAds.Services.AuditLog.MessageBusConsumers;
using ClassifiedAds.Services.AuditLog.Repositories;
using Microsoft.AspNetCore.Builder;
using Microsoft.EntityFrameworkCore;
Expand Down Expand Up @@ -46,7 +47,10 @@ public static void MigrateAuditLogDb(this IApplicationBuilder app)

public static IServiceCollection AddHostedServicesAuditLogModule(this IServiceCollection services)
{
services.AddHostedService<MessageBusReceiver>();
services.AddMessageBusConsumers(Assembly.GetExecutingAssembly());
services.AddOutboxEventPublishers(Assembly.GetExecutingAssembly());

services.AddHostedService<MessageBusConsumerBackgroundService<AuditLogAggregationConsumer, AuditLogCreatedEvent>>();

return services;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,26 @@
using ClassifiedAds.Services.AuditLog.Entities;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace ClassifiedAds.Services.AuditLog.HostedServices;
namespace ClassifiedAds.Services.AuditLog.MessageBusConsumers;

internal class MessageBusReceiver : BackgroundService
public sealed class AuditLogAggregationConsumer : IMessageBusConsumer<AuditLogAggregationConsumer, AuditLogCreatedEvent>
{
private readonly ILogger<MessageBusReceiver> _logger;
private readonly ILogger<AuditLogAggregationConsumer> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IMessageBus _messageBus;

public MessageBusReceiver(ILogger<MessageBusReceiver> logger,
IServiceProvider serviceProvider,
IMessageBus messageBus)
public AuditLogAggregationConsumer(ILogger<AuditLogAggregationConsumer> logger,
IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
_messageBus = messageBus;
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
public async Task HandleAsync(AuditLogCreatedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
{
_messageBus.ReceiveAsync<AuditLogAggregationConsumer, AuditLogCreatedEvent>(async (data, metaData) =>
{
using var scope = _serviceProvider.CreateScope();
await ProcessMessage(scope, data, metaData);
}, stoppingToken);

return Task.CompletedTask;
}

private async Task ProcessMessage(IServiceScope scope, AuditLogCreatedEvent data, MetaData metaData)
{
var dispatcher = scope.ServiceProvider.GetRequiredService<Dispatcher>();
var idempotentRequestRepository = scope.ServiceProvider.GetRequiredService<IRepository<IdempotentRequest, Guid>>();
var dispatcher = _serviceProvider.GetRequiredService<Dispatcher>();
var idempotentRequestRepository = _serviceProvider.GetRequiredService<IRepository<IdempotentRequest, Guid>>();

var requestType = "ADD_AUDIT_LOG_ENTRY";

Expand Down Expand Up @@ -70,7 +55,3 @@ private async Task ProcessMessage(IServiceScope scope, AuditLogCreatedEvent data
await uow.CommitTransactionAsync();
}
}

public sealed class AuditLogAggregationConsumer
{
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
using ClassifiedAds.CrossCuttingConcerns.DateTimes;
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using ClassifiedAds.Domain.Repositories;
using ClassifiedAds.Services.Product.Constants;
using ClassifiedAds.Services.Product.DTOs;
using ClassifiedAds.Services.Product.Entities;
using Dapr.Client;
using MediatR;
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -26,19 +22,16 @@ public class PublishEventsCommandHandler : IRequestHandler<PublishEventsCommand>
private readonly IDateTimeProvider _dateTimeProvider;
private readonly IRepository<OutboxEvent, Guid> _outboxEventRepository;
private readonly IMessageBus _messageBus;
private readonly DaprClient _daprClient;

public PublishEventsCommandHandler(ILogger<PublishEventsCommandHandler> logger,
IDateTimeProvider dateTimeProvider,
IRepository<OutboxEvent, Guid> outboxEventRepository,
IMessageBus messageBus,
DaprClient daprClient)
IMessageBus messageBus)
{
_logger = logger;
_dateTimeProvider = dateTimeProvider;
_outboxEventRepository = outboxEventRepository;
_messageBus = messageBus;
_daprClient = daprClient;
}

public async Task Handle(PublishEventsCommand command, CancellationToken cancellationToken = default)
Expand All @@ -51,24 +44,15 @@ public async Task Handle(PublishEventsCommand command, CancellationToken cancell

foreach (var eventLog in events)
{
if (eventLog.EventType == EventTypeConstants.AuditLogEntryCreated)
var outbox = new PublishingOutBoxEvent
{
var logEntry = JsonSerializer.Deserialize<AuditLogEntry>(eventLog.Message);
await _messageBus.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry },
new MetaData
{
MessageId = eventLog.Id.ToString(),
});
Id = eventLog.Id.ToString(),
EventType = eventLog.EventType,
EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name,
Payload = eventLog.Message,
};

if (!string.IsNullOrWhiteSpace(Environment.GetEnvironmentVariable("DAPR_HTTP_PORT")))
{
await _daprClient.PublishEventAsync("pubsub", "AuditLogCreatedEvent", new AuditLogCreatedEvent { AuditLog = logEntry });
}
}
else
{
// TODO: Take Note
}
await _messageBus.SendAsync(outbox, cancellationToken);

eventLog.Published = true;
eventLog.UpdatedDateTime = _dateTimeProvider.OffsetNow;
Expand Down

0 comments on commit 9dbc98c

Please sign in to comment.