-
Notifications
You must be signed in to change notification settings - Fork 45
/
config.go
163 lines (137 loc) · 4.24 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package store
import (
"context"
"fmt"
"github.com/streamingfast/derr"
"github.com/streamingfast/dstore"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"github.com/streamingfast/substreams/block"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/storage/store/marshaller"
)
type Config struct {
name string
moduleHash string
objStore dstore.Store
outputsStore dstore.Store
moduleInitialBlock uint64
updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy
valueType string
appendLimit uint64
totalSizeLimit uint64
itemSizeLimit uint64
}
func NewConfig(
name string,
moduleInitialBlock uint64,
moduleHash string,
updatePolicy pbsubstreams.Module_KindStore_UpdatePolicy,
valueType string,
store dstore.Store,
) (*Config, error) {
subStore, err := store.SubStore(fmt.Sprintf("%s/states", moduleHash))
if err != nil {
return nil, fmt.Errorf("creating sub store: %w", err)
}
outputsStore, err := store.SubStore(fmt.Sprintf("%s/outputs", moduleHash))
if err != nil {
return nil, fmt.Errorf("creating sub store: %w", err)
}
return &Config{
name: name,
updatePolicy: updatePolicy,
valueType: valueType,
objStore: subStore,
outputsStore: outputsStore,
moduleInitialBlock: moduleInitialBlock,
moduleHash: moduleHash,
appendLimit: 8_388_608, // 8MiB = 8 * 1024 * 1024,
totalSizeLimit: 1_073_741_824, // 1GiB
itemSizeLimit: 10_485_760, // 10MiB
}, nil
}
func (c *Config) newBaseStore(logger *zap.Logger) *baseStore {
return &baseStore{
Config: c,
kv: make(map[string][]byte),
logger: logger.Named("store").With(zap.String("store_name", c.name), zap.String("module_hash", c.moduleHash)),
marshaller: marshaller.Default(),
}
}
func (c *Config) Name() string {
return c.name
}
func (c *Config) ModuleHash() string {
return c.moduleHash
}
func (c *Config) ValueType() string {
return c.valueType
}
func (c *Config) UpdatePolicy() pbsubstreams.Module_KindStore_UpdatePolicy {
return c.updatePolicy
}
func (c *Config) ModuleInitialBlock() uint64 {
return c.moduleInitialBlock
}
func (c *Config) NewFullKV(logger *zap.Logger) *FullKV {
return &FullKV{c.newBaseStore(logger), "N/A"}
}
func (c *Config) ExistsFullKV(ctx context.Context, upTo uint64) (bool, error) {
filename := FullStateFileName(block.NewRange(c.moduleInitialBlock, upTo))
return c.objStore.FileExists(ctx, filename)
}
func (c *Config) ExistsPartialKV(ctx context.Context, from, to uint64) (bool, error) {
filename := PartialFileName(block.NewRange(from, to))
return c.objStore.FileExists(ctx, filename)
}
func (c *Config) NewPartialKV(initialBlock uint64, logger *zap.Logger) *PartialKV {
return &PartialKV{
baseStore: c.newBaseStore(logger),
operations: &pbssinternal.Operations{},
initialBlock: initialBlock,
seen: make(map[string]bool),
}
}
func (c *Config) FileSize(ctx context.Context, fileInfo *FileInfo) (int64, error) {
var size int64
err := derr.RetryContext(ctx, 3, func(ctx context.Context) error {
attr, err := c.objStore.ObjectAttributes(ctx, fileInfo.Filename)
if err != nil {
return fmt.Errorf("getting object attributes: %w", err)
}
size = attr.Size
return nil
})
if err != nil {
return 0, err
}
return size, nil
}
func (c *Config) ListSnapshotFiles(ctx context.Context, below uint64) (files []*FileInfo, err error) {
if below == 0 {
return nil, nil
}
logger := logging.Logger(ctx, zlog)
err = derr.RetryContext(ctx, 3, func(ctx context.Context) error {
// We need to clear each time we start because a previous retry could have accumulated a partial state
files = nil
return c.objStore.Walk(ctx, "", func(filename string) (err error) {
fileInfo, ok := parseFileName(c.Name(), filename)
if !ok {
logger.Warn("seen snapshot file that we don't know how to parse", zap.String("filename", filename))
return nil
}
if fileInfo.Range.StartBlock >= below {
return dstore.StopIteration
}
files = append(files, fileInfo)
return nil
})
})
if err != nil {
return nil, fmt.Errorf("walking files: %s", err)
}
return files, nil
}