Skip to content

Commit

Permalink
Merge pull request #327 from mucahitkurt/use-node-and-csinodeinfo-inf…
Browse files Browse the repository at this point in the history
…ormers

Use csiNode informer to prevent hitting the API server for all time
  • Loading branch information
k8s-ci-robot committed Oct 26, 2019
2 parents 94262e8 + 17fa2a8 commit 4160887
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 116 deletions.
40 changes: 37 additions & 3 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"k8s.io/klog"

utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
utilflag "k8s.io/component-base/cli/flag"
csitrans "k8s.io/csi-translation-lib"
)
Expand Down Expand Up @@ -181,11 +183,32 @@ func main() {
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
}

var csiNodeLister storagelisters.CSINodeLister
var factory informers.SharedInformerFactory
if ctrl.SupportsTopology(pluginCapabilities) {
// Create informer to prevent hit the API server for all resource request
factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
}

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix,
*volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities,
controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology, translator)
csiProvisioner := ctrl.NewCSIProvisioner(
clientset,
*operationTimeout,
identity,
*volumeNamePrefix,
*volumeNameUUIDLength,
grpcClient,
snapClient,
provisionerName,
pluginCapabilities,
controllerCapabilities,
supportsMigrationFromInTreePluginName,
*strictTopology,
translator,
csiNodeLister)

provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand All @@ -195,6 +218,17 @@ func main() {
)

run := func(context.Context) {
if factory != nil {
stopCh := context.Background().Done()
factory.Start(stopCh)
cacheSyncResult := factory.WaitForCacheSync(stopCh)
for _, v := range cacheSyncResult {
if !v {
klog.Fatalf("Failed to sync CsiNodeInformer!")
}
}
}

provisionController.Run(wait.NeverStop)
}

Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,24 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-provisioner/pkg/features"
snapapi "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
"sigs.k8s.io/sig-storage-lib-external-provisioner/util"

v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog"

"google.golang.org/grpc"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
)

//secretParamsMap provides a mapping of current as well as deprecated secret keys
Expand Down Expand Up @@ -115,6 +114,8 @@ const (
tokenPVCNameKey = "pvc.name"
tokenPVCNameSpaceKey = "pvc.namespace"

ResyncPeriodOfCsiNodeInformer = 1 * time.Hour

deleteVolumeRetryCount = 5

annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner"
Expand Down Expand Up @@ -195,6 +196,7 @@ type csiProvisioner struct {
supportsMigrationFromInTreePluginName string
strictTopology bool
translator ProvisionerCSITranslator
csiNodeLister storagelisters.CSINodeLister
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -254,7 +256,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
controllerCapabilities connection.ControllerCapabilitySet,
supportsMigrationFromInTreePluginName string,
strictTopology bool,
translator ProvisionerCSITranslator) controller.Provisioner {
translator ProvisionerCSITranslator,
csiNodeLister storagelisters.CSINodeLister) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -272,6 +275,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
strictTopology: strictTopology,
translator: translator,
csiNodeLister: csiNodeLister,
}
return provisioner
}
Expand Down Expand Up @@ -499,7 +503,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
options.PVC.Name,
options.StorageClass.AllowedTopologies,
options.SelectedNode,
p.strictTopology)
p.strictTopology,
p.csiNodeLister)
if err != nil {
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
}
Expand Down Expand Up @@ -664,8 +669,7 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
}

func (p *csiProvisioner) supportsTopology() bool {
return p.pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
return SupportsTopology(p.pluginCapabilities)
}

func removePrefixedParameters(param map[string]string) (map[string]string, error) {
Expand Down
28 changes: 16 additions & 12 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
"github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake"
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test",
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)

// Requested PVC with requestedBytes storage
deletePolicy := v1.PersistentVolumeReclaimDelete
Expand Down Expand Up @@ -1464,7 +1464,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New())
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2031,7 +2031,7 @@ func TestProvisionFromSnapshot(t *testing.T) {

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2186,7 +2186,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
}

nodes := buildNodes(tc.nodeLabels, k8sTopologyBetaVersion.String())
nodeInfos := buildNodeInfos(tc.topologyKeys)
csiNodes := buildCSINodes(tc.topologyKeys)

var (
pluginCaps connection.PluginCapabilitySet
Expand All @@ -2199,9 +2199,13 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
pluginCaps, controllerCaps = provisionCapabilities()
}

clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes)

csiNodeLister, stopChan := csiNodeLister(clientSet, t)
defer close(stopChan)

csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister)

pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
StorageClass: &storagev1.StorageClass{},
Expand Down Expand Up @@ -2256,7 +2260,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {
clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2436,7 +2440,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)

err = csiProvisioner.Delete(tc.persistentVolume)
if tc.expectErr && err == nil {
Expand Down Expand Up @@ -3092,7 +3096,7 @@ func TestProvisionFromPVC(t *testing.T) {
}

csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New())
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)

pv, err := csiProvisioner.Provision(tc.volOpts)
if tc.expectErr && err == nil {
Expand Down Expand Up @@ -3171,7 +3175,7 @@ func TestProvisionWithMigration(t *testing.T) {
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps,
inTreePluginName, false, mockTranslator)
inTreePluginName, false, mockTranslator, nil)

