Skip to content

Commit

Permalink
capacity: support both v1 and v1beta1
Browse files Browse the repository at this point in the history
The code itself uses the v1 API and directly uses the normal client-go API when
v1 is supported by the server. If the server doesn't support that API, wrappers
around the v1beta1 API convert CSIStorageCapacity objects back and forth as
needed.

The reason for this more complex solution is that it avoids a breaking change
in the external-provisioner and thus provides a smoother transition path: CSI
driver developers can update to the next external-provisioner release and use
it both with Kubernetes < 1.24 and >= 1.24.
  • Loading branch information
pohly committed Apr 7, 2022
1 parent f52d7d8 commit f62d56b
Show file tree
Hide file tree
Showing 6 changed files with 3,371 additions and 8 deletions.
36 changes: 32 additions & 4 deletions cmd/csi-provisioner/csi-provisioner.go
Expand Up @@ -34,6 +34,7 @@ import (
flag "github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -129,6 +130,8 @@ func main() {
flag.Set("logtostderr", "true")
flag.Parse()

ctx := context.Background()

if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {
klog.Fatal(err)
}
Expand Down Expand Up @@ -455,7 +458,7 @@ func main() {
klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment)
topologyInformer = topology.NewFixedNodeTopology(&segment)
}
go topologyInformer.RunWorker(context.Background())
go topologyInformer.RunWorker(ctx)

managedByID := "external-provisioner"
if *enableNodeDeployment {
Expand All @@ -476,18 +479,43 @@ func main() {
}),
)

// We use the V1 CSIStorageCapacity API if available.
clientFactory := capacity.NewV1ClientFactory(clientset)
cInformer := factoryForNamespace.Storage().V1().CSIStorageCapacities()

invalidCapacity := &storagev1.CSIStorageCapacity{
ObjectMeta: metav1.ObjectMeta{
Name: "#%123-invalid-name",
},
}
createdCapacity, err := clientset.StorageV1().CSIStorageCapacities("default").Create(ctx, invalidCapacity, metav1.CreateOptions{})
switch {
case err == nil:
klog.Fatalf("creating an invalid v1.CSIStorageCapacity didn't fail as expected, got: %s", createdCapacity)
case apierrors.IsNotFound(err):
// We need to bridge between the v1beta1 API on the
// server and the v1 API expected by the capacity code.
klog.Info("using the CSIStorageCapacity v1beta1 API")
clientFactory = capacity.NewV1beta1ClientFactory(clientset)
cInformer = capacity.NewV1beta1InformerBridge(factoryForNamespace.Storage().V1beta1().CSIStorageCapacities())
case apierrors.IsInvalid(err):
klog.Info("using the CSIStorageCapacity v1 API")
default:
klog.Fatalf("unexpected error when checking for the V1 CSIStorageCapacity API: %v", err)
}

capacityController = capacity.NewCentralCapacityController(
csi.NewControllerClient(grpcClient),
provisionerName,
clientset.StorageV1().CSIStorageCapacities,
clientFactory,
// Metrics for the queue is available in the default registry.
workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),
controller,
managedByID,
namespace,
topologyInformer,
factory.Storage().V1().StorageClasses(),
factoryForNamespace.Storage().V1().CSIStorageCapacities(),
cInformer,
*capacityPollInterval,
*capacityImmediateBinding,
*operationTimeout,
Expand Down Expand Up @@ -572,7 +600,7 @@ func main() {
}

