/
export.go
executable file
·146 lines (127 loc) · 3.21 KB
/
export.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package memiavl
import (
"context"
"errors"
"fmt"
"path/filepath"
)
// ErrorExportDone is returned by Exporter.Next() when all items have been exported.
var ErrorExportDone = errors.New("export is complete")
// exportBufferSize is the number of nodes to buffer in the exporter. It improves throughput by
// processing multiple nodes per context switch, but take care to avoid excessive memory usage,
// especially since callers may export several IAVL stores in parallel (e.g. the Cosmos SDK).
const exportBufferSize = 32
type MultiTreeExporter struct {
// only one of them is non-nil
db *DB
mtree *MultiTree
iTree int
exporter *Exporter
}
func NewMultiTreeExporter(dir string, version uint32, supportExportNonSnapshotVersion bool) (exporter *MultiTreeExporter, err error) {
var (
db *DB
mtree *MultiTree
)
if supportExportNonSnapshotVersion {
db, err = Load(dir, Options{
TargetVersion: version,
ZeroCopy: true,
ReadOnly: true,
SnapshotWriterLimit: DefaultSnapshotWriterLimit,
})
if err != nil {
return nil, fmt.Errorf("invalid height: %d, %w", version, err)
}
} else {
curVersion, err := currentVersion(dir)
if err != nil {
return nil, fmt.Errorf("failed to load current version: %w", err)
}
if int64(version) > curVersion {
return nil, fmt.Errorf("snapshot is not created yet: height: %d", version)
}
mtree, err = LoadMultiTree(filepath.Join(dir, snapshotName(int64(version))), true, 0)
if err != nil {
return nil, fmt.Errorf("snapshot don't exists: height: %d, %w", version, err)
}
}
return &MultiTreeExporter{
db: db,
mtree: mtree,
}, nil
}
func (mte *MultiTreeExporter) trees() []NamedTree {
if mte.db != nil {
return mte.db.trees
}
return mte.mtree.trees
}
func (mte *MultiTreeExporter) Next() (interface{}, error) {
if mte.exporter != nil {
node, err := mte.exporter.Next()
if err != nil {
if err == ErrorExportDone {
mte.exporter.Close()
mte.exporter = nil
return mte.Next()
}
return nil, err
}
return node, nil
}
trees := mte.trees()
if mte.iTree >= len(trees) {
return nil, ErrorExportDone
}
tree := trees[mte.iTree]
mte.exporter = tree.Export()
mte.iTree++
return tree.Name, nil
}
func (mte *MultiTreeExporter) Close() error {
if mte.exporter != nil {
mte.exporter.Close()
mte.exporter = nil
}
if mte.db != nil {
return mte.db.Close()
}
if mte.mtree != nil {
return mte.mtree.Close()
}
return nil
}
type exportWorker func(callback func(*ExportNode) bool)
type Exporter struct {
ch <-chan *ExportNode
cancel context.CancelFunc
}
func newExporter(worker exportWorker) *Exporter {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *ExportNode, exportBufferSize)
go func() {
defer close(ch)
worker(func(enode *ExportNode) bool {
select {
case ch <- enode:
case <-ctx.Done():
return true
}
return false
})
}()
return &Exporter{ch, cancel}
}
func (e *Exporter) Next() (*ExportNode, error) {
if exportNode, ok := <-e.ch; ok {
return exportNode, nil
}
return nil, ErrorExportDone
}
// Close closes the exporter. It is safe to call multiple times.
func (e *Exporter) Close() {
e.cancel()
for range e.ch { // drain channel
}
}