Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintroduce CSI 0.3.x support in CSI Volume Plugin #71314

Merged
merged 5 commits into from
Nov 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions hack/.golint_failures
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ pkg/version/verflag
pkg/volume
pkg/volume/azure_dd
pkg/volume/azure_file
pkg/volume/csi/csiv0
pkg/volume/csi/fake
pkg/volume/git_repo
pkg/volume/host_path
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
}

// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)

if !m.isVersionCompatibleWithPlugin(versions) {
Expand All @@ -263,7 +263,7 @@ func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, version
// RegisterPlugin starts the endpoint and registers it
// TODO: Start the endpoint and wait for the First ListAndWatch call
// before registering the plugin
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string) error {
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)

e, err := newEndpointImpl(endpoint, pluginName, m.callback)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/devicemanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName
}

func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName), "" /* deprecatedSockDir */)
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
w.Start()

Expand Down
5 changes: 4 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err
}
if klet.enablePluginsWatcher {
klet.pluginWatcher = pluginwatcher.NewWatcher(klet.getPluginsRegistrationDir())
klet.pluginWatcher = pluginwatcher.NewWatcher(
klet.getPluginsRegistrationDir(), /* sockDir */
klet.getPluginsDir(), /* deprecatedSockDir */
)
}

// If the experimentalMounterPathFlag is set, we do not want to
Expand Down
15 changes: 11 additions & 4 deletions pkg/kubelet/util/pluginwatcher/example_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type exampleHandler struct {

m sync.Mutex
count int

permitDeprecatedDir bool
}

type examplePluginEvent int
Expand All @@ -50,16 +52,21 @@ const (
)

// NewExampleHandler provide a example handler
func NewExampleHandler(supportedVersions []string) *exampleHandler {
func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *exampleHandler {
return &exampleHandler{
SupportedVersions: supportedVersions,
ExpectedNames: make(map[string]int),

eventChans: make(map[string]chan examplePluginEvent),
eventChans: make(map[string]chan examplePluginEvent),
permitDeprecatedDir: permitDeprecatedDir,
}
}

func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
if foundInDeprecatedDir && !p.permitDeprecatedDir {
return fmt.Errorf("device plugin socket was found in a directory that is no longer supported and this test does not permit plugins from deprecated dir")
}

p.SendEvent(pluginName, exampleEventValidate)

n, ok := p.DecreasePluginCount(pluginName)
Expand All @@ -79,7 +86,7 @@ func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, vers
return nil
}

func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string) error {
func (p *exampleHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
p.SendEvent(pluginName, exampleEventRegister)

// Verifies the grpcServer is ready to serve services.
Expand Down
69 changes: 58 additions & 11 deletions pkg/kubelet/util/pluginwatcher/plugin_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand All @@ -36,11 +37,12 @@ import (

// Watcher is the plugin watcher
type Watcher struct {
path string
stopCh chan interface{}
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
wg sync.WaitGroup
path string
deprecatedPath string
stopCh chan interface{}
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
wg sync.WaitGroup

mutex sync.Mutex
handlers map[string]PluginHandler
Expand All @@ -54,10 +56,13 @@ type pathInfo struct {
}

// NewWatcher provides a new watcher
func NewWatcher(sockDir string) *Watcher {
// deprecatedSockDir refers to a pre-GA directory that was used by older plugins
// for socket registration. New plugins should not use this directory.
func NewWatcher(sockDir string, deprecatedSockDir string) *Watcher {
return &Watcher{
path: sockDir,
fs: &utilfs.DefaultFs{},
path: sockDir,
deprecatedPath: deprecatedSockDir,
fs: &utilfs.DefaultFs{},

handlers: make(map[string]PluginHandler),
plugins: make(map[string]pathInfo),
Expand Down Expand Up @@ -137,7 +142,15 @@ func (w *Watcher) Start() error {
// Traverse plugin dir after starting the plugin processing goroutine
if err := w.traversePluginDir(w.path); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse plugin socket path, err: %v", err)
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}

// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
w.Stop()
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}

return nil
Expand Down Expand Up @@ -190,6 +203,10 @@ func (w *Watcher) traversePluginDir(dir string) error {

switch mode := info.Mode(); {
case mode.IsDir():
if w.containsBlacklistedDir(path) {
return filepath.SkipDir
}

if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
Expand All @@ -216,6 +233,10 @@ func (w *Watcher) traversePluginDir(dir string) error {
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling create event: %v", event)

if w.containsBlacklistedDir(event.Name) {
return nil
}

fi, err := os.Stat(event.Name)
if err != nil {
return fmt.Errorf("stat file %s failed: %v", event.Name, err)
Expand Down Expand Up @@ -271,16 +292,18 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error {
infoResp.Endpoint = socketPath
}

foundInDeprecatedDir := w.foundInDeprecatedDir(socketPath)

// calls handler callback to verify registration request
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, foundInDeprecatedDir); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin validation failed with err: %v", err))
}

// We add the plugin to the pluginwatcher's map before calling a plugin consumer's Register handle
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
w.registerPlugin(socketPath, infoResp.Type, infoResp.Name)

if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint); err != nil {
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin registration failed with err: %v", err))
}

Expand Down Expand Up @@ -417,3 +440,27 @@ func dial(unixSocketPath string, timeout time.Duration) (registerapi.Registratio

return registerapi.NewRegistrationClient(c), c, nil
}

// While deprecated dir is supported, to add extra protection around #69015
// we will explicitly blacklist kubernetes.io directory.
func (w *Watcher) containsBlacklistedDir(path string) bool {
return strings.HasPrefix(path, w.deprecatedPath+"/kubernetes.io/") ||
path == w.deprecatedPath+"/kubernetes.io"
}

func (w *Watcher) foundInDeprecatedDir(socketPath string) bool {
if len(w.deprecatedPath) != 0 {
if socketPath == w.deprecatedPath {
return true
}

deprecatedPath := w.deprecatedPath
if !strings.HasSuffix(deprecatedPath, "/") {
deprecatedPath = deprecatedPath + "/"
}
if strings.HasPrefix(socketPath, deprecatedPath) {
return true
}
}
return false
}