Skip to content

Commit

Permalink
Added Core.EntityFramework project with EntityFrameworkProjection
Browse files Browse the repository at this point in the history
Implementation is made accordingly to MartenElasticSearchProjection
  • Loading branch information
oskardudycz committed May 19, 2024
1 parent 7333c8f commit 5f39e5c
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 5 deletions.
20 changes: 20 additions & 0 deletions Core.EntityFramework/Core.EntityFramework.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Core\Core.csproj" />
</ItemGroup>

</Project>
98 changes: 98 additions & 0 deletions Core.EntityFramework/EntityFrameworkProjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using Core.Events;
using Core.Reflection;
using Microsoft.EntityFrameworkCore;
using Polly;

namespace Core.EntityFramework;

public class EntityFrameworkProjection<TDbContext>: IEventBatchHandler
where TDbContext : DbContext
{
protected readonly TDbContext DBContext;
protected IAsyncPolicy RetryPolicy { get; }
private readonly HashSet<Type> handledEventTypes = [];

public EntityFrameworkProjection(TDbContext dbContext, IAsyncPolicy? retryPolicy = null)
{
DBContext = dbContext;
RetryPolicy = retryPolicy ?? Policy.NoOpAsync();
}

protected void Projects<TEvent>() =>
handledEventTypes.Add(typeof(TEvent));

public async Task Handle(IEventEnvelope[] eventInEnvelopes, CancellationToken ct)
{
var events = eventInEnvelopes
.Where(@event => handledEventTypes.Contains(@event.Data.GetType()))
.ToArray();

await ApplyAsync(events, ct);
}

protected virtual Task ApplyAsync(IEventEnvelope[] events, CancellationToken ct) =>
ApplyAsync(events.Select(@event => @event.Data).ToArray(), ct);

protected virtual Task ApplyAsync(object[] events, CancellationToken ct) =>
Task.CompletedTask;
}

public abstract class EntityFrameworkProjection<TDocument, TDbContext>(
TDbContext context,
IAsyncPolicy retryPolicy
): EntityFrameworkProjection<TDbContext>(context, retryPolicy)
where TDocument : class
where TDbContext : DbContext
{
private record ProjectEvent(
Func<object, string> GetId,
Func<TDocument, object, TDocument> Apply
);

private readonly Dictionary<Type, ProjectEvent> projectors = new();
private Func<TDocument, object> getDocumentId = default!;

protected void Projects<TEvent>(
Func<TEvent, string> getId,
Func<TDocument, TEvent, TDocument> apply
)
{
projectors.Add(
typeof(TEvent),
new ProjectEvent(
@event => getId((TEvent)@event),
(document, @event) => apply(document, (TEvent)@event)
)
);
Projects<TEvent>();
}

protected void DocumentId(Func<TDocument, object> documentId) =>
getDocumentId = documentId;

protected override Task ApplyAsync(object[] events, CancellationToken token) =>
RetryPolicy.ExecuteAsync(async ct =>
{
var ids = events.Select(GetDocumentId).ToList();
var entities = await DBContext.Set<TDocument>()
.Where(x => ids.Contains(getDocumentId(x)))
.ToListAsync(cancellationToken: ct);
var existingDocuments = entities.ToDictionary(ks => getDocumentId(ks), vs => vs);
for(var i = 0; i < events.Length; i++)
{
Apply(existingDocuments.GetValueOrDefault(ids[i], GetDefault(events[i])), events[i]);
}
}, token);

protected virtual TDocument GetDefault(object @event) =>
ObjectFactory<TDocument>.GetDefaultOrUninitialized();

private TDocument Apply(TDocument document, object @event) =>
projectors[@event.GetType()].Apply(document, @event);

private object GetDocumentId(object @event) =>
projectors[@event.GetType()].GetId(@event);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System.Diagnostics;
using Core.Events;
using Core.EventStoreDB.Events;
using Core.EventStoreDB.Subscriptions.Checkpoints;
using Core.OpenTelemetry;
using EventStore.Client;
using Microsoft.Extensions.Logging;

Expand All @@ -23,6 +21,7 @@ CancellationToken ct
var events = TryDeserializeEvents(resolvedEvents, options.IgnoreDeserializationErrors);
ulong? lastPosition = null;

// TODO: How would you implement Dead-Letter Queue here?
await batchHandler.Handle(events, ct).ConfigureAwait(false);

return lastPosition;
Expand Down
4 changes: 2 additions & 2 deletions Core/Events/EventBusBatchHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ public class EventBusBatchHandler(
ILogger<EventBusBatchHandler> logger
): IEventBatchHandler
{
public async Task Handle(IEventEnvelope[] events, CancellationToken ct)
public async Task Handle(IEventEnvelope[] eventInEnvelopes, CancellationToken ct)
{
foreach (var @event in events)
foreach (var @event in eventInEnvelopes)
{
await HandleEvent(@event, ct).ConfigureAwait(false);
}
Expand Down
2 changes: 1 addition & 1 deletion Core/Events/IEventBatchHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ namespace Core.Events;

public interface IEventBatchHandler
{
Task Handle(IEventEnvelope[] events, CancellationToken ct);
Task Handle(IEventEnvelope[] eventInEnvelopes, CancellationToken ct);
}
7 changes: 7 additions & 0 deletions EventSourcing.NetCore.sln
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "ECommerce.FeedConsumer", "S
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ECommerce.Equinox", "ECommerce.Equinox", "{006643C6-E0B6-48E6-ABC6-9BE3DCB293D8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Core.EntityFramework", "Core.EntityFramework\Core.EntityFramework.csproj", "{840B5027-91B5-42F2-A431-17DBD05F0BDC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1096,6 +1098,10 @@ Global
{C3D0553F-5786-4417-97FD-B65440620274}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16B7A55B-E2E6-4CE2-896A-EF0F02259482}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16B7A55B-E2E6-4CE2-896A-EF0F02259482}.Debug|Any CPU.Build.0 = Debug|Any CPU
{840B5027-91B5-42F2-A431-17DBD05F0BDC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{840B5027-91B5-42F2-A431-17DBD05F0BDC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{840B5027-91B5-42F2-A431-17DBD05F0BDC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{840B5027-91B5-42F2-A431-17DBD05F0BDC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1299,6 +1305,7 @@ Global
{1738AF23-FD36-4457-B9F9-2593207CDEB5} = {0570E45A-2EB6-4C4C-84E4-2C80E1FECEB5}
{C6383AC1-2D24-4E7A-810F-642920C857EA} = {0570E45A-2EB6-4C4C-84E4-2C80E1FECEB5}
{E1B97A7B-97C3-4C14-9BE6-ACE0AF45CE61} = {0570E45A-2EB6-4C4C-84E4-2C80E1FECEB5}
{840B5027-91B5-42F2-A431-17DBD05F0BDC} = {0570E45A-2EB6-4C4C-84E4-2C80E1FECEB5}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A5F55604-2FF3-43B7-B657-4F18E6E95D3B}
Expand Down

0 comments on commit 5f39e5c

Please sign in to comment.