Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Kafka integration #150

Merged
merged 3 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions Core.EventStoreDB/Events/EventEnvelopeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Core.EventStoreDB.Events;

public static class EventEnvelopeExtensions
{
public static EventEnvelope? ToEventEnvelope(this ResolvedEvent resolvedEvent)
public static IEventEnvelope? ToEventEnvelope(this ResolvedEvent resolvedEvent)
{
var eventData = resolvedEvent.Deserialize();
var eventMetadata = resolvedEvent.DeserializeMetadata();
Expand All @@ -20,7 +20,7 @@ public static class EventEnvelopeExtensions
resolvedEvent.Event.Position.CommitPosition,
eventMetadata
);
var type = typeof(EventEnvelope<>).MakeGenericType(eventData.GetType());
return (EventEnvelope)Activator.CreateInstance(type, eventData, metaData)!;

return EventEnvelopeFactory.From(eventData, metaData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public async Task SubscribeToAll(EventStoreDBSubscriptionToAllOptions subscripti
{
if (IsEventWithEmptyData(resolvedEvent) || IsCheckpointEvent(resolvedEvent)) return;

var streamEvent = resolvedEvent.ToEventEnvelope();
var eventEnvelope = resolvedEvent.ToEventEnvelope();

if (streamEvent == null)
if (eventEnvelope == null)
{
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
Expand All @@ -97,7 +97,7 @@ public async Task SubscribeToAll(EventStoreDBSubscriptionToAllOptions subscripti
}

// publish event to internal event bus
await eventBus.Publish(streamEvent, ct);
await eventBus.Publish(eventEnvelope, ct);

await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.CommitPosition, ct);
}
Expand Down
2 changes: 1 addition & 1 deletion Core.Kafka.Tests/KafkaProducerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class KafkaProducerTests
)
.Build();

private readonly EventEnvelope eventEnvelope =
private readonly IEventEnvelope eventEnvelope =
new EventEnvelope<TestEvent>(new TestEvent("test", 123), new EventMetadata("123", 1, 1, null));

[Fact]
Expand Down
28 changes: 20 additions & 8 deletions Core.Kafka/Consumers/KafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
using Confluent.Kafka;
using Core.Events;
using Core.Events.External;
using Core.Reflection;
using Core.Kafka.Events;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using IEventBus = Core.Events.IEventBus;

namespace Core.Kafka.Consumers;

Expand Down Expand Up @@ -67,15 +65,29 @@ private async Task ConsumeNextEvent(IConsumer<string, string> consumer, Cancella
var message = consumer.Consume(cancellationToken);

// get event type from name stored in message.Key
var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key)!;
var eventEnvelope = message.ToEventEnvelope();

var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType);
if (eventEnvelope == null)
{
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
// then we might get events that are from other module and we might not be able to deserialize them.
// In that case it's safe to ignore deserialization error.
// You may add more sophisticated logic checking if it should be ignored or not.
logger.LogWarning("Couldn't deserialize event of type: {EventType}", message.Message.Key);

if (!config.IgnoreDeserializationErrors)
throw new InvalidOperationException(
$"Unable to deserialize event {message.Message.Key}"
);

// deserialize event
var @event = JsonConvert.DeserializeObject(message.Message.Value, eventEnvelopeType)!;
return;
}

// publish event to internal event bus
await eventBus.Publish(@event, cancellationToken);
await eventBus.Publish(eventEnvelope, cancellationToken);

consumer.Commit();
}
catch (Exception e)
{
Expand Down
2 changes: 2 additions & 0 deletions Core.Kafka/Consumers/KafkaConsumerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public class KafkaConsumerConfig
{
public ConsumerConfig? ConsumerConfig { get; set; }
public string[]? Topics { get; set; }

public bool IgnoreDeserializationErrors { get; set; } = true;
}

public static class KafkaConsumerConfigExtensions
Expand Down
22 changes: 22 additions & 0 deletions Core.Kafka/Events/EventEnvelopeExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Confluent.Kafka;
using Core.Events;
using Core.Reflection;
using Core.Serialization.Newtonsoft;

namespace Core.Kafka.Events;

public static class EventEnvelopeExtensions
{
public static IEventEnvelope? ToEventEnvelope(this ConsumeResult<string, string> message)
{
var eventType = TypeProvider.GetTypeFromAnyReferencingAssembly(message.Message.Key);

if (eventType == null)
return null;

var eventEnvelopeType = typeof(EventEnvelope<>).MakeGenericType(eventType);

// deserialize event
return message.Message.Value.FromJson(eventEnvelopeType) as IEventEnvelope;
}
}
2 changes: 1 addition & 1 deletion Core.Kafka/Producers/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ILogger<KafkaProducer> logger
config = configuration.GetKafkaProducerConfig();
}

