Skip to content
Permalink
Browse files

Move CSIDriver Lister to the controller

  • Loading branch information...
xing-yang committed Mar 4, 2019
1 parent 517922f commit ffcd78d8bbc48e6de2429375444faad92029d26a
@@ -224,6 +224,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.InformerFactory.Storage().V1beta1().CSINodes(),
ctx.InformerFactory.Storage().V1beta1().CSIDrivers(),
ctx.Cloud,
ProbeAttachableVolumePlugins(),
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
@@ -106,6 +106,7 @@ func NewAttachDetachController(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
csiDriverInformer storageinformers.CSIDriverInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber,
@@ -147,6 +148,11 @@ func NewAttachDetachController(
adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
}

if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
adc.csiDriverLister = csiDriverInformer.Lister()
adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
}

if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
}
@@ -271,6 +277,12 @@ type attachDetachController struct {
csiNodeLister storagelisters.CSINodeLister
csiNodeSynced kcache.InformerSynced

// csiDriverLister is the shared CSIDriver lister used to fetch and store
// CSIDriver objects from the API server. It is shared with other controllers
// and therefore the CSIDriver objects in its store should be treated as immutable.
csiDriverLister storagelisters.CSIDriverLister
csiDriversSynced kcache.InformerSynced

// cloud provider used by volume host
cloud cloudprovider.Interface

@@ -327,6 +339,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
if adc.csiNodeSynced != nil {
synced = append(synced, adc.csiNodeSynced)
}
if adc.csiDriversSynced != nil {
synced = append(synced, adc.csiDriversSynced)
}

if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
return
@@ -669,6 +684,10 @@ func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister
return adc.csiNodeLister
}

func (adc *attachDetachController) CSIDriverLister() storagelisters.CSIDriverLister {
return adc.csiDriverLister
}

func (adc *attachDetachController) IsAttachDetachController() bool {
return true
}
@@ -793,3 +812,7 @@ func (adc *attachDetachController) GetSubpather() subpath.Interface {
// Subpaths not needed in attachdetach controller
return nil
}

func (adc *attachDetachController) GetCSIDriverLister() storagelisters.CSIDriverLister {
return adc.csiDriverLister
}
@@ -45,6 +45,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
informerFactory.Storage().V1beta1().CSIDrivers(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
@@ -220,6 +221,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
informerFactory.Storage().V1beta1().CSIDrivers(),
nil, /* cloud */
plugins,
prober,
@@ -132,9 +132,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",
@@ -43,6 +43,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
@@ -812,7 +813,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.getPodsDir(),
kubeDeps.Recorder,
experimentalCheckNodeCapabilitiesBeforeMount,
keepTerminatedPodVolumes)
keepTerminatedPodVolumes,
klet.informerFactory)

klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
@@ -1216,6 +1218,9 @@ type Kubelet struct {

// Handles RuntimeClass objects for the Kubelet.
runtimeClassManager *runtimeclass.Manager

// This is for building the informer factory for CSIDriverLister
informerFactory informers.SharedInformerFactory
}

// setupDataDirs creates:
@@ -1409,6 +1414,11 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
go kl.nodeLeaseController.Run(wait.NeverStop)
}

// start informer for CSIDriver
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
go kl.informerFactory.Start(wait.NeverStop)
}
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

@@ -340,7 +340,8 @@ func newTestKubeletWithImageList(
kubelet.getPodsDir(),
kubelet.recorder,
false, /* experimentalCheckNodeCapabilitiesBeforeMount*/
false /* keepTerminatedPodVolumes */)
false, /* keepTerminatedPodVolumes */
nil /* informers.SharedInformerFactory */)

kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()

@@ -106,7 +106,8 @@ func TestRunOnce(t *testing.T) {
kb.getPodsDir(),
kb.recorder,
false, /* experimentalCheckNodeCapabilitiesBeforeMount */
false /* keepTerminatedPodVolumes */)
false, /* keepTerminatedPodVolumes */
nil /* informers.SharedInformerFactory */)

// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
volumeStatsAggPeriod := time.Second * 10
@@ -27,7 +27,9 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/features"
@@ -56,6 +58,18 @@ func NewInitializedVolumePluginMgr(
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {

// Initialize csiDriverLister before calling NewInitializedVolumePluginMgr
var csiDriverLister storagelisters.CSIDriverLister
const resyncPeriod = 0
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Don't initialize if kubeClient is nil
if kubelet.kubeClient != nil {
kubelet.informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
csiDriverInformer := kubelet.informerFactory.Storage().V1beta1().CSIDrivers()
csiDriverLister = csiDriverInformer.Lister()
}
}

mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager)
if err != nil {
return nil, err
@@ -67,6 +81,7 @@ func NewInitializedVolumePluginMgr(
configMapManager: configMapManager,
tokenManager: tokenManager,
mountPodManager: mountPodManager,
csiDriverLister: csiDriverLister,
}

if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
@@ -93,6 +108,7 @@ type kubeletVolumeHost struct {
tokenManager *token.Manager
configMapManager configmap.Manager
mountPodManager mountpod.Manager
csiDriverLister storagelisters.CSIDriverLister
}

func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
@@ -131,6 +147,10 @@ func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
return kvh.kubelet.subpather
}

func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
return kvh.csiDriverLister
}

func (kvh *kubeletVolumeHost) NewWrapperMounter(
volName string,
spec volume.Spec,
@@ -34,6 +34,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
@@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
@@ -158,13 +159,15 @@ func NewVolumeManager(
kubeletPodsDir string,
recorder record.EventRecorder,
checkNodeCapabilitiesBeforeMount bool,
keepTerminatedPodVolumes bool) VolumeManager {
keepTerminatedPodVolumes bool,
informerFactory informers.SharedInformerFactory) VolumeManager {

vm := &volumeManager{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
actualStateOfWorld: cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
informerFactory: informerFactory,
operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
@@ -237,11 +240,19 @@ type volumeManager struct {
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the desiredStateOfWorld using the kubelet PodManager.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator

// the informer factory for CSIDriverLister
informerFactory informers.SharedInformerFactory
}

func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()

// wait for cache sync of CSIDriver
if vm.informerFactory != nil {
vm.informerFactory.WaitForCacheSync(wait.NeverStop)
}

go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
klog.V(2).Infof("The desired_state_of_world populator starts")

@@ -235,7 +235,8 @@ func newTestVolumeManager(tmpDir string, podManager kubepod.Manager, kubeClient
"",
fakeRecorder,
false, /* experimentalCheckNodeCapabilitiesBeforeMount */
false /* keepTerminatedPodVolumes */)
false, /* keepTerminatedPodVolumes */
nil /* informers.SharedInformerFactory */)

return vm
}
@@ -31,8 +31,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
@@ -73,6 +71,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
@@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
@@ -1436,11 +1437,19 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
}
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil))

// Start informer for CSIDrivers.
factory := informers.NewSharedInformerFactory(fakeClient, csiResyncPeriod)
csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
csiDriverLister := csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)

host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
"node",
csiDriverLister,
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
@@ -1458,7 +1467,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
return csiDriverInformer.Informer().HasSynced(), nil
})
}

@@ -216,6 +216,7 @@ func TestBlockMapperSetupDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host

@@ -282,6 +283,7 @@ func TestBlockMapperMapDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host

@@ -364,6 +366,7 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host

@@ -19,7 +19,6 @@ package csi
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"os"
"path"
@@ -292,8 +291,9 @@ func (c *csiMountMgr) podAttributes() (map[string]string, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
return nil, nil
}

if c.plugin.csiDriverLister == nil {
return nil, errors.New("CSIDriver lister does not exist")
return nil, fmt.Errorf("CSIDriverLister not found")
}

csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
Oops, something went wrong.

0 comments on commit ffcd78d

Please sign in to comment.
You can’t perform that action at this time.