-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.go
123 lines (99 loc) · 4.58 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"context"
"fmt"
"os"
"github.com/streamingfast/cli"
"github.com/streamingfast/logging"
sink "github.com/streamingfast/substreams-sink"
pbchanges "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1"
"github.com/streamingfast/substreams/client"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
)
var expectedOutputModuleType = string(new(pbchanges.DatabaseChanges).ProtoReflect().Descriptor().FullName())
var zlog, tracer = logging.RootLogger("project", "github.com/change_to_org/change_to_project")
func main() {
logging.InstantiateLoggers()
cli.Ensure(len(os.Args) == 3 || len(os.Args) == 4, "usage: %s <endpoint> <manifest> [<output_module>]", os.Args[0])
endpoint := os.Args[1]
manifestPath := os.Args[2]
// Find the output module in the manifest sink.moduleName configuration. If you have no
// such configuration, you can change the value below and set the module name explicitly.
outputModuleName := sink.InferOutputModuleFromPackage
if len(os.Args) == 4 {
outputModuleName = os.Args[3]
}
// We do not use params in this example, but if you have some, you would define it here.
// Each param is of the form `module=value` or `imported:module=value`.
params := ([]string)(nil)
pkg, outputModule, outputHashModule, blockRange, err := sink.ReadManifestAndModuleAndBlockRange(
manifestPath,
// Equivalent to the "network" parameter of the CLI. Specifies the network to use
// for params and initialBlocks, overriding the "network" field in the Substreams package.
"mainnet",
params,
outputModuleName,
// Should be the Protobuf full name of the map's module output, we use
// `substreams-database-changes` imported type. Adjust to your needs.
//
// If your Protobuf is defined in your Substreams manifest, you can use `substream protogen`
// while being in the same folder that contain `buf.gen.yaml` file in the example folder.
expectedOutputModuleType,
false,
// This is the block range, in our case defined as Substreams module's start block and up forever
":",
zlog,
)
cli.NoError(err, "unable to read manifest: %s", err)
sinker, err := sink.New(
sink.SubstreamsModeProduction,
pkg,
outputModule,
outputHashModule,
// Deals with endpoint, API token and plaintext/TLS configuration
client.NewSubstreamsClientConfig(endpoint, os.Getenv("SUBSTREAMS_API_TOKEN"), client.JWT, false, false),
zlog,
tracer,
sink.WithBlockRange(blockRange),
)
cli.NoError(err, "unable to create sinker: %s", err)
sinker.OnTerminating(func(err error) {
cli.NoError(err, "unexpected sinker error")
zlog.Info("sink is terminating")
})
// You **must** save the cursor somewhere, saving it to memory while
// make it last until the process is killed, in which on re-start, the
// sinker will resume from start block again. You can simply read from
// a file the string value of the cursor and use `sink.NewCursor(value)`
// to load it.
sinker.Run(context.Background(), sink.NewBlankCursor(), sink.NewSinkerHandlers(handleBlockScopedData, handleBlockUndoSignal))
}
func handleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *sink.Cursor) error {
_ = ctx
changes := &pbchanges.DatabaseChanges{}
if err := data.Output.MapOutput.UnmarshalTo(changes); err != nil {
return fmt.Errorf("unable to unmarshal database changes: %w", err)
}
fmt.Printf("Block #%d (%s) data received with %d changes\n", data.Clock.Number, data.Clock.Id, len(changes.TableChanges))
// Once you have processed the block, you **must** persist the cursor, persistence
// can take differnet form. For example, you can save it to a file, or to a database.
// You can simply use `os.WriteFile("cursor.txt", []byte(cursor.String()), 0644)` to
// save it to a file.
_ = cursor
// The isLive boolean is set to true when the sinker is running in the live portion
// of the chain. If there is **no** liveness checker configured on the sinker, the isLive
// value is going to be nil.
_ = isLive
return nil
}
func handleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) error {
_ = ctx
// The chain forked for one or more blocks, you **must** rewind your changes back to the
// last valid block, which is provided in the undo signal. You **must** also persist the
// last valid cursor also provided in the undo signal. You can simply use
// `os.WriteFile("cursor.txt", []byte(cursor.String()), 0644)` to
// save it to a file.
_ = cursor
fmt.Printf("Rewinding changes back to block %s\n", undoSignal.LastValidBlock)
return nil
}