public async Task Publish(EventEnvelope @event, CancellationToken ct)
public async Task Publish(IEventEnvelope @event, CancellationToken ct)
{
try
{
Expand Down
21 changes: 0 additions & 21 deletions Core.Marten/Aggregates/AggregateExtensions.cs

This file was deleted.

2 changes: 1 addition & 1 deletion Core.Marten/Subscriptions/MartenEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ IServiceProvider serviceProvider
)
);

await eventBus.Publish(new EventEnvelope(@event.Data, eventMetadata), ct);
await eventBus.Publish(EventEnvelopeFactory.From(@event.Data, eventMetadata), ct);
}
}
}
4 changes: 2 additions & 2 deletions Core.Serialization/Newtonsoft/SerializationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public static T FromJson<T>(this string json)
/// <param name="json">json string</param>
/// <param name="type">object type</param>
/// <returns>deserialized object</returns>
public static object FromJson(this string json, Type type)
public static object? FromJson(this string json, Type type)
{
return JsonConvert.DeserializeObject(json, type,
new JsonSerializerSettings().WithNonDefaultConstructorContractResolver())!;
new JsonSerializerSettings().WithNonDefaultConstructorContractResolver());
}

/// <summary>
Expand Down
10 changes: 8 additions & 2 deletions Core.Testing/ApiFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ public abstract class ApiWithEventsFixture<TStartup>: ApiFixture<TStartup> where
return externalCommandBus.SentCommands.OfType<TCommand>().ToList();
}

public async Task PublishInternalEvent(object @event, CancellationToken ct = default)
public Task PublishInternalEvent<TEvent>(TEvent @event, CancellationToken ct = default) where TEvent : notnull =>
PublishInternalEvent(
new EventEnvelope<TEvent>(@event, new EventMetadata(Guid.NewGuid().ToString(), 0, 0, null)), ct);

public async Task PublishInternalEvent<TEvent>(EventEnvelope<TEvent> eventEnvelope, CancellationToken ct = default)
where TEvent : notnull
{
using var scope = Server.Host.Services.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
await eventBus.Publish(@event, ct);

await eventBus.Publish(eventEnvelope, ct);
}

