From 9dbc98c85df94a1ba864f9977bc19d56c15bcf74 Mon Sep 17 00:00:00 2001 From: Phong Nguyen Date: Fri, 8 Sep 2023 22:10:05 +0700 Subject: [PATCH] Message Bus Consumers + OutBox Event Publishers --- .../MessageBrokers/IMessageBus.cs | 5 + .../MessageBrokers/IMessageBusConsumer.cs | 9 ++ .../MessageBrokers/IOutBoxEventPublisher.cs | 24 ++++ .../MessageBrokers/MessageBus.cs | 109 +++++++++++++++++ .../MessageBusConsumerBackgroundService.cs | 27 +++++ ...ditLogModuleServiceCollectionExtensions.cs | 8 +- .../AuditLogAggregationConsumer.cs} | 35 ++---- .../Commands/PublishEventsCommand.cs | 32 ++--- .../AuditLogEntryOutBoxEventPublisher.cs | 53 ++++++++ ...roductModuleServiceCollectionExtensions.cs | 3 + .../Commands/PublishEventsCommand.cs | 31 ++--- .../HostedServices/MessageBusReceiver.cs | 50 -------- .../MessageBusConsumers/WebhookConsumer.cs | 39 ++++++ .../AuditLogEntryOutBoxEventPublisher.cs | 43 +++++++ .../FileEntryOutBoxEventPublisher.cs | 42 +++++++ ...torageModuleServiceCollectionExtensions.cs | 8 +- .../ClassifiedAds.BackgroundServer/Program.cs | 91 +++++++------- .../MessageBrokers/IMessageBus.cs | 8 +- .../MessageBrokers/IMessageBusConsumer.cs | 9 ++ .../MessageBrokers/IOutBoxEventPublisher.cs | 24 ++++ .../MessageBrokers/MessageBus.cs | 109 +++++++++++++++++ .../MessageBusConsumerBackgroundService.cs | 27 +++++ .../Commands/PublishEventsCommand.cs | 26 ++-- .../AuditLogEntryOutBoxEventPublisher.cs | 41 +++++++ ...roductModuleServiceCollectionExtensions.cs | 4 + .../Commands/PublishEventsCommand.cs | 33 ++--- .../HostedServices/MessageBusReceiver.cs | 50 -------- .../MessageBusConsumers/WebhookConsumer.cs | 39 ++++++ .../AuditLogEntryOutBoxEventPublisher.cs | 41 +++++++ .../FileEntryOutBoxEventPublisher.cs | 42 +++++++ ...torageModuleServiceCollectionExtensions.cs | 12 +- .../Commands/PublishEventsCommand.cs | 23 ++-- .../HostedServices/MessageBusReceiver.cs | 50 -------- .../MessageBusConsumers/WebhookConsumer.cs | 39 ++++++ .../FileEntryOutBoxEventPublisher.cs | 42 +++++++ .../ClassifiedAds.BackgroundServer/Program.cs | 113 +++++++++--------- .../MessageBrokers/IMessageBus.cs | 8 +- .../MessageBrokers/IMessageBusConsumer.cs | 9 ++ .../MessageBrokers/IOutBoxEventPublisher.cs | 24 ++++ .../MessageBrokers/MessageBus.cs | 109 +++++++++++++++++ .../MessageBusConsumerBackgroundService.cs | 27 +++++ 41 files changed, 1130 insertions(+), 388 deletions(-) create mode 100644 src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs create mode 100644 src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs create mode 100644 src/Microservices/Common/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs rename src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/{HostedServices/MessageBusReceiver.cs => MessageBusConsumers/AuditLogAggregationConsumer.cs} (53%) create mode 100644 src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs delete mode 100644 src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/HostedServices/MessageBusReceiver.cs create mode 100644 src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/MessageBusConsumers/WebhookConsumer.cs create mode 100644 src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs create mode 100644 src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs create mode 100644 src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs create mode 100644 src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs create mode 100644 src/ModularMonolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs create mode 100644 src/ModularMonolith/ClassifiedAds.Modules.Product/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs delete mode 100644 src/ModularMonolith/ClassifiedAds.Modules.Storage/HostedServices/MessageBusReceiver.cs create mode 100644 src/ModularMonolith/ClassifiedAds.Modules.Storage/MessageBusConsumers/WebhookConsumer.cs create mode 100644 src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs create mode 100644 src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs delete mode 100644 src/Monolith/ClassifiedAds.BackgroundServer/HostedServices/MessageBusReceiver.cs create mode 100644 src/Monolith/ClassifiedAds.BackgroundServer/MessageBusConsumers/WebhookConsumer.cs create mode 100644 src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs create mode 100644 src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs create mode 100644 src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs create mode 100644 src/Monolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs diff --git a/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs index 1649a77e5..f3628d1c9 100644 --- a/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs +++ b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs @@ -11,6 +11,11 @@ Task SendAsync(T message, MetaData metaData = null, CancellationToken cancell Task ReceiveAsync(Func action, CancellationToken cancellationToken = default) where T : IMessageBusMessage; + + Task ReceiveAsync(CancellationToken cancellationToken = default) + where T : IMessageBusMessage; + + Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default); } public interface IMessageBusMessage diff --git a/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs new file mode 100644 index 000000000..f7e224669 --- /dev/null +++ b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs @@ -0,0 +1,9 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; + +public interface IMessageBusConsumer +{ + Task HandleAsync(T data, MetaData metaData, CancellationToken cancellationToken = default); +} diff --git a/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs new file mode 100644 index 000000000..1c9ef5f0f --- /dev/null +++ b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs @@ -0,0 +1,24 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; + +public interface IOutBoxEventPublisher +{ + static abstract string[] CanHandleEventTypes(); + + static abstract string CanHandleEventSource(); + + Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default); +} + +public class PublishingOutBoxEvent +{ + public string Id { get; set; } + + public string EventType { get; set; } + + public string EventSource { get; set; } + + public string Payload { get; set; } +} \ No newline at end of file diff --git a/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs index 0f9cd046a..8a4478651 100644 --- a/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs +++ b/src/Microservices/Common/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs @@ -1,5 +1,9 @@ using Microsoft.Extensions.DependencyInjection; using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Reflection; using System.Threading; using System.Threading.Tasks; @@ -8,6 +12,51 @@ namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; public class MessageBus : IMessageBus { private readonly IServiceProvider _serviceProvider; + private static List _consumers = new List(); + private static Dictionary> _outboxEventHandlers = new(); + + internal static void AddConsumers(Assembly assembly, IServiceCollection services) + { + var types = assembly.GetTypes() + .Where(x => x.GetInterfaces().Any(y => y.IsGenericType && y.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>))) + .ToList(); + + foreach (var type in types) + { + services.AddTransient(type); + } + + _consumers.AddRange(types); + } + + internal static void AddOutboxEventPublishers(Assembly assembly, IServiceCollection services) + { + var types = assembly.GetTypes() + .Where(x => x.GetInterfaces().Any(y => y == typeof(IOutBoxEventPublisher))) + .ToList(); + + foreach (var type in types) + { + services.AddTransient(type); + } + + foreach (var item in types) + { + var canHandlerEventTypes = (string[])item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventTypes), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); + var eventSource = (string)item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventSource), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); + + foreach (var eventType in canHandlerEventTypes) + { + var key = eventSource + ":" + eventType; + if (!_outboxEventHandlers.ContainsKey(key)) + { + _outboxEventHandlers[key] = new List(); + } + + _outboxEventHandlers[key].Add(item); + } + } + } public MessageBus(IServiceProvider serviceProvider) { @@ -25,4 +74,64 @@ public async Task SendAsync(T message, MetaData metaData = null, Cancellation { await _serviceProvider.GetRequiredService>().ReceiveAsync(action, cancellationToken); } + + public async Task ReceiveAsync(CancellationToken cancellationToken = default) + where T : IMessageBusMessage + { + await _serviceProvider.GetRequiredService>().ReceiveAsync(async (data, metaData) => + { + using var scope = _serviceProvider.CreateScope(); + foreach (Type handlerType in _consumers) + { + bool canHandleEvent = handlerType.GetInterfaces() + .Any(x => x.IsGenericType + && x.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>) + && x.GenericTypeArguments[0] == typeof(TConsumer) && x.GenericTypeArguments[1] == typeof(T)); + + if (canHandleEvent) + { + dynamic handler = scope.ServiceProvider.GetService(handlerType); + await handler.HandleAsync((dynamic)data, metaData, cancellationToken); + } + } + }, cancellationToken); + } + + public async Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + var key = outbox.EventSource + ":" + outbox.EventType; + var handlerTypes = _outboxEventHandlers.ContainsKey(key) ? _outboxEventHandlers[key] : null; + + if (handlerTypes == null) + { + // TODO: Take Note + return; + } + + foreach (var type in handlerTypes) + { + dynamic handler = _serviceProvider.GetService(type); + await handler.HandleAsync(outbox, cancellationToken); + } + } } + +public static class MessageBusExtentions +{ + public static void AddMessageBusConsumers(this IServiceCollection services, Assembly assembly) + { + MessageBus.AddConsumers(assembly, services); + } + + public static void AddOutboxEventPublishers(this IServiceCollection services, Assembly assembly) + { + MessageBus.AddOutboxEventPublishers(assembly, services); + } + + public static void AddMessageBus(this IServiceCollection services, Assembly assembly) + { + services.AddTransient(); + services.AddMessageBusConsumers(assembly); + services.AddOutboxEventPublishers(assembly); + } +} \ No newline at end of file diff --git a/src/Microservices/Common/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs b/src/Microservices/Common/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs new file mode 100644 index 000000000..98afe37f2 --- /dev/null +++ b/src/Microservices/Common/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs @@ -0,0 +1,27 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Infrastructure.HostedServices; + +public sealed class MessageBusConsumerBackgroundService : BackgroundService + where T : IMessageBusEvent +{ + private readonly ILogger> _logger; + private readonly IMessageBus _messageBus; + + public MessageBusConsumerBackgroundService(ILogger> logger, + IMessageBus messageBus) + { + _logger = logger; + _messageBus = messageBus; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + _messageBus.ReceiveAsync(stoppingToken); + return Task.CompletedTask; + } +} diff --git a/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/AuditLogModuleServiceCollectionExtensions.cs b/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/AuditLogModuleServiceCollectionExtensions.cs index f2ab00cb7..9a166c5cc 100644 --- a/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/AuditLogModuleServiceCollectionExtensions.cs +++ b/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/AuditLogModuleServiceCollectionExtensions.cs @@ -1,10 +1,11 @@ using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; +using ClassifiedAds.Infrastructure.HostedServices; using ClassifiedAds.Services.AuditLog.Authorization; using ClassifiedAds.Services.AuditLog.ConfigurationOptions; using ClassifiedAds.Services.AuditLog.DTOs; using ClassifiedAds.Services.AuditLog.Entities; -using ClassifiedAds.Services.AuditLog.HostedServices; +using ClassifiedAds.Services.AuditLog.MessageBusConsumers; using ClassifiedAds.Services.AuditLog.Repositories; using Microsoft.AspNetCore.Builder; using Microsoft.EntityFrameworkCore; @@ -46,7 +47,10 @@ public static void MigrateAuditLogDb(this IApplicationBuilder app) public static IServiceCollection AddHostedServicesAuditLogModule(this IServiceCollection services) { - services.AddHostedService(); + services.AddMessageBusConsumers(Assembly.GetExecutingAssembly()); + services.AddOutboxEventPublishers(Assembly.GetExecutingAssembly()); + + services.AddHostedService>(); return services; } diff --git a/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/HostedServices/MessageBusReceiver.cs b/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/MessageBusConsumers/AuditLogAggregationConsumer.cs similarity index 53% rename from src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/HostedServices/MessageBusReceiver.cs rename to src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/MessageBusConsumers/AuditLogAggregationConsumer.cs index 2230f872a..cf3521e61 100644 --- a/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/HostedServices/MessageBusReceiver.cs +++ b/src/Microservices/Services.AuditLog/ClassifiedAds.Services.AuditLog/MessageBusConsumers/AuditLogAggregationConsumer.cs @@ -5,41 +5,26 @@ using ClassifiedAds.Services.AuditLog.Entities; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -namespace ClassifiedAds.Services.AuditLog.HostedServices; +namespace ClassifiedAds.Services.AuditLog.MessageBusConsumers; -internal class MessageBusReceiver : BackgroundService +public sealed class AuditLogAggregationConsumer : IMessageBusConsumer { - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly IServiceProvider _serviceProvider; - private readonly IMessageBus _messageBus; - public MessageBusReceiver(ILogger logger, - IServiceProvider serviceProvider, - IMessageBus messageBus) + public AuditLogAggregationConsumer(ILogger logger, + IServiceProvider serviceProvider) { _logger = logger; _serviceProvider = serviceProvider; - _messageBus = messageBus; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + public async Task HandleAsync(AuditLogCreatedEvent data, MetaData metaData, CancellationToken cancellationToken = default) { - _messageBus.ReceiveAsync(async (data, metaData) => - { - using var scope = _serviceProvider.CreateScope(); - await ProcessMessage(scope, data, metaData); - }, stoppingToken); - - return Task.CompletedTask; - } - - private async Task ProcessMessage(IServiceScope scope, AuditLogCreatedEvent data, MetaData metaData) - { - var dispatcher = scope.ServiceProvider.GetRequiredService(); - var idempotentRequestRepository = scope.ServiceProvider.GetRequiredService>(); + var dispatcher = _serviceProvider.GetRequiredService(); + var idempotentRequestRepository = _serviceProvider.GetRequiredService>(); var requestType = "ADD_AUDIT_LOG_ENTRY"; @@ -70,7 +55,3 @@ private async Task ProcessMessage(IServiceScope scope, AuditLogCreatedEvent data await uow.CommitTransactionAsync(); } } - -public sealed class AuditLogAggregationConsumer -{ -} diff --git a/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/Commands/PublishEventsCommand.cs b/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/Commands/PublishEventsCommand.cs index 21b186671..08e019851 100644 --- a/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/Commands/PublishEventsCommand.cs +++ b/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/Commands/PublishEventsCommand.cs @@ -1,15 +1,11 @@ using ClassifiedAds.CrossCuttingConcerns.DateTimes; using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; -using ClassifiedAds.Services.Product.Constants; -using ClassifiedAds.Services.Product.DTOs; using ClassifiedAds.Services.Product.Entities; -using Dapr.Client; using MediatR; using Microsoft.Extensions.Logging; using System; using System.Linq; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -26,19 +22,16 @@ public class PublishEventsCommandHandler : IRequestHandler private readonly IDateTimeProvider _dateTimeProvider; private readonly IRepository _outboxEventRepository; private readonly IMessageBus _messageBus; - private readonly DaprClient _daprClient; public PublishEventsCommandHandler(ILogger logger, IDateTimeProvider dateTimeProvider, IRepository outboxEventRepository, - IMessageBus messageBus, - DaprClient daprClient) + IMessageBus messageBus) { _logger = logger; _dateTimeProvider = dateTimeProvider; _outboxEventRepository = outboxEventRepository; _messageBus = messageBus; - _daprClient = daprClient; } public async Task Handle(PublishEventsCommand command, CancellationToken cancellationToken = default) @@ -51,24 +44,15 @@ public async Task Handle(PublishEventsCommand command, CancellationToken cancell foreach (var eventLog in events) { - if (eventLog.EventType == EventTypeConstants.AuditLogEntryCreated) + var outbox = new PublishingOutBoxEvent { - var logEntry = JsonSerializer.Deserialize(eventLog.Message); - await _messageBus.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry }, - new MetaData - { - MessageId = eventLog.Id.ToString(), - }); + Id = eventLog.Id.ToString(), + EventType = eventLog.EventType, + EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name, + Payload = eventLog.Message, + }; - if (!string.IsNullOrWhiteSpace(Environment.GetEnvironmentVariable("DAPR_HTTP_PORT"))) - { - await _daprClient.PublishEventAsync("pubsub", "AuditLogCreatedEvent", new AuditLogCreatedEvent { AuditLog = logEntry }); - } - } - else - { - // TODO: Take Note - } + await _messageBus.SendAsync(outbox, cancellationToken); eventLog.Published = true; eventLog.UpdatedDateTime = _dateTimeProvider.OffsetNow; diff --git a/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs b/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs new file mode 100644 index 000000000..4013f0557 --- /dev/null +++ b/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs @@ -0,0 +1,53 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Services.Product.Commands; +using ClassifiedAds.Services.Product.Constants; +using ClassifiedAds.Services.Product.DTOs; +using ClassifiedAds.Services.Product.Entities; +using Dapr.Client; +using System; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Modules.Product.OutBoxEventPublishers; + +public class AuditLogEntryOutBoxEventPublisher : IOutBoxEventPublisher +{ + private readonly IMessageBus _messageBus; + private readonly DaprClient _daprClient; + + public static string[] CanHandleEventTypes() + { + return new string[] { EventTypeConstants.AuditLogEntryCreated }; + } + + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + + public AuditLogEntryOutBoxEventPublisher(IMessageBus messageBus, + DaprClient daprClient) + { + _messageBus = messageBus; + _daprClient = daprClient; + } + + public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + if (outbox.EventType == EventTypeConstants.AuditLogEntryCreated) + { + var logEntry = JsonSerializer.Deserialize(outbox.Payload); + await _messageBus.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry }, + new MetaData + { + MessageId = outbox.Id, + }); + + if (!string.IsNullOrWhiteSpace(Environment.GetEnvironmentVariable("DAPR_HTTP_PORT"))) + { + await _daprClient.PublishEventAsync("pubsub", "AuditLogCreatedEvent", new AuditLogCreatedEvent { AuditLog = logEntry }); + } + } + } +} \ No newline at end of file diff --git a/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/ProductModuleServiceCollectionExtensions.cs b/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/ProductModuleServiceCollectionExtensions.cs index 25cf7b04a..f8ea203d6 100644 --- a/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/ProductModuleServiceCollectionExtensions.cs +++ b/src/Microservices/Services.Product/ClassifiedAds.Services.Product.Api/ProductModuleServiceCollectionExtensions.cs @@ -63,6 +63,9 @@ public static void MigrateProductDb(this IApplicationBuilder app) public static IServiceCollection AddHostedServicesProductModule(this IServiceCollection services) { + services.AddMessageBusConsumers(Assembly.GetExecutingAssembly()); + services.AddOutboxEventPublishers(Assembly.GetExecutingAssembly()); + services.AddHostedService(); return services; diff --git a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/Commands/PublishEventsCommand.cs b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/Commands/PublishEventsCommand.cs index 73a3b3bbe..6c108bbf2 100644 --- a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/Commands/PublishEventsCommand.cs +++ b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/Commands/PublishEventsCommand.cs @@ -2,13 +2,10 @@ using ClassifiedAds.CrossCuttingConcerns.DateTimes; using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; -using ClassifiedAds.Services.Storage.Constants; -using ClassifiedAds.Services.Storage.DTOs; using ClassifiedAds.Services.Storage.Entities; using Microsoft.Extensions.Logging; using System; using System.Linq; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -47,27 +44,15 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca foreach (var eventLog in events) { - if (eventLog.EventType == EventTypeConstants.FileEntryCreated) + var outbox = new PublishingOutBoxEvent { - await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize(eventLog.Message) }); - } - else if (eventLog.EventType == EventTypeConstants.FileEntryDeleted) - { - await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize(eventLog.Message) }); - } - else if (eventLog.EventType == EventTypeConstants.AuditLogEntryCreated) - { - var logEntry = JsonSerializer.Deserialize(eventLog.Message); - await _messageBus.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry }, - new MetaData - { - MessageId = eventLog.Id.ToString(), - }); - } - else - { - // TODO: Take Note - } + Id = eventLog.Id.ToString(), + EventType = eventLog.EventType, + EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name, + Payload = eventLog.Message, + }; + + await _messageBus.SendAsync(outbox, cancellationToken); eventLog.Published = true; eventLog.UpdatedDateTime = _dateTimeProvider.OffsetNow; diff --git a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/HostedServices/MessageBusReceiver.cs b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/HostedServices/MessageBusReceiver.cs deleted file mode 100644 index 120311822..000000000 --- a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/HostedServices/MessageBusReceiver.cs +++ /dev/null @@ -1,50 +0,0 @@ -using ClassifiedAds.Domain.Infrastructure.MessageBrokers; -using ClassifiedAds.Services.Storage.DTOs; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System.Net.Http; -using System.Net.Http.Json; -using System.Threading; -using System.Threading.Tasks; - -namespace ClassifiedAds.Services.Storage.HostedServices; - -internal class MessageBusReceiver : BackgroundService -{ - private static readonly HttpClient _httpClient = new HttpClient(); - - private readonly ILogger _logger; - private readonly IConfiguration _configuration; - private readonly IMessageBus _messageBus; - - public MessageBusReceiver(ILogger logger, - IConfiguration configuration, - IMessageBus messageBus) - { - _logger = logger; - _configuration = configuration; - _messageBus = messageBus; - } - - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - _messageBus.ReceiveAsync(async (data, metaData) => - { - var url = _configuration["Webhooks:FileUploadedEvent:PayloadUrl"]; - await _httpClient.PostAsJsonAsync(url, data.FileEntry); - }, stoppingToken); - - _messageBus.ReceiveAsync(async (data, metaData) => - { - var url = _configuration["Webhooks:FileDeletedEvent:PayloadUrl"]; - await _httpClient.PostAsJsonAsync(url, data.FileEntry); - }, stoppingToken); - - return Task.CompletedTask; - } -} - -public sealed class WebhookConsumer -{ -} \ No newline at end of file diff --git a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/MessageBusConsumers/WebhookConsumer.cs b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/MessageBusConsumers/WebhookConsumer.cs new file mode 100644 index 000000000..50e2d97a2 --- /dev/null +++ b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/MessageBusConsumers/WebhookConsumer.cs @@ -0,0 +1,39 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Services.Storage.DTOs; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using System.Net.Http; +using System.Net.Http.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Services.Storage.MessageBusConsumers; + +public sealed class WebhookConsumer : + IMessageBusConsumer, + IMessageBusConsumer +{ + private static readonly HttpClient _httpClient = new HttpClient(); + + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + + public WebhookConsumer(ILogger logger, + IConfiguration configuration) + { + _logger = logger; + _configuration = configuration; + } + + public async Task HandleAsync(FileUploadedEvent data, MetaData metaData, CancellationToken cancellationToken = default) + { + var url = _configuration["Webhooks:FileUploadedEvent:PayloadUrl"]; + await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken); + } + + public async Task HandleAsync(FileDeletedEvent data, MetaData metaData, CancellationToken cancellationToken = default) + { + var url = _configuration["Webhooks:FileDeletedEvent:PayloadUrl"]; + await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken); + } +} diff --git a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs new file mode 100644 index 000000000..20946b6b0 --- /dev/null +++ b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs @@ -0,0 +1,43 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Services.Storage.Commands; +using ClassifiedAds.Services.Storage.Constants; +using ClassifiedAds.Services.Storage.DTOs; +using ClassifiedAds.Services.Storage.Entities; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Services.Storage.OutBoxEventPublishers; + +public class AuditLogEntryOutBoxEventPublisher : IOutBoxEventPublisher +{ + private readonly IMessageBus _messageBus; + + public static string[] CanHandleEventTypes() + { + return new string[] { EventTypeConstants.AuditLogEntryCreated }; + } + + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + + public AuditLogEntryOutBoxEventPublisher(IMessageBus messageBus) + { + _messageBus = messageBus; + } + + public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + if (outbox.EventType == EventTypeConstants.AuditLogEntryCreated) + { + var logEntry = JsonSerializer.Deserialize(outbox.Payload); + await _messageBus.SendAsync(new AuditLogCreatedEvent { AuditLog = logEntry }, + new MetaData + { + MessageId = outbox.Id, + }); + } + } +} \ No newline at end of file diff --git a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs new file mode 100644 index 000000000..8c9e42c74 --- /dev/null +++ b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs @@ -0,0 +1,42 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Services.Storage.Commands; +using ClassifiedAds.Services.Storage.Constants; +using ClassifiedAds.Services.Storage.DTOs; +using ClassifiedAds.Services.Storage.Entities; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Services.Storage.OutBoxEventPublishers; + +public class FileEntryOutBoxEventPublisher : IOutBoxEventPublisher +{ + private readonly IMessageBus _messageBus; + + public static string[] CanHandleEventTypes() + { + return new string[] { EventTypeConstants.FileEntryCreated, EventTypeConstants.FileEntryDeleted }; + } + + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + + public FileEntryOutBoxEventPublisher(IMessageBus messageBus) + { + _messageBus = messageBus; + } + + public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + if (outbox.EventType == EventTypeConstants.FileEntryCreated) + { + await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize(outbox.Payload) }, cancellationToken: cancellationToken); + } + else if (outbox.EventType == EventTypeConstants.FileEntryDeleted) + { + await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize(outbox.Payload) }, cancellationToken: cancellationToken); + } + } +} \ No newline at end of file diff --git a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/StorageModuleServiceCollectionExtensions.cs b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/StorageModuleServiceCollectionExtensions.cs index d2ab6f820..823571507 100644 --- a/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/StorageModuleServiceCollectionExtensions.cs +++ b/src/Microservices/Services.Storage/ClassifiedAds.Services.Storage.Api/StorageModuleServiceCollectionExtensions.cs @@ -1,11 +1,13 @@ using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; +using ClassifiedAds.Infrastructure.HostedServices; using ClassifiedAds.Infrastructure.Identity; using ClassifiedAds.Services.Storage.Authorization; using ClassifiedAds.Services.Storage.ConfigurationOptions; using ClassifiedAds.Services.Storage.DTOs; using ClassifiedAds.Services.Storage.Entities; using ClassifiedAds.Services.Storage.HostedServices; +using ClassifiedAds.Services.Storage.MessageBusConsumers; using ClassifiedAds.Services.Storage.Repositories; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; @@ -59,7 +61,11 @@ public static void MigrateStorageDb(this IApplicationBuilder app) public static IServiceCollection AddHostedServicesStorageModule(this IServiceCollection services) { - services.AddHostedService(); + services.AddMessageBusConsumers(Assembly.GetExecutingAssembly()); + services.AddOutboxEventPublishers(Assembly.GetExecutingAssembly()); + + services.AddHostedService>(); + services.AddHostedService>(); services.AddHostedService(); return services; diff --git a/src/ModularMonolith/ClassifiedAds.BackgroundServer/Program.cs b/src/ModularMonolith/ClassifiedAds.BackgroundServer/Program.cs index d0706e2ff..302f768b9 100644 --- a/src/ModularMonolith/ClassifiedAds.BackgroundServer/Program.cs +++ b/src/ModularMonolith/ClassifiedAds.BackgroundServer/Program.cs @@ -5,64 +5,63 @@ using ClassifiedAds.Infrastructure.Logging; using ClassifiedAds.Modules.Identity.Repositories; using ClassifiedAds.Modules.Storage.DTOs; -using ClassifiedAds.Modules.Storage.HostedServices; +using ClassifiedAds.Modules.Storage.MessageBusConsumers; using Microsoft.AspNetCore.DataProtection; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using System; -CreateHostBuilder(args).Build().Run(); +Host.CreateDefaultBuilder(args) +.UseWindowsService() +.UseClassifiedAdsLogger(configuration => +{ + var appSettings = new AppSettings(); + configuration.Bind(appSettings); + return appSettings.Logging; +}) +.ConfigureServices((hostContext, services) => +{ + var serviceProvider = services.BuildServiceProvider(); + var configuration = serviceProvider.GetService(); -static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .UseWindowsService() - .UseClassifiedAdsLogger(configuration => - { - var appSettings = new AppSettings(); - configuration.Bind(appSettings); - return appSettings.Logging; - }) - .ConfigureServices((hostContext, services) => - { - var serviceProvider = services.BuildServiceProvider(); - var configuration = serviceProvider.GetService(); + var appSettings = new AppSettings(); + configuration.Bind(appSettings); - var appSettings = new AppSettings(); - configuration.Bind(appSettings); - - var validationResult = appSettings.Validate(); - if (validationResult.Failed) - { - throw new Exception(validationResult.FailureMessage); - } + var validationResult = appSettings.Validate(); + if (validationResult.Failed) + { + throw new Exception(validationResult.FailureMessage); + } - services.Configure(configuration); + services.Configure(configuration); - services.AddScoped(); + services.AddScoped(); - services.AddDateTimeProvider(); + services.AddDateTimeProvider(); - services - .AddAuditLogModule(opt => configuration.GetSection("Modules:AuditLog").Bind(opt)) - .AddIdentityModuleCore(opt => configuration.GetSection("Modules:Identity").Bind(opt)) - .AddNotificationModule(opt => configuration.GetSection("Modules:Notification").Bind(opt)) - .AddProductModule(opt => configuration.GetSection("Modules:Product").Bind(opt)) - .AddStorageModule(opt => configuration.GetSection("Modules:Storage").Bind(opt)) - .AddApplicationServices(); + services + .AddAuditLogModule(opt => configuration.GetSection("Modules:AuditLog").Bind(opt)) + .AddIdentityModuleCore(opt => configuration.GetSection("Modules:Identity").Bind(opt)) + .AddNotificationModule(opt => configuration.GetSection("Modules:Notification").Bind(opt)) + .AddProductModule(opt => configuration.GetSection("Modules:Product").Bind(opt)) + .AddStorageModule(opt => configuration.GetSection("Modules:Storage").Bind(opt)) + .AddApplicationServices(); - services.AddDataProtection() - .PersistKeysToDbContext() - .SetApplicationName("ClassifiedAds"); + services.AddDataProtection() + .PersistKeysToDbContext() + .SetApplicationName("ClassifiedAds"); - services.AddTransient(); - services.AddMessageBusSender(appSettings.MessageBroker); - services.AddMessageBusSender(appSettings.MessageBroker); - services.AddMessageBusReceiver(appSettings.MessageBroker); - services.AddMessageBusReceiver(appSettings.MessageBroker); + services.AddTransient(); + services.AddMessageBusSender(appSettings.MessageBroker); + services.AddMessageBusSender(appSettings.MessageBroker); + services.AddMessageBusReceiver(appSettings.MessageBroker); + services.AddMessageBusReceiver(appSettings.MessageBroker); - services.AddHostedServicesIdentityModule(); - services.AddHostedServicesNotificationModule(); - services.AddHostedServicesProductModule(); - services.AddHostedServicesStorageModule(); - }); \ No newline at end of file + services.AddHostedServicesIdentityModule(); + services.AddHostedServicesNotificationModule(); + services.AddHostedServicesProductModule(); + services.AddHostedServicesStorageModule(); +}) +.Build() +.Run(); \ No newline at end of file diff --git a/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs index 1649a77e5..d8d9239bd 100644 --- a/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs +++ b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs @@ -1,4 +1,5 @@ -using System; +using ClassifiedAds.Domain.Entities; +using System; using System.Threading; using System.Threading.Tasks; @@ -11,6 +12,11 @@ Task SendAsync(T message, MetaData metaData = null, CancellationToken cancell Task ReceiveAsync(Func action, CancellationToken cancellationToken = default) where T : IMessageBusMessage; + + Task ReceiveAsync(CancellationToken cancellationToken = default) + where T : IMessageBusMessage; + + Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default); } public interface IMessageBusMessage diff --git a/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs new file mode 100644 index 000000000..f7e224669 --- /dev/null +++ b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs @@ -0,0 +1,9 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; + +public interface IMessageBusConsumer +{ + Task HandleAsync(T data, MetaData metaData, CancellationToken cancellationToken = default); +} diff --git a/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs new file mode 100644 index 000000000..1c9ef5f0f --- /dev/null +++ b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs @@ -0,0 +1,24 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; + +public interface IOutBoxEventPublisher +{ + static abstract string[] CanHandleEventTypes(); + + static abstract string CanHandleEventSource(); + + Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default); +} + +public class PublishingOutBoxEvent +{ + public string Id { get; set; } + + public string EventType { get; set; } + + public string EventSource { get; set; } + + public string Payload { get; set; } +} \ No newline at end of file diff --git a/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs index 0f9cd046a..8a4478651 100644 --- a/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs +++ b/src/ModularMonolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs @@ -1,5 +1,9 @@ using Microsoft.Extensions.DependencyInjection; using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Reflection; using System.Threading; using System.Threading.Tasks; @@ -8,6 +12,51 @@ namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; public class MessageBus : IMessageBus { private readonly IServiceProvider _serviceProvider; + private static List _consumers = new List(); + private static Dictionary> _outboxEventHandlers = new(); + + internal static void AddConsumers(Assembly assembly, IServiceCollection services) + { + var types = assembly.GetTypes() + .Where(x => x.GetInterfaces().Any(y => y.IsGenericType && y.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>))) + .ToList(); + + foreach (var type in types) + { + services.AddTransient(type); + } + + _consumers.AddRange(types); + } + + internal static void AddOutboxEventPublishers(Assembly assembly, IServiceCollection services) + { + var types = assembly.GetTypes() + .Where(x => x.GetInterfaces().Any(y => y == typeof(IOutBoxEventPublisher))) + .ToList(); + + foreach (var type in types) + { + services.AddTransient(type); + } + + foreach (var item in types) + { + var canHandlerEventTypes = (string[])item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventTypes), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); + var eventSource = (string)item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventSource), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); + + foreach (var eventType in canHandlerEventTypes) + { + var key = eventSource + ":" + eventType; + if (!_outboxEventHandlers.ContainsKey(key)) + { + _outboxEventHandlers[key] = new List(); + } + + _outboxEventHandlers[key].Add(item); + } + } + } public MessageBus(IServiceProvider serviceProvider) { @@ -25,4 +74,64 @@ public async Task SendAsync(T message, MetaData metaData = null, Cancellation { await _serviceProvider.GetRequiredService>().ReceiveAsync(action, cancellationToken); } + + public async Task ReceiveAsync(CancellationToken cancellationToken = default) + where T : IMessageBusMessage + { + await _serviceProvider.GetRequiredService>().ReceiveAsync(async (data, metaData) => + { + using var scope = _serviceProvider.CreateScope(); + foreach (Type handlerType in _consumers) + { + bool canHandleEvent = handlerType.GetInterfaces() + .Any(x => x.IsGenericType + && x.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>) + && x.GenericTypeArguments[0] == typeof(TConsumer) && x.GenericTypeArguments[1] == typeof(T)); + + if (canHandleEvent) + { + dynamic handler = scope.ServiceProvider.GetService(handlerType); + await handler.HandleAsync((dynamic)data, metaData, cancellationToken); + } + } + }, cancellationToken); + } + + public async Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + var key = outbox.EventSource + ":" + outbox.EventType; + var handlerTypes = _outboxEventHandlers.ContainsKey(key) ? _outboxEventHandlers[key] : null; + + if (handlerTypes == null) + { + // TODO: Take Note + return; + } + + foreach (var type in handlerTypes) + { + dynamic handler = _serviceProvider.GetService(type); + await handler.HandleAsync(outbox, cancellationToken); + } + } } + +public static class MessageBusExtentions +{ + public static void AddMessageBusConsumers(this IServiceCollection services, Assembly assembly) + { + MessageBus.AddConsumers(assembly, services); + } + + public static void AddOutboxEventPublishers(this IServiceCollection services, Assembly assembly) + { + MessageBus.AddOutboxEventPublishers(assembly, services); + } + + public static void AddMessageBus(this IServiceCollection services, Assembly assembly) + { + services.AddTransient(); + services.AddMessageBusConsumers(assembly); + services.AddOutboxEventPublishers(assembly); + } +} \ No newline at end of file diff --git a/src/ModularMonolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs b/src/ModularMonolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs new file mode 100644 index 000000000..98afe37f2 --- /dev/null +++ b/src/ModularMonolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs @@ -0,0 +1,27 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Infrastructure.HostedServices; + +public sealed class MessageBusConsumerBackgroundService : BackgroundService + where T : IMessageBusEvent +{ + private readonly ILogger> _logger; + private readonly IMessageBus _messageBus; + + public MessageBusConsumerBackgroundService(ILogger> logger, + IMessageBus messageBus) + { + _logger = logger; + _messageBus = messageBus; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + _messageBus.ReceiveAsync(stoppingToken); + return Task.CompletedTask; + } +} diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Product/Commands/PublishEventsCommand.cs b/src/ModularMonolith/ClassifiedAds.Modules.Product/Commands/PublishEventsCommand.cs index 780c6a452..7ce4377b2 100644 --- a/src/ModularMonolith/ClassifiedAds.Modules.Product/Commands/PublishEventsCommand.cs +++ b/src/ModularMonolith/ClassifiedAds.Modules.Product/Commands/PublishEventsCommand.cs @@ -1,13 +1,11 @@ using ClassifiedAds.Application; -using ClassifiedAds.Contracts.AuditLog.Services; using ClassifiedAds.CrossCuttingConcerns.DateTimes; +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; -using ClassifiedAds.Modules.Product.Constants; using ClassifiedAds.Modules.Product.Entities; using Microsoft.Extensions.Logging; using System; using System.Linq; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -23,17 +21,17 @@ public class PublishEventsCommandHandler : ICommandHandler private readonly ILogger _logger; private readonly IDateTimeProvider _dateTimeProvider; private readonly IRepository _outboxEventRepository; - private readonly IAuditLogService _externalAuditLogService; + private readonly IMessageBus _messageBus; public PublishEventsCommandHandler(ILogger logger, IDateTimeProvider dateTimeProvider, IRepository outboxEventRepository, - IAuditLogService externalAuditLogService) + IMessageBus messageBus) { _logger = logger; _dateTimeProvider = dateTimeProvider; _outboxEventRepository = outboxEventRepository; - _externalAuditLogService = externalAuditLogService; + _messageBus = messageBus; } public async Task HandleAsync(PublishEventsCommand command, CancellationToken cancellationToken = default) @@ -46,15 +44,15 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca foreach (var eventLog in events) { - if (eventLog.EventType == EventTypeConstants.AuditLogEntryCreated) + var outbox = new PublishingOutBoxEvent { - var logEntry = JsonSerializer.Deserialize(eventLog.Message); - await _externalAuditLogService.AddAsync(logEntry, eventLog.Id.ToString()); - } - else - { - // TODO: Take Note - } + Id = eventLog.Id.ToString(), + EventType = eventLog.EventType, + EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name, + Payload = eventLog.Message, + }; + + await _messageBus.SendAsync(outbox, cancellationToken); eventLog.Published = true; eventLog.UpdatedDateTime = _dateTimeProvider.OffsetNow; diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Product/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs b/src/ModularMonolith/ClassifiedAds.Modules.Product/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs new file mode 100644 index 000000000..463ca058e --- /dev/null +++ b/src/ModularMonolith/ClassifiedAds.Modules.Product/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs @@ -0,0 +1,41 @@ +using ClassifiedAds.Contracts.AuditLog.Services; +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Modules.Product.Commands; +using ClassifiedAds.Modules.Product.Constants; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Modules.Product.OutBoxEventPublishers; + +public class AuditLogEntryOutBoxEventPublisher : IOutBoxEventPublisher +{ + private readonly IMessageBus _messageBus; + private readonly IAuditLogService _externalAuditLogService; + + public static string[] CanHandleEventTypes() + { + return new string[] { EventTypeConstants.AuditLogEntryCreated }; + } + + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + + public AuditLogEntryOutBoxEventPublisher(IMessageBus messageBus, + IAuditLogService externalAuditLogService) + { + _messageBus = messageBus; + _externalAuditLogService = externalAuditLogService; + } + + public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + if (outbox.EventType == EventTypeConstants.AuditLogEntryCreated) + { + var logEntry = JsonSerializer.Deserialize(outbox.Payload); + await _externalAuditLogService.AddAsync(logEntry, outbox.Id); + } + } +} \ No newline at end of file diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Product/ProductModuleServiceCollectionExtensions.cs b/src/ModularMonolith/ClassifiedAds.Modules.Product/ProductModuleServiceCollectionExtensions.cs index 6fe5dffb3..6dfc9e306 100644 --- a/src/ModularMonolith/ClassifiedAds.Modules.Product/ProductModuleServiceCollectionExtensions.cs +++ b/src/ModularMonolith/ClassifiedAds.Modules.Product/ProductModuleServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ using ClassifiedAds.CrossCuttingConcerns.Csv; +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; using ClassifiedAds.Infrastructure.Csv; using ClassifiedAds.Modules.Product.Authorization; @@ -67,6 +68,9 @@ public static void MigrateProductDb(this IApplicationBuilder app) public static IServiceCollection AddHostedServicesProductModule(this IServiceCollection services) { + services.AddMessageBusConsumers(Assembly.GetExecutingAssembly()); + services.AddOutboxEventPublishers(Assembly.GetExecutingAssembly()); + services.AddHostedService(); return services; diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Storage/Commands/PublishEventsCommand.cs b/src/ModularMonolith/ClassifiedAds.Modules.Storage/Commands/PublishEventsCommand.cs index 668dfe4cf..42b5e0cb7 100644 --- a/src/ModularMonolith/ClassifiedAds.Modules.Storage/Commands/PublishEventsCommand.cs +++ b/src/ModularMonolith/ClassifiedAds.Modules.Storage/Commands/PublishEventsCommand.cs @@ -1,15 +1,11 @@ using ClassifiedAds.Application; -using ClassifiedAds.Contracts.AuditLog.Services; using ClassifiedAds.CrossCuttingConcerns.DateTimes; using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; -using ClassifiedAds.Modules.Storage.Constants; -using ClassifiedAds.Modules.Storage.DTOs; using ClassifiedAds.Modules.Storage.Entities; using Microsoft.Extensions.Logging; using System; using System.Linq; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -26,19 +22,16 @@ public class PublishEventsCommandHandler : ICommandHandler private readonly IDateTimeProvider _dateTimeProvider; private readonly IRepository _outboxEventRepository; private readonly IMessageBus _messageBus; - private readonly IAuditLogService _externalAuditLogService; public PublishEventsCommandHandler(ILogger logger, IDateTimeProvider dateTimeProvider, IRepository outboxEventRepository, - IMessageBus messageBus, - IAuditLogService externalAuditLogService) + IMessageBus messageBus) { _logger = logger; _dateTimeProvider = dateTimeProvider; _outboxEventRepository = outboxEventRepository; _messageBus = messageBus; - _externalAuditLogService = externalAuditLogService; } public async Task HandleAsync(PublishEventsCommand command, CancellationToken cancellationToken = default) @@ -51,23 +44,15 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca foreach (var eventLog in events) { - if (eventLog.EventType == EventTypeConstants.FileEntryCreated) + var outbox = new PublishingOutBoxEvent { - await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize(eventLog.Message) }); - } - else if (eventLog.EventType == EventTypeConstants.FileEntryDeleted) - { - await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize(eventLog.Message) }); - } - else if (eventLog.EventType == EventTypeConstants.AuditLogEntryCreated) - { - var logEntry = JsonSerializer.Deserialize(eventLog.Message); - await _externalAuditLogService.AddAsync(logEntry, eventLog.Id.ToString()); - } - else - { - // TODO: Take Note - } + Id = eventLog.Id.ToString(), + EventType = eventLog.EventType, + EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name, + Payload = eventLog.Message, + }; + + await _messageBus.SendAsync(outbox, cancellationToken); eventLog.Published = true; eventLog.UpdatedDateTime = _dateTimeProvider.OffsetNow; diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Storage/HostedServices/MessageBusReceiver.cs b/src/ModularMonolith/ClassifiedAds.Modules.Storage/HostedServices/MessageBusReceiver.cs deleted file mode 100644 index 3d26cfecd..000000000 --- a/src/ModularMonolith/ClassifiedAds.Modules.Storage/HostedServices/MessageBusReceiver.cs +++ /dev/null @@ -1,50 +0,0 @@ -using ClassifiedAds.Domain.Infrastructure.MessageBrokers; -using ClassifiedAds.Modules.Storage.DTOs; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System.Net.Http; -using System.Net.Http.Json; -using System.Threading; -using System.Threading.Tasks; - -namespace ClassifiedAds.Modules.Storage.HostedServices; - -internal class MessageBusReceiver : BackgroundService -{ - private static readonly HttpClient _httpClient = new HttpClient(); - - private readonly ILogger _logger; - private readonly IConfiguration _configuration; - private readonly IMessageBus _messageBus; - - public MessageBusReceiver(ILogger logger, - IConfiguration configuration, - IMessageBus messageBus) - { - _logger = logger; - _configuration = configuration; - _messageBus = messageBus; - } - - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - _messageBus.ReceiveAsync(async (data, metaData) => - { - var url = _configuration["Modules:Storage:Webhooks:FileUploadedEvent:PayloadUrl"]; - await _httpClient.PostAsJsonAsync(url, data.FileEntry); - }, stoppingToken); - - _messageBus.ReceiveAsync(async (data, metaData) => - { - var url = _configuration["Modules:Storage:Webhooks:FileDeletedEvent:PayloadUrl"]; - await _httpClient.PostAsJsonAsync(url, data.FileEntry); - }, stoppingToken); - - return Task.CompletedTask; - } -} - -public sealed class WebhookConsumer -{ -} diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Storage/MessageBusConsumers/WebhookConsumer.cs b/src/ModularMonolith/ClassifiedAds.Modules.Storage/MessageBusConsumers/WebhookConsumer.cs new file mode 100644 index 000000000..f3ef5a8a8 --- /dev/null +++ b/src/ModularMonolith/ClassifiedAds.Modules.Storage/MessageBusConsumers/WebhookConsumer.cs @@ -0,0 +1,39 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Modules.Storage.DTOs; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using System.Net.Http; +using System.Net.Http.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Modules.Storage.MessageBusConsumers; + +public sealed class WebhookConsumer : + IMessageBusConsumer, + IMessageBusConsumer +{ + private static readonly HttpClient _httpClient = new HttpClient(); + + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + + public WebhookConsumer(ILogger logger, + IConfiguration configuration) + { + _logger = logger; + _configuration = configuration; + } + + public async Task HandleAsync(FileUploadedEvent data, MetaData metaData, CancellationToken cancellationToken = default) + { + var url = _configuration["Modules:Storage:Webhooks:FileUploadedEvent:PayloadUrl"]; + await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken); + } + + public async Task HandleAsync(FileDeletedEvent data, MetaData metaData, CancellationToken cancellationToken = default) + { + var url = _configuration["Modules:Storage:Webhooks:FileDeletedEvent:PayloadUrl"]; + await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken); + } +} diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs b/src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs new file mode 100644 index 000000000..7f7b2cce2 --- /dev/null +++ b/src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/AuditLogEntryOutBoxEventPublisher.cs @@ -0,0 +1,41 @@ +using ClassifiedAds.Contracts.AuditLog.Services; +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Modules.Storage.Commands; +using ClassifiedAds.Modules.Storage.Constants; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Modules.Storage.OutBoxEventPublishers; + +public class AuditLogEntryOutBoxEventPublisher : IOutBoxEventPublisher +{ + private readonly IMessageBus _messageBus; + private readonly IAuditLogService _externalAuditLogService; + + public static string[] CanHandleEventTypes() + { + return new string[] { EventTypeConstants.AuditLogEntryCreated }; + } + + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + + public AuditLogEntryOutBoxEventPublisher(IMessageBus messageBus, + IAuditLogService externalAuditLogService) + { + _messageBus = messageBus; + _externalAuditLogService = externalAuditLogService; + } + + public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + if (outbox.EventType == EventTypeConstants.AuditLogEntryCreated) + { + var logEntry = JsonSerializer.Deserialize(outbox.Payload); + await _externalAuditLogService.AddAsync(logEntry, outbox.Id); + } + } +} \ No newline at end of file diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs b/src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs new file mode 100644 index 000000000..7c4a4d0aa --- /dev/null +++ b/src/ModularMonolith/ClassifiedAds.Modules.Storage/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs @@ -0,0 +1,42 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Modules.Storage.Commands; +using ClassifiedAds.Modules.Storage.Constants; +using ClassifiedAds.Modules.Storage.DTOs; +using ClassifiedAds.Modules.Storage.Entities; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Modules.Storage.OutBoxEventPublishers; + +public class FileEntryOutBoxEventPublisher : IOutBoxEventPublisher +{ + private readonly IMessageBus _messageBus; + + public static string[] CanHandleEventTypes() + { + return new string[] { EventTypeConstants.FileEntryCreated, EventTypeConstants.FileEntryDeleted }; + } + + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + + public FileEntryOutBoxEventPublisher(IMessageBus messageBus) + { + _messageBus = messageBus; + } + + public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + if (outbox.EventType == EventTypeConstants.FileEntryCreated) + { + await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize(outbox.Payload) }, cancellationToken: cancellationToken); + } + else if (outbox.EventType == EventTypeConstants.FileEntryDeleted) + { + await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize(outbox.Payload) }, cancellationToken: cancellationToken); + } + } +} \ No newline at end of file diff --git a/src/ModularMonolith/ClassifiedAds.Modules.Storage/StorageModuleServiceCollectionExtensions.cs b/src/ModularMonolith/ClassifiedAds.Modules.Storage/StorageModuleServiceCollectionExtensions.cs index bd9194646..1b2364e03 100644 --- a/src/ModularMonolith/ClassifiedAds.Modules.Storage/StorageModuleServiceCollectionExtensions.cs +++ b/src/ModularMonolith/ClassifiedAds.Modules.Storage/StorageModuleServiceCollectionExtensions.cs @@ -1,8 +1,12 @@ -using ClassifiedAds.Domain.Repositories; +using ClassifiedAds.Infrastructure.HostedServices; +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Domain.Repositories; using ClassifiedAds.Modules.Storage.Authorization; using ClassifiedAds.Modules.Storage.ConfigurationOptions; +using ClassifiedAds.Modules.Storage.DTOs; using ClassifiedAds.Modules.Storage.Entities; using ClassifiedAds.Modules.Storage.HostedServices; +using ClassifiedAds.Modules.Storage.MessageBusConsumers; using ClassifiedAds.Modules.Storage.Repositories; using Microsoft.AspNetCore.Builder; using Microsoft.EntityFrameworkCore; @@ -55,7 +59,11 @@ public static void MigrateStorageDb(this IApplicationBuilder app) public static IServiceCollection AddHostedServicesStorageModule(this IServiceCollection services) { - services.AddHostedService(); + services.AddMessageBusConsumers(Assembly.GetExecutingAssembly()); + services.AddOutboxEventPublishers(Assembly.GetExecutingAssembly()); + + services.AddHostedService>(); + services.AddHostedService>(); services.AddHostedService(); return services; diff --git a/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs b/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs index c37782498..44d5de537 100644 --- a/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs +++ b/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs @@ -1,13 +1,10 @@ -using ClassifiedAds.Application.FileEntries.DTOs; -using ClassifiedAds.CrossCuttingConcerns.DateTimes; -using ClassifiedAds.Domain.Constants; +using ClassifiedAds.CrossCuttingConcerns.DateTimes; using ClassifiedAds.Domain.Entities; using ClassifiedAds.Domain.Infrastructure.MessageBrokers; using ClassifiedAds.Domain.Repositories; using Microsoft.Extensions.Logging; using System; using System.Linq; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -46,19 +43,15 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca foreach (var eventLog in events) { - if (eventLog.EventType == EventTypeConstants.FileEntryCreated) + var outbox = new PublishingOutBoxEvent { - await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize(eventLog.Message) }, cancellationToken: cancellationToken); - } - else if (eventLog.EventType == EventTypeConstants.FileEntryDeleted) - { - await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize(eventLog.Message) }, cancellationToken: cancellationToken); - } - else - { - // TODO: Take Note - } + Id = eventLog.Id.ToString(), + EventType = eventLog.EventType, + EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name, + Payload = eventLog.Message, + }; + await _messageBus.SendAsync(outbox, cancellationToken); eventLog.Published = true; eventLog.UpdatedDateTime = _dateTimeProvider.OffsetNow; await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken); diff --git a/src/Monolith/ClassifiedAds.BackgroundServer/HostedServices/MessageBusReceiver.cs b/src/Monolith/ClassifiedAds.BackgroundServer/HostedServices/MessageBusReceiver.cs deleted file mode 100644 index d9e6e1e1b..000000000 --- a/src/Monolith/ClassifiedAds.BackgroundServer/HostedServices/MessageBusReceiver.cs +++ /dev/null @@ -1,50 +0,0 @@ -using ClassifiedAds.Application.FileEntries.DTOs; -using ClassifiedAds.Domain.Infrastructure.MessageBrokers; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System.Net.Http; -using System.Net.Http.Json; -using System.Threading; -using System.Threading.Tasks; - -namespace ClassifiedAds.BackgroundServer.HostedServices; - -internal sealed class MessageBusReceiver : BackgroundService -{ - private static readonly HttpClient _httpClient = new HttpClient(); - - private readonly ILogger _logger; - private readonly IConfiguration _configuration; - private readonly IMessageBus _messageBus; - - public MessageBusReceiver(ILogger logger, - IConfiguration configuration, - IMessageBus messageBus) - { - _logger = logger; - _configuration = configuration; - _messageBus = messageBus; - } - - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - _messageBus.ReceiveAsync(async (data, metaData) => - { - var url = _configuration["Webhooks:FileUploadedEvent:PayloadUrl"]; - await _httpClient.PostAsJsonAsync(url, data.FileEntry); - }, stoppingToken); - - _messageBus.ReceiveAsync(async (data, metaData) => - { - var url = _configuration["Webhooks:FileDeletedEvent:PayloadUrl"]; - await _httpClient.PostAsJsonAsync(url, data.FileEntry); - }, stoppingToken); - - return Task.CompletedTask; - } -} - -public sealed class WebhookConsumer -{ -} diff --git a/src/Monolith/ClassifiedAds.BackgroundServer/MessageBusConsumers/WebhookConsumer.cs b/src/Monolith/ClassifiedAds.BackgroundServer/MessageBusConsumers/WebhookConsumer.cs new file mode 100644 index 000000000..0334fa45f --- /dev/null +++ b/src/Monolith/ClassifiedAds.BackgroundServer/MessageBusConsumers/WebhookConsumer.cs @@ -0,0 +1,39 @@ +using ClassifiedAds.Application.FileEntries.DTOs; +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using System.Net.Http; +using System.Net.Http.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.BackgroundServer.MessageBusConsumers; + +public sealed class WebhookConsumer : + IMessageBusConsumer, + IMessageBusConsumer +{ + private static readonly HttpClient _httpClient = new HttpClient(); + + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + + public WebhookConsumer(ILogger logger, + IConfiguration configuration) + { + _logger = logger; + _configuration = configuration; + } + + public async Task HandleAsync(FileUploadedEvent data, MetaData metaData, CancellationToken cancellationToken = default) + { + var url = _configuration["Webhooks:FileUploadedEvent:PayloadUrl"]; + await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken); + } + + public async Task HandleAsync(FileDeletedEvent data, MetaData metaData, CancellationToken cancellationToken = default) + { + var url = _configuration["Webhooks:FileDeletedEvent:PayloadUrl"]; + await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken); + } +} \ No newline at end of file diff --git a/src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs b/src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs new file mode 100644 index 000000000..d0abde48d --- /dev/null +++ b/src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs @@ -0,0 +1,42 @@ +using ClassifiedAds.Application.EventLogs.Commands; +using ClassifiedAds.Application.FileEntries.DTOs; +using ClassifiedAds.Domain.Constants; +using ClassifiedAds.Domain.Entities; +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.BackgroundServer.OutBoxEventPublishers; + +public class FileEntryOutBoxEventPublisher : IOutBoxEventPublisher +{ + private readonly IMessageBus _messageBus; + + public static string[] CanHandleEventTypes() + { + return new string[] { EventTypeConstants.FileEntryCreated, EventTypeConstants.FileEntryDeleted }; + } + + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + + public FileEntryOutBoxEventPublisher(IMessageBus messageBus) + { + _messageBus = messageBus; + } + + public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + if (outbox.EventType == EventTypeConstants.FileEntryCreated) + { + await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize(outbox.Payload) }, cancellationToken: cancellationToken); + } + else if (outbox.EventType == EventTypeConstants.FileEntryDeleted) + { + await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize(outbox.Payload) }, cancellationToken: cancellationToken); + } + } +} diff --git a/src/Monolith/ClassifiedAds.BackgroundServer/Program.cs b/src/Monolith/ClassifiedAds.BackgroundServer/Program.cs index 2bc02716e..61b56d330 100644 --- a/src/Monolith/ClassifiedAds.BackgroundServer/Program.cs +++ b/src/Monolith/ClassifiedAds.BackgroundServer/Program.cs @@ -2,84 +2,81 @@ using ClassifiedAds.BackgroundServer.ConfigurationOptions; using ClassifiedAds.BackgroundServer.HostedServices; using ClassifiedAds.BackgroundServer.Identity; +using ClassifiedAds.BackgroundServer.MessageBusConsumers; using ClassifiedAds.CrossCuttingConcerns.Exceptions; using ClassifiedAds.Domain.Identity; using ClassifiedAds.Domain.IdentityProviders; using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using ClassifiedAds.Infrastructure.HostedServices; using ClassifiedAds.Infrastructure.IdentityProviders.Auth0; using ClassifiedAds.Infrastructure.IdentityProviders.Azure; using ClassifiedAds.Infrastructure.Logging; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using System.Reflection; -namespace ClassifiedAds.BackgroundServer; - -public class Program +Host.CreateDefaultBuilder(args) +.UseWindowsService() +.UseClassifiedAdsLogger(configuration => { - public static void Main(string[] args) - { - CreateHostBuilder(args).Build().Run(); - } - - public static IHostBuilder CreateHostBuilder(string[] args) => - Host.CreateDefaultBuilder(args) - .UseWindowsService() - .UseClassifiedAdsLogger(configuration => - { - var appSettings = new AppSettings(); - configuration.Bind(appSettings); - return appSettings.Logging; - }) - .ConfigureServices((hostContext, services) => - { - var serviceProvider = services.BuildServiceProvider(); - var configuration = serviceProvider.GetService(); + var appSettings = new AppSettings(); + configuration.Bind(appSettings); + return appSettings.Logging; +}) +.ConfigureServices((hostContext, services) => +{ + var serviceProvider = services.BuildServiceProvider(); + var configuration = serviceProvider.GetService(); - var appSettings = new AppSettings(); - configuration.Bind(appSettings); + var appSettings = new AppSettings(); + configuration.Bind(appSettings); - var validationResult = appSettings.Validate(); - if (validationResult.Failed) - { - throw new ValidationException(validationResult.FailureMessage); - } + var validationResult = appSettings.Validate(); + if (validationResult.Failed) + { + throw new ValidationException(validationResult.FailureMessage); + } - services.Configure(configuration); + services.Configure(configuration); - services.AddScoped(); + services.AddScoped(); - services.AddDateTimeProvider(); - services.AddPersistence(appSettings.ConnectionStrings.ClassifiedAds) - .AddDomainServices() - .AddApplicationServices() - .AddMessageHandlers(); + services.AddDateTimeProvider(); + services.AddPersistence(appSettings.ConnectionStrings.ClassifiedAds) + .AddDomainServices() + .AddApplicationServices() + .AddMessageHandlers(); - services.AddTransient(); - services.AddMessageBusSender(appSettings.MessageBroker); - services.AddMessageBusSender(appSettings.MessageBroker); - services.AddMessageBusReceiver(appSettings.MessageBroker); - services.AddMessageBusReceiver(appSettings.MessageBroker); + services.AddTransient(); + services.AddMessageBusSender(appSettings.MessageBroker); + services.AddMessageBusSender(appSettings.MessageBroker); + services.AddMessageBusReceiver(appSettings.MessageBroker); + services.AddMessageBusReceiver(appSettings.MessageBroker); + services.AddMessageBusConsumers(Assembly.GetExecutingAssembly()); + services.AddOutboxEventPublishers(Assembly.GetExecutingAssembly()); - services.AddNotificationServices(appSettings.Notification); + services.AddNotificationServices(appSettings.Notification); - services.AddWebNotification(appSettings.Notification.Web); + services.AddWebNotification(appSettings.Notification.Web); - if (appSettings.IdentityProviders?.Auth0?.Enabled ?? false) - { - services.AddSingleton(new Auth0IdentityProvider(appSettings.IdentityProviders.Auth0)); - } + if (appSettings.IdentityProviders?.Auth0?.Enabled ?? false) + { + services.AddSingleton(new Auth0IdentityProvider(appSettings.IdentityProviders.Auth0)); + } - if (appSettings.IdentityProviders?.AzureActiveDirectoryB2C?.Enabled ?? false) - { - services.AddSingleton(new AzureActiveDirectoryB2CIdentityProvider(appSettings.IdentityProviders.AzureActiveDirectoryB2C)); - } + if (appSettings.IdentityProviders?.AzureActiveDirectoryB2C?.Enabled ?? false) + { + services.AddSingleton(new AzureActiveDirectoryB2CIdentityProvider(appSettings.IdentityProviders.AzureActiveDirectoryB2C)); + } - services.AddHostedService(); - services.AddHostedService(); - services.AddHostedService(); - services.AddHostedService(); - services.AddHostedService(); - services.AddHostedService(); - }); -} + services.AddHostedService>(); + services.AddHostedService>(); + services.AddHostedService(); + services.AddHostedService(); + services.AddHostedService(); + services.AddHostedService(); + services.AddHostedService(); +}) +.Build() +.Run(); diff --git a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs index 1649a77e5..d8d9239bd 100644 --- a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs +++ b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBus.cs @@ -1,4 +1,5 @@ -using System; +using ClassifiedAds.Domain.Entities; +using System; using System.Threading; using System.Threading.Tasks; @@ -11,6 +12,11 @@ Task SendAsync(T message, MetaData metaData = null, CancellationToken cancell Task ReceiveAsync(Func action, CancellationToken cancellationToken = default) where T : IMessageBusMessage; + + Task ReceiveAsync(CancellationToken cancellationToken = default) + where T : IMessageBusMessage; + + Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default); } public interface IMessageBusMessage diff --git a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs new file mode 100644 index 000000000..f7e224669 --- /dev/null +++ b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IMessageBusConsumer.cs @@ -0,0 +1,9 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; + +public interface IMessageBusConsumer +{ + Task HandleAsync(T data, MetaData metaData, CancellationToken cancellationToken = default); +} diff --git a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs new file mode 100644 index 000000000..1c9ef5f0f --- /dev/null +++ b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs @@ -0,0 +1,24 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; + +public interface IOutBoxEventPublisher +{ + static abstract string[] CanHandleEventTypes(); + + static abstract string CanHandleEventSource(); + + Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default); +} + +public class PublishingOutBoxEvent +{ + public string Id { get; set; } + + public string EventType { get; set; } + + public string EventSource { get; set; } + + public string Payload { get; set; } +} \ No newline at end of file diff --git a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs index 0f9cd046a..8a4478651 100644 --- a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs +++ b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs @@ -1,5 +1,9 @@ using Microsoft.Extensions.DependencyInjection; using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Reflection; using System.Threading; using System.Threading.Tasks; @@ -8,6 +12,51 @@ namespace ClassifiedAds.Domain.Infrastructure.MessageBrokers; public class MessageBus : IMessageBus { private readonly IServiceProvider _serviceProvider; + private static List _consumers = new List(); + private static Dictionary> _outboxEventHandlers = new(); + + internal static void AddConsumers(Assembly assembly, IServiceCollection services) + { + var types = assembly.GetTypes() + .Where(x => x.GetInterfaces().Any(y => y.IsGenericType && y.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>))) + .ToList(); + + foreach (var type in types) + { + services.AddTransient(type); + } + + _consumers.AddRange(types); + } + + internal static void AddOutboxEventPublishers(Assembly assembly, IServiceCollection services) + { + var types = assembly.GetTypes() + .Where(x => x.GetInterfaces().Any(y => y == typeof(IOutBoxEventPublisher))) + .ToList(); + + foreach (var type in types) + { + services.AddTransient(type); + } + + foreach (var item in types) + { + var canHandlerEventTypes = (string[])item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventTypes), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); + var eventSource = (string)item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventSource), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); + + foreach (var eventType in canHandlerEventTypes) + { + var key = eventSource + ":" + eventType; + if (!_outboxEventHandlers.ContainsKey(key)) + { + _outboxEventHandlers[key] = new List(); + } + + _outboxEventHandlers[key].Add(item); + } + } + } public MessageBus(IServiceProvider serviceProvider) { @@ -25,4 +74,64 @@ public async Task SendAsync(T message, MetaData metaData = null, Cancellation { await _serviceProvider.GetRequiredService>().ReceiveAsync(action, cancellationToken); } + + public async Task ReceiveAsync(CancellationToken cancellationToken = default) + where T : IMessageBusMessage + { + await _serviceProvider.GetRequiredService>().ReceiveAsync(async (data, metaData) => + { + using var scope = _serviceProvider.CreateScope(); + foreach (Type handlerType in _consumers) + { + bool canHandleEvent = handlerType.GetInterfaces() + .Any(x => x.IsGenericType + && x.GetGenericTypeDefinition() == typeof(IMessageBusConsumer<,>) + && x.GenericTypeArguments[0] == typeof(TConsumer) && x.GenericTypeArguments[1] == typeof(T)); + + if (canHandleEvent) + { + dynamic handler = scope.ServiceProvider.GetService(handlerType); + await handler.HandleAsync((dynamic)data, metaData, cancellationToken); + } + } + }, cancellationToken); + } + + public async Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) + { + var key = outbox.EventSource + ":" + outbox.EventType; + var handlerTypes = _outboxEventHandlers.ContainsKey(key) ? _outboxEventHandlers[key] : null; + + if (handlerTypes == null) + { + // TODO: Take Note + return; + } + + foreach (var type in handlerTypes) + { + dynamic handler = _serviceProvider.GetService(type); + await handler.HandleAsync(outbox, cancellationToken); + } + } } + +public static class MessageBusExtentions +{ + public static void AddMessageBusConsumers(this IServiceCollection services, Assembly assembly) + { + MessageBus.AddConsumers(assembly, services); + } + + public static void AddOutboxEventPublishers(this IServiceCollection services, Assembly assembly) + { + MessageBus.AddOutboxEventPublishers(assembly, services); + } + + public static void AddMessageBus(this IServiceCollection services, Assembly assembly) + { + services.AddTransient(); + services.AddMessageBusConsumers(assembly); + services.AddOutboxEventPublishers(assembly); + } +} \ No newline at end of file diff --git a/src/Monolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs b/src/Monolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs new file mode 100644 index 000000000..98afe37f2 --- /dev/null +++ b/src/Monolith/ClassifiedAds.Infrastructure/HostedServices/MessageBusConsumerBackgroundService.cs @@ -0,0 +1,27 @@ +using ClassifiedAds.Domain.Infrastructure.MessageBrokers; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System.Threading; +using System.Threading.Tasks; + +namespace ClassifiedAds.Infrastructure.HostedServices; + +public sealed class MessageBusConsumerBackgroundService : BackgroundService + where T : IMessageBusEvent +{ + private readonly ILogger> _logger; + private readonly IMessageBus _messageBus; + + public MessageBusConsumerBackgroundService(ILogger> logger, + IMessageBus messageBus) + { + _logger = logger; + _messageBus = messageBus; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + _messageBus.ReceiveAsync(stoppingToken); + return Task.CompletedTask; + } +}