Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added controller for PVC deletion #422

Merged
merged 10 commits into from
Mar 16, 2020
45 changes: 33 additions & 12 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,36 @@ func main() {
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName

factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)

// -------------------------------
// Listers
// Create informer to prevent hit the API server for all resource request
scLister := factory.Storage().V1().StorageClasses().Lister()
claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()

var csiNodeLister storagelistersv1beta1.CSINodeLister
var nodeLister v1.NodeLister
if ctrl.SupportsTopology(pluginCapabilities) {
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
nodeLister = factory.Core().V1().Nodes().Lister()
}

// -------------------------------
// PersistentVolumeClaims informer
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()

// Setup options
provisionerOptions := []func(*controller.ProvisionController) error{
controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.RateLimiter(rateLimiter),
controller.Threadiness(int(*workerThreads)),
controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
controller.ClaimsInformer(claimInformer),
}

translator := csitrans.New()
Expand All @@ -193,17 +216,6 @@ func main() {
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
}

// Create informer to prevent hit the API server for all resource request
factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
scLister := factory.Storage().V1().StorageClasses().Lister()

var csiNodeLister storagelistersv1beta1.CSINodeLister
var nodeLister v1.NodeLister
if ctrl.SupportsTopology(pluginCapabilities) {
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
nodeLister = factory.Core().V1().Nodes().Lister()
}

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(
Expand All @@ -223,6 +235,7 @@ func main() {
scLister,
csiNodeLister,
nodeLister,
claimLister,
*extraCreateMetadata,
)

Expand All @@ -234,6 +247,13 @@ func main() {
provisionerOptions...,
)

csiClaimController := ctrl.NewCloningProtectionController(
clientset,
claimLister,
claimInformer,
claimQueue,
)

run := func(context.Context) {
stopCh := context.Background().Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove stopCh?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally left this change after seeing issues with a similar approach in tests, will undo this.

factory.Start(stopCh)
Expand All @@ -244,6 +264,7 @@ func main() {
}
}

go csiClaimController.Run(int(*workerThreads), stopCh)
provisionController.Run(wait.NeverStop)
}

Expand Down
206 changes: 206 additions & 0 deletions pkg/controller/clone_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package controller

import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
)

//
// This package introduce a way to handle finalizers, related to in-progress PVC cloning. This is a two step approach:
//
// 1) PVC referenced as a data source is now updated with a finalizer `provisioner.storage.kubernetes.io/cloning-protection` during a ProvisionExt method call.
// The detection of cloning in-progress is based on the assumption that a PVC with `spec.DataSource` pointing on a another PVC will go into `Pending` state.
// The downside of this, is that fact that any other reason causing PVC to stay in the `Pending` state also blocks resource from deletion it from deletion
//
// 2) When cloning is finished for each PVC referencing the one as a data source,
// this PVC will go from `Pending` to `Bound` state. That allows remove the finalizer.
//

// CloningProtectionController is storing all related interfaces
// to handle cloning protection finalizer removal after CSI cloning is finished
type CloningProtectionController struct {
client kubernetes.Interface
claimLister corelisters.PersistentVolumeClaimLister
claimInformer cache.SharedInformer
claimQueue workqueue.RateLimitingInterface
}

// NewCloningProtectionController creates new controller for additional CSI claim protection capabilities
func NewCloningProtectionController(
client kubernetes.Interface,
claimLister corelisters.PersistentVolumeClaimLister,
claimInformer cache.SharedInformer,
claimQueue workqueue.RateLimitingInterface,
) *CloningProtectionController {
controller := &CloningProtectionController{
client: client,
claimLister: claimLister,
claimInformer: claimInformer,
claimQueue: claimQueue,
}
return controller
}

// Run is a main CloningProtectionController handler
func (p *CloningProtectionController) Run(threadiness int, stopCh <-chan struct{}) {
klog.Info("Starting CloningProtection controller")
defer utilruntime.HandleCrash()
defer p.claimQueue.ShutDown()

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: p.enqueueClaimUpadate,
UpdateFunc: func(_ interface{}, newObj interface{}) { p.enqueueClaimUpadate(newObj) },
}
p.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.DefaultResyncPeriod)

for i := 0; i < threadiness; i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to double the number of goroutines, and default value of worker-threads is 100 (so 200) total. Do we really need that many threads to process this finalizer? @jsafrane

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, maybe not...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this in a follow up PR. Sorry for the issue. Could 5 be sufficient @msau42?

go wait.Until(p.runClaimWorker, time.Second, stopCh)
}

go p.claimInformer.Run(stopCh)

klog.Infof("Started CloningProtection controller")
<-stopCh
klog.Info("Shutting down CloningProtection controller")
}

func (p *CloningProtectionController) runClaimWorker() {
for p.processNextClaimWorkItem() {
}
}

// processNextClaimWorkItem processes items from claimQueue
func (p *CloningProtectionController) processNextClaimWorkItem() bool {
obj, shutdown := p.claimQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer p.claimQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
p.claimQueue.Forget(obj)
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}

if err := p.syncClaimHandler(key); err != nil {
klog.Warningf("Retrying syncing claim %q after %v failures", key, p.claimQueue.NumRequeues(obj))
p.claimQueue.AddRateLimited(obj)
} else {
p.claimQueue.Forget(obj)
}

return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}

return true
}

// enqueueClaimUpadate takes a PVC obj and stores it into the claim work queue.
func (p *CloningProtectionController) enqueueClaimUpadate(obj interface{}) {
new, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected claim but got %+v", new))
return
}

// Timestamp didn't appear
if new.DeletionTimestamp == nil {
return
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
}

p.claimQueue.Add(key)
}

// syncClaimHandler gets the claim from informer's cache then calls syncClaim
func (p *CloningProtectionController) syncClaimHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

claim, err := p.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
if apierrs.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("Item '%s' in work queue no longer exists", key))
return nil
}

return err
}

return p.syncClaim(claim)
}

// syncClaim removes finalizers from a PVC, when cloning is finished
func (p *CloningProtectionController) syncClaim(claim *v1.PersistentVolumeClaim) error {
if !checkFinalizer(claim, pvcCloneFinalizer) {
return nil
}

// Checking for PVCs in the same namespace to have other states aside from Pending, which means that cloning is still in progress
pvcList, err := p.claimLister.PersistentVolumeClaims(claim.Namespace).List(labels.Everything())
if err != nil {
return err
}

// Check for pvc state with DataSource pointing to claim
for _, pvc := range pvcList {
if pvc.Spec.DataSource == nil {
continue
}

// Requeue when at least one PVC is still works on cloning
if pvc.Spec.DataSource.Kind == pvcKind &&
pvc.Spec.DataSource.Name == claim.Name &&
pvc.Status.Phase == v1.ClaimPending {
return fmt.Errorf("PVC '%s' is in 'Pending' state, cloning in progress", pvc.Name)
}
}

// Remove clone finalizer
finalizers := make([]string, 0)
for _, finalizer := range claim.ObjectMeta.Finalizers {
if finalizer != pvcCloneFinalizer {
finalizers = append(finalizers, finalizer)
}
}
claim.ObjectMeta.Finalizers = finalizers

if _, err = p.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
if !apierrs.IsNotFound(err) {
// Couldn't remove finalizer and the object still exists, the controller may
// try to remove the finalizer again on the next update
klog.Infof("failed to remove clone finalizer from PVC %v", claim.Name)
return err
}
}

return nil
}
Loading