-
Notifications
You must be signed in to change notification settings - Fork 44
/
stores.go
129 lines (112 loc) · 4.35 KB
/
stores.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package pipeline
import (
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"github.com/streamingfast/substreams/block"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"github.com/streamingfast/substreams/pipeline/exec"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/store"
)
type Stores struct {
logger *zap.Logger
isTier2Request bool // means we're processing a tier2 request
bounder *storeBoundary
configs store.ConfigMap
StoreMap store.Map
// DEPRECATED: we don't need to report back, these file names are now implicitly conveyed from
// tier1 to tier2.
partialsWritten block.Ranges // when backprocessing, to report back to orchestrator
tier string
storesToWrite map[string]struct{}
}
func NewStores(ctx context.Context, storeConfigs store.ConfigMap, storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum uint64, isTier2Request bool, storesToWrite map[string]struct{}) *Stores {
// FIXME(abourget): a StoreBoundary should exist for EACH Store
// because the module's Initial Block could change the range of each
// store.
tier := "tier1"
if isTier2Request {
tier = "tier2"
}
bounder := NewStoreBoundary(storeSnapshotSaveInterval, requestStartBlockNum, stopBlockNum)
return &Stores{
configs: storeConfigs,
isTier2Request: isTier2Request,
bounder: bounder,
tier: tier,
logger: reqctx.Logger(ctx),
storesToWrite: storesToWrite,
}
}
func (s *Stores) SetStoreMap(storeMap store.Map) {
s.StoreMap = storeMap
}
func (s *Stores) resetStores() {
for _, s := range s.StoreMap.All() {
if resetableStore, ok := s.(store.Resettable); ok {
resetableStore.Reset()
}
}
}
// flushStores is called only for Tier2 request, as to not save reversible stores.
func (s *Stores) flushStores(ctx context.Context, executionStages exec.ExecutionStages, blockNum uint64) (err error) {
if s.StoreMap == nil {
return // fast exit for cases without stores or no linear processing
}
boundaryIntervals := s.bounder.GetStoreFlushRanges(s.isTier2Request, s.bounder.requestStopBlock, blockNum)
for _, boundaryBlock := range boundaryIntervals {
if err := s.saveStoresSnapshots(ctx, len(executionStages)-1, boundaryBlock); err != nil {
return fmt.Errorf("saving stores snapshot at bound %d: %w", boundaryBlock, err)
}
}
return nil
}
func (s *Stores) saveStoresSnapshots(ctx context.Context, stage int, boundaryBlock uint64) (err error) {
for mod := range s.storesToWrite {
store := s.StoreMap[mod]
s.logger.Info("flushing store at boundary", zap.Uint64("boundary", boundaryBlock), zap.String("store", mod), zap.Int("stage", stage))
// TODO when partials are generic again, we can also check if PartialKV exists and skip if it does.
existsFullKv, _ := s.configs[mod].ExistsFullKV(ctx, boundaryBlock)
if existsFullKv {
continue
}
if err := s.saveStoreSnapshot(ctx, store, boundaryBlock); err != nil {
return fmt.Errorf("save store snapshot %q: %w", mod, err)
}
}
return nil
}
func (s *Stores) storesHandleUndo(moduleOutput *pbssinternal.ModuleOutput) {
if s, found := s.StoreMap.Get(moduleOutput.ModuleName); found {
if deltaStore, ok := s.(store.DeltaAccessor); ok {
deltaStore.ApplyDeltasReverse(moduleOutput.GetStoreDeltas().StoreDeltas)
}
}
}
func (s *Stores) saveStoreSnapshot(ctx context.Context, saveStore store.Store, boundaryBlock uint64) (err error) {
ctx, span := reqctx.WithSpan(ctx, fmt.Sprintf("substreams/%s/stores/save_store_snapshot", s.tier))
span.SetAttributes(attribute.String("subtreams.store", saveStore.Name()))
defer span.EndWithErr(&err)
file, writer, err := saveStore.Save(boundaryBlock)
if err != nil {
return fmt.Errorf("saving store %q at boundary %d: %w", saveStore.Name(), boundaryBlock, err)
}
if err = writer.Write(ctx); err != nil {
return fmt.Errorf("failed to write store: %w", err)
}
if reqctx.Details(ctx).ShouldReturnWrittenPartials(saveStore.Name()) {
//s.partialsWritten = append(s.partialsWritten, file.Range)
s.logger.Debug("adding partials written",
zap.Stringer("range", file.Range),
zap.Stringer("ranges", s.partialsWritten),
zap.Uint64("boundary_block", boundaryBlock),
)
if v, ok := saveStore.(store.PartialStore); ok {
reqctx.Span(ctx).AddEvent("store_roll_trigger")
v.Roll(boundaryBlock)
}
}
return nil
}