forked from looplab/eventhorizon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventstore.go
211 lines (177 loc) · 5.57 KB
/
eventstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright (c) 2014 - Max Persson <max@looplab.se>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eventhorizon
import (
"errors"
"time"
)
// ErrNoEventsToAppend returned when no events are available to append.
var ErrNoEventsToAppend = errors.New("no events to append")
// ErrNoEventStoreDefined returned if no event store has been defined.
var ErrNoEventStoreDefined = errors.New("no event store defined")
// EventStore is an interface for an event sourcing event store.
type EventStore interface {
// Save appends all events in the event stream to the store.
Save([]Event) error
// Load loads all events for the aggregate id from the store.
Load(UUID) ([]Event, error)
}
// EventRecordStore is an interface for an event store which can return the records for the events.
type EventRecordStore interface {
// Returns the event records for the aggregate id from the store.
// The record contains information about version and timestamp for the event.
Records(UUID) ([]EventRecord, error)
}
// AggregateRecord is a stored record of an aggregate in form of its events.
type AggregateRecord interface {
AggregateID() UUID
Version() int
EventRecords() []EventRecord
}
// EventRecord is a single event record with timestamp
type EventRecord interface {
Type() string
Version() int
Events() []Event
Timestamp() time.Time
}
// MemoryEventStore implements EventStore as an in memory structure.
// This event store fulfills both EventStore and EventRecordStore
type MemoryEventStore struct {
eventBus EventBus
aggregateRecords map[UUID]*memoryAggregateRecord
}
// NewMemoryEventStore creates a new MemoryEventStore.
func NewMemoryEventStore(eventBus EventBus) *MemoryEventStore {
s := &MemoryEventStore{
eventBus: eventBus,
aggregateRecords: make(map[UUID]*memoryAggregateRecord),
}
return s
}
// Save appends all events in the event stream to the memory store.
func (s *MemoryEventStore) Save(events []Event) error {
if len(events) == 0 {
return ErrNoEventsToAppend
}
for _, event := range events {
r := &memoryEventRecord{
eventType: event.EventType(),
timestamp: time.Now(),
event: event,
}
if a, ok := s.aggregateRecords[event.AggregateID()]; ok {
a.version++
r.version = a.version
a.events = append(a.events, r)
} else {
s.aggregateRecords[event.AggregateID()] = &memoryAggregateRecord{
aggregateID: event.AggregateID(),
version: 0,
events: []*memoryEventRecord{r},
}
}
// Publish event on the bus.
if s.eventBus != nil {
s.eventBus.PublishEvent(event)
}
}
return nil
}
// Load loads all events for the aggregate id from the memory store.
// Returns nil if no events can be found.
func (s *MemoryEventStore) Load(id UUID) ([]Event, error) {
if a, ok := s.aggregateRecords[id]; ok {
events := make([]Event, len(a.events))
for i, r := range a.events {
events[i] = r.event
}
return events, nil
}
return nil, nil
}
// Records returns the complete event records, containing timestamp and version.
func (s *MemoryEventStore) Records(id UUID) ([]EventRecord, error) {
if a, ok := s.aggregateRecords[id]; ok {
events := make([]EventRecord, len(a.events))
for i, r := range a.events {
events[i] = r
}
return events, nil
}
return nil, nil
}
type memoryAggregateRecord struct {
aggregateID UUID
version int
events []*memoryEventRecord
}
type memoryEventRecord struct {
eventType string
version int
timestamp time.Time
event Event
}
func (m *memoryEventRecord) Type() string { return m.eventType }
func (m *memoryEventRecord) Version() int { return m.version }
func (m *memoryEventRecord) Timestamp() time.Time { return m.timestamp }
func (m *memoryEventRecord) Events() []Event { return []Event{m.event} }
// TraceEventStore wraps an EventStore and adds debug tracing.
type TraceEventStore struct {
eventStore EventStore
tracing bool
trace []Event
}
// NewTraceEventStore creates a new TraceEventStore.
func NewTraceEventStore(eventStore EventStore) *TraceEventStore {
s := &TraceEventStore{
eventStore: eventStore,
trace: make([]Event, 0),
}
return s
}
// Save appends all events to the base store and trace them if enabled.
func (s *TraceEventStore) Save(events []Event) error {
if s.tracing {
s.trace = append(s.trace, events...)
}
if s.eventStore != nil {
return s.eventStore.Save(events)
}
return nil
}
// Load loads all events for the aggregate id from the base store.
// Returns ErrNoEventStoreDefined if no event store could be found.
func (s *TraceEventStore) Load(id UUID) ([]Event, error) {
if s.eventStore != nil {
return s.eventStore.Load(id)
}
return nil, ErrNoEventStoreDefined
}
// StartTracing starts the tracing of events.
func (s *TraceEventStore) StartTracing() {
s.tracing = true
}
// StopTracing stops the tracing of events.
func (s *TraceEventStore) StopTracing() {
s.tracing = false
}
// GetTrace returns the events that happened during the tracing.
func (s *TraceEventStore) GetTrace() []Event {
return s.trace
}
// ResetTrace resets the trace.
func (s *TraceEventStore) ResetTrace() {
s.trace = make([]Event, 0)
}