-
-
Notifications
You must be signed in to change notification settings - Fork 16
/
storage.go
339 lines (313 loc) · 12.7 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
Copyright 2023 Avi Zimmerman <avi.zimmerman@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"os"
"github.com/spf13/pflag"
v1 "github.com/webmeshproj/api/v1"
"google.golang.org/protobuf/types/known/structpb"
"github.com/webmeshproj/webmesh/pkg/context"
"github.com/webmeshproj/webmesh/pkg/meshnode"
"github.com/webmeshproj/webmesh/pkg/storage"
extstorage "github.com/webmeshproj/webmesh/pkg/storage/providers/external"
passthroughstorage "github.com/webmeshproj/webmesh/pkg/storage/providers/passthrough"
raftstorage "github.com/webmeshproj/webmesh/pkg/storage/providers/raftstorage"
"github.com/webmeshproj/webmesh/pkg/storage/types"
)
// StorageProvider is a type of storage provider.
type StorageProvider string
const (
// StorageProviderRaft is the builtin raft storage provider.
StorageProviderRaft StorageProvider = "raft"
// StorageProviderPassThrough is the passthrough storage provider.
StorageProviderPassThrough StorageProvider = "passthrough"
// StorageProviderExternal is an external storage provider.
StorageProviderExternal StorageProvider = "external"
)
// IsValid checks if the storage provider is valid.
func (s StorageProvider) IsValid() bool {
switch s {
case StorageProviderRaft, StorageProviderPassThrough, StorageProviderExternal:
return true
case "": // Defaults to raft
return true
}
return false
}
// StorageOptions are the storage options.
type StorageOptions struct {
// InMemory is a flag to use in-memory storage.
InMemory bool `koanf:"in-memory,omitempty"`
// Path is the path to the storage directory.
Path string `koanf:"path,omitempty"`
// Provider is the storage provider. If empty, the default is used.
Provider string `koanf:"provider,omitempty"`
// Raft are the raft storage options.
Raft RaftOptions `koanf:"raft,omitempty"`
// External are the external storage options.
External ExternalStorageOptions `koanf:"external,omitempty"`
// LogLevel is the log level for the storage provider.
LogLevel string `koanf:"log-level,omitempty"`
// LogFormat is the log format for the storage provider.
LogFormat string `koanf:"log-format,omitempty"`
}
// NewStorageOptions creates a new storage options.
func NewStorageOptions() StorageOptions {
return StorageOptions{
Path: raftstorage.DefaultDataDir,
Provider: string(StorageProviderRaft),
Raft: NewRaftOptions(),
External: NewExternalStorageOptions(),
LogLevel: "info",
}
}
// BindFlags binds the storage options to the flag set.
func (o *StorageOptions) BindFlags(prefix string, fs *pflag.FlagSet) {
fs.BoolVar(&o.InMemory, prefix+"in-memory", o.InMemory, "Use in-memory storage")
fs.StringVar(&o.Path, prefix+"path", o.Path, "Path to the storage directory")
fs.StringVar(&o.Provider, prefix+"provider", o.Provider, "Storage provider (defaults to raftstorage or passthrough depending on other options)")
fs.StringVar(&o.LogLevel, prefix+"log-level", o.LogLevel, "Log level for the storage provider")
fs.StringVar(&o.LogFormat, prefix+"log-format", o.LogFormat, "Log format for the storage provider")
o.Raft.BindFlags(prefix+"raft.", fs)
o.External.BindFlags(prefix+"external.", fs)
}
// Validate validates the storage options.
func (o StorageOptions) Validate(isMember bool) error {
provider := StorageProvider(o.Provider)
if !provider.IsValid() {
return fmt.Errorf("invalid storage provider: %s", o.Provider)
}
if provider == StorageProviderRaft {
if isMember {
if err := o.Raft.Validate(o.Path, o.InMemory); err != nil {
return err
}
}
}
if provider == StorageProviderExternal {
if err := o.External.Validate(); err != nil {
return err
}
}
return nil
}
// ListenPort returns the port to listen on for the storage provider.
func (o StorageOptions) ListenPort() int {
if o.Provider == string(StorageProviderRaft) || o.Provider == "" {
return o.Raft.ListenPort()
}
// TODO: Get the port from the external storage provider.
return 0
}
// NewStorageProvider creates a new storage provider from the given options. If not a storage providing member, a node dialer
// is required for the passthrough storage provider.
func (o *Config) NewStorageProvider(ctx context.Context, node meshnode.Node, force bool) (storage.Provider, error) {
if !o.IsStorageMember() {
return passthroughstorage.NewProvider(o.Storage.NewPassthroughOptions(ctx, node)), nil
}
switch StorageProvider(o.Storage.Provider) {
case StorageProviderRaft, "":
return o.Storage.NewRaftStorageProvider(ctx, node, force)
case StorageProviderExternal:
return o.Storage.NewExternalStorageProvider(ctx, node.ID())
case StorageProviderPassThrough:
return passthroughstorage.NewProvider(o.Storage.NewPassthroughOptions(ctx, node)), nil
default:
return nil, fmt.Errorf("invalid storage provider: %s", o.Storage.Provider)
}
}
// NewRaftStorageProvider returns a new raftstorage provider for the current configuration.
func (o StorageOptions) NewRaftStorageProvider(ctx context.Context, node meshnode.Node, force bool) (storage.Provider, error) {
opts, err := o.NewRaftOptions(ctx, node, force)
if err != nil {
return nil, err
}
return raftstorage.NewProvider(opts), nil
}
// NewExternalStorageProvider returns a new external storage provider for the current configuration.
func (o StorageOptions) NewExternalStorageProvider(ctx context.Context, nodeID types.NodeID) (storage.Provider, error) {
opts, err := o.NewExternalStorageOptions(ctx, nodeID)
if err != nil {
return nil, err
}
return extstorage.NewProvider(opts), nil
}
// NewRaftOptions returns a new raft options for the current configuration.
func (o StorageOptions) NewRaftOptions(ctx context.Context, node meshnode.Node, force bool) (raftstorage.Options, error) {
raftTransport, err := o.Raft.NewTransport(node)
if err != nil {
return raftstorage.Options{}, fmt.Errorf("create raft transport: %w", err)
}
opts := raftstorage.NewOptions(node.ID(), raftTransport)
opts.ClearDataDir = force
opts.DataDir = o.Path
opts.InMemory = o.InMemory
opts.ConnectionPoolCount = o.Raft.ConnectionPoolCount
opts.ConnectionTimeout = o.Raft.ConnectionTimeout
opts.HeartbeatTimeout = o.Raft.HeartbeatTimeout
opts.ElectionTimeout = o.Raft.ElectionTimeout
opts.ApplyTimeout = o.Raft.ApplyTimeout
opts.CommitTimeout = o.Raft.CommitTimeout
opts.MaxAppendEntries = o.Raft.MaxAppendEntries
opts.LeaderLeaseTimeout = o.Raft.LeaderLeaseTimeout
opts.SnapshotInterval = o.Raft.SnapshotInterval
opts.SnapshotThreshold = o.Raft.SnapshotThreshold
opts.SnapshotRetention = o.Raft.SnapshotRetention
opts.ObserverChanBuffer = o.Raft.ObserverChanBuffer
opts.LogLevel = o.LogLevel
opts.LogFormat = o.LogFormat
return opts, nil
}
// NewPassthroughOptions returns a new passthrough options for the current configuration.
func (o StorageOptions) NewPassthroughOptions(ctx context.Context, node meshnode.Node) passthroughstorage.Options {
return passthroughstorage.Options{
Dialer: node,
LogLevel: o.LogLevel,
LogFormat: o.LogFormat,
}
}
// NewExternalStorageOptions creates a new external storage options.
func (o StorageOptions) NewExternalStorageOptions(ctx context.Context, nodeID types.NodeID) (extstorage.Options, error) {
opts := extstorage.Options{
NodeID: nodeID,
Server: o.External.Server,
LogLevel: o.LogLevel,
LogFormat: o.LogFormat,
}
if len(o.External.Config) > 0 {
config, err := structpb.NewStruct(o.External.Config)
if err != nil {
return opts, err
}
opts.Config = &v1.PluginConfiguration{
Config: config,
}
}
if o.External.Insecure {
context.LoggerFrom(ctx).Warn("Using insecure connection to external storage provider")
return opts, nil
}
var err error
opts.TLSConfig, err = o.External.NewTLSConfig(ctx)
if err != nil {
return opts, err
}
return opts, nil
}
// ExternalStorageOptions are the external storage options.
type ExternalStorageOptions struct {
// Server is the address of a server for the plugin.
Server string `koanf:"server,omitempty"`
// Config is the configuration to pass to the plugin.
Config PluginMapConfig `koanf:"config,omitempty"`
// Insecure is whether to use an insecure connection to the plugin server.
Insecure bool `koanf:"insecure,omitempty"`
// TLSCAData is the base64 PEM-encoded CA data for verifying certificates.
TLSCAData string `koanf:"tls-ca-data,omitempty"`
// TLSCAFile is the path to a CA for verifying certificates.
TLSCAFile string `koanf:"tls-ca-file,omitempty"`
// TLSCertData is the base64 PEM-encoded certificate data for authenticating to the plugin server.
TLSCertData string `koanf:"tls-cert-data,omitempty"`
// TLSCertFile is the path to a certificate for authenticating to the plugin server.
TLSCertFile string `koanf:"tls-cert-file,omitempty"`
// TLSKeyData is the base64 PEM-encoded key data for authenticating to the plugin server.
TLSKeyData string `koanf:"tls-key-data,omitempty"`
// TLSKeyFile is the path to a key for authenticating to the plugin server.
TLSKeyFile string `koanf:"tls-key-file,omitempty"`
// TLSSkipVerify is whether to skip verifying the plugin server's certificate.
TLSSkipVerify bool `koanf:"tls-skip-verify,omitempty"`
}
// NewExternalStorageOptions creates a new external storage options.
func NewExternalStorageOptions() ExternalStorageOptions {
return ExternalStorageOptions{
Config: make(PluginMapConfig),
}
}
// BindFlags binds the external storage options to the flag set.
func (o *ExternalStorageOptions) BindFlags(prefix string, fs *pflag.FlagSet) {
fs.StringVar(&o.Server, prefix+"server", o.Server, "Address of a server for the plugin")
fs.Var(&o.Config, prefix+"config", "Configuration to pass to the plugin as key value pairs")
fs.BoolVar(&o.Insecure, prefix+"insecure", o.Insecure, "Use an insecure connection to the plugin server")
fs.StringVar(&o.TLSCAFile, prefix+"tls-ca-file", o.TLSCAFile, "Path to a CA for verifying certificates")
fs.StringVar(&o.TLSCertFile, prefix+"tls-cert-file", o.TLSCertFile, "Path to a certificate for authenticating to the plugin server")
fs.StringVar(&o.TLSKeyFile, prefix+"tls-key-file", o.TLSKeyFile, "Path to a key for authenticating to the plugin server")
fs.BoolVar(&o.TLSSkipVerify, prefix+"tls-skip-verify", o.TLSSkipVerify, "Skip verifying the plugin server's certificate")
}
// Validate validates the external storage options.
func (o ExternalStorageOptions) Validate() error {
if o.Server == "" {
return fmt.Errorf("external storage server is required")
}
return nil
}
// NewTLSConfig creates a new TLS config from the options.
func (o ExternalStorageOptions) NewTLSConfig(ctx context.Context) (*tls.Config, error) {
var conf tls.Config
var roots *x509.CertPool
var err error
roots, err = x509.SystemCertPool()
if err != nil {
context.LoggerFrom(ctx).Warn("Failed to load system certificate pool, starting with empty pool")
roots = x509.NewCertPool()
}
if o.TLSCAData != "" {
data, err := base64.StdEncoding.DecodeString(o.TLSCAData)
if err != nil {
return nil, fmt.Errorf("failed to decode tls-ca-data: %w", err)
}
if !roots.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("failed to append tls-ca-data to certificate pool")
}
}
if o.TLSCAFile != "" {
data, err := os.ReadFile(o.TLSCAFile)
if err != nil {
return nil, fmt.Errorf("failed to load tls-ca-file: %w", err)
}
if !roots.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("failed to append tls-ca-file to certificate pool")
}
}
conf.RootCAs = roots
if o.TLSCertFile != "" && o.TLSKeyFile != "" {
cert, err := tls.LoadX509KeyPair(o.TLSCertFile, o.TLSKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load tls-cert-file and tls-key-file: %w", err)
}
conf.Certificates = append(conf.Certificates, cert)
}
if o.TLSCertData != "" && o.TLSKeyData != "" {
certData, err := base64.StdEncoding.DecodeString(o.TLSCertData)
if err != nil {
return nil, fmt.Errorf("failed to decode tls-cert-data: %w", err)
}
keyData, err := base64.StdEncoding.DecodeString(o.TLSKeyData)
if err != nil {
return nil, fmt.Errorf("failed to decode tls-key-data: %w", err)
}
cert, err := tls.X509KeyPair(certData, keyData)
if err != nil {
return nil, fmt.Errorf("failed to load tls-cert-data and tls-key-data: %w", err)
}
conf.Certificates = append(conf.Certificates, cert)
}
if o.TLSSkipVerify {
context.LoggerFrom(ctx).Warn("Skipping verification of external storage server certificate")
conf.InsecureSkipVerify = true
}
return &conf, nil
}