-
Notifications
You must be signed in to change notification settings - Fork 264
/
nbdkit.go
398 lines (358 loc) · 9.53 KB
/
nbdkit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
package image
import (
"bufio"
"fmt"
"strings"
"io"
"os"
"os/exec"
"path/filepath"
"time"
"github.com/pkg/errors"
"k8s.io/klog/v2"
"kubevirt.io/containerized-data-importer/pkg/common"
)
const (
nbdVddkLibraryPath = "/opt/vmware-vix-disklib-distrib/lib64"
startupTimeoutSeconds = 15
)
type nbdkitOperations struct {
nbdkit *Nbdkit
}
// NbdkitPlugin represents a plugin for nbdkit
type NbdkitPlugin string
// NbdkitFilter represents s filter for nbdkit
type NbdkitFilter string
// NbdkitLogWatcher allows custom handling of nbdkit log messages
type NbdkitLogWatcher interface {
Start(*bufio.Reader)
Stop()
}
// Nbdkit plugins
const (
NbdkitCurlPlugin NbdkitPlugin = "curl"
NbdkitFilePlugin NbdkitPlugin = "file"
NbdkitVddkPlugin NbdkitPlugin = "vddk"
NbdkitVddkMockPlugin NbdkitPlugin = "/opt/testing/libvddk-test-plugin.so"
)
// Nbdkit filters
const (
NbdkitXzFilter NbdkitFilter = "xz"
NbdkitTarFilter NbdkitFilter = "tar"
NbdkitGzipFilter NbdkitFilter = "gzip"
NbdkitRetryFilter NbdkitFilter = "retry"
)
// Nbdkit represents struct for an nbdkit instance
type Nbdkit struct {
c *exec.Cmd
NbdPidFile string
nbdkitArgs []string
plugin NbdkitPlugin
pluginArgs []string
redactArgs []string
filters []NbdkitFilter
Socket string
Env []string
LogWatcher NbdkitLogWatcher
}
// NbdkitOperation defines the interface for executing nbdkit
type NbdkitOperation interface {
StartNbdkit(source string) error
KillNbdkit() error
AddEnvVariable(v string)
AddFilter(filter NbdkitFilter)
}
// NewNbdkit creates a new Nbdkit instance with an nbdkit plugin and pid file
func NewNbdkit(plugin NbdkitPlugin, nbdkitPidFile string) *Nbdkit {
return &Nbdkit{
NbdPidFile: nbdkitPidFile,
plugin: plugin,
}
}
// NewNbdkitCurl creates a new Nbdkit instance with the curl plugin
func NewNbdkitCurl(nbdkitPidFile, certDir, socket string, extraHeaders, secretExtraHeaders []string) NbdkitOperation {
var pluginArgs []string
var redactArgs []string
args := []string{"-r"}
if certDir != "" {
pluginArgs = append(pluginArgs, fmt.Sprintf("cainfo=%s/%s", certDir, "tls.crt"))
}
for _, header := range extraHeaders {
pluginArgs = append(pluginArgs, fmt.Sprintf("header=%s", header))
}
for _, header := range secretExtraHeaders {
redactArgs = append(redactArgs, fmt.Sprintf("header=%s", header))
}
return &Nbdkit{
NbdPidFile: nbdkitPidFile,
plugin: NbdkitCurlPlugin,
nbdkitArgs: args,
pluginArgs: pluginArgs,
redactArgs: redactArgs,
Socket: socket,
}
}
// NewNbdkitVddk creates a new Nbdkit instance with the vddk plugin
func NewNbdkitVddk(nbdkitPidFile, socket, server, username, password, thumbprint, moref string) (NbdkitOperation, error) {
pluginArgs := []string{
"libdir=" + nbdVddkLibraryPath,
}
if server != "" {
pluginArgs = append(pluginArgs, "server="+server)
}
if username != "" {
pluginArgs = append(pluginArgs, "user="+username)
}
if password != "" {
pluginArgs = append(pluginArgs, "password="+password)
}
if thumbprint != "" {
pluginArgs = append(pluginArgs, "thumbprint="+thumbprint)
}
if moref != "" {
pluginArgs = append(pluginArgs, "vm=moref="+moref)
}
pluginArgs = append(pluginArgs, "--verbose")
pluginArgs = append(pluginArgs, "-D", "nbdkit.backend.controlpath=0")
pluginArgs = append(pluginArgs, "-D", "nbdkit.backend.datapath=0")
p := getVddkPluginPath()
n := &Nbdkit{
NbdPidFile: nbdkitPidFile,
plugin: p,
pluginArgs: pluginArgs,
Socket: socket,
}
n.AddEnvVariable("LD_LIBRARY_PATH=" + nbdVddkLibraryPath)
n.AddFilter(NbdkitRetryFilter)
if err := n.validatePlugin(); err != nil {
return nil, err
}
return n, nil
}
// AddEnvVariable adds an environmental variable to the nbdkit command
func (n *Nbdkit) AddEnvVariable(v string) {
env := os.Environ()
env = append(env, v)
n.Env = env
}
// AddFilter adds a nbdkit filter if it doesn't already exist
func (n *Nbdkit) AddFilter(filter NbdkitFilter) {
for _, f := range n.filters {
if f == filter {
return
}
}
n.filters = append(n.filters, filter)
}
func getVddkPluginPath() NbdkitPlugin {
_, err := os.Stat(string(NbdkitVddkMockPlugin))
if !os.IsNotExist(err) {
return NbdkitVddkMockPlugin
}
return NbdkitVddkPlugin
}
func (n *Nbdkit) getSourceArg(s string) string {
var source string
switch n.plugin {
case NbdkitCurlPlugin:
source = fmt.Sprintf("url=%s", s)
case NbdkitVddkPlugin, NbdkitVddkMockPlugin:
source = fmt.Sprintf("file=%s", s)
default:
source = s
}
return source
}
// StartNbdkit starts nbdkit process
func (n *Nbdkit) StartNbdkit(source string) error {
var err error
argsNbdkit := []string{
"--foreground",
"--readonly",
"--exit-with-parent",
"-U", n.Socket,
"--pidfile", n.NbdPidFile,
}
// set filters
for _, f := range n.filters {
argsNbdkit = append(argsNbdkit, fmt.Sprintf("--filter=%s", f))
}
// set additional arguments
for _, a := range n.nbdkitArgs {
argsNbdkit = append(argsNbdkit, a)
}
// append nbdkit plugin arguments
argsNbdkit = append(argsNbdkit, string(n.plugin))
argsNbdkit = append(argsNbdkit, n.pluginArgs...)
argsNbdkit = append(argsNbdkit, n.redactArgs...)
argsNbdkit = append(argsNbdkit, n.getSourceArg(source))
isRedacted := func(arg string) bool {
for _, value := range n.redactArgs {
if value == arg {
return true
}
}
return false
}
quotedArgs := make([]string, len(argsNbdkit))
for index, value := range argsNbdkit {
if strings.HasPrefix(value, "password=") {
quotedArgs[index] = "'password=*****'"
} else if isRedacted(value) {
if strings.HasPrefix(value, "header=") {
quotedArgs[index] = "'header=/secret redacted/'"
} else {
quotedArgs[index] = "'/secret redacted/'"
}
} else {
quotedArgs[index] = "'" + value + "'"
}
}
klog.V(3).Infof("Start nbdkit with: %v", quotedArgs)
n.c = exec.Command("nbdkit", argsNbdkit...)
var stdout io.ReadCloser
stdout, err = n.c.StdoutPipe()
if err != nil {
klog.Errorf("Error constructing stdout pipe: %v", err)
return err
}
n.c.Stderr = n.c.Stdout
output := bufio.NewReader(stdout)
if n.LogWatcher != nil {
n.LogWatcher.Start(output)
} else {
go watchNbdLog(output)
}
err = n.c.Start()
if err != nil {
klog.Errorf("Unable to start nbdkit: %v", err)
return err
}
err = waitForNbd(n.NbdPidFile)
if err != nil {
klog.Errorf("Failed waiting for nbdkit to start up: %v", err)
return err
}
return nil
}
// Default nbdkit log watcher, logs lines as nbdkit prints them,
// and appends them to the nbdkit log file.
func watchNbdLog(output *bufio.Reader) {
f, err := os.Create(common.NbdkitLogPath)
if err != nil {
klog.Errorf("Error writing nbdkit log to file: %v", err)
}
defer f.Close()
scanner := bufio.NewScanner(output)
for scanner.Scan() {
line := scanner.Text()
logLine := fmt.Sprintf("Log line from nbdkit: %s", line)
klog.Info(logLine)
f.WriteString(logLine)
}
if err := scanner.Err(); err != nil {
klog.Errorf("Error watching nbdkit log: %v", err)
}
klog.Infof("Stopped watching nbdkit log.")
}
// waitForNbd waits for nbdkit to start by watching for the existence of the given PID file.
func waitForNbd(pidfile string) error {
nbdCheck := make(chan bool, 1)
go func() {
klog.Infoln("Waiting for nbdkit PID.")
for {
select {
case <-nbdCheck:
return
case <-time.After(500 * time.Millisecond):
_, err := os.Stat(pidfile)
if err != nil {
if !os.IsNotExist(err) {
klog.Warningf("Error checking for nbdkit PID: %v", err)
}
} else {
nbdCheck <- true
return
}
}
}
}()
select {
case <-nbdCheck:
klog.Infoln("nbdkit ready.")
return nil
case <-time.After(startupTimeoutSeconds * time.Second):
nbdCheck <- true
return errors.New("timed out waiting for nbdkit to be ready")
}
}
// KillNbdkit stops the nbdkit process
func (n *Nbdkit) KillNbdkit() error {
var err error
if n.c == nil {
return nil
}
if n.c.Process != nil {
err = n.c.Process.Signal(os.Interrupt)
if err != nil {
err = n.c.Process.Kill()
}
}
if n.LogWatcher != nil {
n.LogWatcher.Stop()
}
return err
}
// validatePlugins tests VDDK and any other plugins before starting nbdkit for real
func (n *Nbdkit) validatePlugin() error {
walker := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
klog.Infof("%s: %d %s", path, info.Size(), info.Mode())
return nil
}
klog.Infof("Checking nbdkit plugin directory tree:")
err := filepath.Walk("/usr/lib64/nbdkit", walker)
if err != nil {
klog.Warningf("Unable to get nbdkit plugin directory tree: %v", err)
}
if n.plugin == NbdkitVddkPlugin {
klog.Infof("Checking VDDK library directory tree:")
err = filepath.Walk("/opt/vmware-vix-disklib-distrib", walker)
if err != nil {
klog.Warningf("Unable to get VDDK library directory tree: %v", err)
}
}
args := []string{
"--dump-plugin",
string(n.plugin),
"libdir=" + nbdVddkLibraryPath,
}
nbdkit := exec.Command("nbdkit", args...)
nbdkit.Env = n.Env
out, err := nbdkit.CombinedOutput()
if out != nil {
klog.Infof("Output from nbdkit --dump-plugin %s: %s", string(n.plugin), out)
}
if err != nil {
return err
}
return nil
}
type mockNbdkit struct{}
// NewMockNbdkitCurl creates a mock nbdkit curl plugin for testing
func NewMockNbdkitCurl(nbdkitPidFile, certDir, socket string, extraHeaders, secretExtraHeaders []string) NbdkitOperation {
return &mockNbdkit{}
}
func (m *mockNbdkit) StartNbdkit(source string) error {
return nil
}
func (m *mockNbdkit) KillNbdkit() error {
return nil
}
func (m *mockNbdkit) AddEnvVariable(v string) {}
func (m *mockNbdkit) AddFilter(filter NbdkitFilter) {}