-
Notifications
You must be signed in to change notification settings - Fork 44
/
snapshot.go
60 lines (48 loc) · 1.26 KB
/
snapshot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package pipeline
import (
"fmt"
"github.com/streamingfast/substreams/storage/store"
"github.com/streamingfast/substreams"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
)
func (p *Pipeline) sendSnapshots(storeMap store.Map, snapshotModules []string) error {
if len(snapshotModules) == 0 {
return nil
}
for _, modName := range snapshotModules {
store, found := storeMap.Get(modName)
if !found {
return fmt.Errorf("store %q not found", modName)
}
send := func(count uint64, total uint64, deltas []*pbsubstreamsrpc.StoreDelta) {
data := &pbsubstreamsrpc.InitialSnapshotData{
ModuleName: modName,
Deltas: deltas,
SentKeys: count,
TotalKeys: total,
}
p.respFunc(substreams.NewSnapshotData(data))
}
var count uint64
total := store.Length()
var accum []*pbsubstreamsrpc.StoreDelta
store.Iter(func(k string, v []byte) error {
count++
accum = append(accum, &pbsubstreamsrpc.StoreDelta{
Operation: pbsubstreamsrpc.StoreDelta_CREATE,
Key: k,
NewValue: v,
})
if count%100 == 0 {
send(count, total, accum)
accum = nil
}
return nil
})
if len(accum) != 0 {
send(count, total, accum)
}
}
p.respFunc(substreams.NewSnapshotComplete())
return nil
}