Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #75129: Move CSIDriver Lister to the controller #78878

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.InformerFactory.Storage().V1beta1().CSINodes(),
ctx.InformerFactory.Storage().V1beta1().CSIDrivers(),
ctx.Cloud,
ProbeAttachableVolumePlugins(),
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func NewAttachDetachController(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
csiDriverInformer storageinformers.CSIDriverInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber,
Expand Down Expand Up @@ -147,6 +148,11 @@ func NewAttachDetachController(
adc.csiNodeSynced = csiNodeInformer.Informer().HasSynced
}

if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
adc.csiDriverLister = csiDriverInformer.Lister()
adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
}

if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
}
Expand Down Expand Up @@ -271,6 +277,12 @@ type attachDetachController struct {
csiNodeLister storagelisters.CSINodeLister
csiNodeSynced kcache.InformerSynced

// csiDriverLister is the shared CSIDriver lister used to fetch and store
// CSIDriver objects from the API server. It is shared with other controllers
// and therefore the CSIDriver objects in its store should be treated as immutable.
csiDriverLister storagelisters.CSIDriverLister
csiDriversSynced kcache.InformerSynced

// cloud provider used by volume host
cloud cloudprovider.Interface

Expand Down Expand Up @@ -327,6 +339,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
if adc.csiNodeSynced != nil {
synced = append(synced, adc.csiNodeSynced)
}
if adc.csiDriversSynced != nil {
synced = append(synced, adc.csiDriversSynced)
}

if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
return
Expand Down Expand Up @@ -669,6 +684,10 @@ func (adc *attachDetachController) CSINodeLister() storagelisters.CSINodeLister
return adc.csiNodeLister
}

func (adc *attachDetachController) CSIDriverLister() storagelisters.CSIDriverLister {
return adc.csiDriverLister
}

func (adc *attachDetachController) IsAttachDetachController() bool {
return true
}
Expand Down Expand Up @@ -793,3 +812,7 @@ func (adc *attachDetachController) GetSubpather() subpath.Interface {
// Subpaths not needed in attachdetach controller
return nil
}

func (adc *attachDetachController) GetCSIDriverLister() storagelisters.CSIDriverLister {
return adc.csiDriverLister
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
informerFactory.Storage().V1beta1().CSIDrivers(),
nil, /* cloud */
nil, /* plugins */
nil, /* prober */
Expand Down Expand Up @@ -220,6 +221,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Storage().V1beta1().CSINodes(),
informerFactory.Storage().V1beta1().CSIDrivers(),
nil, /* cloud */
plugins,
prober,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/certificate:go_default_library",
Expand Down
56 changes: 56 additions & 0 deletions pkg/kubelet/volume_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ import (
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/features"
Expand Down Expand Up @@ -56,6 +60,24 @@ func NewInitializedVolumePluginMgr(
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {

// Initialize csiDriverLister before calling InitPlugins
var informerFactory informers.SharedInformerFactory
var csiDriverLister storagelisters.CSIDriverLister
var csiDriversSynced cache.InformerSynced
const resyncPeriod = 0
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Don't initialize if kubeClient is nil
if kubelet.kubeClient != nil {
informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
csiDriverInformer := informerFactory.Storage().V1beta1().CSIDrivers()
csiDriverLister = csiDriverInformer.Lister()
csiDriversSynced = csiDriverInformer.Informer().HasSynced

} else {
klog.Warning("kubeClient is nil. Skip initialization of CSIDriverLister")
}
}

mountPodManager, err := mountpod.NewManager(kubelet.getRootDir(), kubelet.podManager)
if err != nil {
return nil, err
Expand All @@ -67,6 +89,9 @@ func NewInitializedVolumePluginMgr(
configMapManager: configMapManager,
tokenManager: tokenManager,
mountPodManager: mountPodManager,
informerFactory: informerFactory,
csiDriverLister: csiDriverLister,
csiDriversSynced: csiDriversSynced,
}

if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
Expand All @@ -93,6 +118,9 @@ type kubeletVolumeHost struct {
tokenManager *token.Manager
configMapManager configmap.Manager
mountPodManager mountpod.Manager
informerFactory informers.SharedInformerFactory
csiDriverLister storagelisters.CSIDriverLister
csiDriversSynced cache.InformerSynced
}

func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
Expand Down Expand Up @@ -131,6 +159,34 @@ func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
return kvh.kubelet.subpather
}

func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
return kvh.informerFactory
}

func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
return kvh.csiDriverLister
}

func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
return kvh.csiDriversSynced
}

// WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
if kvh.csiDriversSynced == nil {
klog.Error("csiDriversSynced not found on KubeletVolumeHost")
return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
}

synced := []cache.InformerSynced{kvh.csiDriversSynced}
if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
klog.Warning("failed to wait for cache sync for CSIDriverLister")
return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
}

return nil
}

func (kvh *kubeletVolumeHost) NewWrapperMounter(
volName string,
spec volume.Spec,
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/volumemanager/volume_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str

metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)

if vm.kubeClient != nil {
// start informer for CSIDriver
vm.volumePluginMgr.Run(stopCh)
}

<-stopCh
klog.Infof("Shutting down Kubelet Volume Manager")
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/volume",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume/util/fs:go_default_library",
"//pkg/volume/util/recyclerclient:go_default_library",
Expand All @@ -29,8 +30,11 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
Expand Down
3 changes: 1 addition & 2 deletions pkg/volume/csi/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
Expand Down Expand Up @@ -73,6 +71,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
Expand Down
13 changes: 11 additions & 2 deletions pkg/volume/csi/csi_attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
Expand Down Expand Up @@ -1435,12 +1436,20 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
fakeClient = fakeclient.NewSimpleClientset()
}
fakeWatcher := watch.NewRaceFreeFake()
fakeClient.Fake.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil))
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))

// Start informer for CSIDrivers.
factory := informers.NewSharedInformerFactory(fakeClient, csiResyncPeriod)
csiDriverInformer := factory.Storage().V1beta1().CSIDrivers()
csiDriverLister := csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)

host := volumetest.NewFakeVolumeHostWithCSINodeName(
tmpDir,
fakeClient,
nil,
"node",
csiDriverLister,
)
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)
Expand All @@ -1458,7 +1467,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
return csiDriverInformer.Informer().HasSynced(), nil
})
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/volume/csi/csi_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func TestBlockMapperSetupDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host

Expand Down Expand Up @@ -282,6 +283,7 @@ func TestBlockMapperMapDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host

Expand Down Expand Up @@ -364,6 +366,7 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
fakeClient,
nil,
"fakeNode",
nil,
)
plug.host = host

Expand Down
9 changes: 7 additions & 2 deletions pkg/volume/csi/csi_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package csi
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"os"
"path"
Expand Down Expand Up @@ -292,8 +291,14 @@ func (c *csiMountMgr) podAttributes() (map[string]string, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
return nil, nil
}

kletHost, ok := c.plugin.host.(volume.KubeletVolumeHost)
if ok {
kletHost.WaitForCacheSync()
}

if c.plugin.csiDriverLister == nil {
return nil, errors.New("CSIDriver lister does not exist")
return nil, fmt.Errorf("CSIDriverLister not found")
}

csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
Expand Down
10 changes: 2 additions & 8 deletions pkg/volume/csi/csi_mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
storagev1beta1 "k8s.io/api/storage/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
fakeclient "k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -155,13 +154,6 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
plug, tmpDir := newTestPlugin(t, fakeClient)
defer os.RemoveAll(tmpDir)

if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
// Wait until the informer in CSI volume plugin has all CSIDrivers.
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
return plug.csiDriverInformer.Informer().HasSynced(), nil
})
}

registerFakePlugin(test.driver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV("test-pv", 10, test.driver, testVol)
pv.Spec.CSI.VolumeAttributes = test.volumeContext
Expand Down Expand Up @@ -391,6 +383,7 @@ func TestMounterSetUpSimple(t *testing.T) {
})
}
}

func TestMounterSetUpWithInline(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)()

Expand Down Expand Up @@ -527,6 +520,7 @@ func TestMounterSetUpWithInline(t *testing.T) {
})
}
}

func TestMounterSetUpWithFSGroup(t *testing.T) {
fakeClient := fakeclient.NewSimpleClientset()
plug, tmpDir := newTestPlugin(t, fakeClient)
Expand Down