-
Notifications
You must be signed in to change notification settings - Fork 7
/
aggregate_root_repository.go
108 lines (83 loc) · 3.31 KB
/
aggregate_root_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package es
import (
"context"
"errors"
"github.com/stackus/edat/core"
"github.com/stackus/edat/log"
)
// AggregateRepository interface
type AggregateRepository interface {
Load(ctx context.Context, aggregateID string) (*AggregateRoot, error)
Save(ctx context.Context, command core.Command, options ...AggregateRootOption) (*AggregateRoot, error)
Update(ctx context.Context, aggregateID string, command core.Command, options ...AggregateRootOption) (*AggregateRoot, error)
}
// AggregateRootRepository uses stores to load and save the changes to aggregates as events
type AggregateRootRepository struct {
constructor func() Aggregate
store AggregateRootStore
logger log.Logger
}
// AggregateRootStoreMiddleware interface for embedding stores
type AggregateRootStoreMiddleware func(store AggregateRootStore) AggregateRootStore
// ErrAggregateNotFound is returned when no root was found for a given aggregate id
var ErrAggregateNotFound = errors.New("aggregate not found")
// ErrAggregateVersionMismatch should be returned by stores when new events cannot be appended due to version conflicts
var ErrAggregateVersionMismatch = errors.New("aggregate version mismatch")
// NewAggregateRootRepository constructs a new AggregateRootRepository
func NewAggregateRootRepository(constructor func() Aggregate, store AggregateRootStore) *AggregateRootRepository {
r := &AggregateRootRepository{
constructor: constructor,
store: store,
logger: log.DefaultLogger,
}
r.logger.Trace("es.AggregateRootRepository constructed")
return r
}
// Load finds aggregates in the provided store
func (r *AggregateRootRepository) Load(ctx context.Context, aggregateID string) (*AggregateRoot, error) {
root := r.root(WithAggregateRootID(aggregateID))
err := r.store.Load(ctx, root)
if err != nil {
return nil, err
}
if root.version == aggregateNeverCommitted {
return nil, ErrAggregateNotFound
}
return root, r.store.Load(ctx, root)
}
// Save applies the given command to a new aggregate and persists it into the store
func (r *AggregateRootRepository) Save(ctx context.Context, command core.Command, options ...AggregateRootOption) (*AggregateRoot, error) {
root := r.root(options...)
return root, r.save(ctx, command, root)
}
// Update locates an existing aggregate, applies the commands and persists the result into the store
func (r *AggregateRootRepository) Update(ctx context.Context, aggregateID string, command core.Command, options ...AggregateRootOption) (*AggregateRoot, error) {
root := r.root(append(options, WithAggregateRootID(aggregateID))...)
err := r.store.Load(ctx, root)
if err != nil {
return nil, err
}
// require aggregates to exist to be "Updated"
if root.version == aggregateNeverCommitted {
return nil, ErrAggregateNotFound
}
return root, r.save(ctx, command, root)
}
func (r *AggregateRootRepository) root(options ...AggregateRootOption) *AggregateRoot {
return NewAggregateRoot(r.constructor(), options...)
}
func (r *AggregateRootRepository) save(ctx context.Context, command core.Command, root *AggregateRoot) error {
err := root.ProcessCommand(command)
if err != nil {
return err
}
if root.PendingVersion() == root.Version() {
return nil
}
err = r.store.Save(ctx, root)
if err != nil {
r.logger.Error("error saving aggregate root", log.Error(err))
return err
}
return nil
}