-
Notifications
You must be signed in to change notification settings - Fork 45
/
forkhandler.go
63 lines (51 loc) · 1.65 KB
/
forkhandler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package pipeline
import (
"sync"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)
type UndoHandler func(clock *pbsubstreams.Clock, moduleOutputs []*pbssinternal.ModuleOutput)
// TODO(abourget): The scope of this object and the Engine
//
// are not pretty similar, to keep track of certain pieces
// of info that are reversible, and handle the back and forth
// between undos and redos.
// Perhaps what we could have here, is have those undo handlers
// live on the Pipeline (where it makes sense)
// and have some nested structs handle
type ForkHandler struct {
reversibleOutputs map[string][]*pbssinternal.ModuleOutput
undoHandlers []UndoHandler
mu sync.RWMutex
}
func NewForkHandler() *ForkHandler {
return &ForkHandler{
reversibleOutputs: make(map[string][]*pbssinternal.ModuleOutput),
undoHandlers: []UndoHandler{},
}
}
func (f *ForkHandler) registerUndoHandler(handler UndoHandler) {
f.undoHandlers = append(f.undoHandlers, handler)
}
func (f *ForkHandler) handleUndo(
clock *pbsubstreams.Clock,
) error {
f.mu.RLock()
defer f.mu.RUnlock()
if moduleOutputs, found := f.reversibleOutputs[clock.Id]; found {
for _, h := range f.undoHandlers {
h(clock, moduleOutputs)
}
}
return nil
}
func (f *ForkHandler) removeReversibleOutput(blockID string) {
f.mu.Lock()
delete(f.reversibleOutputs, blockID)
f.mu.Unlock()
}
func (f *ForkHandler) addReversibleOutput(moduleOutput *pbssinternal.ModuleOutput, blockID string) {
f.mu.Lock()
defer f.mu.Unlock()
f.reversibleOutputs[blockID] = append(f.reversibleOutputs[blockID], moduleOutput)
}