From 6485ea9680e0b5daa2dc9f74bd64c59a867aff20 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Tue, 17 May 2022 17:16:03 +0200 Subject: [PATCH 1/3] Fixed Kafka integration: - added the initializer to docker compose to set the Kafka topics upfront. - fixed EventEnvelope creation to use generic one. Changed the non-generic from record to interface to make sure that it won't be used. - made sure that EventBus takes only EventEnvelope. - created EventEnvelope in the external publishing. - used handler in KafkaConsumer to deserialise contracts with private constructors, - changed KafkaConsumer setting to manually commit offset. --- .../Events/EventEnvelopeExtensions.cs | 6 +-- Core.Kafka.Tests/KafkaProducerTests.cs | 2 +- Core.Kafka/Consumers/KafkaConsumer.cs | 5 +- Core.Kafka/Producers/KafkaProducer.cs | 2 +- .../Subscriptions/MartenEventPublisher.cs | 2 +- Core.Testing/DummyExternalEventPublisher.cs | 2 +- Core/Config.cs | 2 +- Core/Events/EventBus.cs | 12 ++--- Core/Events/EventEnvelope.cs | 26 ++++++++--- Core/Events/External/IExternaEventProducer.cs | 4 +- .../External/NulloExternalEventProducer.cs | 2 +- Core/Tracing/TracingScope.cs | 4 +- .../Carts/Carts/ShoppingCarts/Config.cs | 2 +- .../FinalizingCart/ShoppingCartFinalized.cs | 25 ++++++---- .../Orders/Orders.Api/appsettings.json | 3 +- Sample/ECommerce/PracticalEventSourcing.sln | 38 +++++++++++++++ Sample/ECommerce/docker/docker-compose.yml | 46 +++++++++++++------ docker-compose.yml | 24 +++++++++- 18 files changed, 154 insertions(+), 53 deletions(-) diff --git a/Core.EventStoreDB/Events/EventEnvelopeExtensions.cs b/Core.EventStoreDB/Events/EventEnvelopeExtensions.cs index 2db7944c8..89931e5a2 100644 --- a/Core.EventStoreDB/Events/EventEnvelopeExtensions.cs +++ b/Core.EventStoreDB/Events/EventEnvelopeExtensions.cs @@ -6,7 +6,7 @@ namespace Core.EventStoreDB.Events; public static class EventEnvelopeExtensions { - public static EventEnvelope? ToEventEnvelope(this ResolvedEvent resolvedEvent) + public static IEventEnvelope? ToEventEnvelope(this ResolvedEvent resolvedEvent) { var eventData = resolvedEvent.Deserialize(); var eventMetadata = resolvedEvent.DeserializeMetadata(); @@ -20,7 +20,7 @@ public static class EventEnvelopeExtensions resolvedEvent.Event.Position.CommitPosition, eventMetadata ); - var type = typeof(EventEnvelope<>).MakeGenericType(eventData.GetType()); - return (EventEnvelope)Activator.CreateInstance(type, eventData, metaData)!; + + return EventEnvelopeFactory.From(eventData, metaData); } } diff --git a/Core.Kafka.Tests/KafkaProducerTests.cs b/Core.Kafka.Tests/KafkaProducerTests.cs index 8a2e7cd97..e3eb8babd 100644 --- a/Core.Kafka.Tests/KafkaProducerTests.cs +++ b/Core.Kafka.Tests/KafkaProducerTests.cs @@ -21,7 +21,7 @@ public class KafkaProducerTests ) .Build(); - private readonly EventEnvelope eventEnvelope = + private readonly IEventEnvelope eventEnvelope = new EventEnvelope(new TestEvent("test", 123), new EventMetadata("123", 1, 1, null)); [Fact] diff --git a/Core.Kafka/Consumers/KafkaConsumer.cs b/Core.Kafka/Consumers/KafkaConsumer.cs index 6dab20c5c..034a1b830 100644 --- a/Core.Kafka/Consumers/KafkaConsumer.cs +++ b/Core.Kafka/Consumers/KafkaConsumer.cs @@ -2,6 +2,7 @@ using Core.Events; using Core.Events.External; using Core.Reflection; +using Core.Serialization.Newtonsoft; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -72,10 +73,12 @@ private async Task ConsumeNextEvent(IConsumer consumer, Cancella var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType); // deserialize event - var @event = JsonConvert.DeserializeObject(message.Message.Value, eventEnvelopeType)!; + var @event = message.Message.Value.FromJson(eventEnvelopeType); // publish event to internal event bus await eventBus.Publish(@event, cancellationToken); + + consumer.Commit(); } catch (Exception e) { diff --git a/Core.Kafka/Producers/KafkaProducer.cs b/Core.Kafka/Producers/KafkaProducer.cs index 7e587c461..99ded1567 100644 --- a/Core.Kafka/Producers/KafkaProducer.cs +++ b/Core.Kafka/Producers/KafkaProducer.cs @@ -22,7 +22,7 @@ ILogger logger config = configuration.GetKafkaProducerConfig(); } - public async Task Publish(EventEnvelope @event, CancellationToken ct) + public async Task Publish(IEventEnvelope @event, CancellationToken ct) { try { diff --git a/Core.Marten/Subscriptions/MartenEventPublisher.cs b/Core.Marten/Subscriptions/MartenEventPublisher.cs index 09059778f..774f21f88 100644 --- a/Core.Marten/Subscriptions/MartenEventPublisher.cs +++ b/Core.Marten/Subscriptions/MartenEventPublisher.cs @@ -37,7 +37,7 @@ public async Task ConsumeAsync(IDocumentOperations documentOperations, IReadOnly ) ); - await eventBus.Publish(new EventEnvelope(@event.Data, eventMetadata), ct); + await eventBus.Publish(EventEnvelopeFactory.From(@event.Data, eventMetadata), ct); } } } diff --git a/Core.Testing/DummyExternalEventPublisher.cs b/Core.Testing/DummyExternalEventPublisher.cs index abafe3567..7bd2777c5 100644 --- a/Core.Testing/DummyExternalEventPublisher.cs +++ b/Core.Testing/DummyExternalEventPublisher.cs @@ -7,7 +7,7 @@ public class DummyExternalEventProducer: IExternalEventProducer { public IList PublishedEvents { get; } = new List(); - public Task Publish(EventEnvelope @event, CancellationToken ct) + public Task Publish(IEventEnvelope @event, CancellationToken ct) { PublishedEvents.Add(@event.Data); diff --git a/Core/Config.cs b/Core/Config.cs index 5f38eb4b0..23caf594c 100644 --- a/Core/Config.cs +++ b/Core/Config.cs @@ -43,7 +43,7 @@ public static IServiceCollection AddTracing(this IServiceCollection services) sp.GetRequiredService().CreateTraceScope(scopedServiceProvider, traceMetadata) ); - services.TryAddScoped>(sp => + services.TryAddScoped>(sp => (scopedServiceProvider, eventEnvelope) => sp.GetRequiredService() .CreateTraceScope(scopedServiceProvider, eventEnvelope) ); diff --git a/Core/Events/EventBus.cs b/Core/Events/EventBus.cs index 631089233..52e20a15e 100644 --- a/Core/Events/EventBus.cs +++ b/Core/Events/EventBus.cs @@ -15,13 +15,13 @@ public interface IEventBus public class EventBus: IEventBus { private readonly IServiceProvider serviceProvider; - private readonly Func createTracingScope; + private readonly Func createTracingScope; private readonly AsyncPolicy retryPolicy; private static readonly ConcurrentDictionary PublishMethods = new(); public EventBus( IServiceProvider serviceProvider, - Func createTracingScope, + Func createTracingScope, AsyncPolicy retryPolicy ) { @@ -32,7 +32,7 @@ AsyncPolicy retryPolicy private async Task Publish(TEvent @event, CancellationToken ct) { - var eventEnvelope = @event as EventEnvelope; + var eventEnvelope = @event as IEventEnvelope; using var scope = serviceProvider.CreateScope(); using var tracingScope = createTracingScope(scope.ServiceProvider, eventEnvelope); @@ -52,9 +52,9 @@ public async Task Publish(object @event, CancellationToken ct) { // if it's an event envelope, publish also just event data // thanks to that both handlers with envelope and without will be called - if (@event is EventEnvelope(var data, _)) - await (Task)GetGenericPublishFor(data) - .Invoke(this, new[] { data, ct })!; + if (@event is IEventEnvelope eventEnvelope) + await (Task)GetGenericPublishFor(eventEnvelope.Data) + .Invoke(this, new[] { eventEnvelope.Data, ct })!; await (Task)GetGenericPublishFor(@event) .Invoke(this, new[] { @event, ct })!; diff --git a/Core/Events/EventEnvelope.cs b/Core/Events/EventEnvelope.cs index 43e2f0be0..0693cf757 100644 --- a/Core/Events/EventEnvelope.cs +++ b/Core/Events/EventEnvelope.cs @@ -1,4 +1,5 @@ -using Core.Tracing; +using System.Reflection; +using Core.Tracing; namespace Core.Events; @@ -9,15 +10,26 @@ public record EventMetadata( TraceMetadata? Trace ); -public record EventEnvelope( - object Data, - EventMetadata Metadata -); +public interface IEventEnvelope +{ + object Data { get; } + EventMetadata Metadata { get; init; } +} public record EventEnvelope( T Data, EventMetadata Metadata -): EventEnvelope(Data, Metadata) where T : notnull +): IEventEnvelope where T : notnull +{ + object IEventEnvelope.Data => Data; +} + +public static class EventEnvelopeFactory { - public new T Data => (T)base.Data; + public static IEventEnvelope From(object data, EventMetadata metadata) + { + //TODO: Get rid of reflection! + var type = typeof(EventEnvelope<>).MakeGenericType(data.GetType()); + return (IEventEnvelope)Activator.CreateInstance(type, data, metadata)!; + } } diff --git a/Core/Events/External/IExternaEventProducer.cs b/Core/Events/External/IExternaEventProducer.cs index 4731a7479..c64b16c1a 100644 --- a/Core/Events/External/IExternaEventProducer.cs +++ b/Core/Events/External/IExternaEventProducer.cs @@ -2,7 +2,7 @@ namespace Core.Events.External; public interface IExternalEventProducer { - Task Publish(EventEnvelope @event, CancellationToken ct); + Task Publish(IEventEnvelope @event, CancellationToken ct); } @@ -24,7 +24,7 @@ public async Task Publish(object @event, CancellationToken ct) { await eventBus.Publish(@event, ct); - if (@event is EventEnvelope { Data: IExternalEvent } eventEnvelope) + if (@event is IEventEnvelope { Data: IExternalEvent } eventEnvelope) { await externalEventProducer.Publish(eventEnvelope, ct); } diff --git a/Core/Events/External/NulloExternalEventProducer.cs b/Core/Events/External/NulloExternalEventProducer.cs index 9cab89ad4..f554bae86 100644 --- a/Core/Events/External/NulloExternalEventProducer.cs +++ b/Core/Events/External/NulloExternalEventProducer.cs @@ -2,7 +2,7 @@ namespace Core.Events.External; public class NulloExternalEventProducer : IExternalEventProducer { - public Task Publish(EventEnvelope @event, CancellationToken ct) + public Task Publish(IEventEnvelope @event, CancellationToken ct) { return Task.CompletedTask; } diff --git a/Core/Tracing/TracingScope.cs b/Core/Tracing/TracingScope.cs index 87c09b2b1..c8c430162 100644 --- a/Core/Tracing/TracingScope.cs +++ b/Core/Tracing/TracingScope.cs @@ -70,12 +70,12 @@ public static class TraceScopeFactoryExtensions { public static TracingScope CreateTraceScope( this ITracingScopeFactory tracingScopeFactory, - IServiceProvider serviceProvider, EventEnvelope? eventEnvelope) + IServiceProvider serviceProvider, IEventEnvelope? eventEnvelope) { if (eventEnvelope == null) return tracingScopeFactory.CreateTraceScope(serviceProvider); - var (_, eventMetadata) = eventEnvelope; + var eventMetadata = eventEnvelope.Metadata; var newCausationId = new CausationId(eventMetadata.EventId); diff --git a/Sample/ECommerce/Carts/Carts/ShoppingCarts/Config.cs b/Sample/ECommerce/Carts/Carts/ShoppingCarts/Config.cs index 4a3dc8b69..13dd49c42 100644 --- a/Sample/ECommerce/Carts/Carts/ShoppingCarts/Config.cs +++ b/Sample/ECommerce/Carts/Carts/ShoppingCarts/Config.cs @@ -42,7 +42,7 @@ private static IServiceCollection AddQueryHandlers(this IServiceCollection servi .AddQueryHandler, HandleGetCartHistory>(); private static IServiceCollection AddEventHandlers(this IServiceCollection services) => - services.AddEventHandler(); + services.AddEventHandler, HandleCartFinalized>(); internal static void ConfigureCarts(this StoreOptions options) { diff --git a/Sample/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs b/Sample/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs index 223251659..9ffadbe7c 100644 --- a/Sample/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs +++ b/Sample/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs @@ -2,6 +2,7 @@ using Carts.ShoppingCarts.Products; using Core.Events; using Core.Exceptions; +using Core.Tracing; using Marten; namespace Carts.ShoppingCarts.FinalizingCart; @@ -25,7 +26,7 @@ public static CartFinalized Create( } } -internal class HandleCartFinalized: IEventHandler +internal class HandleCartFinalized: IEventHandler> { private readonly IQuerySession querySession; private readonly IEventBus eventBus; @@ -39,17 +40,21 @@ IEventBus eventBus this.eventBus = eventBus; } - public async Task Handle(ShoppingCartConfirmed @event, CancellationToken cancellationToken) + public async Task Handle(EventEnvelope @event, CancellationToken cancellationToken) { - var cart = await querySession.LoadAsync(@event.CartId, cancellationToken) - ?? throw AggregateNotFoundException.For(@event.CartId); + var cart = await querySession.LoadAsync(@event.Data.CartId, cancellationToken) + ?? throw AggregateNotFoundException.For(@event.Data.CartId); - var externalEvent = CartFinalized.Create( - @event.CartId, - cart.ClientId, - cart.ProductItems.ToList(), - cart.TotalPrice, - @event.ConfirmedAt + // TODO: This should be handled internally by event bus, or this event should be stored in the outbox stream + var externalEvent = new EventEnvelope( + CartFinalized.Create( + @event.Data.CartId, + cart.ClientId, + cart.ProductItems.ToList(), + cart.TotalPrice, + @event.Data.ConfirmedAt + ), + @event.Metadata ); await eventBus.Publish(externalEvent, cancellationToken); diff --git a/Sample/ECommerce/Orders/Orders.Api/appsettings.json b/Sample/ECommerce/Orders/Orders.Api/appsettings.json index 452ea8ea3..54fe70b1b 100644 --- a/Sample/ECommerce/Orders/Orders.Api/appsettings.json +++ b/Sample/ECommerce/Orders/Orders.Api/appsettings.json @@ -23,7 +23,8 @@ "ConsumerConfig": { "GroupId": "Orders", "BootstrapServers": "localhost:9092", - "AutoOffsetReset": "Earliest" + "AutoOffsetReset": "Earliest", + "EnableAutoCommit": false }, "Topics": [ "Carts", diff --git a/Sample/ECommerce/PracticalEventSourcing.sln b/Sample/ECommerce/PracticalEventSourcing.sln index 7418f87e7..4707393e6 100644 --- a/Sample/ECommerce/PracticalEventSourcing.sln +++ b/Sample/ECommerce/PracticalEventSourcing.sln @@ -50,6 +50,24 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shipments.Api.Tests", "Ship EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shipments.Tests", "Shipments\Shipments.Tests\Shipments.Tests.csproj", "{30E21C9B-5680-4F09-9FDA-6447A25EB1F7}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docker", "docker", "{97F227F3-E49F-4741-96D8-2D4033300EB2}" + ProjectSection(SolutionItems) = preProject + docker\docker-compose.yml = docker\docker-compose.yml + EndProjectSection +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{A98BE9E3-869A-4409-B6C7-5D275B297429}" + ProjectSection(SolutionItems) = preProject + README.md = README.md + EndProjectSection +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Core.Kafka", "..\..\Core.Kafka\Core.Kafka.csproj", "{1B9111A1-DCA6-42BF-A073-402024780D8D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Core.Testing", "..\..\Core.Testing\Core.Testing.csproj", "{714E605E-6EB2-41F4-A2BC-FACD7F781907}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Core.Api.Testing", "..\..\Core.Api.Testing\Core.Api.Testing.csproj", "{3EFCB989-6F16-4D69-8C3D-7F862243FAB9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Core.Serialization", "..\..\Core.Serialization\Core.Serialization.csproj", "{F519CFB6-9B4D-4317-B442-32B5F4EC5A4C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -76,6 +94,10 @@ Global {5C59079B-5ED0-418D-A09E-E879B69B50ED} = {8C72B8D9-1542-4703-9530-90EABB6A6DF4} {494CAEE8-548A-481A-AAD2-3CADD2A198B4} = {8C72B8D9-1542-4703-9530-90EABB6A6DF4} {30E21C9B-5680-4F09-9FDA-6447A25EB1F7} = {8C72B8D9-1542-4703-9530-90EABB6A6DF4} + {1B9111A1-DCA6-42BF-A073-402024780D8D} = {CC9592F6-C639-4C1E-A089-E5A7F4B9BAEA} + {714E605E-6EB2-41F4-A2BC-FACD7F781907} = {CC9592F6-C639-4C1E-A089-E5A7F4B9BAEA} + {3EFCB989-6F16-4D69-8C3D-7F862243FAB9} = {CC9592F6-C639-4C1E-A089-E5A7F4B9BAEA} + {F519CFB6-9B4D-4317-B442-32B5F4EC5A4C} = {CC9592F6-C639-4C1E-A089-E5A7F4B9BAEA} EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {607CC0DE-712A-4FFB-9862-FEB4F10100D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU @@ -158,5 +180,21 @@ Global {30E21C9B-5680-4F09-9FDA-6447A25EB1F7}.Debug|Any CPU.Build.0 = Debug|Any CPU {30E21C9B-5680-4F09-9FDA-6447A25EB1F7}.Release|Any CPU.ActiveCfg = Release|Any CPU {30E21C9B-5680-4F09-9FDA-6447A25EB1F7}.Release|Any CPU.Build.0 = Release|Any CPU + {1B9111A1-DCA6-42BF-A073-402024780D8D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1B9111A1-DCA6-42BF-A073-402024780D8D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1B9111A1-DCA6-42BF-A073-402024780D8D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1B9111A1-DCA6-42BF-A073-402024780D8D}.Release|Any CPU.Build.0 = Release|Any CPU + {714E605E-6EB2-41F4-A2BC-FACD7F781907}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {714E605E-6EB2-41F4-A2BC-FACD7F781907}.Debug|Any CPU.Build.0 = Debug|Any CPU + {714E605E-6EB2-41F4-A2BC-FACD7F781907}.Release|Any CPU.ActiveCfg = Release|Any CPU + {714E605E-6EB2-41F4-A2BC-FACD7F781907}.Release|Any CPU.Build.0 = Release|Any CPU + {3EFCB989-6F16-4D69-8C3D-7F862243FAB9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3EFCB989-6F16-4D69-8C3D-7F862243FAB9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3EFCB989-6F16-4D69-8C3D-7F862243FAB9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3EFCB989-6F16-4D69-8C3D-7F862243FAB9}.Release|Any CPU.Build.0 = Release|Any CPU + {F519CFB6-9B4D-4317-B442-32B5F4EC5A4C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F519CFB6-9B4D-4317-B442-32B5F4EC5A4C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F519CFB6-9B4D-4317-B442-32B5F4EC5A4C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F519CFB6-9B4D-4317-B442-32B5F4EC5A4C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/Sample/ECommerce/docker/docker-compose.yml b/Sample/ECommerce/docker/docker-compose.yml index 1788c29d0..326ee89d7 100644 --- a/Sample/ECommerce/docker/docker-compose.yml +++ b/Sample/ECommerce/docker/docker-compose.yml @@ -34,12 +34,10 @@ services: networks: - kafka_network ports: - - "2181:2181" - - "2888:2888" - - "3888:3888" + - "2181:2181" environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 ####################################################### # Kafka @@ -63,6 +61,28 @@ services: ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_HOSTNAME: 127.0.0.1 + init-kafka: + image: confluentinc/cp-kafka:7.0.1 + depends_on: + - kafka + entrypoint: [ '/bin/sh', '-c' ] + networks: + - kafka_network + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Carts --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Orders --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Payments --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Shipments --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " + ####################################################### # Avro Schema Registry ####################################################### @@ -78,12 +98,12 @@ services: networks: - kafka_network environment: - SCHEMA_REGISTRY_HOST_NAME: schema_registry - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS' - SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*' - SCHEMA_LOG4J_ROOT_LOGLEVEL: 'ERROR' - SCHEMA_TOOLS_LOG4J_LOGLEVEL: 'ERROR' + SCHEMA_REGISTRY_HOST_NAME: schema_registry + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' + SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS' + SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*' + SCHEMA_LOG4J_ROOT_LOGLEVEL: 'ERROR' + SCHEMA_TOOLS_LOG4J_LOGLEVEL: 'ERROR' kafka_rest: image: confluentinc/cp-kafka-rest:7.0.1 @@ -95,7 +115,7 @@ services: networks: - kafka_network environment: - KAFKA_REST_BOOTSTRAP_SERVERS: kafka:9092 + KAFKA_REST_BOOTSTRAP_SERVERS: kafka:29092 KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema_registry:8081 KAFKA_REST_HOST_NAME: kafka_rest @@ -108,7 +128,7 @@ services: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 networks: - kafka_network depends_on: diff --git a/docker-compose.yml b/docker-compose.yml index 34aafac74..6ba26220f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -123,6 +123,28 @@ services: ADVERTISED_HOST_NAME: kafka KAFKA_ADVERTISED_HOSTNAME: 127.0.0.1 + init-kafka: + image: confluentinc/cp-kafka:7.0.1 + depends_on: + - kafka + entrypoint: [ '/bin/sh', '-c' ] + networks: + - kafka_network + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Carts --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Orders --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Payments --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic Shipments --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " + ####################################################### # Avro Schema Registry ####################################################### @@ -155,7 +177,7 @@ services: networks: - kafka_network environment: - KAFKA_REST_BOOTSTRAP_SERVERS: kafka:9092 + KAFKA_REST_BOOTSTRAP_SERVERS: kafka:29092 KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema_registry:8081 KAFKA_REST_HOST_NAME: kafka_rest From e5be4e2c066a58988a585795ab55e619a8bd69c9 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Tue, 17 May 2022 18:07:06 +0200 Subject: [PATCH 2/3] Enforced EventEnvelope as a parameter of EventBus.Publish method --- Core.Kafka/Consumers/KafkaConsumer.cs | 2 +- Core.Marten/Aggregates/AggregateExtensions.cs | 21 --------- Core.Testing/ApiFixture.cs | 7 ++- Core/Events/EventBus.cs | 15 +++---- Core/Events/External/IExternaEventProducer.cs | 9 ++-- .../Payments/Payments/Payments/Config.cs | 6 +-- .../Payments/FailingPayment/PaymentFailed.cs | 44 +++++++++++-------- .../FinalizingPayment/PaymentFinalized.cs | 20 +++++---- .../Shipments/Packages/PackageService.cs | 11 +++-- .../FinalizingCart/ShoppingCartFinalized.cs | 24 +++++----- 10 files changed, 78 insertions(+), 81 deletions(-) delete mode 100644 Core.Marten/Aggregates/AggregateExtensions.cs diff --git a/Core.Kafka/Consumers/KafkaConsumer.cs b/Core.Kafka/Consumers/KafkaConsumer.cs index 034a1b830..2a75f960f 100644 --- a/Core.Kafka/Consumers/KafkaConsumer.cs +++ b/Core.Kafka/Consumers/KafkaConsumer.cs @@ -73,7 +73,7 @@ private async Task ConsumeNextEvent(IConsumer consumer, Cancella var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType); // deserialize event - var @event = message.Message.Value.FromJson(eventEnvelopeType); + var @event = (IEventEnvelope)message.Message.Value.FromJson(eventEnvelopeType); // publish event to internal event bus await eventBus.Publish(@event, cancellationToken); diff --git a/Core.Marten/Aggregates/AggregateExtensions.cs b/Core.Marten/Aggregates/AggregateExtensions.cs deleted file mode 100644 index 4c7e83121..000000000 --- a/Core.Marten/Aggregates/AggregateExtensions.cs +++ /dev/null @@ -1,21 +0,0 @@ -using Core.Aggregates; -using Core.Events; -using Marten; - -namespace Core.Marten.Aggregates; - -public static class AggregateExtensions -{ - public static async Task StoreAndPublishEvents( - this IAggregate aggregate, - IDocumentSession session, - IEventBus eventBus, - CancellationToken cancellationToken = default - ) - { - var uncommitedEvents = aggregate.DequeueUncommittedEvents(); - session.Events.Append(aggregate.Id, uncommitedEvents); - await session.SaveChangesAsync(cancellationToken); - await eventBus.Publish(uncommitedEvents, cancellationToken); - } -} diff --git a/Core.Testing/ApiFixture.cs b/Core.Testing/ApiFixture.cs index 8bbcef7ed..704932056 100644 --- a/Core.Testing/ApiFixture.cs +++ b/Core.Testing/ApiFixture.cs @@ -40,11 +40,14 @@ public IReadOnlyCollection PublishedExternalCommandOfType() return externalCommandBus.SentCommands.OfType().ToList(); } - public async Task PublishInternalEvent(object @event, CancellationToken ct = default) + public async Task PublishInternalEvent(TEvent @event, CancellationToken ct = default) where TEvent : notnull { using var scope = Server.Host.Services.CreateScope(); var eventBus = scope.ServiceProvider.GetRequiredService(); - await eventBus.Publish(@event, ct); + + //TODO: metadata should be taken by event bus internally + var eventEnvelope = new EventEnvelope(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)); + await eventBus.Publish(eventEnvelope, ct); } public IReadOnlyCollection PublishedInternalEventsOfType() => diff --git a/Core/Events/EventBus.cs b/Core/Events/EventBus.cs index 52e20a15e..c8be676ff 100644 --- a/Core/Events/EventBus.cs +++ b/Core/Events/EventBus.cs @@ -9,7 +9,7 @@ namespace Core.Events; public interface IEventBus { - Task Publish(object @event, CancellationToken ct); + Task Publish(IEventEnvelope @event, CancellationToken ct); } public class EventBus: IEventBus @@ -48,16 +48,15 @@ await retryPolicy.ExecuteAsync(async token => } } - public async Task Publish(object @event, CancellationToken ct) + public async Task Publish(IEventEnvelope eventEnvelope, CancellationToken ct) { - // if it's an event envelope, publish also just event data + // publish also just event data // thanks to that both handlers with envelope and without will be called - if (@event is IEventEnvelope eventEnvelope) - await (Task)GetGenericPublishFor(eventEnvelope.Data) - .Invoke(this, new[] { eventEnvelope.Data, ct })!; + await (Task)GetGenericPublishFor(eventEnvelope.Data) + .Invoke(this, new[] { eventEnvelope.Data, ct })!; - await (Task)GetGenericPublishFor(@event) - .Invoke(this, new[] { @event, ct })!; + await (Task)GetGenericPublishFor(eventEnvelope) + .Invoke(this, new object[] { eventEnvelope, ct })!; } private static MethodInfo GetGenericPublishFor(object @event) => diff --git a/Core/Events/External/IExternaEventProducer.cs b/Core/Events/External/IExternaEventProducer.cs index c64b16c1a..5f513dee5 100644 --- a/Core/Events/External/IExternaEventProducer.cs +++ b/Core/Events/External/IExternaEventProducer.cs @@ -20,13 +20,10 @@ IExternalEventProducer externalEventProducer this.externalEventProducer = externalEventProducer; } - public async Task Publish(object @event, CancellationToken ct) + public async Task Publish(IEventEnvelope eventEnvelope, CancellationToken ct) { - await eventBus.Publish(@event, ct); + await eventBus.Publish(eventEnvelope, ct); - if (@event is IEventEnvelope { Data: IExternalEvent } eventEnvelope) - { - await externalEventProducer.Publish(eventEnvelope, ct); - } + await externalEventProducer.Publish(eventEnvelope, ct); } } diff --git a/Sample/ECommerce/Payments/Payments/Payments/Config.cs b/Sample/ECommerce/Payments/Payments/Payments/Config.cs index e12db8b1d..ff4c118bb 100644 --- a/Sample/ECommerce/Payments/Payments/Payments/Config.cs +++ b/Sample/ECommerce/Payments/Payments/Payments/Config.cs @@ -28,9 +28,9 @@ private static IServiceCollection AddCommandHandlers(this IServiceCollection ser private static IServiceCollection AddEventHandlers(this IServiceCollection services) => services - .AddEventHandler() - .AddEventHandler() - .AddEventHandler(); + .AddEventHandler, TransformIntoPaymentFinalized>() + .AddEventHandler, TransformIntoPaymentFailed>() + .AddEventHandler, TransformIntoPaymentFailed>(); internal static void ConfigurePayments(this StoreOptions options) { diff --git a/Sample/ECommerce/Payments/Payments/Payments/FailingPayment/PaymentFailed.cs b/Sample/ECommerce/Payments/Payments/Payments/FailingPayment/PaymentFailed.cs index 6e6fcaa55..10a83e9a0 100644 --- a/Sample/ECommerce/Payments/Payments/Payments/FailingPayment/PaymentFailed.cs +++ b/Sample/ECommerce/Payments/Payments/Payments/FailingPayment/PaymentFailed.cs @@ -24,8 +24,8 @@ PaymentFailReason failReason public class TransformIntoPaymentFailed : - IEventHandler, - IEventHandler + IEventHandler>, + IEventHandler> { private readonly IEventBus eventBus; private readonly IQuerySession querySession; @@ -39,31 +39,39 @@ IQuerySession querySession this.querySession = querySession; } - public async Task Handle(PaymentDiscarded @event, CancellationToken cancellationToken) + public async Task Handle(EventEnvelope @event, CancellationToken cancellationToken) { - var payment = await querySession.LoadAsync(@event.PaymentId, cancellationToken); + var payment = await querySession.LoadAsync(@event.Data.PaymentId, cancellationToken); - var externalEvent = PaymentFailed.Create( - @event.PaymentId, - payment!.OrderId, - payment.Amount, - @event.DiscardedAt, - PaymentFailReason.Discarded + // TODO: This should be handled internally by event bus, or this event should be stored in the outbox stream + var externalEvent = new EventEnvelope( + PaymentFailed.Create( + @event.Data.PaymentId, + payment!.OrderId, + payment.Amount, + @event.Data.DiscardedAt, + PaymentFailReason.Discarded + ), + @event.Metadata ); await eventBus.Publish(externalEvent, cancellationToken); } - public async Task Handle(PaymentTimedOut @event, CancellationToken cancellationToken) + public async Task Handle(EventEnvelope @event, CancellationToken cancellationToken) { - var payment = await querySession.LoadAsync(@event.PaymentId, cancellationToken); + var payment = await querySession.LoadAsync(@event.Data.PaymentId, cancellationToken); - var externalEvent = PaymentFailed.Create( - @event.PaymentId, - payment!.OrderId, - payment.Amount, - @event.TimedOutAt, - PaymentFailReason.Discarded + // TODO: This should be handled internally by event bus, or this event should be stored in the outbox stream + var externalEvent = new EventEnvelope( + PaymentFailed.Create( + @event.Data.PaymentId, + payment!.OrderId, + payment.Amount, + @event.Data.TimedOutAt, + PaymentFailReason.Discarded + ), + @event.Metadata ); await eventBus.Publish(externalEvent, cancellationToken); diff --git a/Sample/ECommerce/Payments/Payments/Payments/FinalizingPayment/PaymentFinalized.cs b/Sample/ECommerce/Payments/Payments/Payments/FinalizingPayment/PaymentFinalized.cs index eeba3982c..1adf77240 100644 --- a/Sample/ECommerce/Payments/Payments/Payments/FinalizingPayment/PaymentFinalized.cs +++ b/Sample/ECommerce/Payments/Payments/Payments/FinalizingPayment/PaymentFinalized.cs @@ -27,7 +27,7 @@ public static PaymentFinalized Create(Guid paymentId, Guid orderId, decimal amou } public class TransformIntoPaymentFinalized: - IEventHandler + IEventHandler> { private readonly IEventBus eventBus; private readonly IQuerySession querySession; @@ -41,17 +41,21 @@ IQuerySession querySession this.querySession = querySession; } - public async Task Handle(PaymentCompleted @event, CancellationToken cancellationToken) + public async Task Handle(EventEnvelope @event, CancellationToken cancellationToken) { - var (paymentId, completedAt) = @event; + var (paymentId, completedAt) = @event.Data; var payment = await querySession.LoadAsync(paymentId, cancellationToken); - var externalEvent = PaymentFinalized.Create( - paymentId, - payment!.OrderId, - payment.Amount, - completedAt + // TODO: This should be handled internally by event bus, or this event should be stored in the outbox stream + var externalEvent = new EventEnvelope( + PaymentFinalized.Create( + paymentId, + payment!.OrderId, + payment.Amount, + completedAt + ), + @event.Metadata ); await eventBus.Publish(externalEvent, cancellationToken); diff --git a/Sample/ECommerce/Shipments/Shipments/Packages/PackageService.cs b/Sample/ECommerce/Shipments/Shipments/Packages/PackageService.cs index d1186d843..9d6c13f29 100644 --- a/Sample/ECommerce/Shipments/Shipments/Packages/PackageService.cs +++ b/Sample/ECommerce/Shipments/Shipments/Packages/PackageService.cs @@ -1,9 +1,9 @@ +using Core.Events; using Microsoft.EntityFrameworkCore; using Shipments.Packages.Events.External; using Shipments.Packages.Requests; using Shipments.Products; using Shipments.Storage; -using IEventBus = Core.Events.IEventBus; namespace Shipments.Packages; @@ -91,16 +91,19 @@ public async Task DeliverPackage(DeliverPackage request, CancellationToken cance await SaveChanges(cancellationToken); } - private async Task SaveChangesAndPublish(object @event, CancellationToken cancellationToken) + private async Task SaveChangesAndPublish(TEvent @event, CancellationToken cancellationToken) where TEvent : notnull { await SaveChanges(cancellationToken); await Publish(@event, cancellationToken); } - private async Task Publish(object @event, CancellationToken cancellationToken) + private async Task Publish(TEvent @event, CancellationToken cancellationToken) where TEvent : notnull { - await eventBus.Publish(@event, cancellationToken); + //TODO: metadata should be taken by event bus internally + var eventEnvelope = new EventEnvelope(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)); + + await eventBus.Publish(eventEnvelope, cancellationToken); } private Task SaveChanges(CancellationToken cancellationToken = default) diff --git a/Sample/EventStoreDB/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs b/Sample/EventStoreDB/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs index 6bc371b48..11d3a4096 100644 --- a/Sample/EventStoreDB/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs +++ b/Sample/EventStoreDB/ECommerce/Carts/Carts/ShoppingCarts/FinalizingCart/ShoppingCartFinalized.cs @@ -25,7 +25,7 @@ public static ShoppingCartFinalized Create( } } -internal class HandleCartFinalized: IEventHandler +internal class HandleCartFinalized: IEventHandler> { private readonly IQuerySession querySession; private readonly IEventBus eventBus; @@ -39,17 +39,21 @@ IEventBus eventBus this.eventBus = eventBus; } - public async Task Handle(ShoppingCartConfirmed @event, CancellationToken cancellationToken) + public async Task Handle(EventEnvelope @event, CancellationToken cancellationToken) { - var cart = await querySession.LoadAsync(@event.CartId, cancellationToken) - ?? throw AggregateNotFoundException.For(@event.CartId); + var cart = await querySession.LoadAsync(@event.Data.CartId, cancellationToken) + ?? throw AggregateNotFoundException.For(@event.Data.CartId); - var externalEvent = ShoppingCartFinalized.Create( - @event.CartId, - cart.ClientId, - cart.ProductItems.ToList(), - cart.TotalPrice, - @event.ConfirmedAt + // TODO: This should be handled internally by event bus, or this event should be stored in the outbox stream + var externalEvent = new EventEnvelope( + ShoppingCartFinalized.Create( + @event.Data.CartId, + cart.ClientId, + cart.ProductItems.ToList(), + cart.TotalPrice, + @event.Data.ConfirmedAt + ), + @event.Metadata ); await eventBus.Publish(externalEvent, cancellationToken); From f26c430f02aa83fe410f4818343b3a5f95e5d153 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Tue, 17 May 2022 18:29:35 +0200 Subject: [PATCH 3/3] Added EventEnvelopeExtensions for Kafka message deserialisation and aligned with EventStoreDB subscription to all to ignore messages that's not able to deserialise --- .../EventStoreDBSubscriptionToAll.cs | 6 ++--- Core.Kafka/Consumers/KafkaConsumer.cs | 27 ++++++++++++------- Core.Kafka/Consumers/KafkaConsumerConfig.cs | 2 ++ Core.Kafka/Events/EventEnvelopeExtensions.cs | 22 +++++++++++++++ .../Newtonsoft/SerializationExtensions.cs | 4 +-- Core.Testing/ApiFixture.cs | 9 ++++--- 6 files changed, 53 insertions(+), 17 deletions(-) create mode 100644 Core.Kafka/Events/EventEnvelopeExtensions.cs diff --git a/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs b/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs index 38f7637c9..dea1dd590 100644 --- a/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs +++ b/Core.EventStoreDB/Subscriptions/EventStoreDBSubscriptionToAll.cs @@ -77,9 +77,9 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re { if (IsEventWithEmptyData(resolvedEvent) || IsCheckpointEvent(resolvedEvent)) return; - var streamEvent = resolvedEvent.ToEventEnvelope(); + var eventEnvelope = resolvedEvent.ToEventEnvelope(); - if (streamEvent == null) + if (eventEnvelope == null) { // That can happen if we're sharing database between modules. // If we're subscribing to all and not filtering out events from other modules, @@ -97,7 +97,7 @@ private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent re } // publish event to internal event bus - await eventBus.Publish(streamEvent, ct); + await eventBus.Publish(eventEnvelope, ct); await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.CommitPosition, ct); } diff --git a/Core.Kafka/Consumers/KafkaConsumer.cs b/Core.Kafka/Consumers/KafkaConsumer.cs index 2a75f960f..440d39248 100644 --- a/Core.Kafka/Consumers/KafkaConsumer.cs +++ b/Core.Kafka/Consumers/KafkaConsumer.cs @@ -1,12 +1,9 @@ using Confluent.Kafka; using Core.Events; using Core.Events.External; -using Core.Reflection; -using Core.Serialization.Newtonsoft; +using Core.Kafka.Events; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using IEventBus = Core.Events.IEventBus; namespace Core.Kafka.Consumers; @@ -68,15 +65,27 @@ private async Task ConsumeNextEvent(IConsumer consumer, Cancella var message = consumer.Consume(cancellationToken); // get event type from name stored in message.Key - var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key)!; + var eventEnvelope = message.ToEventEnvelope(); - var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType); + if (eventEnvelope == null) + { + // That can happen if we're sharing database between modules. + // If we're subscribing to all and not filtering out events from other modules, + // then we might get events that are from other module and we might not be able to deserialize them. + // In that case it's safe to ignore deserialization error. + // You may add more sophisticated logic checking if it should be ignored or not. + logger.LogWarning("Couldn't deserialize event of type: {EventType}", message.Message.Key); + + if (!config.IgnoreDeserializationErrors) + throw new InvalidOperationException( + $"Unable to deserialize event {message.Message.Key}" + ); - // deserialize event - var @event = (IEventEnvelope)message.Message.Value.FromJson(eventEnvelopeType); + return; + } // publish event to internal event bus - await eventBus.Publish(@event, cancellationToken); + await eventBus.Publish(eventEnvelope, cancellationToken); consumer.Commit(); } diff --git a/Core.Kafka/Consumers/KafkaConsumerConfig.cs b/Core.Kafka/Consumers/KafkaConsumerConfig.cs index f386efe9c..08fd92fee 100644 --- a/Core.Kafka/Consumers/KafkaConsumerConfig.cs +++ b/Core.Kafka/Consumers/KafkaConsumerConfig.cs @@ -7,6 +7,8 @@ public class KafkaConsumerConfig { public ConsumerConfig? ConsumerConfig { get; set; } public string[]? Topics { get; set; } + + public bool IgnoreDeserializationErrors { get; set; } = true; } public static class KafkaConsumerConfigExtensions diff --git a/Core.Kafka/Events/EventEnvelopeExtensions.cs b/Core.Kafka/Events/EventEnvelopeExtensions.cs new file mode 100644 index 000000000..b2f944008 --- /dev/null +++ b/Core.Kafka/Events/EventEnvelopeExtensions.cs @@ -0,0 +1,22 @@ +using Confluent.Kafka; +using Core.Events; +using Core.Reflection; +using Core.Serialization.Newtonsoft; + +namespace Core.Kafka.Events; + +public static class EventEnvelopeExtensions +{ + public static IEventEnvelope? ToEventEnvelope(this ConsumeResult message) + { + var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key); + + if (eventType == null) + return null; + + var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType); + + // deserialize event + return message.Message.Value.FromJson(eventEnvelopeType) as IEventEnvelope; + } +} diff --git a/Core.Serialization/Newtonsoft/SerializationExtensions.cs b/Core.Serialization/Newtonsoft/SerializationExtensions.cs index 19f41b5be..4e8154040 100644 --- a/Core.Serialization/Newtonsoft/SerializationExtensions.cs +++ b/Core.Serialization/Newtonsoft/SerializationExtensions.cs @@ -40,10 +40,10 @@ public static T FromJson(this string json) /// json string /// object type /// deserialized object - public static object FromJson(this string json, Type type) + public static object? FromJson(this string json, Type type) { return JsonConvert.DeserializeObject(json, type, - new JsonSerializerSettings().WithNonDefaultConstructorContractResolver())!; + new JsonSerializerSettings().WithNonDefaultConstructorContractResolver()); } /// diff --git a/Core.Testing/ApiFixture.cs b/Core.Testing/ApiFixture.cs index 704932056..134a55014 100644 --- a/Core.Testing/ApiFixture.cs +++ b/Core.Testing/ApiFixture.cs @@ -40,13 +40,16 @@ public IReadOnlyCollection PublishedExternalCommandOfType() return externalCommandBus.SentCommands.OfType().ToList(); } - public async Task PublishInternalEvent(TEvent @event, CancellationToken ct = default) where TEvent : notnull + public Task PublishInternalEvent(TEvent @event, CancellationToken ct = default) where TEvent : notnull => + PublishInternalEvent( + new EventEnvelope(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)), ct); + + public async Task PublishInternalEvent(EventEnvelope eventEnvelope, CancellationToken ct = default) + where TEvent : notnull { using var scope = Server.Host.Services.CreateScope(); var eventBus = scope.ServiceProvider.GetRequiredService(); - //TODO: metadata should be taken by event bus internally - var eventEnvelope = new EventEnvelope(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)); await eventBus.Publish(eventEnvelope, ct); }