forked from oskardudycz/EventSourcing.NetCore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventStore.cs
191 lines (165 loc) · 6.76 KB
/
EventStore.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
using System.Collections;
using System.Data;
using Dapper;
using Newtonsoft.Json;
using Npgsql;
using Tools.Tools;
namespace EventStoreBasics;
public class EventStore: IDisposable, IEventStore
{
private readonly NpgsqlConnection databaseConnection;
private const string Apply = "Apply";
public EventStore(NpgsqlConnection databaseConnection)
{
this.databaseConnection = databaseConnection;
}
public void Init()
{
// See more in Greg Young's "Building an Event Storage" article https://cqrs.wordpress.com/documents/building-event-storage/
CreateStreamsTable();
CreateEventsTable();
CreateAppendEventFunction();
}
public bool Store<TStream>(TStream aggregate) where TStream : IAggregate
{
// 1. get events from Aggregate
// 2. Foreach event append it to store
// 3. Return true if succeeded
throw new NotImplementedException("Implement logic described above;");
}
public bool AppendEvent<TStream>(Guid streamId, object @event, long? expectedVersion = null) where TStream: notnull
{
return databaseConnection.QuerySingle<bool>(
"SELECT append_event(@Id, @Data::jsonb, @Type, @StreamId, @StreamType, @ExpectedVersion)",
new
{
Id = Guid.NewGuid(),
Data = JsonConvert.SerializeObject(@event),
Type = @event.GetType().AssemblyQualifiedName,
StreamId = streamId,
StreamType = typeof(TStream).AssemblyQualifiedName,
ExpectedVersion = expectedVersion
},
commandType: CommandType.Text
);
}
public T AggregateStream<T>(Guid streamId, long? atStreamVersion = null, DateTime? atTimestamp = null) where T: notnull
{
var aggregate = (T)Activator.CreateInstance(typeof(T), true)!;
var events = GetEvents(streamId, atStreamVersion, atTimestamp);
var version = 0;
foreach (var @event in events)
{
aggregate.InvokeIfExists(Apply, @event);
aggregate.SetIfExists(nameof(IAggregate.Version), ++version);
}
return aggregate;
}
public StreamState? GetStreamState(Guid streamId)
{
const string getStreamSql =
@"SELECT id, type, version
FROM streams
WHERE id = @streamId";
return databaseConnection
.Query<dynamic>(getStreamSql, new { streamId })
.Select(streamData =>
new StreamState(
streamData.id,
Type.GetType(streamData.type),
streamData.version
))
.SingleOrDefault();
}
public IEnumerable GetEvents(Guid streamId, long? atStreamVersion = null, DateTime? atTimestamp = null)
{
const string getStreamSql =
@"SELECT id, data, stream_id, type, version, created
FROM events
WHERE stream_id = @streamId
AND (@atStreamVersion IS NULL OR version <= @atStreamVersion)
AND (@atTimestamp IS NULL OR created <= @atTimestamp)
ORDER BY version";
return databaseConnection
.Query<dynamic>(getStreamSql, new { streamId, atStreamVersion, atTimestamp })
.Select(@event =>
JsonConvert.DeserializeObject(
@event.data,
Type.GetType(@event.type)
))
.ToList();
}
private void CreateStreamsTable()
{
const string creatStreamsTableSql =
@"CREATE TABLE IF NOT EXISTS streams(
id UUID NOT NULL PRIMARY KEY,
type TEXT NOT NULL,
version BIGINT NOT NULL
);";
databaseConnection.Execute(creatStreamsTableSql);
}
private void CreateEventsTable()
{
const string creatEventsTableSql =
@"CREATE TABLE IF NOT EXISTS events(
id UUID NOT NULL PRIMARY KEY,
data JSONB NOT NULL,
stream_id UUID NOT NULL,
type TEXT NOT NULL,
version BIGINT NOT NULL,
created timestamp with time zone NOT NULL default (now()),
FOREIGN KEY(stream_id) REFERENCES streams(id),
CONSTRAINT events_stream_and_version UNIQUE(stream_id, version)
);";
databaseConnection.Execute(creatEventsTableSql);
}
private void CreateAppendEventFunction()
{
const string appendEventFunctionSql =
@"CREATE OR REPLACE FUNCTION append_event(id uuid, data jsonb, type text, stream_id uuid, stream_type text, expected_stream_version bigint default null) RETURNS boolean
LANGUAGE plpgsql
AS $$
DECLARE
stream_version int;
BEGIN
-- get stream version
SELECT
version INTO stream_version
FROM streams as s
WHERE
s.id = stream_id FOR UPDATE;
-- if stream doesn't exist - create new one with version 0
IF stream_version IS NULL THEN
stream_version := 0;
INSERT INTO streams
(id, type, version)
VALUES
(stream_id, stream_type, stream_version);
END IF;
-- check optimistic concurrency
IF expected_stream_version IS NOT NULL AND stream_version != expected_stream_version THEN
RETURN FALSE;
END IF;
-- increment event_version
stream_version := stream_version + 1;
-- append event
INSERT INTO events
(id, data, stream_id, type, version)
VALUES
(id, data, stream_id, type, stream_version);
-- update stream version
UPDATE streams as s
SET version = stream_version
WHERE
s.id = stream_id;
RETURN TRUE;
END;
$$;";
databaseConnection.Execute(appendEventFunctionSql);
}
public void Dispose()
{
databaseConnection.Dispose();
}
}