forked from looplab/eventhorizon
/
eventmaintenance.go
133 lines (119 loc) · 3.72 KB
/
eventmaintenance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright (c) 2021 - The Event Horizon authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mongodb_v2
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
// Register uuid.UUID as BSON type.
_ "github.com/looplab/eventhorizon/codec/bson"
eh "github.com/looplab/eventhorizon"
)
// Replace implements the Replace method of the eventhorizon.EventStore interface.
func (s *EventStore) Replace(ctx context.Context, event eh.Event) error {
sess, err := s.client.StartSession(nil)
if err != nil {
return eh.EventStoreError{
Err: eh.ErrCouldNotSaveEvents,
BaseErr: err,
}
}
defer sess.EndSession(ctx)
if _, err := sess.WithTransaction(ctx, func(txCtx mongo.SessionContext) (interface{}, error) {
// First check if the aggregate exists, the not found error in the update
// query can mean both that the aggregate or the event is not found.
if n, err := s.events.CountDocuments(ctx,
bson.M{"aggregate_id": event.AggregateID()}); n == 0 {
return nil, eh.ErrAggregateNotFound
} else if err != nil {
return nil, err
}
// Create the event record for the Database.
e, err := newEvt(ctx, event)
if err != nil {
return nil, err
}
// Copy the event position from the old event (and set in metadata).
res := s.events.FindOne(ctx, bson.M{
"aggregate_id": event.AggregateID(),
"version": event.Version(),
})
if res.Err() != nil {
if res.Err() == mongo.ErrNoDocuments {
return nil, eh.ErrInvalidEvent
}
return nil, fmt.Errorf("could not find event to replace: %w", res.Err())
}
var eventToReplace evt
if err := res.Decode(&eventToReplace); err != nil {
return nil, fmt.Errorf("could not decode event to replace: %w", err)
}
e.Position = eventToReplace.Position
e.Metadata["position"] = eventToReplace.Position
// Find and replace the event.
if r, err := s.events.ReplaceOne(ctx, bson.M{
"aggregate_id": event.AggregateID(),
"version": event.Version(),
}, e); err != nil {
return nil, err
} else if r.MatchedCount == 0 {
return nil, eh.ErrInvalidEvent
}
return nil, nil
}); err != nil {
// Return some errors intact.
if err == eh.ErrAggregateNotFound || err == eh.ErrInvalidEvent {
return err
}
return eh.EventStoreError{
Err: eh.ErrCouldNotSaveEvents,
BaseErr: err,
}
}
return nil
}
// RenameEvent implements the RenameEvent method of the eventhorizon.EventStore interface.
func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) error {
// Find and rename all events.
// TODO: Maybe use change info.
if _, err := s.events.UpdateMany(ctx,
bson.M{
"event_type": from.String(),
},
bson.M{
"$set": bson.M{"event_type": to.String()},
},
); err != nil {
return eh.EventStoreError{
Err: eh.ErrCouldNotSaveEvents,
BaseErr: err,
}
}
return nil
}
// Clear clears the event storage.
func (s *EventStore) Clear(ctx context.Context) error {
if err := s.events.Drop(ctx); err != nil {
return eh.EventStoreError{
Err: fmt.Errorf("could not clear events collection: %w", err),
}
}
if err := s.streams.Drop(ctx); err != nil {
return eh.EventStoreError{
Err: fmt.Errorf("could not clear streams collection: %w", err),
}
}
return nil
}