/
EntityFrameworkProjection.cs
76 lines (69 loc) · 2.71 KB
/
EntityFrameworkProjection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
using System.Linq.Expressions;
using Core.EntityFramework;
using Core.Events;
using Core.Projections;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.ChangeTracking;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Npgsql;
namespace ECommerce.Core.Projections;
public class AddProjection<TView, TEvent, TDbContext>(
TDbContext dbContext,
Func<EventEnvelope<TEvent>, TView> create,
ILogger<AddProjection<TView, TEvent, TDbContext>> logger
): IEventHandler<EventEnvelope<TEvent>>
where TView : class
where TDbContext : DbContext
where TEvent : notnull
{
public async Task Handle(EventEnvelope<TEvent> eventEnvelope, CancellationToken ct)
{
var view = create(eventEnvelope);
try
{
await dbContext.AddAsync(view, ct);
await dbContext.SaveChangesAsync(ct);
}
catch (Exception updateException)
when (updateException.GetBaseException() is PostgresException { SqlState: PostgresErrorCodes.UniqueViolation })
{
logger.LogWarning(updateException, "{ViewType} already exists. Ignoring", typeof(TView).Name);
}
}
}
public class UpdateProjection<TView, TEvent, TDbContext>(
TDbContext dbContext,
ILogger<AddProjection<TView, TEvent, TDbContext>> logger,
Func<TEvent, object> getViewId,
Action<EventEnvelope<TEvent>, TView> update
): IEventHandler<EventEnvelope<TEvent>>
where TView : class
where TDbContext : DbContext
where TEvent : notnull
{
public async Task Handle(EventEnvelope<TEvent> eventEnvelope, CancellationToken ct)
{
var viewId = getViewId(eventEnvelope.Data);
var view = await dbContext.FindAsync<TView>([viewId], ct);
switch (view)
{
case null:
throw new InvalidOperationException($"{typeof(TView).Name} with id {viewId} wasn't found for event {typeof(TEvent).Name}");
case ITrackLastProcessedPosition tracked when tracked.LastProcessedPosition <= eventEnvelope.Metadata.LogPosition:
logger.LogWarning(
"{View} with id {ViewId} was already processed. LastProcessedPosition: {LastProcessedPosition}), event LogPosition: {LogPosition}",
typeof(TView).Name,
viewId,
tracked.LastProcessedPosition,
eventEnvelope.Metadata.LogPosition
);
return;
case ITrackLastProcessedPosition tracked:
tracked.LastProcessedPosition = eventEnvelope.Metadata.LogPosition;
break;
}
update(eventEnvelope, view);
await dbContext.SaveChangesAsync(ct);
}
}