diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 4b3866b2c2..bdad85809f 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -43,6 +43,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1beta1" utilflag "k8s.io/component-base/cli/flag" csitrans "k8s.io/csi-translation-lib" @@ -184,11 +185,13 @@ func main() { } var csiNodeLister storagelisters.CSINodeLister + var nodeLister v1.NodeLister var factory informers.SharedInformerFactory if ctrl.SupportsTopology(pluginCapabilities) { // Create informer to prevent hit the API server for all resource request factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer) csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister() + nodeLister = factory.Core().V1().Nodes().Lister() } // Create the provisioner: it implements the Provisioner interface expected by @@ -207,7 +210,8 @@ func main() { supportsMigrationFromInTreePluginName, *strictTopology, translator, - csiNodeLister) + csiNodeLister, + nodeLister) provisionController = controller.NewProvisionController( clientset, @@ -224,7 +228,7 @@ func main() { cacheSyncResult := factory.WaitForCacheSync(stopCh) for _, v := range cacheSyncResult { if !v { - klog.Fatalf("Failed to sync CsiNodeInformer!") + klog.Fatalf("Failed to sync Informers!") } } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 50a142e110..6158c09886 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -47,6 +47,7 @@ import ( "k8s.io/klog" "google.golang.org/grpc" + corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1beta1" ) @@ -197,6 +198,7 @@ type csiProvisioner struct { strictTopology bool translator ProvisionerCSITranslator csiNodeLister storagelisters.CSINodeLister + nodeLister corelisters.NodeLister } var _ controller.Provisioner = &csiProvisioner{} @@ -257,7 +259,8 @@ func NewCSIProvisioner(client kubernetes.Interface, supportsMigrationFromInTreePluginName string, strictTopology bool, translator ProvisionerCSITranslator, - csiNodeLister storagelisters.CSINodeLister) controller.Provisioner { + csiNodeLister storagelisters.CSINodeLister, + nodeLister corelisters.NodeLister) controller.Provisioner { csiClient := csi.NewControllerClient(grpcClient) provisioner := &csiProvisioner{ @@ -276,6 +279,7 @@ func NewCSIProvisioner(client kubernetes.Interface, strictTopology: strictTopology, translator: translator, csiNodeLister: csiNodeLister, + nodeLister: nodeLister, } return provisioner } @@ -504,7 +508,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1. options.StorageClass.AllowedTopologies, options.SelectedNode, p.strictTopology, - p.csiNodeLister) + p.csiNodeLister, + p.nodeLister) if err != nil { return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err) } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 94f46af1b1..5e17b06089 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -401,7 +401,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", - 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil) + 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil) // Requested PVC with requestedBytes storage deletePolicy := v1.PersistentVolumeReclaimDelete @@ -1464,7 +1464,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil) + nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil, nil) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -2031,7 +2031,7 @@ func TestProvisionFromSnapshot(t *testing.T) { pluginCaps, controllerCaps := provisionFromSnapshotCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil) + client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -2201,11 +2201,11 @@ func TestProvisionWithTopologyEnabled(t *testing.T) { clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes) - csiNodeLister, stopChan := csiNodeLister(clientSet, t) + csiNodeLister, nodeLister, stopChan := listers(clientSet) defer close(stopChan) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister, nodeLister) pv, err := csiProvisioner.Provision(controller.ProvisionOptions{ StorageClass: &storagev1.StorageClass{}, @@ -2260,7 +2260,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) { clientSet := fakeclientset.NewSimpleClientset() pluginCaps, controllerCaps := provisionWithTopologyCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -2440,7 +2440,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil) err = csiProvisioner.Delete(tc.persistentVolume) if tc.expectErr && err == nil { @@ -3096,7 +3096,7 @@ func TestProvisionFromPVC(t *testing.T) { } csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil) + nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil) pv, err := csiProvisioner.Provision(tc.volOpts) if tc.expectErr && err == nil { @@ -3175,7 +3175,7 @@ func TestProvisionWithMigration(t *testing.T) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, - inTreePluginName, false, mockTranslator, nil) + inTreePluginName, false, mockTranslator, nil, nil) // Set up return values (AnyTimes to avoid overfitting on implementation) @@ -3335,7 +3335,7 @@ func TestDeleteMigration(t *testing.T) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", - false, mockTranslator, nil) + false, mockTranslator, nil, nil) // Set mock return values (AnyTimes to avoid overfitting on implementation details) mockTranslator.EXPECT().IsPVMigratable(gomock.Any()).Return(tc.expectTranslation).AnyTimes() diff --git a/pkg/controller/topology.go b/pkg/controller/topology.go index 2ba6d6eeca..de0880a7be 100644 --- a/pkg/controller/topology.go +++ b/pkg/controller/topology.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/version" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/klog" ) @@ -152,7 +153,8 @@ func GenerateAccessibilityRequirements( allowedTopologies []v1.TopologySelectorTerm, selectedNode *v1.Node, strictTopology bool, - csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) { + csiNodeLister storagelisters.CSINodeLister, + nodeLister corelisters.NodeLister) (*csi.TopologyRequirement, error) { requirement := &csi.TopologyRequirement{} var ( @@ -219,7 +221,7 @@ func GenerateAccessibilityRequirements( requisiteTerms = flatten(allowedTopologies) } else { // Aggregate existing topologies in nodes across the entire cluster. - requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister) + requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister, nodeLister) if err != nil { return nil, err } @@ -309,7 +311,8 @@ func aggregateTopologies( kubeClient kubernetes.Interface, driverName string, selectedCSINode *storage.CSINode, - csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) { + csiNodeLister storagelisters.CSINodeLister, + nodeLister corelisters.NodeLister) ([]topologyTerm, error) { // 1. Determine topologyKeys to use for aggregation var topologyKeys []string @@ -371,15 +374,14 @@ func aggregateTopologies( if err != nil { return nil, err } - // TODO (#144): use informers - nodes, err := kubeClient.CoreV1().Nodes().List(metav1.ListOptions{LabelSelector: selector}) + nodes, err := nodeLister.List(selector) if err != nil { return nil, fmt.Errorf("error listing nodes: %v", err) } var terms []topologyTerm - for _, node := range nodes.Items { - term, _ := getTopologyFromNode(&node, topologyKeys) + for _, node := range nodes { + term, _ := getTopologyFromNode(node, topologyKeys) terms = append(terms, term) } if len(terms) == 0 { @@ -508,7 +510,7 @@ func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTer return term, false } -func buildTopologyKeySelector(topologyKeys []string) (string, error) { +func buildTopologyKeySelector(topologyKeys []string) (labels.Selector, error) { var expr []metav1.LabelSelectorRequirement for _, key := range topologyKeys { expr = append(expr, metav1.LabelSelectorRequirement{ @@ -523,10 +525,10 @@ func buildTopologyKeySelector(topologyKeys []string) (string, error) { selector, err := metav1.LabelSelectorAsSelector(&labelSelector) if err != nil { - return "", fmt.Errorf("error parsing topology keys selector: %v", err) + return nil, fmt.Errorf("error parsing topology keys selector: %v", err) } - return selector.String(), nil + return selector, nil } func (t topologyTerm) clone() topologyTerm { diff --git a/pkg/controller/topology_test.go b/pkg/controller/topology_test.go index 052a06b8ba..a2f1b839a5 100644 --- a/pkg/controller/topology_test.go +++ b/pkg/controller/topology_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" fakeclientset "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/kubernetes/pkg/apis/core/helper" ) @@ -392,7 +393,7 @@ func TestStatefulSetSpreading(t *testing.T) { kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes) - csiNodeLister, stopChan := csiNodeLister(kubeClient, t) + csiNodeLister, nodeLister, stopChan := listers(kubeClient) defer close(stopChan) for name, tc := range testcases { @@ -410,6 +411,7 @@ func TestStatefulSetSpreading(t *testing.T) { nil, strictTopology, csiNodeLister, + nodeLister, ) if err != nil { @@ -804,6 +806,7 @@ func TestAllowedTopologies(t *testing.T) { nil, /* selectedNode */ strictTopology, nil, + nil, ) if err != nil { @@ -1080,7 +1083,7 @@ func TestTopologyAggregation(t *testing.T) { kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes) - csiNodeLister, stopChan := csiNodeLister(kubeClient, t) + csiNodeLister, nodeLister, stopChan := listers(kubeClient) defer close(stopChan) var selectedNode *v1.Node @@ -1095,6 +1098,7 @@ func TestTopologyAggregation(t *testing.T) { selectedNode, strictTopology, csiNodeLister, + nodeLister, ) if tc.expectError { @@ -1332,7 +1336,7 @@ func TestPreferredTopologies(t *testing.T) { kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes) selectedNode := &nodes.Items[0] - csiNodeLister, stopChan := csiNodeLister(kubeClient, t) + csiNodeLister, nodeLister, stopChan := listers(kubeClient) defer close(stopChan) requirements, err := GenerateAccessibilityRequirements( @@ -1343,6 +1347,7 @@ func TestPreferredTopologies(t *testing.T) { selectedNode, strictTopology, csiNodeLister, + nodeLister, ) if tc.expectError { @@ -1545,11 +1550,12 @@ func requisiteEqual(t1, t2 []*csi.Topology) bool { return unchecked.Len() == 0 } -func csiNodeLister(kubeClient *fakeclientset.Clientset, t *testing.T) (v1beta1.CSINodeLister, chan struct{}) { +func listers(kubeClient *fakeclientset.Clientset) (v1beta1.CSINodeLister, corelisters.NodeLister, chan struct{}) { factory := informers.NewSharedInformerFactory(kubeClient, ResyncPeriodOfCsiNodeInformer) stopChan := make(chan struct{}) csiNodeLister := factory.Storage().V1beta1().CSINodes().Lister() + nodeLister := factory.Core().V1().Nodes().Lister() factory.Start(stopChan) factory.WaitForCacheSync(stopChan) - return csiNodeLister, stopChan + return csiNodeLister, nodeLister, stopChan } diff --git a/vendor/modules.txt b/vendor/modules.txt index dd0c5ca90b..06f6017cb2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -246,6 +246,7 @@ k8s.io/apiserver/pkg/util/feature # k8s.io/client-go v0.0.0-20190918200256-06eb1244587a => k8s.io/client-go v0.0.0-20190918200256-06eb1244587a k8s.io/client-go/informers k8s.io/client-go/kubernetes +k8s.io/client-go/listers/core/v1 k8s.io/client-go/listers/storage/v1beta1 k8s.io/client-go/rest k8s.io/client-go/tools/clientcmd @@ -416,7 +417,6 @@ k8s.io/client-go/listers/batch/v2alpha1 k8s.io/client-go/listers/certificates/v1beta1 k8s.io/client-go/listers/coordination/v1 k8s.io/client-go/listers/coordination/v1beta1 -k8s.io/client-go/listers/core/v1 k8s.io/client-go/listers/events/v1beta1 k8s.io/client-go/listers/extensions/v1beta1 k8s.io/client-go/listers/networking/v1