/
event_store.go
116 lines (95 loc) · 2.12 KB
/
event_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package edatpgx
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v4"
"github.com/rezaAmiri123/edat/core"
"github.com/rezaAmiri123/edat/es"
"github.com/rezaAmiri123/edat/log"
)
type EventStore struct {
tableName string
client Client
logger edatlog.Logger
}
var _ es.AggregateRootStore = (*EventStore)(nil)
func NewEventStore(cliet Client, options ...EventStoreOption) *EventStore {
store := &EventStore{
tableName: DefaultEventTableName,
client: cliet,
logger: edatlog.DefaultLogger,
}
for _, option := range options {
option(store)
}
return store
}
func (s *EventStore) Load(ctx context.Context, root *es.AggregateRoot) error {
name := root.AggregateName()
id := root.AggregateID()
version := root.PendingVersion()
rows, err := s.client.Query(ctx, fmt.Sprintf(loadEventsSQL, s.tableName), name, id, version)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
return err
}
defer rows.Close()
for rows.Next() {
var event core.Event
var eventName string
var data []byte
err = rows.Scan(&eventName, &data)
if err != nil {
return nil
}
event, err = core.DeserializeEvent(eventName, data)
if err != nil {
return err
}
err = root.LoadEvent(event)
if err != nil {
return err
}
}
return nil
}
func (s *EventStore) Save(ctx context.Context, root *es.AggregateRoot) (err error) {
var tx pgx.Tx
name := root.AggregateName()
id := root.AggregateID()
version := root.Version()
tx, err = s.client.Begin(ctx)
if err != nil {
return
}
defer func() {
p := recover()
switch {
case p != nil:
_ = tx.Rollback(ctx)
panic(p)
case err != nil:
_ = tx.Rollback(ctx)
default:
err = tx.Commit(ctx)
}
}()
correlationID := core.GetCorrelationID(ctx)
// TODO this is incorrect
causationID := core.GetRequestID(ctx)
for i, event := range root.Events() {
var data []byte
data, err = core.SerializeEvent(event)
if err != nil {
return err
}
_, err = tx.Exec(ctx, fmt.Sprintf(writeEventSQL, s.tableName), name, id, correlationID, causationID, version+i+1, event.EventName(), data)
if err!= nil{
return err
}
}
return
}