Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding CSI driver registration with plugin watcher #64560

Merged
merged 2 commits into from Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions hack/.golint_failures
Expand Up @@ -382,7 +382,9 @@ pkg/volume/azure_dd
pkg/volume/azure_file
pkg/volume/cephfs
pkg/volume/configmap
pkg/volume/csi
pkg/volume/csi/fake
pkg/volume/csi/labelmanager
pkg/volume/empty_dir
pkg/volume/fc
pkg/volume/flexvolume
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/BUILD
Expand Up @@ -102,6 +102,7 @@ go_library(
"//pkg/util/removeall:go_default_library",
"//pkg/version:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumepathhandler:go_default_library",
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubelet/kubelet.go
Expand Up @@ -106,6 +106,7 @@ import (
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
utilexec "k8s.io/utils/exec"
)

Expand Down Expand Up @@ -1290,6 +1291,9 @@ func (kl *Kubelet) initializeModules() error {
}
}
if kl.enablePluginsWatcher {
// Adding Registration Callback function for CSI Driver
kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, will do in the follow up PR.


// Start the plugin watcher
if err := kl.pluginWatcher.Start(); err != nil {
return fmt.Errorf("failed to start Plugin Watcher. err: %v", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/csi/BUILD
Expand Up @@ -12,9 +12,11 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/volume/csi",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/labelmanager:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
Expand All @@ -25,6 +27,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
Expand Down Expand Up @@ -70,6 +73,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/volume/csi/fake:all-srcs",
"//pkg/volume/csi/labelmanager:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
Expand Down
13 changes: 11 additions & 2 deletions pkg/volume/csi/csi_client.go
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/golang/glog"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
)

type csiClient interface {
Expand Down Expand Up @@ -255,9 +257,16 @@ func newGrpcConn(driverName string) (*grpc.ClientConn, error) {
if driverName == "" {
return nil, fmt.Errorf("driver name is empty")
}

network := "unix"
addr := fmt.Sprintf(csiAddrTemplate, driverName)
// TODO once KubeletPluginsWatcher graduates to beta, remove FeatureGate check
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPluginsWatcher) {
driver, ok := csiDrivers.driversMap[driverName]
if !ok {
return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
}
addr = driver.driverEndpoint
}
network := "unix"
glog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))

return grpc.Dial(
Expand Down
48 changes: 48 additions & 0 deletions pkg/volume/csi/csi_plugin.go
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"os"
"path"
"strings"
"sync"
"time"

"github.com/golang/glog"
Expand All @@ -29,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/labelmanager"
)

const (
Expand Down Expand Up @@ -59,9 +62,54 @@ func ProbeVolumePlugins() []volume.VolumePlugin {
// volume.VolumePlugin methods
var _ volume.VolumePlugin = &csiPlugin{}

type csiDriver struct {
driverName string
driverEndpoint string
}

type csiDriversStore struct {
driversMap map[string]csiDriver
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you are no longer using the sync.Map type any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a comment about sync.Map suggesting that it is not strong typed. So I switched back to a simple map with mutex.

sync.RWMutex
}

// csiDrivers map keep track of all registered CSI drivers on the node and their
// corresponding sockets
var csiDrivers csiDriversStore

var lm labelmanager.Interface

// RegistrationCallback is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car.
func RegistrationCallback(pluginName string, endpoint string, versions []string, socketPath string) (error, chan bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Golint arg-order: error should be the last type when returning multiple items.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the kubelet's function returning variable in this order.


glog.Infof(log("Callback from kubelet with plugin name: %s endpoint: %s versions: %s socket path: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be we should use verbose level like V(4)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change it once the feature becomes more mature/beta. For now we need to see this but without trigger 10 million of other messages.

pluginName, endpoint, strings.Join(versions, ","), socketPath))

if endpoint == "" {
endpoint = socketPath
}
// Calling nodeLabelManager to update label for newly registered CSI driver
err := lm.AddLabels(pluginName)
if err != nil {
return err, nil
}
// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
// all other CSI components will be able to get the actual socket of CSI drivers by its name.
csiDrivers.Lock()
defer csiDrivers.Unlock()
csiDrivers.driversMap[pluginName] = csiDriver{driverName: pluginName, driverEndpoint: endpoint}

return nil, nil
}

func (p *csiPlugin) Init(host volume.VolumeHost) error {
glog.Info(log("plugin initializing..."))
p.host = host

// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
lm = labelmanager.NewLabelManager(host.GetNodeName(), host.GetKubeClient())

return nil
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/volume/csi/labelmanager/BUILD
@@ -0,0 +1,30 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["labelmanager.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/labelmanager",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)