-
Notifications
You must be signed in to change notification settings - Fork 44
/
tier2.go
727 lines (616 loc) · 22.4 KB
/
tier2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
package service
import (
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"connectrpc.com/connect"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dauth"
"github.com/streamingfast/dgrpc"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/dstore"
"github.com/streamingfast/logging"
tracing "github.com/streamingfast/sf-tracing"
"github.com/streamingfast/substreams"
"github.com/streamingfast/substreams/block"
"github.com/streamingfast/substreams/metrics"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/pipeline"
"github.com/streamingfast/substreams/pipeline/cache"
"github.com/streamingfast/substreams/pipeline/exec"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/substreams/storage/execout"
"github.com/streamingfast/substreams/storage/index"
"github.com/streamingfast/substreams/storage/store"
"github.com/streamingfast/substreams/wasm"
"go.opentelemetry.io/otel/attribute"
ttrace "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
type ModuleExecutionConfig struct {
name string
moduleHash string
objStore dstore.Store
skipExecution bool
cachedOutputs map[string][]byte // ??
blockFilter *BlockFilter
modKind pbsubstreams.ModuleKind
moduleInitialBlock uint64
logger *zap.Logger
}
type BlockFilter struct {
preexistingExecOuts map[uint64]struct{}
}
type Tier2Service struct {
wasmExtensions func(map[string]string) (map[string]map[string]wasm.WASMExtension, error) //todo: rename
tracer ttrace.Tracer
logger *zap.Logger
streamFactoryFuncOverride StreamFactoryFunc
setReadyFunc func(bool)
currentConcurrentRequests int64
maxConcurrentRequests uint64
moduleExecutionTracing bool
connectionCountMutex sync.RWMutex
tier2RequestParameters *reqctx.Tier2RequestParameters
}
const protoPkfPrefix = "type.googleapis.com/"
func NewTier2(
logger *zap.Logger,
opts ...Option,
) (*Tier2Service, error) {
s := &Tier2Service{
tracer: tracing.GetTracer(),
logger: logger,
}
metrics.RegisterMetricSet(logger)
for _, opt := range opts {
opt(s)
}
return s, nil
}
func (s *Tier2Service) isOverloaded() bool {
s.connectionCountMutex.RLock()
defer s.connectionCountMutex.RUnlock()
isOverloaded := s.maxConcurrentRequests > 0 && uint64(s.currentConcurrentRequests) >= s.maxConcurrentRequests
return isOverloaded
}
func (s *Tier2Service) incrementConcurrentRequests() {
s.connectionCountMutex.Lock()
defer s.connectionCountMutex.Unlock()
s.currentConcurrentRequests++
s.setOverloaded()
}
func (s *Tier2Service) decrementConcurrentRequests() {
s.connectionCountMutex.Lock()
defer s.connectionCountMutex.Unlock()
s.currentConcurrentRequests--
s.setOverloaded()
}
func (s *Tier2Service) setOverloaded() {
overloaded := s.maxConcurrentRequests != 0 && uint64(s.currentConcurrentRequests) >= s.maxConcurrentRequests
s.setReadyFunc(!overloaded)
}
func (s *Tier2Service) ProcessRange(request *pbssinternal.ProcessRangeRequest, streamSrv pbssinternal.Substreams_ProcessRangeServer) error {
metrics.Tier2ActiveRequests.Inc()
metrics.Tier2RequestCounter.Inc()
defer metrics.Tier2ActiveRequests.Dec()
// We keep `err` here as the unaltered error from `blocks` call, this is used in the EndSpan to record the full error
// and not only the `grpcError` one which is a subset view of the full `err`.
var err error
ctx := streamSrv.Context()
if s.isOverloaded() {
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("service currently overloaded"))
}
s.incrementConcurrentRequests()
defer func() {
s.decrementConcurrentRequests()
}()
stage := request.OutputModule
logger := reqctx.Logger(ctx).Named("tier2").With(zap.String("stage", stage), zap.Uint64("segment_number", request.SegmentNumber))
ctx = logging.WithLogger(ctx, logger)
ctx = dmetering.WithBytesMeter(ctx)
ctx = dmetering.WithCounter(ctx, "wasm_input_bytes")
ctx = reqctx.WithTracer(ctx, s.tracer)
ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/request")
defer span.EndWithErr(&err)
span.SetAttributes(attribute.Int64("substreams.tier", 2))
hostname := updateStreamHeadersHostname(streamSrv.SetHeader, logger)
span.SetAttributes(attribute.String("hostname", hostname))
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}
moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
moduleNames[i] = request.Modules.Modules[i].Name
}
fields := []zap.Field{
zap.Uint64("segment_size", request.SegmentSize),
zap.Uint32("stage", request.Stage),
zap.Strings("modules", moduleNames),
zap.String("output_module", request.OutputModule),
}
if auth := dauth.FromContext(ctx); auth != nil {
fields = append(fields,
zap.String("user_id", auth.UserID()),
zap.String("key_id", auth.APIKeyID()),
zap.String("ip_address", auth.RealIP()),
)
if cacheTag := auth.Get("X-Sf-Substreams-Cache-Tag"); cacheTag != "" {
fields = append(fields,
zap.String("cache_tag", cacheTag),
)
}
}
logger.Info("incoming substreams ProcessRange request", fields...)
if err := ValidateTier2Request(request); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}
emitter, err := dmetering.New(request.MeteringConfig, logger)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("unable to initialize dmetering: %w", err))
}
defer func() {
emitter.Shutdown(nil)
}()
ctx = reqctx.WithEmitter(ctx, dmetering.GetDefaultEmitter())
respFunc := tier2ResponseHandler(ctx, logger, streamSrv)
err = s.processRange(ctx, request, respFunc)
grpcError := toGRPCError(ctx, err)
switch status.Code(grpcError) {
case codes.Unknown, codes.Internal, codes.Unavailable:
logger.Info("unexpected termination of stream of blocks", zap.Error(err))
}
return grpcError
}
func (s *Tier2Service) getWASMRegistry(wasmExtensionConfigs map[string]string) (*wasm.Registry, error) {
var exts map[string]map[string]wasm.WASMExtension
if s.wasmExtensions != nil {
x, err := s.wasmExtensions(wasmExtensionConfigs) // sets eth_call extensions to wasm machine, ex., for ethereum
if err != nil {
return nil, fmt.Errorf("loading wasm extensions: %w", err)
}
exts = x
}
return wasm.NewRegistry(exts), nil
}
func (s *Tier2Service) processRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error {
logger := reqctx.Logger(ctx)
mergedBlocksStore, cacheStore, unmeteredCacheStore, err := s.getStores(ctx, request)
if err != nil {
return err
}
execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, true, request.Modules)
if err != nil {
return stream.NewErrInvalidArg(err.Error())
}
requestDetails := pipeline.BuildRequestDetailsFromSubrequest(request)
ctx = reqctx.WithRequest(ctx, requestDetails)
if s.moduleExecutionTracing {
ctx = reqctx.WithModuleExecutionTracing(ctx)
}
var requestStats *metrics.Stats
ctx, requestStats = setupRequestStats(ctx, requestDetails, execGraph.ModuleHashes().Get(requestDetails.OutputModule), true)
defer requestStats.LogAndClose()
wasmRegistry, err := s.getWASMRegistry(request.WasmExtensionConfigs)
if err != nil {
return err
}
startBlock := request.StartBlock()
stopBlock := request.StopBlock()
execOutputConfigs, err := execout.NewConfigs(
cacheStore,
execGraph.UsedModulesUpToStage(int(request.Stage)),
execGraph.ModuleHashes(),
request.SegmentSize,
logger)
if err != nil {
return fmt.Errorf("new config map: %w", err)
}
storeConfigs, err := store.NewConfigMap(cacheStore, execGraph.Stores(), execGraph.ModuleHashes())
if err != nil {
return fmt.Errorf("configuring stores: %w", err)
}
// indexes are not metered: we want users to use them as much as possible
indexConfigs, err := index.NewConfigs(unmeteredCacheStore, execGraph.UsedIndexesModulesUpToStage(int(request.Stage)), execGraph.ModuleHashes(), logger)
if err != nil {
return fmt.Errorf("configuring indexes: %w", err)
}
executionPlan, err := GetExecutionPlan(ctx, logger, execGraph, request.Stage, startBlock, stopBlock, request.OutputModule, execOutputConfigs, indexConfigs, storeConfigs)
if err != nil {
return fmt.Errorf("creating execution plan: %w", err)
}
if executionPlan == nil || len(executionPlan.RequiredModules) == 0 {
logger.Info("no modules required to run, skipping")
return nil
}
stores := pipeline.NewStores(ctx, storeConfigs, request.SegmentSize, requestDetails.ResolvedStartBlockNum, stopBlock, true, executionPlan.StoresToWrite)
// this engine will keep the ExistingExecOuts to optimize the execution (for inputs from modules that skip execution)
execOutputCacheEngine, err := cache.NewEngine(ctx, executionPlan.ExecoutWriters, request.BlockType, executionPlan.ExistingExecOuts, executionPlan.IndexWriters)
if err != nil {
return fmt.Errorf("error building caching engine: %w", err)
}
//opts := s.buildPipelineOptions(ctx, request)
var opts []pipeline.Option
opts = append(opts, pipeline.WithFinalBlocksOnly())
opts = append(opts, pipeline.WithHighestStage(request.Stage))
pipe := pipeline.New(
ctx,
execGraph,
stores,
executionPlan.ExistingIndices,
execOutputConfigs,
wasmRegistry,
execOutputCacheEngine,
request.SegmentSize,
nil,
respFunc,
// This must always be the parent/global trace id, the one that comes from tier1
opts...,
)
logger.Debug("initializing tier2 pipeline",
zap.Uint64("request_start_block", requestDetails.ResolvedStartBlockNum),
zap.String("output_module", request.OutputModule),
zap.Uint32("stage", request.Stage),
)
if err := pipe.Init(ctx); err != nil {
return fmt.Errorf("error during pipeline init: %w", err)
}
if err := pipe.InitTier2Stores(ctx); err != nil {
return fmt.Errorf("error building pipeline: %w", err)
}
if err := pipe.BuildModuleExecutors(ctx); err != nil {
return fmt.Errorf("error building module executors: %w", err)
}
allExecutorsExcludedByBlockIndex := true
excludable:
for _, stage := range pipe.ModuleExecutors {
for _, executor := range stage {
if executionPlan.ExistingExecOuts[executor.Name()] != nil {
continue
}
if !executor.BlockIndex().ExcludesAllBlocks() {
allExecutorsExcludedByBlockIndex = false
break excludable
}
}
}
if allExecutorsExcludedByBlockIndex {
logger.Info("all executors are excluded by block index. Skipping execution of segment")
return pipe.OnStreamTerminated(ctx, io.EOF)
}
var streamErr error
if canSkipBlockSource(executionPlan.ExistingExecOuts, executionPlan.RequiredModules, request.BlockType) {
maxDistributorLength := int(stopBlock - requestDetails.ResolvedStartBlockNum)
clocksDistributor := make(map[uint64]*pbsubstreams.Clock)
for _, execOutput := range executionPlan.ExistingExecOuts {
execOutput.ExtractClocks(clocksDistributor)
if len(clocksDistributor) >= maxDistributorLength {
break
}
}
sortedClocksDistributor := sortClocksDistributor(clocksDistributor)
ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/mapper_stream")
for _, clock := range sortedClocksDistributor {
if clock.Number < startBlock || clock.Number >= stopBlock {
panic("reading from mapper, block was out of range") // we don't want to have this case undetected
}
cursor := irreversibleCursorFromClock(clock)
if err := pipe.ProcessFromExecOutput(ctx, clock, cursor); err != nil {
span.EndWithErr(&err)
return err
}
}
streamErr = io.EOF
span.EndWithErr(&streamErr)
return pipe.OnStreamTerminated(ctx, streamErr)
}
sf := &StreamFactory{
mergedBlocksStore: mergedBlocksStore,
}
streamFactoryFunc := sf.New
if s.streamFactoryFuncOverride != nil { //this is only for testing purposes.
streamFactoryFunc = s.streamFactoryFuncOverride
}
blockStream, err := streamFactoryFunc(
ctx,
pipe,
int64(requestDetails.ResolvedStartBlockNum),
stopBlock,
"",
true,
false,
logger.Named("stream"),
)
if err != nil {
return fmt.Errorf("error getting stream: %w", err)
}
ctx, span := reqctx.WithSpan(ctx, "substreams/tier2/pipeline/blocks_stream")
streamErr = blockStream.Run(ctx)
span.EndWithErr(&streamErr)
return pipe.OnStreamTerminated(ctx, streamErr)
}
func (s *Tier2Service) getStores(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (mergedBlocksStore, cacheStore, unmeteredCacheStore dstore.Store, err error) {
mergedBlocksStore, err = dstore.NewDBinStore(request.MergedBlocksStore)
if err != nil {
return nil, nil, nil, fmt.Errorf("setting up block store from url %q: %w", request.MergedBlocksStore, err)
}
if cloned, ok := mergedBlocksStore.(dstore.Clonable); ok {
mergedBlocksStore, err = cloned.Clone(ctx)
if err != nil {
return nil, nil, nil, fmt.Errorf("cloning store: %w", err)
}
mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx))
}
stateStore, err := dstore.NewStore(request.StateStore, "zst", "zstd", false)
if err != nil {
return nil, nil, nil, fmt.Errorf("getting store: %w", err)
}
cacheTag := request.StateStoreDefaultTag
if auth := dauth.FromContext(ctx); auth != nil {
if ct := auth.Get("X-Sf-Substreams-Cache-Tag"); ct != "" {
if IsValidCacheTag(ct) {
cacheTag = ct
} else {
return nil, nil, nil, fmt.Errorf("invalid value for X-Sf-Substreams-Cache-Tag %s, should only contain letters, numbers, hyphens and undescores", ct)
}
}
}
unmeteredCacheStore, err = stateStore.SubStore(cacheTag)
if err != nil {
return nil, nil, nil, fmt.Errorf("internal error setting store: %w", err)
}
if clonableStore, ok := unmeteredCacheStore.(dstore.Clonable); ok {
cloned, err := clonableStore.Clone(ctx)
if err != nil {
return nil, nil, nil, fmt.Errorf("cloning store: %w", err)
}
cloned.SetMeter(dmetering.GetBytesMeter(ctx))
cacheStore = cloned
}
return
}
func canSkipBlockSource(existingExecOuts map[string]*execout.File, requiredModules map[string]*pbsubstreams.Module, blockType string) bool {
if len(existingExecOuts) == 0 {
return false
}
for name, module := range requiredModules {
if existingExecOuts[name] != nil {
continue
}
for _, input := range module.Inputs {
if src := input.GetSource(); src != nil && src.Type == blockType {
return false
}
}
}
return true
}
//func (s *Tier2Service) buildPipelineOptions(ctx context.Context, request *pbssinternal.ProcessRangeRequest) (opts []pipeline.Option) {
// requestDetails := reqctx.Details(ctx)
// for _, pipeOpts := range s.pipelineOptions {
// opts = append(opts, pipeOpts.PipelineOptions(ctx, request.StartBlockNum, request.StopBlockNum, requestDetails.UniqueIDString())...)
// }
// return
//}
func tier2ResponseHandler(ctx context.Context, logger *zap.Logger, streamSrv pbssinternal.Substreams_ProcessRangeServer) substreams.ResponseFunc {
meter := dmetering.GetBytesMeter(ctx)
auth := dauth.FromContext(ctx)
userID := auth.UserID()
apiKeyID := auth.APIKeyID()
userMeta := auth.Meta()
ip := auth.RealIP()
return func(respAny substreams.ResponseFromAnyTier) error {
resp := respAny.(*pbssinternal.ProcessRangeResponse)
if err := streamSrv.Send(resp); err != nil {
logger.Info("unable to send block probably due to client disconnecting", zap.Error(err))
return connect.NewError(connect.CodeUnavailable, err)
}
sendMetering(ctx, meter, userID, apiKeyID, ip, userMeta, "sf.substreams.internal.v2/ProcessRange", resp)
return nil
}
}
func updateStreamHeadersHostname(setHeader func(metadata.MD) error, logger *zap.Logger) string {
hostname, err := os.Hostname()
if err != nil {
logger.Warn("cannot find hostname, using 'unknown'", zap.Error(err))
hostname = "unknown host"
}
if os.Getenv("SUBSTREAMS_SEND_HOSTNAME") == "true" {
md := metadata.New(map[string]string{"host": hostname})
err = setHeader(md)
if err != nil {
logger.Warn("cannot send header metadata", zap.Error(err))
}
}
return hostname
}
// toGRPCError turns an `err` into a gRPC error if it's non-nil, in the `nil` case,
// `nil` is returned right away.
//
// If the `err` has in its chain of error either `context.Canceled`, `context.DeadlineExceeded`
// or `stream.ErrInvalidArg`, error is turned into a proper gRPC error respectively of code
// `Canceled`, `DeadlineExceeded` or `InvalidArgument`.
//
// If the `err` has its in chain any error constructed through `connect.NewError` (and its variants), then
// we return the first found error of such type directly, because it's already a gRPC error.
//
// Otherwise, the error is assumed to be an internal error and turned backed into a proper
// `connect.NewError(connect.CodeInternal, err)`.
func toGRPCError(ctx context.Context, err error) error {
if err == nil {
return nil
}
// already GRPC error
if grpcError := dgrpc.AsGRPCError(err); grpcError != nil {
return grpcError.Err()
}
// GRPC to connect error
connectError := &connect.Error{}
if errors.As(err, &connectError) {
switch connectError.Code() {
case connect.CodeCanceled:
return status.Error(codes.Canceled, err.Error())
case connect.CodeUnavailable:
return status.Error(codes.Canceled, err.Error())
case connect.CodeInvalidArgument:
return status.Error(codes.InvalidArgument, err.Error())
case connect.CodeUnknown:
return status.Error(codes.Unknown, err.Error())
}
}
if errors.Is(err, context.Canceled) {
if context.Cause(ctx) != nil {
err = context.Cause(ctx)
if err == errShuttingDown {
return status.Error(codes.Unavailable, err.Error())
}
}
return status.Error(codes.Canceled, err.Error())
}
if errors.Is(err, context.DeadlineExceeded) {
return status.Error(codes.DeadlineExceeded, err.Error())
}
if store.StoreAboveMaxSizeRegexp.MatchString(err.Error()) {
return status.Error(codes.InvalidArgument, err.Error())
}
if errors.Is(err, exec.ErrWasmDeterministicExec) {
return status.Error(codes.InvalidArgument, err.Error())
}
var errInvalidArg *stream.ErrInvalidArg
if errors.As(err, &errInvalidArg) {
return status.Error(codes.InvalidArgument, err.Error())
}
return status.Error(codes.Internal, err.Error())
}
type ExecutionPlan struct {
ExistingExecOuts map[string]*execout.File
ExecoutWriters map[string]*execout.Writer
ExistingIndices map[string]map[string]*roaring64.Bitmap
IndexWriters map[string]*index.Writer
RequiredModules map[string]*pbsubstreams.Module
StoresToWrite map[string]struct{}
}
func GetExecutionPlan(
ctx context.Context,
logger *zap.Logger,
execGraph *exec.Graph,
stage uint32,
startBlock uint64,
stopBlock uint64,
outputModule string,
execoutConfigs *execout.Configs,
indexConfigs *index.Configs,
storeConfigs store.ConfigMap,
) (*ExecutionPlan, error) {
storesToWrite := make(map[string]struct{})
existingExecOuts := make(map[string]*execout.File)
existingIndices := make(map[string]map[string]*roaring64.Bitmap)
requiredModules := make(map[string]*pbsubstreams.Module)
execoutWriters := make(map[string]*execout.Writer) // this affects stores and mappers, per-block data
indexWriters := make(map[string]*index.Writer) // write the full index file
// storeWriters := .... // write the snapshots
usedModules := make(map[string]*pbsubstreams.Module)
for _, module := range execGraph.UsedModulesUpToStage(int(stage)) {
usedModules[module.Name] = module
}
stageUsedModules := execGraph.StagedUsedModules()[stage]
runningLastStage := stageUsedModules.IsLastStage()
stageUsedModulesName := make(map[string]bool)
for _, layer := range stageUsedModules {
for _, mod := range layer {
stageUsedModulesName[mod.Name] = true
}
}
for _, mod := range usedModules {
if mod.InitialBlock >= stopBlock {
continue
}
name := mod.Name
c := execoutConfigs.ConfigMap[name]
moduleStartBlock := startBlock
if mod.InitialBlock > startBlock {
moduleStartBlock = mod.InitialBlock
}
switch mod.ModuleKind() {
case pbsubstreams.ModuleKindBlockIndex:
indexFile := indexConfigs.ConfigMap[name].NewFile(&block.Range{StartBlock: moduleStartBlock, ExclusiveEndBlock: stopBlock})
err := indexFile.Load(ctx)
if err != nil {
requiredModules[name] = usedModules[name]
indexWriters[name] = index.NewWriter(indexFile)
break
}
existingIndices[name] = indexFile.Indices
case pbsubstreams.ModuleKindMap:
file, readErr := c.ReadFile(ctx, &block.Range{StartBlock: moduleStartBlock, ExclusiveEndBlock: stopBlock})
if readErr != nil {
requiredModules[name] = usedModules[name]
break
}
existingExecOuts[name] = file
if runningLastStage && name == outputModule {
logger.Info("found existing exec output for output_module, skipping run", zap.String("output_module", name))
return nil, nil
}
case pbsubstreams.ModuleKindStore:
file, readErr := c.ReadFile(ctx, &block.Range{StartBlock: moduleStartBlock, ExclusiveEndBlock: stopBlock})
if readErr != nil {
requiredModules[name] = usedModules[name]
} else {
existingExecOuts[name] = file
}
// if either full or partial kv exists, we can skip the module
// some stores may already exist completely on this stage, but others do not, so we keep going but ignore those
storeExists, err := storeConfigs[name].ExistsFullKV(ctx, stopBlock)
if err != nil {
return nil, fmt.Errorf("checking fullkv file existence: %w", err)
}
if !storeExists {
partialStoreExists, err := storeConfigs[name].ExistsPartialKV(ctx, moduleStartBlock, stopBlock)
if err != nil {
return nil, fmt.Errorf("checking partial file existence: %w", err)
}
if !partialStoreExists {
storesToWrite[name] = struct{}{}
requiredModules[name] = usedModules[name]
}
}
}
}
for name, module := range requiredModules {
if _, exists := existingExecOuts[name]; exists {
continue // for stores that need to be run for the partials, but already have cached execution outputs
}
writerStartBlock := startBlock
if module.InitialBlock > startBlock {
writerStartBlock = module.InitialBlock
}
var isIndexWriter bool
if module.ModuleKind() == pbsubstreams.ModuleKindBlockIndex {
isIndexWriter = true
}
execoutWriters[name] = execout.NewWriter(
writerStartBlock,
stopBlock,
name,
execoutConfigs,
isIndexWriter,
)
}
return &ExecutionPlan{
ExistingExecOuts: existingExecOuts,
ExecoutWriters: execoutWriters,
ExistingIndices: existingIndices,
IndexWriters: indexWriters,
RequiredModules: requiredModules,
StoresToWrite: storesToWrite,
}, nil
}