-
Notifications
You must be signed in to change notification settings - Fork 1
/
probe.go
234 lines (198 loc) · 6.95 KB
/
probe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flexvolume
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/volume"
"os"
"fmt"
"path/filepath"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"k8s.io/apimachinery/pkg/util/errors"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
type flexVolumeProber struct {
mutex sync.Mutex
pluginDir string // Flexvolume driver directory
watcher utilfs.FSWatcher
probeNeeded bool // Must only read and write this through testAndSetProbeNeeded.
lastUpdated time.Time // Last time probeNeeded was updated.
watchEventCount int
factory PluginFactory
fs utilfs.Filesystem
}
const (
// TODO (cxing) Tune these params based on test results.
// watchEventLimit is the max allowable number of processed watches within watchEventInterval.
watchEventInterval = 5 * time.Second
watchEventLimit = 20
)
func GetDynamicPluginProber(pluginDir string) volume.DynamicPluginProber {
return &flexVolumeProber{
pluginDir: pluginDir,
watcher: utilfs.NewFsnotifyWatcher(),
factory: pluginFactory{},
fs: &utilfs.DefaultFs{},
}
}
func (prober *flexVolumeProber) Init() error {
prober.testAndSetProbeNeeded(true)
prober.lastUpdated = time.Now()
if err := prober.createPluginDir(); err != nil {
return err
}
if err := prober.initWatcher(); err != nil {
return err
}
return nil
}
// Probes for Flexvolume drivers.
// If a filesystem update has occurred since the last probe, updated = true
// and the list of probed plugins is returned.
// Otherwise, update = false and probedPlugins = nil.
//
// If an error occurs, updated and plugins are set arbitrarily.
func (prober *flexVolumeProber) Probe() (updated bool, plugins []volume.VolumePlugin, err error) {
probeNeeded := prober.testAndSetProbeNeeded(false)
if !probeNeeded {
return false, nil, nil
}
files, err := prober.fs.ReadDir(prober.pluginDir)
if err != nil {
return false, nil, fmt.Errorf("Error reading the Flexvolume directory: %s", err)
}
plugins = []volume.VolumePlugin{}
allErrs := []error{}
for _, f := range files {
// only directories with names that do not begin with '.' are counted as plugins
// and pluginDir/dirname/dirname should be an executable
// unless dirname contains '~' for escaping namespace
// e.g. dirname = vendor~cifs
// then, executable will be pluginDir/dirname/cifs
if f.IsDir() && filepath.Base(f.Name())[0] != '.' {
plugin, pluginErr := prober.factory.NewFlexVolumePlugin(prober.pluginDir, f.Name())
if pluginErr != nil {
pluginErr = fmt.Errorf(
"Error creating Flexvolume plugin from directory %s, skipping. Error: %s",
f.Name(), pluginErr)
allErrs = append(allErrs, pluginErr)
continue
}
plugins = append(plugins, plugin)
}
}
return true, plugins, errors.NewAggregate(allErrs)
}
func (prober *flexVolumeProber) handleWatchEvent(event fsnotify.Event) error {
// event.Name is the watched path.
if filepath.Base(event.Name)[0] == '.' {
// Ignore files beginning with '.'
return nil
}
eventPathAbs, err := filepath.Abs(event.Name)
if err != nil {
return err
}
pluginDirAbs, err := filepath.Abs(prober.pluginDir)
if err != nil {
return err
}
// If the Flexvolume plugin directory is removed, need to recreate it
// in order to keep it under watch.
if eventOpIs(event, fsnotify.Remove) && eventPathAbs == pluginDirAbs {
if err := prober.createPluginDir(); err != nil {
return err
}
if err := prober.addWatchRecursive(pluginDirAbs); err != nil {
return err
}
} else if eventOpIs(event, fsnotify.Create) {
if err := prober.addWatchRecursive(eventPathAbs); err != nil {
return err
}
}
prober.updateProbeNeeded()
return nil
}
func (prober *flexVolumeProber) updateProbeNeeded() {
// Within 'watchEventInterval' seconds, a max of 'watchEventLimit' watch events is processed.
// The watch event will not be registered if the limit is reached.
// This prevents increased disk usage from Probe() being triggered too frequently (either
// accidentally or maliciously).
if time.Since(prober.lastUpdated) > watchEventInterval {
// Update, then reset the timer and watch count.
prober.testAndSetProbeNeeded(true)
prober.lastUpdated = time.Now()
prober.watchEventCount = 1
} else if prober.watchEventCount < watchEventLimit {
prober.testAndSetProbeNeeded(true)
prober.watchEventCount++
}
}
// Recursively adds to watch all directories inside and including the file specified by the given filename.
// If the file is a symlink to a directory, it will watch the symlink but not any of the subdirectories.
//
// Each file or directory change triggers two events: one from the watch on itself, another from the watch
// on its parent directory.
func (prober *flexVolumeProber) addWatchRecursive(filename string) error {
addWatch := func(path string, info os.FileInfo, err error) error {
if err == nil && info.IsDir() {
if err := prober.watcher.AddWatch(path); err != nil {
glog.Errorf("Error recursively adding watch: %v", err)
}
}
return nil
}
return prober.fs.Walk(filename, addWatch)
}
// Creates a new filesystem watcher and adds watches for the plugin directory
// and all of its subdirectories.
func (prober *flexVolumeProber) initWatcher() error {
err := prober.watcher.Init(func(event fsnotify.Event) {
if err := prober.handleWatchEvent(event); err != nil {
glog.Errorf("Flexvolume prober watch: %s", err)
}
}, func(err error) {
glog.Errorf("Received an error from watcher: %s", err)
})
if err != nil {
return fmt.Errorf("Error initializing watcher: %s", err)
}
if err := prober.addWatchRecursive(prober.pluginDir); err != nil {
return fmt.Errorf("Error adding watch on Flexvolume directory: %s", err)
}
prober.watcher.Run()
return nil
}
// Creates the plugin directory, if it doesn't already exist.
func (prober *flexVolumeProber) createPluginDir() error {
if _, err := prober.fs.Stat(prober.pluginDir); os.IsNotExist(err) {
glog.Warningf("Flexvolume plugin directory at %s does not exist. Recreating.", prober.pluginDir)
err := prober.fs.MkdirAll(prober.pluginDir, 0755)
if err != nil {
return fmt.Errorf("Error (re-)creating driver directory: %s", err)
}
}
return nil
}
func (prober *flexVolumeProber) testAndSetProbeNeeded(newval bool) (oldval bool) {
prober.mutex.Lock()
defer prober.mutex.Unlock()
oldval, prober.probeNeeded = prober.probeNeeded, newval
return
}
func eventOpIs(event fsnotify.Event, op fsnotify.Op) bool {
return event.Op&op == op
}