/
event_store.go
147 lines (121 loc) · 3.7 KB
/
event_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package sql
import (
"context"
"database/sql"
"fmt"
"github.com/stackus/errors"
"github.com/startcodextech/goevents/esourcing"
"github.com/startcodextech/goevents/registry"
"github.com/startcodextech/goevents/store"
"strings"
"time"
)
type (
EventStore struct {
tableName string
db DB
registry registry.Registry
}
)
var _ esourcing.AggregateStore = (*EventStore)(nil)
func NewEventStore(tableName string, db DB, registry registry.Registry) EventStore {
return EventStore{
tableName: tableName,
db: db,
registry: registry,
}
}
func (s EventStore) Load(ctx context.Context, aggregate esourcing.EventSourcedAggregate) (err error) {
aggregateID := aggregate.ID()
aggregateName := aggregate.AggregateName()
var rows *sql.Rows
rows, err = s.db.QueryContext(ctx, s.table(s.queryLoad()), aggregateID, aggregateName, aggregate.Version())
if err != nil {
return err
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
err = errors.Wrap(err, "closing event rows")
}
}(rows)
for rows.Next() {
var eventID, eventName string
var payloadData []byte
var aggregateVersion int
var occurredAt time.Time
err := rows.Scan(&aggregateVersion, &eventID, &eventName, &payloadData, &occurredAt)
if err != nil {
return err
}
var payload interface{}
payload, err = s.registry.Deserialize(eventName, payloadData)
if err != nil {
return err
}
event := store.NewAggregateEventBuilder().
WithID(eventID).
WithName(eventName).
WithPayload(payload).
WithAggregate(aggregate).
WithAggregateVersion(aggregateVersion).
WithOccurredAt(occurredAt).
Build()
if err = esourcing.LoadEvent(aggregate, event); err != nil {
return err
}
}
return nil
}
func (s EventStore) Save(ctx context.Context, aggregate esourcing.EventSourcedAggregate) (err error) {
const query = "INSERT INTO %s (stream_id, stream_name, stream_version, event_id, event_name, event_data, occurred_at) VALUES"
aggregateID := aggregate.ID()
aggregateName := aggregate.AggregateName()
placeholders := make([]string, len(aggregate.Events()))
values := make([]any, len(aggregate.Events())*7)
for i, event := range aggregate.Events() {
var payloadData []byte
payloadData, err := s.registry.Serialize(event.EventName(), event.Payload())
if err != nil {
return err
}
placeholders[i] = fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)",
i*7+1, i*7+2, i*7+3, i*7+4, i*7+5, i*7+6, i*7+7,
)
values[i*7] = aggregateID
values[i*7+1] = aggregateName
values[i*7+2] = event.AggregateVersion()
values[i*7+3] = event.ID()
values[i*7+4] = event.EventName()
values[i*7+5] = payloadData
values[i*7+6] = event.OccurredAt()
}
_, err = s.db.ExecContext(
ctx,
fmt.Sprintf("%s %s", s.table(query), strings.Join(placeholders, ",")),
values...,
)
if err != nil {
return err
}
return nil
}
func (s EventStore) queryLoad() string {
switch s.db.DBType() {
case DBTypeMySQL:
return "SELECT stream_version, event_id, event_name, event_data, occurred_a FROM %s WHERE stream_id = ? AND stream_name = ? AND stream_version > ? ORDER BY stream_version AS"
default:
return "SELECT stream_version, event_id, event_name, event_data, occurred_a FROM %s WHERE stream_id = $1 AND stream_name = $2 AND stream_version > $3 ORDER BY stream_version ASC"
}
}
func (s EventStore) querySave() string {
switch s.db.DBType() {
case DBTypeMySQL:
return "INSERT INTO %s (stream_id, stream_name, stream_version, event_id, event_name, event_data, occurred_at) VALUES"
default:
return "INSERT INTO %s (stream_id, stream_name, stream_version, event_id, event_name, event_data, occurred_at) VALUES"
}
}
func (s EventStore) table(query string) string {
return fmt.Sprintf(query, s.tableName)
}