public IReadOnlyCollection<TEvent> PublishedInternalEventsOfType<TEvent>() =>
Expand Down
2 changes: 1 addition & 1 deletion Core.Testing/DummyExternalEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class DummyExternalEventProducer: IExternalEventProducer
{
public IList<object> PublishedEvents { get; } = new List<object>();

public Task Publish(EventEnvelope @event, CancellationToken ct)
public Task Publish(IEventEnvelope @event, CancellationToken ct)
{
PublishedEvents.Add(@event.Data);

Expand Down
2 changes: 1 addition & 1 deletion Core/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static IServiceCollection AddTracing(this IServiceCollection services)
sp.GetRequiredService<ITracingScopeFactory>().CreateTraceScope(scopedServiceProvider, traceMetadata)
);

services.TryAddScoped<Func<IServiceProvider, EventEnvelope?, TracingScope>>(sp =>
services.TryAddScoped<Func<IServiceProvider, IEventEnvelope?, TracingScope>>(sp =>
(scopedServiceProvider, eventEnvelope) => sp.GetRequiredService<ITracingScopeFactory>()
.CreateTraceScope(scopedServiceProvider, eventEnvelope)
);
Expand Down
21 changes: 10 additions & 11 deletions Core/Events/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ namespace Core.Events;

public interface IEventBus
{
Task Publish(object @event, CancellationToken ct);
Task Publish(IEventEnvelope @event, CancellationToken ct);
}

public class EventBus: IEventBus
{
private readonly IServiceProvider serviceProvider;
private readonly Func<IServiceProvider, EventEnvelope?, TracingScope> createTracingScope;
private readonly Func<IServiceProvider, IEventEnvelope?, TracingScope> createTracingScope;
private readonly AsyncPolicy retryPolicy;
private static readonly ConcurrentDictionary<Type, MethodInfo> PublishMethods = new();

public EventBus(
IServiceProvider serviceProvider,
Func<IServiceProvider, EventEnvelope?, TracingScope> createTracingScope,
Func<IServiceProvider, IEventEnvelope?, TracingScope> createTracingScope,
AsyncPolicy retryPolicy
)
{
Expand All @@ -32,7 +32,7 @@ AsyncPolicy retryPolicy

private async Task Publish<TEvent>(TEvent @event, CancellationToken ct)
{
var eventEnvelope = @event as EventEnvelope;
var eventEnvelope = @event as IEventEnvelope;
using var scope = serviceProvider.CreateScope();
using var tracingScope = createTracingScope(scope.ServiceProvider, eventEnvelope);

Expand All @@ -48,16 +48,15 @@ private async Task Publish<TEvent>(TEvent @event, CancellationToken ct)
}
}

public async Task Publish(object @event, CancellationToken ct)
public async Task Publish(IEventEnvelope eventEnvelope, CancellationToken ct)
{
// if it's an event envelope, publish also just event data
// publish also just event data
// thanks to that both handlers with envelope and without will be called
if (@event is EventEnvelope(var data, _))
await (Task)GetGenericPublishFor(data)
.Invoke(this, new[] { data, ct })!;
await (Task)GetGenericPublishFor(eventEnvelope.Data)
.Invoke(this, new[] { eventEnvelope.Data, ct })!;

await (Task)GetGenericPublishFor(@event)
.Invoke(this, new[] { @event, ct })!;
await (Task)GetGenericPublishFor(eventEnvelope)
.Invoke(this, new object[] { eventEnvelope, ct })!;
}

private static MethodInfo GetGenericPublishFor(object @event) =>
Expand Down
26 changes: 19 additions & 7 deletions Core/Events/EventEnvelope.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Core.Tracing;
using System.Reflection;
using Core.Tracing;

namespace Core.Events;

Expand All @@ -9,15 +10,26 @@ public record EventMetadata(
TraceMetadata? Trace
);

public record EventEnvelope(
object Data,
EventMetadata Metadata
);
public interface IEventEnvelope
{
object Data { get; }
EventMetadata Metadata { get; init; }
}

public record EventEnvelope<T>(
T Data,
EventMetadata Metadata
): EventEnvelope(Data, Metadata) where T : notnull
): IEventEnvelope where T : notnull
{
object IEventEnvelope.Data => Data;
}

public static class EventEnvelopeFactory
{
public new T Data => (T)base.Data;
public static IEventEnvelope From(object data, EventMetadata metadata)
{
//TODO: Get rid of reflection!
var type = typeof(EventEnvelope<>).MakeGenericType(data.GetType());
return (IEventEnvelope)Activator.CreateInstance(type, data, metadata)!;
}
}
11 changes: 4 additions & 7 deletions Core/Events/External/IExternaEventProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Core.Events.External;

public interface IExternalEventProducer
{
Task Publish(EventEnvelope @event, CancellationToken ct);
Task Publish(IEventEnvelope @event, CancellationToken ct);
}


Expand All @@ -20,13 +20,10 @@ IExternalEventProducer externalEventProducer
this.externalEventProducer = externalEventProducer;
}

public async Task Publish(object @event, CancellationToken ct)
public async Task Publish(IEventEnvelope eventEnvelope, CancellationToken ct)
{
await eventBus.Publish(@event, ct);
await eventBus.Publish(eventEnvelope, ct);

if (@event is EventEnvelope { Data: IExternalEvent } eventEnvelope)
{
await externalEventProducer.Publish(eventEnvelope, ct);
}
await externalEventProducer.Publish(eventEnvelope, ct);
}
}
2 changes: 1 addition & 1 deletion Core/Events/External/NulloExternalEventProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Core.Events.External;

public class NulloExternalEventProducer : IExternalEventProducer
{
public Task Publish(EventEnvelope @event, CancellationToken ct)
public Task Publish(IEventEnvelope @event, CancellationToken ct)
{
return Task.CompletedTask;
}
Expand Down
4 changes: 2 additions & 2 deletions Core/Tracing/TracingScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public static class TraceScopeFactoryExtensions
{
public static TracingScope CreateTraceScope(
this ITracingScopeFactory tracingScopeFactory,
IServiceProvider serviceProvider, EventEnvelope? eventEnvelope)
IServiceProvider serviceProvider, IEventEnvelope? eventEnvelope)
{
if (eventEnvelope == null)
return tracingScopeFactory.CreateTraceScope(serviceProvider);

var (_, eventMetadata) = eventEnvelope;
var eventMetadata = eventEnvelope.Metadata;

var newCausationId = new CausationId(eventMetadata.EventId);

Expand Down
2 changes: 1 addition & 1 deletion Sample/ECommerce/Carts/Carts/ShoppingCarts/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ internal static class CartsConfig
.AddQueryHandler<GetCartHistory, IPagedList<ShoppingCartHistory>, HandleGetCartHistory>();

private static IServiceCollection AddEventHandlers(this IServiceCollection services) =>
services.AddEventHandler<ShoppingCartConfirmed, HandleCartFinalized>();
services.AddEventHandler<EventEnvelope<ShoppingCartConfirmed>, HandleCartFinalized>();

internal static void ConfigureCarts(this StoreOptions options)
{
Expand Down