// Set up return values (AnyTimes to avoid overfitting on implementation)

Expand Down Expand Up @@ -3331,7 +3335,7 @@ func TestDeleteMigration(t *testing.T) {
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "",
false, mockTranslator)
false, mockTranslator, nil)

// Set mock return values (AnyTimes to avoid overfitting on implementation details)
mockTranslator.EXPECT().IsPVMigratable(gomock.Any()).Return(tc.expectTranslation).AnyTimes()
Expand Down
49 changes: 29 additions & 20 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ import (
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
v1 "k8s.io/api/core/v1"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-provisioner/pkg/features"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/version"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/klog"
)

Expand Down Expand Up @@ -69,12 +74,18 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo
}
}

// SupportsTopology returns whether topology is supported both for plugin and external provisioner
func SupportsTopology(pluginCapabilities connection.PluginCapabilitySet) bool {
return pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
}

// GenerateAccessibilityRequirements returns the CSI TopologyRequirement
// to pass into the CSI CreateVolume request.
//
// This function is only called if the topology feature is enabled
// in the external-provisioner and the CSI driver implements the
// CSI accessbility capability. It is disabled by default.
// CSI accessibility capability. It is disabled by default.
//
// If enabled, we require that the K8s API server is on at least
// K8s 1.14, and that the K8s CSINode feature gate is enabled. In
Expand Down Expand Up @@ -140,7 +151,8 @@ func GenerateAccessibilityRequirements(
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node,
strictTopology bool) (*csi.TopologyRequirement, error) {
strictTopology bool,
csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}

var (
Expand All @@ -152,7 +164,7 @@ func GenerateAccessibilityRequirements(

// 1. Get CSINode for the selected node
if selectedNode != nil {
selectedCSINode, err = getSelectedCSINode(kubeClient, selectedNode)
selectedCSINode, err = getSelectedCSINode(csiNodeLister, selectedNode)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -207,7 +219,7 @@ func GenerateAccessibilityRequirements(
requisiteTerms = flatten(allowedTopologies)
} else {
// Aggregate existing topologies in nodes across the entire cluster.
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -263,11 +275,10 @@ func GenerateAccessibilityRequirements(

// getSelectedCSINode returns the CSINode object for the given selectedNode.
func getSelectedCSINode(
kubeClient kubernetes.Interface,
csiNodeLister storagelisters.CSINodeLister,
selectedNode *v1.Node) (*storage.CSINode, error) {

// TODO (#144): use informers
selectedNodeInfo, err := kubeClient.StorageV1beta1().CSINodes().Get(selectedNode.Name, metav1.GetOptions{})
selectedCSINode, err := csiNodeLister.Get(selectedNode.Name)
if err != nil {
// If the Node is before 1.14, then we fallback to "topology disabled" behavior
// to retain backwards compatibility in a single-topology environment with
Expand All @@ -289,37 +300,35 @@ func getSelectedCSINode(
// error with the API server.
return nil, fmt.Errorf("error getting CSINode for selected node %q: %v", selectedNode.Name, err)
}
return selectedNodeInfo, nil
return selectedCSINode, nil
}

// aggregateTopologies returns all the supported topology values in the cluster that
// match the driver's topology keys.
func aggregateTopologies(
kubeClient kubernetes.Interface,
driverName string,
selectedCSINode *storage.CSINode) ([]topologyTerm, error) {
selectedCSINode *storage.CSINode,
csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) {

// 1. Determine topologyKeys to use for aggregation
var topologyKeys []string
if selectedCSINode == nil {
// Immediate binding

// TODO (#144): use informers
nodeInfos, err := kubeClient.StorageV1beta1().CSINodes().List(metav1.ListOptions{})
csiNodes, err := csiNodeLister.List(labels.Everything())
if err != nil {
// Require CSINode beta feature on K8s apiserver to be enabled.
// We don't want to fallback and provision in the wrong topology if there's some temporary
// error with the API server.
return nil, fmt.Errorf("error listing CSINodes: %v", err)
}

rand.Shuffle(len(nodeInfos.Items), func(i, j int) {
nodeInfos.Items[i], nodeInfos.Items[j] = nodeInfos.Items[j], nodeInfos.Items[i]
rand.Shuffle(len(csiNodes), func(i, j int) {
csiNodes[i], csiNodes[j] = csiNodes[j], csiNodes[i]
})

// Pick the first node with topology keys
for _, nodeInfo := range nodeInfos.Items {
topologyKeys = getTopologyKeys(&nodeInfo, driverName)
for _, csiNode := range csiNodes {
topologyKeys = getTopologyKeys(csiNode, driverName)
if topologyKeys != nil {
break
}
Expand Down Expand Up @@ -478,8 +487,8 @@ func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32)
return preferredTerms
}

func getTopologyKeys(nodeInfo *storage.CSINode, driverName string) []string {
for _, driver := range nodeInfo.Spec.Drivers {
func getTopologyKeys(csiNode *storage.CSINode, driverName string) []string {
for _, driver := range csiNode.Spec.Drivers {
if driver.Name == driverName {
return driver.TopologyKeys
}
Expand Down
Loading

0 comments on commit 4160887

Please sign in to comment.