/
module_executor.go
91 lines (69 loc) · 2.79 KB
/
module_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package exec
import (
"context"
"fmt"
"github.com/streamingfast/substreams/storage/execout"
"google.golang.org/protobuf/proto"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"github.com/streamingfast/substreams/reqctx"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
)
func RunModule(ctx context.Context, executor ModuleExecutor, execOutput execout.ExecutionOutputGetter) (*pbssinternal.ModuleOutput, []byte, error) {
logger := reqctx.Logger(ctx)
modName := executor.Name()
var err error
ctx, span := reqctx.WithModuleExecutionSpan(ctx, "module_execution")
defer span.EndWithErr(&err)
logger = logger.With(zap.String("module_name", modName))
span.SetAttributes(attribute.String("substreams.module.name", modName))
logger.Debug("running module")
cached, outputBytes, err := getCachedOutput(execOutput, executor)
if err != nil {
return nil, nil, fmt.Errorf("check cache output exists: %w", err)
}
span.SetAttributes(attribute.Bool("substreams.module.cached", cached))
if cached {
if err = executor.applyCachedOutput(outputBytes); err != nil {
return nil, nil, fmt.Errorf("apply cached output: %w", err)
}
moduleOutput, err := executor.toModuleOutput(outputBytes)
if err != nil {
return moduleOutput, outputBytes, fmt.Errorf("converting output to module output: %w", err)
}
if moduleOutput == nil {
return nil, nil, nil // output from PartialKV is always nil, we cannot use it
}
// For store modules, the output in cache is in "operations", but we get the proper store deltas in "toModuleOutput", so we need to transform back those deltas into outputBytes
if storeDeltas := moduleOutput.GetStoreDeltas(); storeDeltas != nil {
outputBytes, err = proto.Marshal(moduleOutput.GetStoreDeltas())
if err != nil {
return nil, nil, err
}
}
fillModuleOutputMetadata(executor, moduleOutput)
moduleOutput.Cached = true
return moduleOutput, outputBytes, nil
}
uid := reqctx.ReqStats(ctx).RecordModuleWasmBlockBegin(modName)
outputBytes, moduleOutput, err := executor.run(ctx, execOutput)
if err != nil {
return nil, nil, fmt.Errorf("execute: %w", err)
}
reqctx.ReqStats(ctx).RecordModuleWasmBlockEnd(modName, uid)
fillModuleOutputMetadata(executor, moduleOutput)
return moduleOutput, outputBytes, nil
}
func getCachedOutput(execOutput execout.ExecutionOutputGetter, executor ModuleExecutor) (bool, []byte, error) {
output, cached, err := execOutput.Get(executor.Name())
if err != nil && err != execout.NotFound {
return false, nil, fmt.Errorf("get cached output: %w", err)
}
return cached, output, nil
}
func fillModuleOutputMetadata(executor ModuleExecutor, in *pbssinternal.ModuleOutput) {
logs, truncated := executor.lastExecutionLogs()
in.ModuleName = executor.Name()
in.Logs = logs
in.DebugLogsTruncated = truncated
}