if !*enableLeaderElection {
run(context.TODO())
run(ctx)
} else {
// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller
// to preserve backwards compatibility
Expand Down
177 changes: 177 additions & 0 deletions pkg/capacity/apibridge.go
@@ -0,0 +1,177 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package capacity contains the code which controls the CSIStorageCapacity
// objects owned by the external-provisioner.
package capacity

import (
"context"

storagev1 "k8s.io/api/storage/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1"
"k8s.io/client-go/kubernetes"
storagev1beta1typed "k8s.io/client-go/kubernetes/typed/storage/v1beta1"
storagelistersv1 "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

func NewV1ClientFactory(clientSet kubernetes.Interface) CSIStorageCapacityFactory {
return func(namespace string) CSIStorageCapacityInterface {
return clientSet.StorageV1().CSIStorageCapacities(namespace)
}
}

func NewV1beta1ClientFactory(clientSet kubernetes.Interface) CSIStorageCapacityFactory {
return func(namespace string) CSIStorageCapacityInterface {
return csiStorageCapacityBridge{CSIStorageCapacityInterface: clientSet.StorageV1beta1().CSIStorageCapacities(namespace)}
}
}

// csiStorageCapacityBridge converts between v1 and v1beta. Delete is the same
// in both API versions, we can inherit its implementation. Create and Update
// must be wrapped.
type csiStorageCapacityBridge struct {
storagev1beta1typed.CSIStorageCapacityInterface
}

var _ CSIStorageCapacityInterface = csiStorageCapacityBridge{}

func (c csiStorageCapacityBridge) Create(ctx context.Context, cscv1 *storagev1.CSIStorageCapacity, opts metav1.CreateOptions) (*storagev1.CSIStorageCapacity, error) {
cscv1beta1, err := c.CSIStorageCapacityInterface.Create(ctx, v1Tov1beta1(cscv1), opts)
if err != nil {
return nil, err
}
return v1beta1Tov1(cscv1beta1), nil
}

func (c csiStorageCapacityBridge) Update(ctx context.Context, cscv1 *storagev1.CSIStorageCapacity, opts metav1.UpdateOptions) (*storagev1.CSIStorageCapacity, error) {
cscv1beta1, err := c.CSIStorageCapacityInterface.Update(ctx, v1Tov1beta1(cscv1), opts)
if err != nil {
return nil, err
}
return v1beta1Tov1(cscv1beta1), nil
}

func NewV1beta1InformerBridge(informer storageinformersv1beta1.CSIStorageCapacityInformer) storageinformersv1.CSIStorageCapacityInformer {
return csiStorageCapacityInformerBridge{informer: informer}
}

type csiStorageCapacityInformerBridge struct {
informer storageinformersv1beta1.CSIStorageCapacityInformer
}

var _ storageinformersv1.CSIStorageCapacityInformer = csiStorageCapacityInformerBridge{}

func (ib csiStorageCapacityInformerBridge) Lister() storagelistersv1.CSIStorageCapacityLister {
return ib
}

func (ib csiStorageCapacityInformerBridge) List(selector labels.Selector) ([]*storagev1.CSIStorageCapacity, error) {
cscsv1beta1, err := ib.informer.Lister().List(selector)
if err != nil {
return nil, err
}
cscsv1 := make([]*storagev1.CSIStorageCapacity, 0, len(cscsv1beta1))
for _, cscv1beta1 := range cscsv1beta1 {
cscsv1 = append(cscsv1, v1beta1Tov1(cscv1beta1))
}
return cscsv1, nil
}

func (ib csiStorageCapacityInformerBridge) Informer() cache.SharedIndexInformer {
return csiStorageCapacityIndexInformerBridge{SharedIndexInformer: ib.informer.Informer()}
}

func (ib csiStorageCapacityInformerBridge) CSIStorageCapacities(namespace string) storagelistersv1.CSIStorageCapacityNamespaceLister {
// Not implemented, not needed.
return nil
}

// csiStorageCapacityIndexInformerBridge wraps a SharedIndexInformer for
// v1beta1 CSIStorageCapacity. It makes sure that handlers only ever see v1
// CSIStorageCapacity.
type csiStorageCapacityIndexInformerBridge struct {
cache.SharedIndexInformer
}

var _ cache.SharedIndexInformer = csiStorageCapacityIndexInformerBridge{}

func (iib csiStorageCapacityIndexInformerBridge) AddEventHandler(handlerv1 cache.ResourceEventHandler) {
handlerv1beta1 := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
csc, ok := obj.(*storagev1beta1.CSIStorageCapacity)
if !ok {
klog.Errorf("added object: expected v1beta1.CSIStorageCapacity, got %T -> ignoring it", obj)
return
}
handlerv1.OnAdd(v1beta1Tov1(csc))
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldCsc, ok := oldObj.(*storagev1beta1.CSIStorageCapacity)
if !ok {
klog.Errorf("updated object: expected v1beta1.CSIStorageCapacity, got %T -> ignoring it", oldObj)
return
}
newCsc, ok := newObj.(*storagev1beta1.CSIStorageCapacity)
if !ok {
klog.Errorf("updated object: expected v1beta1.CSIStorageCapacity, got %T -> ignoring it", newObj)
return
}
handlerv1.OnUpdate(v1beta1Tov1(oldCsc), v1beta1Tov1(newCsc))
},
DeleteFunc: func(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
csc, ok := obj.(*storagev1beta1.CSIStorageCapacity)
if !ok {
klog.Errorf("deleted object: expected v1beta1.CSIStorageCapacity, got %T -> ignoring it", obj)
return
}
handlerv1.OnDelete(v1beta1Tov1(csc))
},
}
iib.SharedIndexInformer.AddEventHandler(handlerv1beta1)
}

