/
EventStoreDBRepository.cs
65 lines (54 loc) · 2.34 KB
/
EventStoreDBRepository.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
using Core.Aggregates;
using Core.Events;
using Core.EventStoreDB.Events;
using Core.EventStoreDB.Serialization;
using Core.OpenTelemetry;
using EventStore.Client;
namespace Core.EventStoreDB.Repository;
public interface IEventStoreDBRepository<T> where T : class, IAggregate
{
Task<T?> Find(Guid id, CancellationToken cancellationToken);
Task<ulong> Add(Guid id, T aggregate, CancellationToken ct = default);
Task<ulong> Update(Guid id, T aggregate, ulong? expectedRevision = null, CancellationToken ct = default);
Task<ulong> Delete(Guid id, T aggregate, ulong? expectedRevision = null, CancellationToken ct = default);
}
public class EventStoreDBRepository<T>(EventStoreClient eventStore): IEventStoreDBRepository<T>
where T : class, IAggregate
{
public Task<T?> Find(Guid id, CancellationToken cancellationToken) =>
eventStore.AggregateStream<T>(
id,
cancellationToken
);
public async Task<ulong> Add(Guid id, T aggregate, CancellationToken ct = default)
{
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(id),
StreamState.NoStream,
GetEventsToStore(aggregate),
cancellationToken: ct
).ConfigureAwait(false);
return result.NextExpectedStreamRevision.ToUInt64();
}
public async Task<ulong> Update(Guid id, T aggregate, ulong? expectedRevision = null, CancellationToken ct = default)
{
var eventsToAppend = GetEventsToStore(aggregate);
var nextVersion = expectedRevision ?? (ulong)(aggregate.Version - eventsToAppend.Count);
var result = await eventStore.AppendToStreamAsync(
StreamNameMapper.ToStreamId<T>(id),
nextVersion,
eventsToAppend,
cancellationToken: ct
).ConfigureAwait(false);
return result.NextExpectedStreamRevision.ToUInt64();
}
public Task<ulong> Delete(Guid id, T aggregate, ulong? expectedRevision = null, CancellationToken ct = default) =>
Update(id, aggregate, expectedRevision, ct);
private static List<EventData> GetEventsToStore(T aggregate)
{
var events = aggregate.DequeueUncommittedEvents();
return events
.Select(@event => @event.ToJsonEventData(TelemetryPropagator.GetPropagationContext()))
.ToList();
}
}