Skip to content

Commit

Permalink
UPSTREAM: <carry>: delay queuing deletion for PV to allow nodes some …
Browse files Browse the repository at this point in the history
…time to unmount
  • Loading branch information
deads2k authored and damemi committed Dec 6, 2021
1 parent 1baac43 commit d823961
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/volume/persistentvolume/pv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ type PersistentVolumeController struct {
// however overall speed of multi-worker controller would be lower than if
// it runs single thread only.
claimQueue *workqueue.Type
volumeQueue *workqueue.Type
volumeQueue workqueue.RateLimitingInterface

// Map of scheduled/running operations.
runningOperations goroutinemap.GoRoutineMap
Expand Down
23 changes: 20 additions & 3 deletions pkg/controller/volume/persistentvolume/pv_controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
createProvisionedPVInterval: createProvisionedPVInterval,
claimQueue: workqueue.NewNamed("claims"),
volumeQueue: workqueue.NewNamed("volumes"),
volumeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volumes"),
resyncPeriod: p.SyncPeriod,
operationTimestamps: metrics.NewOperationStartTimeCache(),
}
Expand All @@ -113,7 +113,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWorkAfter(controller.volumeQueue, obj, 21*time.Second) },
},
)
controller.volumeLister = p.VolumeInformer.Lister()
Expand Down Expand Up @@ -196,6 +196,20 @@ func (ctrl *PersistentVolumeController) enqueueWork(queue workqueue.Interface, o
queue.Add(objName)
}

func (ctrl *PersistentVolumeController) enqueueWorkAfter(queue workqueue.DelayingInterface, obj interface{}, delay time.Duration) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
objName, err := controller.KeyFunc(obj)
if err != nil {
klog.Errorf("failed to get key from object: %v", err)
return
}
klog.V(5).Infof("enqueued %q for sync", objName)
queue.AddAfter(objName, delay)
}

func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) {
return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
}
Expand Down Expand Up @@ -297,8 +311,11 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl
// sync the volume when its claim is deleted. Explicitly sync'ing the
// volume here in response to claim deletion prevents the volume from
// waiting until the next sync period for its Release.
// delay queuing the volume to allow some time for nodes to detach the volume from the node. The time chosen here
// is to hopefully be short enough that e2e tests still pass and long enough that most PVs stop hitting the failure
// errors.
klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimKey, volumeName)
ctrl.volumeQueue.Add(volumeName)
ctrl.volumeQueue.AddAfter(volumeName, 21*time.Second)
}

// Run starts all of this controller's control loops
Expand Down

0 comments on commit d823961

Please sign in to comment.