// Shallow copies are good enough for our purpose.

func v1beta1Tov1(csc *storagev1beta1.CSIStorageCapacity) *storagev1.CSIStorageCapacity {
return &storagev1.CSIStorageCapacity{
ObjectMeta: csc.ObjectMeta,
NodeTopology: csc.NodeTopology,
StorageClassName: csc.StorageClassName,
Capacity: csc.Capacity,
MaximumVolumeSize: csc.MaximumVolumeSize,
}
}

func v1Tov1beta1(csc *storagev1.CSIStorageCapacity) *storagev1beta1.CSIStorageCapacity {
return &storagev1beta1.CSIStorageCapacity{
ObjectMeta: csc.ObjectMeta,
NodeTopology: csc.NodeTopology,
StorageClassName: csc.StorageClassName,
Capacity: csc.Capacity,
MaximumVolumeSize: csc.MaximumVolumeSize,
}
}
17 changes: 14 additions & 3 deletions pkg/capacity/capacity.go
Expand Up @@ -38,7 +38,6 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
storageinformersv1 "k8s.io/client-go/informers/storage/v1"
storagev1typed "k8s.io/client-go/kubernetes/typed/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics"
Expand Down Expand Up @@ -79,7 +78,7 @@ type Controller struct {

csiController CSICapacityClient
driverName string
clientFactory func(namespace string) storagev1typed.CSIStorageCapacityInterface
clientFactory CSIStorageCapacityFactory
queue workqueue.RateLimitingInterface
owner *metav1.OwnerReference
managedByID string
Expand Down Expand Up @@ -147,13 +146,25 @@ type CSICapacityClient interface {
GetCapacity(ctx context.Context, in *csi.GetCapacityRequest, opts ...grpc.CallOption) (*csi.GetCapacityResponse, error)
}

// CSIStorageCapacityInterface is a subset of the client-go interface for
// v1.CSIStorageCapacity.
type CSIStorageCapacityInterface interface {
Create(ctx context.Context, cSIStorageCapacity *storagev1.CSIStorageCapacity, opts metav1.CreateOptions) (*storagev1.CSIStorageCapacity, error)
Update(ctx context.Context, cSIStorageCapacity *storagev1.CSIStorageCapacity, opts metav1.UpdateOptions) (*storagev1.CSIStorageCapacity, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
}

// CSIStorageCapacityFactory corresponds to StorageV1().CSIStorageCapacities but returns
// just what we need.
type CSIStorageCapacityFactory func(namespace string) CSIStorageCapacityInterface

// NewController creates a new controller for CSIStorageCapacity objects.
// It implements metrics.StableCollector and thus can be registered in
// a registry.
func NewCentralCapacityController(
csiController CSICapacityClient,
driverName string,
clientFactory func(namespace string) storagev1typed.CSIStorageCapacityInterface,
clientFactory CSIStorageCapacityFactory,
queue workqueue.RateLimitingInterface,
owner *metav1.OwnerReference,
managedByID string,
Expand Down
2 changes: 1 addition & 1 deletion pkg/capacity/capacity_test.go
Expand Up @@ -1352,7 +1352,7 @@ func fakeController(ctx context.Context, client *fakeclientset.Clientset, owner
c := NewCentralCapacityController(
storage,
driverName,
client.StorageV1().CSIStorageCapacities,
NewV1ClientFactory(client),
queue,
owner,
managedByID,
Expand Down

0 comments on commit f62d56b

Please sign in to comment.