-
Notifications
You must be signed in to change notification settings - Fork 142
/
snapshot_manager_runner.go
150 lines (133 loc) · 4.56 KB
/
snapshot_manager_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package sm_snapshots
import (
"context"
"sync"
"github.com/samber/lo"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/wasp/packages/shutdown"
"github.com/iotaledger/wasp/packages/state"
"github.com/iotaledger/wasp/packages/util/pipe"
)
// To avoid code duplication, a common parts of regular and mocked snapshot managers
// are extracted to `snapshotManagerRunner`.
type snapshotManagerRunner struct {
log *logger.Logger
ctx context.Context
shutdownCoordinator *shutdown.Coordinator
blockCommittedPipe pipe.Pipe[SnapshotInfo]
lastIndexSnapshotted uint32
lastIndexSnapshottedMutex sync.Mutex
loadedSnapshotStateIndex uint32
createPeriod uint32
delayPeriod uint32
queue []SnapshotInfo
core snapshotManagerCore
}
func newSnapshotManagerRunner(
ctx context.Context,
store state.Store,
shutdownCoordinator *shutdown.Coordinator,
createPeriod uint32,
delayPeriod uint32,
core snapshotManagerCore,
log *logger.Logger,
) *snapshotManagerRunner {
result := &snapshotManagerRunner{
log: log,
ctx: ctx,
shutdownCoordinator: shutdownCoordinator,
blockCommittedPipe: pipe.NewInfinitePipe[SnapshotInfo](),
lastIndexSnapshotted: 0,
lastIndexSnapshottedMutex: sync.Mutex{},
loadedSnapshotStateIndex: 0,
createPeriod: createPeriod,
delayPeriod: delayPeriod,
queue: make([]SnapshotInfo, 0),
core: core,
}
if store.IsEmpty() {
loadedSnapshotInfo := result.core.loadSnapshot()
if loadedSnapshotInfo != nil {
result.loadedSnapshotStateIndex = loadedSnapshotInfo.StateIndex()
}
}
go result.run()
return result
}
// -------------------------------------
// Implementations of SnapshotManager interface
// -------------------------------------
func (smrT *snapshotManagerRunner) GetLoadedSnapshotStateIndex() uint32 {
return smrT.loadedSnapshotStateIndex
}
func (smrT *snapshotManagerRunner) BlockCommittedAsync(snapshotInfo SnapshotInfo) {
if smrT.createSnapshotsNeeded() {
smrT.blockCommittedPipe.In() <- snapshotInfo
}
}
// -------------------------------------
// Api for snapshotManagerCore implementations
// -------------------------------------
func (smrT *snapshotManagerRunner) snapshotCreated(snapshotInfo SnapshotInfo) {
stateIndex := snapshotInfo.StateIndex()
smrT.lastIndexSnapshottedMutex.Lock()
defer smrT.lastIndexSnapshottedMutex.Unlock()
if stateIndex > smrT.lastIndexSnapshotted {
smrT.lastIndexSnapshotted = stateIndex
smrT.queue = lo.Filter(smrT.queue, func(si SnapshotInfo, index int) bool { return si.StateIndex() > smrT.lastIndexSnapshotted })
}
}
// -------------------------------------
// Internal functions
// -------------------------------------
func (smrT *snapshotManagerRunner) run() {
defer smrT.blockCommittedPipe.Close()
blockCommittedPipeCh := smrT.blockCommittedPipe.Out()
for {
if smrT.ctx.Err() != nil {
if smrT.shutdownCoordinator == nil {
return
}
if smrT.shutdownCoordinator.CheckNestedDone() {
smrT.log.Debugf("Stopping snapshot manager, because context was closed")
smrT.shutdownCoordinator.Done()
return
}
}
select {
case snapshotInfo, ok := <-blockCommittedPipeCh:
if ok {
smrT.handleBlockCommitted(snapshotInfo)
} else {
blockCommittedPipeCh = nil
}
case <-smrT.ctx.Done():
continue
}
}
}
func (smrT *snapshotManagerRunner) createSnapshotsNeeded() bool {
return smrT.createPeriod > 0
}
func (smrT *snapshotManagerRunner) handleBlockCommitted(snapshotInfo SnapshotInfo) {
sisToCreate := func() []SnapshotInfo { // Function to unlock the mutex quicker
stateIndex := snapshotInfo.StateIndex()
var lastIndexSnapshotted uint32
smrT.lastIndexSnapshottedMutex.Lock()
defer smrT.lastIndexSnapshottedMutex.Unlock()
lastIndexSnapshotted = smrT.lastIndexSnapshotted
if (stateIndex > lastIndexSnapshotted) && (stateIndex%smrT.createPeriod == 0) { // TODO: what if snapshotted state has been reverted?
smrT.queue = append(smrT.queue, snapshotInfo)
}
stateIndexToCommit := stateIndex - smrT.delayPeriod
if (stateIndexToCommit > lastIndexSnapshotted) && (stateIndexToCommit%smrT.createPeriod == 0) {
return lo.Filter(smrT.queue, func(si SnapshotInfo, index int) bool { return si.StateIndex() == stateIndexToCommit })
}
return []SnapshotInfo{}
}()
for i, siToCreate := range sisToCreate {
if !(lo.ContainsBy(sisToCreate[:i], func(si SnapshotInfo) bool { return si.Equals(siToCreate) })) {
smrT.core.createSnapshot(siToCreate)
}
}
}