-
Notifications
You must be signed in to change notification settings - Fork 43
/
indexexec.go
72 lines (57 loc) · 2 KB
/
indexexec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package exec
import (
"context"
"fmt"
"github.com/streamingfast/substreams/reqctx"
pbindex "github.com/streamingfast/substreams/pb/sf/substreams/index/v1"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
"github.com/streamingfast/substreams/storage/execout"
"github.com/streamingfast/substreams/wasm"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
type IndexModuleExecutor struct {
BaseExecutor
}
func NewIndexModuleExecutor(baseExecutor *BaseExecutor) *IndexModuleExecutor {
return &IndexModuleExecutor{BaseExecutor: *baseExecutor}
}
func (i *IndexModuleExecutor) Name() string { return i.moduleName }
func (i *IndexModuleExecutor) String() string { return i.Name() }
func (i *IndexModuleExecutor) applyCachedOutput([]byte) error {
return nil
}
func (i *IndexModuleExecutor) run(ctx context.Context, reader execout.ExecutionOutputGetter) (out []byte, outForFiles []byte, moduleOutputData *pbssinternal.ModuleOutput, err error) {
_, span := reqctx.WithModuleExecutionSpan(ctx, "exec_index")
defer span.EndWithErr(&err)
var call *wasm.Call
if call, err = i.wasmCall(reader); err != nil {
return nil, nil, nil, fmt.Errorf("maps wasm call: %w", err)
}
if call != nil {
out = call.Output()
}
modOut, err := i.toModuleOutput(out)
if err != nil {
return nil, nil, nil, fmt.Errorf("converting back to module output: %w", err)
}
return out, out, modOut, nil
}
func (i *IndexModuleExecutor) toModuleOutput(data []byte) (*pbssinternal.ModuleOutput, error) {
var indexKeys pbindex.Keys
err := proto.Unmarshal(data, &indexKeys)
if err != nil {
return nil, fmt.Errorf("unmarshalling index keys: %w", err)
}
return &pbssinternal.ModuleOutput{
Data: &pbssinternal.ModuleOutput_MapOutput{
MapOutput: &anypb.Any{TypeUrl: "type.googleapis.com/sf.substreams.index.v1.Keys", Value: data},
},
}, nil
}
func (i *IndexModuleExecutor) HasValidOutput() bool {
return true
}
func (i *IndexModuleExecutor) HasOutputForFiles() bool {
return true
}