Skip to content

Commit

Permalink
Fixed handling of the LastProcessedPosition handling in EntityFramewo…
Browse files Browse the repository at this point in the history
…rkProjection
  • Loading branch information
oskardudycz committed May 17, 2024
1 parent ebeca9f commit 9db711d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
6 changes: 5 additions & 1 deletion Core/Projections/IProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ void IProjection.Apply(object @event)
}
}

public interface IVersionedProjection: IProjection
public interface ITrackLastProcessedPosition
{
public ulong LastProcessedPosition { get; set; }
}

public interface IVersionedProjection: IProjection, ITrackLastProcessedPosition
{
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using Core.Events;
using Core.Projections;
using ECommerce.Core.Queries;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Npgsql;

namespace ECommerce.Core.Projections;

Expand All @@ -12,8 +15,8 @@ public static class EntityFrameworkProjection
this IServiceCollection services,
Action<EntityFrameworkProjectionBuilder<TView, TDbContext>> setup
)
where TView: class
where TDbContext: DbContext
where TView : class
where TDbContext : DbContext
{
setup(new EntityFrameworkProjectionBuilder<TView, TDbContext>(services));
return services;
Expand All @@ -24,7 +27,8 @@ public class EntityFrameworkProjectionBuilder<TView, TDbContext>(IServiceCollect
where TView : class
where TDbContext : DbContext
{
public EntityFrameworkProjectionBuilder<TView, TDbContext> AddOn<TEvent>(Func<EventEnvelope<TEvent>, TView> handler) where TEvent : notnull
public EntityFrameworkProjectionBuilder<TView, TDbContext> AddOn<TEvent>(Func<EventEnvelope<TEvent>, TView> handler)
where TEvent : notnull
{
services.AddSingleton(handler);
services.AddTransient<IEventHandler<EventEnvelope<TEvent>>, AddProjection<TView, TEvent, TDbContext>>();
Expand Down Expand Up @@ -71,8 +75,9 @@ public class EntityFrameworkProjectionBuilder<TView, TDbContext>(IServiceCollect

public class AddProjection<TView, TEvent, TDbContext>(
TDbContext dbContext,
Func<EventEnvelope<TEvent>, TView> create)
: IEventHandler<EventEnvelope<TEvent>>
Func<EventEnvelope<TEvent>, TView> create,
ILogger<AddProjection<TView, TEvent, TDbContext>> logger
): IEventHandler<EventEnvelope<TEvent>>
where TView : class
where TDbContext : DbContext
where TEvent : notnull
Expand All @@ -81,17 +86,26 @@ public async Task Handle(EventEnvelope<TEvent> eventEnvelope, CancellationToken
{
var view = create(eventEnvelope);

await dbContext.AddAsync(view, ct);
await dbContext.SaveChangesAsync(ct);
try
{
await dbContext.AddAsync(view, ct);
await dbContext.SaveChangesAsync(ct);
}
catch (PostgresException postgresException)
when (postgresException is { SqlState: PostgresErrorCodes.UniqueViolation })
{
logger.LogWarning(postgresException, "{ViewType} already exists. Ignoring", typeof(TView).Name);
}
}
}

public class UpdateProjection<TView, TEvent, TDbContext>(
TDbContext dbContext,
ILogger<AddProjection<TView, TEvent, TDbContext>> logger,
Func<TEvent, object> getViewId,
Action<EventEnvelope<TEvent>, TView> update,
Func<EntityEntry<TView>, CancellationToken, Task>? prepare = null)
: IEventHandler<EventEnvelope<TEvent>>
Func<EntityEntry<TView>, CancellationToken, Task>? prepare = null
): IEventHandler<EventEnvelope<TEvent>>
where TView : class
where TDbContext : DbContext
where TEvent : notnull
Expand All @@ -101,8 +115,23 @@ public async Task Handle(EventEnvelope<TEvent> eventEnvelope, CancellationToken
var viewId = getViewId(eventEnvelope.Data);
var view = await dbContext.FindAsync<TView>([viewId], ct);

if (view == null)
throw new InvalidOperationException($"{typeof(TView).Name} with id {viewId} wasn't found");
switch (view)
{
case null:
throw new InvalidOperationException($"{typeof(TView).Name} with id {viewId} wasn't found");
case ITrackLastProcessedPosition tracked when tracked.LastProcessedPosition <= eventEnvelope.Metadata.LogPosition:
logger.LogWarning(
"{View} with id {ViewId} was already processe. LastProcessedPosition: {LastProcessedPosition}), event LogPosition: {LogPosition}",
typeof(TView).Name,
viewId,
tracked.LastProcessedPosition,
eventEnvelope.Metadata.LogPosition
);
return;
case ITrackLastProcessedPosition tracked:
tracked.LastProcessedPosition = eventEnvelope.Metadata.LogPosition;
break;
}

prepare?.Invoke(dbContext.Entry(view), ct);

Expand Down

0 comments on commit 9db711d

Please sign in to comment.