Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CSI Translation Library into a struct that is injected into various components to simplify unit testing #82683

Merged
merged 1 commit into from
Sep 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kube-controller-manager/app/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ go_library(
"//staging/src/k8s.io/component-base/cli/globalflag:go_default_library",
"//staging/src/k8s.io/component-base/version:go_default_library",
"//staging/src/k8s.io/component-base/version/verflag:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/custom_metrics:go_default_library",
"//staging/src/k8s.io/metrics/pkg/client/external_metrics:go_default_library",
Expand Down
4 changes: 3 additions & 1 deletion cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
restclient "k8s.io/client-go/rest"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/controller"
cloudcontroller "k8s.io/kubernetes/pkg/controller/cloud"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
Expand Down Expand Up @@ -309,7 +310,8 @@ func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, err
ctx.InformerFactory.Core().V1().PersistentVolumes(),
ctx.InformerFactory.Storage().V1().StorageClasses(),
ctx.Cloud,
ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration))
ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
csitrans.New())

if expandControllerErr != nil {
return nil, true, fmt.Errorf("failed to start volume expand controller : %v", expandControllerErr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/volume/expand/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ go_library(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
Expand Down Expand Up @@ -75,6 +74,7 @@ go_test(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
],
)
14 changes: 11 additions & 3 deletions pkg/controller/volume/expand/expand_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
csitranslation "k8s.io/csi-translation-lib"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/util/mount"
Expand All @@ -62,6 +61,11 @@ type ExpandController interface {
Run(stopCh <-chan struct{})
}

// CSINameTranslator can get the CSI Driver name based on the in-tree plugin name
type CSINameTranslator interface {
GetCSINameFromInTreeName(pluginName string) (string, error)
}

type expandController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
Expand Down Expand Up @@ -92,6 +96,8 @@ type expandController struct {
operationGenerator operationexecutor.OperationGenerator

queue workqueue.RateLimitingInterface

translator CSINameTranslator
}

// NewExpandController expands the pvs
Expand All @@ -101,7 +107,8 @@ func NewExpandController(
pvInformer coreinformers.PersistentVolumeInformer,
scInformer storageclassinformer.StorageClassInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin) (ExpandController, error) {
plugins []volume.VolumePlugin,
translator CSINameTranslator) (ExpandController, error) {

expc := &expandController{
kubeClient: kubeClient,
Expand All @@ -113,6 +120,7 @@ func NewExpandController(
classLister: scInformer.Lister(),
classListerSynced: scInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
translator: translator,
}

if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
Expand Down Expand Up @@ -255,7 +263,7 @@ func (expc *expandController) syncHandler(key string) error {
if volumePlugin.IsMigratedToCSI() {
msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", volumeResizerName)
expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
csiResizerName, err := csitranslation.GetCSINameFromInTreeName(class.Provisioner)
csiResizerName, err := expc.translator.GetCSINameFromInTreeName(class.Provisioner)
if err != nil {
errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/volume/expand/expand_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/informers"
coretesting "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
csitrans "k8s.io/csi-translation-lib"
csitranslationplugins "k8s.io/csi-translation-lib/plugins"
"k8s.io/kubernetes/pkg/controller"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
Expand Down Expand Up @@ -123,7 +124,7 @@ func TestSyncHandler(t *testing.T) {
if tc.storageClass != nil {
informerFactory.Storage().V1().StorageClasses().Informer().GetIndexer().Add(tc.storageClass)
}
expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins)
expc, err := NewExpandController(fakeKubeClient, pvcInformer, pvInformer, storageClassInformer, nil, allPlugins, csitrans.New())
if err != nil {
t.Fatalf("error creating expand controller : %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/volume/persistentvolume/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ go_test(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/volume/persistentvolume/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,12 @@ func wrapTestWithProvisionCalls(expectedProvisionCalls []provisionCall, toWrap t
return wrapTestWithPluginCalls(nil, nil, expectedProvisionCalls, toWrap)
}

type fakeCSINameTranslator struct{}

func (t fakeCSINameTranslator) GetCSINameFromInTreeName(pluginName string) (string, error) {
return "vendor.com/MockCSIPlugin", nil
}

// wrapTestWithCSIMigrationProvisionCalls returns a testCall that:
// - configures controller with a volume plugin that emulates CSI migration
// - calls given testCall
Expand All @@ -530,9 +536,7 @@ func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall {
isMigratedToCSI: true,
}
ctrl.volumePluginMgr.InitPlugins([]vol.VolumePlugin{plugin}, nil /* prober */, ctrl)
ctrl.csiNameFromIntreeNameHook = func(string) (string, error) {
return "vendor.com/MockCSIPlugin", nil
}
ctrl.translator = fakeCSINameTranslator{}
return toWrap(ctrl, reactor, test)
}
}
Expand Down
25 changes: 10 additions & 15 deletions pkg/controller/volume/persistentvolume/pv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
csitranslation "k8s.io/csi-translation-lib"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
Expand Down Expand Up @@ -134,6 +133,11 @@ const createProvisionedPVRetryCount = 5
// Interval between retries when we create a PV object for a provisioned volume.
const createProvisionedPVInterval = 10 * time.Second

// CSINameTranslator can get the CSI Driver name based on the in-tree plugin name
type CSINameTranslator interface {
GetCSINameFromInTreeName(pluginName string) (string, error)
}

// PersistentVolumeController is a controller that synchronizes
// PersistentVolumeClaims and PersistentVolumes. It starts two
// cache.Controllers that watch PersistentVolume and PersistentVolumeClaim
Expand Down Expand Up @@ -200,10 +204,6 @@ type PersistentVolumeController struct {
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration

// For testing only: hook to intercept CSI driver name <=> Intree plugin name mapping
// Not used when set to nil
csiNameFromIntreeNameHook func(pluginName string) (string, error)

// operationTimestamps caches start timestamp of operations
// (currently provision + binding/deletion) for metric recording.
// Detailed lifecyle/key for each operation
Expand All @@ -225,6 +225,8 @@ type PersistentVolumeController struct {
// the corresponding timestamp entry will be deleted from cache
// abort: N.A.
operationTimestamps metrics.OperationStartTimeCache

translator CSINameTranslator
}

// syncClaim is the main controller method to decide what to do with a claim.
Expand Down Expand Up @@ -1355,13 +1357,6 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum
return nil
}

func (ctrl *PersistentVolumeController) getCSINameFromIntreeName(pluginName string) (string, error) {
if ctrl.csiNameFromIntreeNameHook != nil {
return ctrl.csiNameFromIntreeNameHook(pluginName)
}
return csitranslation.GetCSINameFromInTreeName(pluginName)
}

// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(
Expand Down Expand Up @@ -1571,7 +1566,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperationExternal(
provisionerName := storageClass.Provisioner
if plugin != nil {
// update the provisioner name to use the CSI in-tree name
provisionerName, err = ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
provisionerName, err = ctrl.translator.GetCSINameFromInTreeName(storageClass.Provisioner)
if err != nil {
strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err)
klog.V(2).Infof("%s", strerr)
Expand Down Expand Up @@ -1732,7 +1727,7 @@ func (ctrl *PersistentVolumeController) getProvisionerNameFromVolume(volume *v1.
return "N/A"
}
if plugin != nil {
provisionerName, err := ctrl.getCSINameFromIntreeName(class.Provisioner)
provisionerName, err := ctrl.translator.GetCSINameFromInTreeName(class.Provisioner)
if err == nil {
return provisionerName
}
Expand All @@ -1747,7 +1742,7 @@ func (ctrl *PersistentVolumeController) getProvisionerName(plugin vol.Provisiona
return plugin.GetPluginName()
} else if plugin != nil {
// get the CSI in-tree name from storage class provisioner name
provisionerName, err := ctrl.getCSINameFromIntreeName(storageClass.Provisioner)
provisionerName, err := ctrl.translator.GetCSINameFromInTreeName(storageClass.Provisioner)
if err != nil {
return "N/A"
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/volume/persistentvolume/pv_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
Expand Down Expand Up @@ -93,6 +94,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
volumeQueue: workqueue.NewNamed("volumes"),
resyncPeriod: p.SyncPeriod,
operationTimestamps: metrics.NewOperationStartTimeCache(),
translator: csitrans.New(),
}

// Prober is nil because PV is not aware of Flexvolume.
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/volume/persistentvolume/pv_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
storagelisters "k8s.io/client-go/listers/storage/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
Expand Down Expand Up @@ -438,6 +439,7 @@ func TestDelayBindingMode(t *testing.T) {
classInformer := informerFactory.Storage().V1().StorageClasses()
ctrl := &PersistentVolumeController{
classLister: classInformer.Lister(),
translator: csitrans.New(),
}

for _, class := range classes {
Expand Down
25 changes: 19 additions & 6 deletions pkg/scheduler/algorithm/predicates/csi_volume_predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,24 @@ import (
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/util/rand"
utilfeature "k8s.io/apiserver/pkg/util/feature"
csilib "k8s.io/csi-translation-lib"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)

// InTreeToCSITranslator contains methods required to check migratable status
// and perform translations from InTree PV's to CSI
type InTreeToCSITranslator interface {
IsPVMigratable(pv *v1.PersistentVolume) bool
IsMigratableIntreePluginByName(inTreePluginName string) bool
GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
GetCSINameFromInTreeName(pluginName string) (string, error)
TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
}

// CSIMaxVolumeLimitChecker defines predicate needed for counting CSI volumes
type CSIMaxVolumeLimitChecker struct {
csiNodeInfo CSINodeInfo
Expand All @@ -39,6 +49,8 @@ type CSIMaxVolumeLimitChecker struct {
scInfo StorageClassInfo

randomVolumeIDPrefix string

translator InTreeToCSITranslator
}

// NewCSIMaxVolumeLimitPredicate returns a predicate for counting CSI volumes
Expand All @@ -50,6 +62,7 @@ func NewCSIMaxVolumeLimitPredicate(
pvcInfo: pvcInfo,
scInfo: scInfo,
randomVolumeIDPrefix: rand.String(32),
translator: csitrans.New(),
}
return c.attachableLimitPredicate
}
Expand Down Expand Up @@ -201,11 +214,11 @@ func (c *CSIMaxVolumeLimitChecker) getCSIDriverInfo(csiNode *storagev1beta1.CSIN
csiSource := pv.Spec.PersistentVolumeSource.CSI
if csiSource == nil {
// We make a fast path for non-CSI volumes that aren't migratable
if !csilib.IsPVMigratable(pv) {
if !c.translator.IsPVMigratable(pv) {
return "", ""
}

pluginName, err := csilib.GetInTreePluginNameFromSpec(pv, nil)
pluginName, err := c.translator.GetInTreePluginNameFromSpec(pv, nil)
if err != nil {
klog.V(5).Infof("Unable to look up plugin name from PV spec: %v", err)
return "", ""
Expand All @@ -216,7 +229,7 @@ func (c *CSIMaxVolumeLimitChecker) getCSIDriverInfo(csiNode *storagev1beta1.CSIN
return "", ""
}

csiPV, err := csilib.TranslateInTreePVToCSI(pv)
csiPV, err := c.translator.TranslateInTreePVToCSI(pv)
if err != nil {
klog.V(5).Infof("Unable to translate in-tree volume to CSI: %v", err)
return "", ""
Expand Down Expand Up @@ -258,13 +271,13 @@ func (c *CSIMaxVolumeLimitChecker) getCSIDriverInfoFromSC(csiNode *storagev1beta
volumeHandle := fmt.Sprintf("%s-%s/%s", c.randomVolumeIDPrefix, namespace, pvcName)

provisioner := storageClass.Provisioner
if csilib.IsMigratableIntreePluginByName(provisioner) {
if c.translator.IsMigratableIntreePluginByName(provisioner) {
if !isCSIMigrationOn(csiNode, provisioner) {
klog.V(5).Infof("CSI Migration of plugin %s is not enabled", provisioner)
return "", ""
}

driverName, err := csilib.GetCSINameFromInTreeName(provisioner)
driverName, err := c.translator.GetCSINameFromInTreeName(provisioner)
if err != nil {
klog.V(5).Infof("Unable to look up driver name from plugin name: %v", err)
return "", ""
Expand Down
1 change: 1 addition & 0 deletions pkg/volume/util/operationexecutor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_test(
"//staging/src/k8s.io/component-base/featuregate:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
Expand Down
5 changes: 5 additions & 0 deletions pkg/volume/util/operationexecutor/fakegenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
Expand Down Expand Up @@ -88,6 +89,10 @@ func (f *fakeOGCounter) GetVolumePluginMgr() *volume.VolumePluginMgr {
return nil
}

func (f *fakeOGCounter) GetCSITranslator() InTreeToCSITranslator {
return csitrans.New()
}

func (f *fakeOGCounter) GenerateBulkVolumeVerifyFunc(
map[types.NodeName][]*volume.Spec,
string,
Expand Down
6 changes: 3 additions & 3 deletions pkg/volume/util/operationexecutor/operation_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,14 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
}

// Migration: Must also check the Node since Attach would have been done with in-tree if node is not using Migration
nu, err := nodeUsingCSIPlugin(oe.operationGenerator, volumeAttached.VolumeSpec, node)
nu, err := nodeUsingCSIPlugin(oe.operationGenerator.GetCSITranslator(), oe.operationGenerator.GetVolumePluginMgr(), volumeAttached.VolumeSpec, node)
if err != nil {
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.NodeUsingCSIPlugin failed", err).Error())
continue
}

var volumePlugin volume.VolumePlugin
if useCSIPlugin(oe.operationGenerator.GetVolumePluginMgr(), volumeAttached.VolumeSpec) && nu {
if useCSIPlugin(oe.operationGenerator.GetCSITranslator(), oe.operationGenerator.GetVolumePluginMgr(), volumeAttached.VolumeSpec) && nu {
// The volume represented by this spec is CSI and thus should be migrated
volumePlugin, err = oe.operationGenerator.GetVolumePluginMgr().FindPluginByName(csi.CSIPluginName)
if err != nil || volumePlugin == nil {
Expand All @@ -661,7 +661,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached(
continue
}

csiSpec, err := translateSpec(volumeAttached.VolumeSpec)
csiSpec, err := translateSpec(oe.operationGenerator.GetCSITranslator(), volumeAttached.VolumeSpec)
if err != nil {
klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.TranslateSpec failed", err).Error())
continue
Expand Down