Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay CSI client initialization #74652

Merged
merged 1 commit into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 17 additions & 5 deletions pkg/volume/csi/csi_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
)

type csiBlockMapper struct {
csiClientGetter
k8s kubernetes.Interface
csiClient csiClient
plugin *csiPlugin
driverName csiDriverName
specName string
Expand Down Expand Up @@ -247,14 +247,20 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

csiClient, err := m.csiClientGetter.Get()
if err != nil {
klog.Error(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
return "", err
}

// Call NodeStageVolume
stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment)
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
return "", err
}

// Call NodePublishVolume
publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath)
publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -326,6 +332,12 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

csiClient, err := m.csiClientGetter.Get()
if err != nil {
klog.Error(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
return err
}

// Call NodeUnpublishVolume
publishPath := m.getPublishPath()
if _, err := os.Stat(publishPath); err != nil {
Expand All @@ -335,7 +347,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return err
}
} else {
err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath)
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
if err != nil {
return err
}
Expand All @@ -350,7 +362,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return err
}
} else {
err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath)
err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath)
if err != nil {
return err
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/volume/csi/csi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"net"
"strings"
"sync"
"time"

csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -807,3 +808,36 @@ func versionRequiresV0Client(version *utilversion.Version) bool {

return false
}

// CSI client getter with cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to add some detailed comments on this struct, why it is needed and how it is used. The fix uses a quite subtle approach so I think adding comments will help others understand and might use it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be better. I've added some comments.

// This provides a method to initialize CSI client with driver name and caches
// it for later use. When CSI clients have not been discovered yet (e.g.
// on kubelet restart), client initialization will fail. Users of CSI client (e.g.
// mounter manager and block mapper) can use this to delay CSI client
// initialization until needed.
type csiClientGetter struct {
sync.RWMutex
csiClient csiClient
driverName csiDriverName
}

func (c *csiClientGetter) Get() (csiClient, error) {
c.RLock()
if c.csiClient != nil {
c.RUnlock()
return c.csiClient, nil
}
c.RUnlock()
c.Lock()
defer c.Unlock()
// Double-checking locking criterion.
if c.csiClient != nil {
return c.csiClient, nil
}
csi, err := newCsiDriverClient(c.driverName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can c.csiClient be assigned directly (we have the write lock) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean c.csiClient, err := newCsiDriverClient(c.driverName)?
Yes, it is equal, no much difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is one difference, with the second style newCsiDriverClient(c.driverName) must return nil client on error, with the first style we do not need make such assumption.

if err != nil {
return nil, err
}
c.csiClient = csi
return c.csiClient, nil
}
14 changes: 11 additions & 3 deletions pkg/volume/csi/csi_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var (
)

type csiMountMgr struct {
csiClient csiClient
csiClientGetter
k8s kubernetes.Interface
plugin *csiPlugin
driverName csiDriverName
Expand Down Expand Up @@ -111,7 +111,11 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
return nil
}

csi := c.csiClient
csi, err := c.csiClientGetter.Get()
if err != nil {
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
return err
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

Expand Down Expand Up @@ -343,7 +347,11 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))

volID := c.volumeID
csi := c.csiClient
csi, err := c.csiClientGetter.Get()
if err != nil {
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
return err
}

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
Expand Down
20 changes: 4 additions & 16 deletions pkg/volume/csi/csi_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,6 @@ func (p *csiPlugin) NewMounter(
return nil, errors.New("failed to get a Kubernetes client")
}

csi, err := newCsiDriverClient(csiDriverName(driverName))
if err != nil {
return nil, err
}

mounter := &csiMountMgr{
plugin: p,
k8s: k8s,
Expand All @@ -398,9 +393,9 @@ func (p *csiPlugin) NewMounter(
driverMode: driverMode,
volumeID: volumeHandle,
specVolumeID: spec.Name(),
csiClient: csi,
readOnly: readOnly,
}
mounter.csiClientGetter.driverName = csiDriverName(driverName)

// Save volume info in pod dir
dir := mounter.GetPath()
Expand Down Expand Up @@ -458,10 +453,7 @@ func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmo
}
unmounter.driverName = csiDriverName(data[volDataKey.driverName])
unmounter.volumeID = data[volDataKey.volHandle]
unmounter.csiClient, err = newCsiDriverClient(unmounter.driverName)
if err != nil {
return nil, err
}
unmounter.csiClientGetter.driverName = unmounter.driverName

return unmounter, nil
}
Expand Down Expand Up @@ -638,10 +630,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
}

klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
client, err := newCsiDriverClient(csiDriverName(pvSource.Driver))
if err != nil {
return nil, err
}

k8s := p.host.GetKubeClient()
if k8s == nil {
Expand All @@ -650,7 +638,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
}

mapper := &csiBlockMapper{
csiClient: client,
k8s: k8s,
plugin: p,
volumeID: pvSource.VolumeHandle,
Expand All @@ -660,6 +647,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
specName: spec.Name(),
podUID: podRef.UID,
}
mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)

// Save volume info in pod dir
dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
Expand Down Expand Up @@ -714,7 +702,7 @@ func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (vo
}
unmapper.driverName = csiDriverName(data[volDataKey.driverName])
unmapper.volumeID = data[volDataKey.volHandle]
unmapper.csiClient, err = newCsiDriverClient(unmapper.driverName)
unmapper.csiClientGetter.driverName = unmapper.driverName
if err != nil {
return nil, err
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/volume/csi/csi_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ func TestPluginNewMounter(t *testing.T) {
if string(csiMounter.podUID) != string(test.podUID) {
t.Error("mounter podUID not set")
}
if csiMounter.csiClient == nil {
csiClient, err := csiMounter.csiClientGetter.Get()
if csiClient == nil {
t.Error("mounter csiClient is nil")
}
if csiMounter.driverMode != test.driverMode {
Expand Down Expand Up @@ -732,7 +733,8 @@ func TestPluginNewMounterWithInline(t *testing.T) {
if string(csiMounter.podUID) != string(test.podUID) {
t.Error("mounter podUID not set")
}
if csiMounter.csiClient == nil {
csiClient, err := csiMounter.csiClientGetter.Get()
if csiClient == nil {
t.Error("mounter csiClient is nil")
}
if csiMounter.driverMode != test.driverMode {
Expand Down Expand Up @@ -815,8 +817,9 @@ func TestPluginNewUnmounter(t *testing.T) {
t.Error("podUID not set")
}

if csiUnmounter.csiClient == nil {
t.Error("unmounter csiClient is nil")
csiClient, err := csiUnmounter.csiClientGetter.Get()
if csiClient == nil {
t.Error("mounter csiClient is nil")
}
}

Expand Down Expand Up @@ -932,7 +935,8 @@ func TestPluginNewBlockMapper(t *testing.T) {
if csiMapper.podUID == types.UID("") {
t.Error("CSI block mapper missing pod.UID")
}
if csiMapper.csiClient == nil {
csiClient, err := csiMapper.csiClientGetter.Get()
if csiClient == nil {
t.Error("mapper csiClient is nil")
}

Expand Down Expand Up @@ -994,7 +998,8 @@ func TestPluginNewUnmapper(t *testing.T) {
t.Error("specName not set")
}

if csiUnmapper.csiClient == nil {
csiClient, err := csiUnmapper.csiClientGetter.Get()
if csiClient == nil {
t.Error("unmapper csiClient is nil")
}

Expand Down