From 41bf03b77f89513d9541344352456cbaf73580f9 Mon Sep 17 00:00:00 2001 From: Cheng Xing Date: Tue, 14 Jan 2020 10:56:58 -0800 Subject: [PATCH] Parallelize attach operations across different nodes for volumes that allow multi-attach --- .../volume/attachdetach/reconciler/BUILD | 2 +- .../attachdetach/reconciler/reconciler.go | 85 ++- .../volumemanager/reconciler/reconciler.go | 4 +- pkg/volume/util/nestedpendingoperations/BUILD | 2 + .../nestedpendingoperations.go | 207 ++++--- .../nestedpendingoperations_test.go | 527 +++++++++++++----- .../operationexecutor/operation_executor.go | 41 +- .../operation_executor_test.go | 179 +++++- pkg/volume/util/util.go | 41 ++ 9 files changed, 795 insertions(+), 293 deletions(-) diff --git a/pkg/controller/volume/attachdetach/reconciler/BUILD b/pkg/controller/volume/attachdetach/reconciler/BUILD index cb047fc9366b..b3a94b397e3b 100644 --- a/pkg/controller/volume/attachdetach/reconciler/BUILD +++ b/pkg/controller/volume/attachdetach/reconciler/BUILD @@ -16,7 +16,7 @@ go_library( "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", - "//pkg/volume:go_default_library", + "//pkg/volume/util:go_default_library", "//pkg/volume/util/operationexecutor:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index a2ec9be2a937..c1e2edddc262 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -24,7 +24,7 @@ import ( "strings" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" @@ -34,7 +34,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) @@ -134,42 +134,6 @@ func (rc *reconciler) syncStates() { rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) } -// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible. -// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns -// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the -// attacher to fail fast in such cases. -// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 -func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool { - if volumeSpec.Volume != nil { - // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach - if volumeSpec.Volume.AzureDisk != nil || - volumeSpec.Volume.Cinder != nil { - return true - } - } - - // Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to - // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes - if volumeSpec.PersistentVolume != nil { - // Check for persistent volume types which do not fail when trying to multi-attach - if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { - // No access mode specified so we don't know for sure. Let the attacher fail if needed - return false - } - - // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false - for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { - if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { - return false - } - } - return true - } - - // we don't know if it's supported or not and let the attacher fail later in cases it's not supported - return false -} - func (rc *reconciler) reconcile() { // Detaches are triggered before attaches so that volumes referenced by // pods that are rescheduled to a different node are detached first. @@ -178,13 +142,23 @@ func (rc *reconciler) reconcile() { for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { if !rc.desiredStateOfWorld.VolumeExists( attachedVolume.VolumeName, attachedVolume.NodeName) { - // Don't even try to start an operation if there is already one running + // Check whether there already exist an operation pending, and don't even + // try to start an operation if there is already one running. // This check must be done before we do any other checks, as otherwise the other checks // may pass while at the same time the volume leaves the pending state, resulting in // double detach attempts - if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") { - klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) - continue + // The operation key format is different depending on whether the volume + // allows multi attach across different nodes. + if util.IsMultiAttachAllowed(attachedVolume.VolumeSpec) { + if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) { + klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName) + continue + } + } else { + if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) { + klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) + continue + } } // Set the detach request time @@ -260,15 +234,27 @@ func (rc *reconciler) attachDesiredVolumes() { rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName) continue } - // Don't even try to start an operation if there is already one running - if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") { - if klog.V(10) { - klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + + if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) { + + // Don't even try to start an operation if there is already one running for the given volume and node. + if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) { + if klog.V(10) { + klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName) + } + continue + } + + } else { + + // Don't even try to start an operation if there is already one running for the given volume + if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) { + if klog.V(10) { + klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName) + } + continue } - continue - } - if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) { nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) if len(nodes) > 0 { if !volumeToAttach.MultiAttachErrorReported { @@ -277,6 +263,7 @@ func (rc *reconciler) attachDesiredVolumes() { } continue } + } // Volume/Node doesn't exist, spawn a goroutine to attach it diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 3524446d5d40..011bdc8acfa3 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -279,7 +279,7 @@ func (rc *reconciler) reconcile() { for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && - !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { + !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { if attachedVolume.GloballyMounted { // Volume is globally mounted to device, unmount it klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) @@ -407,7 +407,7 @@ func (rc *reconciler) syncStates() { continue } // There is no pod that uses the volume. - if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) { + if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) { klog.Warning("Volume is in pending operation, skip cleaning up mounts") } klog.V(2).Infof( diff --git a/pkg/volume/util/nestedpendingoperations/BUILD b/pkg/volume/util/nestedpendingoperations/BUILD index 5e940356c8d0..5e287a0aba9e 100644 --- a/pkg/volume/util/nestedpendingoperations/BUILD +++ b/pkg/volume/util/nestedpendingoperations/BUILD @@ -14,6 +14,7 @@ go_library( "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -27,6 +28,7 @@ go_test( "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], ) diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index 193e362cd0bd..a889a5b188f4 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -29,45 +29,83 @@ import ( "sync" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" k8sRuntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume/util/types" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) const ( // EmptyUniquePodName is a UniquePodName for empty string. - EmptyUniquePodName types.UniquePodName = types.UniquePodName("") + EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("") // EmptyUniqueVolumeName is a UniqueVolumeName for empty string EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("") + + // EmptyNodeName is a NodeName for empty string + EmptyNodeName types.NodeName = types.NodeName("") ) // NestedPendingOperations defines the supported set of operations. type NestedPendingOperations interface { - // Run adds the concatenation of volumeName and podName to the list of - // running operations and spawns a new go routine to execute operationFunc. - // If an operation with the same volumeName, same or empty podName - // and same operationName exits, an AlreadyExists or ExponentialBackoff - // error is returned. If an operation with same volumeName and podName - // has ExponentialBackoff error but operationName is different, exponential - // backoff is reset and operation is allowed to proceed. - // This enables multiple operations to execute in parallel for the same - // volumeName as long as they have different podName. - // Once the operation is complete, the go routine is terminated and the - // concatenation of volumeName and podName is removed from the list of - // executing operations allowing a new operation to be started with the - // volumeName without error. - Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error + + // Run adds the concatenation of volumeName, podName, and nodeName to the list + // of running operations and spawns a new go routine to run + // generatedOperations. + + // volumeName, podName, and nodeName collectively form the operation key. + // The following forms of operation keys are supported (two keys are designed + // to be "matched" if we want to serialize their operations): + // - volumeName empty, podName and nodeName could be anything + // This key does not match with any keys. + // - volumeName exists, podName empty, nodeName empty + // This key matches all other keys with the same volumeName. + // - volumeName exists, podName exists, nodeName empty + // This key matches with: + // - the same volumeName and podName + // - the same volumeName, but empty podName + // - volumeName exists, podName empty, nodeName exists + // This key matches with: + // - the same volumeName and nodeName + // - the same volumeName but empty nodeName + + // If there is no operation with a matching key, the operation is allowed to + // proceed. + // If an operation with a matching key exists and the previous operation is + // running, an AlreadyExists error is returned. + // If an operation with a matching key exists and the previous operation + // failed: + // - If the previous operation has the same + // generatedOperations.operationName: + // - If the full exponential backoff period is satisfied, the operation is + // allowed to proceed. + // - Otherwise, an ExponentialBackoff error is returned. + // - Otherwise, exponential backoff is reset and operation is allowed to + // proceed. + + // Once the operation is complete, the go routine is terminated. If the + // operation succeeded, its corresponding key is removed from the list of + // executing operations, allowing a new operation to be started with the key + // without error. If it failed, the key remains and the exponential + // backoff status is updated. + Run( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName, + generatedOperations volumetypes.GeneratedOperations) error // Wait blocks until all operations are completed. This is typically // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() - // IsOperationPending returns true if an operation for the given volumeName and podName is pending, - // otherwise it returns false - IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool + // IsOperationPending returns true if an operation for the given volumeName + // and one of podName or nodeName is pending, otherwise it returns false + IsOperationPending( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName) bool } // NewNestedPendingOperations returns a new instance of NestedPendingOperations. @@ -88,8 +126,7 @@ type nestedPendingOperations struct { } type operation struct { - volumeName v1.UniqueVolumeName - podName types.UniquePodName + key operationKey operationName string operationPending bool expBackoff exponentialbackoff.ExponentialBackoff @@ -97,22 +134,24 @@ type operation struct { func (grm *nestedPendingOperations) Run( volumeName v1.UniqueVolumeName, - podName types.UniquePodName, - generatedOperations types.GeneratedOperations) error { + podName volumetypes.UniquePodName, + nodeName types.NodeName, + generatedOperations volumetypes.GeneratedOperations) error { grm.lock.Lock() defer grm.lock.Unlock() - opExists, previousOpIndex := grm.isOperationExists(volumeName, podName) + + opKey := operationKey{volumeName, podName, nodeName} + + opExists, previousOpIndex := grm.isOperationExists(opKey) if opExists { previousOp := grm.operations[previousOpIndex] // Operation already exists if previousOp.operationPending { // Operation is pending - operationKey := getOperationKey(volumeName, podName) - return NewAlreadyExistsError(operationKey) + return NewAlreadyExistsError(opKey) } - operationKey := getOperationKey(volumeName, podName) - backOffErr := previousOp.expBackoff.SafeToRetry(operationKey) + backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey)) if backOffErr != nil { if previousOp.operationName == generatedOperations.OperationName { return backOffErr @@ -124,15 +163,13 @@ func (grm *nestedPendingOperations) Run( // Update existing operation to mark as pending. grm.operations[previousOpIndex].operationPending = true - grm.operations[previousOpIndex].volumeName = volumeName - grm.operations[previousOpIndex].podName = podName + grm.operations[previousOpIndex].key = opKey } else { // Create a new operation grm.operations = append(grm.operations, operation{ + key: opKey, operationPending: true, - volumeName: volumeName, - podName: podName, operationName: generatedOperations.OperationName, expBackoff: exponentialbackoff.ExponentialBackoff{}, }) @@ -142,7 +179,7 @@ func (grm *nestedPendingOperations) Run( // Handle unhandled panics (very unlikely) defer k8sRuntime.HandleCrash() // Handle completion of and error, if any, from operationFunc() - defer grm.operationComplete(volumeName, podName, &detailedErr) + defer grm.operationComplete(opKey, &detailedErr) return generatedOperations.Run() }() @@ -151,12 +188,14 @@ func (grm *nestedPendingOperations) Run( func (grm *nestedPendingOperations) IsOperationPending( volumeName v1.UniqueVolumeName, - podName types.UniquePodName) bool { + podName volumetypes.UniquePodName, + nodeName types.NodeName) bool { grm.lock.RLock() defer grm.lock.RUnlock() - exist, previousOpIndex := grm.isOperationExists(volumeName, podName) + opKey := operationKey{volumeName, podName, nodeName} + exist, previousOpIndex := grm.isOperationExists(opKey) if exist && grm.operations[previousOpIndex].operationPending { return true } @@ -164,71 +203,69 @@ func (grm *nestedPendingOperations) IsOperationPending( } // This is an internal function and caller should acquire and release the lock -func (grm *nestedPendingOperations) isOperationExists( - volumeName v1.UniqueVolumeName, - podName types.UniquePodName) (bool, int) { +func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) { // If volumeName is empty, operation can be executed concurrently - if volumeName == EmptyUniqueVolumeName { + if key.volumeName == EmptyUniqueVolumeName { return false, -1 } for previousOpIndex, previousOp := range grm.operations { - if previousOp.volumeName != volumeName { - // No match, keep searching - continue - } + volumeNameMatch := previousOp.key.volumeName == key.volumeName - if previousOp.podName != EmptyUniquePodName && - podName != EmptyUniquePodName && - previousOp.podName != podName { - // No match, keep searching - continue - } + podNameMatch := previousOp.key.podName == EmptyUniquePodName || + key.podName == EmptyUniquePodName || + previousOp.key.podName == key.podName - // Match - return true, previousOpIndex + nodeNameMatch := previousOp.key.nodeName == EmptyNodeName || + key.nodeName == EmptyNodeName || + previousOp.key.nodeName == key.nodeName + + if volumeNameMatch && podNameMatch && nodeNameMatch { + return true, previousOpIndex + } } + return false, -1 } -func (grm *nestedPendingOperations) getOperation( - volumeName v1.UniqueVolumeName, - podName types.UniquePodName) (uint, error) { +func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) { // Assumes lock has been acquired by caller. for i, op := range grm.operations { - if op.volumeName == volumeName && - op.podName == podName { + if op.key.volumeName == key.volumeName && + op.key.podName == key.podName && + op.key.nodeName == key.nodeName { return uint(i), nil } } - logOperationKey := getOperationKey(volumeName, podName) - return 0, fmt.Errorf("Operation %q not found", logOperationKey) + return 0, fmt.Errorf("Operation %+v not found", key) } -func (grm *nestedPendingOperations) deleteOperation( +func (grm *nestedPendingOperations) deleteOperation(key operationKey) { // Assumes lock has been acquired by caller. - volumeName v1.UniqueVolumeName, - podName types.UniquePodName) { opIndex := -1 for i, op := range grm.operations { - if op.volumeName == volumeName && - op.podName == podName { + if op.key.volumeName == key.volumeName && + op.key.podName == key.podName && + op.key.nodeName == key.nodeName { opIndex = i break } } + if opIndex < 0 { + return + } + // Delete index without preserving order grm.operations[opIndex] = grm.operations[len(grm.operations)-1] grm.operations = grm.operations[:len(grm.operations)-1] } -func (grm *nestedPendingOperations) operationComplete( - volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) { +func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) { // Defer operations are executed in Last-In is First-Out order. In this case // the lock is acquired first when operationCompletes begins, and is // released when the method finishes, after the lock is released cond is @@ -239,24 +276,20 @@ func (grm *nestedPendingOperations) operationComplete( if *err == nil || !grm.exponentialBackOffOnError { // Operation completed without error, or exponentialBackOffOnError disabled - grm.deleteOperation(volumeName, podName) + grm.deleteOperation(key) if *err != nil { // Log error - logOperationKey := getOperationKey(volumeName, podName) - klog.Errorf("operation %s failed with: %v", - logOperationKey, - *err) + klog.Errorf("operation %+v failed with: %v", key, *err) } return } // Operation completed with error and exponentialBackOffOnError Enabled - existingOpIndex, getOpErr := grm.getOperation(volumeName, podName) + existingOpIndex, getOpErr := grm.getOperation(key) if getOpErr != nil { // Failed to find existing operation - logOperationKey := getOperationKey(volumeName, podName) - klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", - logOperationKey, + klog.Errorf("Operation %+v completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.", + key, *err) return } @@ -265,10 +298,8 @@ func (grm *nestedPendingOperations) operationComplete( grm.operations[existingOpIndex].operationPending = false // Log error - operationKey := - getOperationKey(volumeName, podName) klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff. - GenerateNoRetriesPermittedMsg(operationKey)) + GenerateNoRetriesPermittedMsg(fmt.Sprintf("%+v", key))) } func (grm *nestedPendingOperations) Wait() { @@ -280,21 +311,15 @@ func (grm *nestedPendingOperations) Wait() { } } -func getOperationKey( - volumeName v1.UniqueVolumeName, podName types.UniquePodName) string { - podNameStr := "" - if podName != EmptyUniquePodName { - podNameStr = fmt.Sprintf(" (%q)", podName) - } - - return fmt.Sprintf("%q%s", - volumeName, - podNameStr) +type operationKey struct { + volumeName v1.UniqueVolumeName + podName volumetypes.UniquePodName + nodeName types.NodeName } // NewAlreadyExistsError returns a new instance of AlreadyExists error. -func NewAlreadyExistsError(operationKey string) error { - return alreadyExistsError{operationKey} +func NewAlreadyExistsError(key operationKey) error { + return alreadyExistsError{key} } // IsAlreadyExists returns true if an error returned from @@ -313,13 +338,13 @@ func IsAlreadyExists(err error) bool { // new operation can not be started because an operation with the same operation // name is already executing. type alreadyExistsError struct { - operationKey string + operationKey operationKey } var _ error = alreadyExistsError{} func (err alreadyExistsError) Error() string { return fmt.Sprintf( - "Failed to create operation with name %q. An operation with that name is already executing.", + "Failed to create operation with name %+v. An operation with that name is already executing.", err.operationKey) } diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index fe52c8977980..b94ccbae6196 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -22,9 +22,10 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" - "k8s.io/kubernetes/pkg/volume/util/types" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) const ( @@ -44,99 +45,95 @@ const ( initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond ) -func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) { +func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) // Assert if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } } -func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) { +func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volume1Name := v1.UniqueVolumeName("volume1-name") volume2Name := v1.UniqueVolumeName("volume2-name") - operation := func() (error, error) { return nil, nil } // Act - err1 := grm.Run(volume1Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) - err2 := grm.Run(volume2Name, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err1 := grm.Run(volume1Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) + err2 := grm.Run(volume2Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) // Assert if err1 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume1Name, err1) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", volume1Name, err1) } if err2 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", volume2Name, err2) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", volume2Name, err2) } } -func Test_NewGoRoutineMap_Positive_TwoSubOps(t *testing.T) { +func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation1PodName := types.UniquePodName("operation1-podname") - operation2PodName := types.UniquePodName("operation2-podname") - operation := func() (error, error) { return nil, nil } + operation1PodName := volumetypes.UniquePodName("operation1-podname") + operation2PodName := volumetypes.UniquePodName("operation2-podname") // Act - err1 := grm.Run(volumeName, operation1PodName, types.GeneratedOperations{OperationFunc: operation}) - err2 := grm.Run(volumeName, operation2PodName, types.GeneratedOperations{OperationFunc: operation}) + err1 := grm.Run(volumeName, operation1PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) + err2 := grm.Run(volumeName, operation2PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) // Assert if err1 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation1PodName, err1) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", operation1PodName, err1) } if err2 != nil { - t.Fatalf("NewGoRoutine %q failed. Expected: Actual: <%v>", operation2PodName, err2) + t.Fatalf("NestedPendingOperations %q failed. Expected: Actual: <%v>", operation2PodName, err2) } } -func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation := func() (error, error) { return nil, nil } // Act - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc}) // Assert if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc <-operation1DoneCh // Force operation1 to complete // Act err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -145,30 +142,30 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) { // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateCallbackFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc <-operation1DoneCh // Force operation1 to complete // Act err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -177,28 +174,28 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t * // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + operation1 := panicFunc + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc // Act err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -207,28 +204,28 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) { // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operation1 := generatePanicFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + operation1 := panicFunc + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc // Act err2 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -237,53 +234,54 @@ func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *tes // Assert if err2 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err2) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) } } -func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") op1Name := "mount_volume" - operation1 := generateErrorFunc() - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) + operation1 := errorFunc + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } // Shorter than exponential backoff period, so as to trigger exponential backoff error on second // operation. - operation2 := generateErrorFunc() + operation2 := errorFunc err2 := retryWithExponentialBackOff( initialOperationWaitTimeShort, func() (bool, error) { err := grm.Run(volumeName, - "", /* operationSubName */ - types.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) + EmptyUniquePodName, + EmptyNodeName, + volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name}) if exponentialbackoff.IsExponentialBackoff(err) { return true, nil @@ -294,114 +292,114 @@ func Test_NewGoRoutineMap_Negative_SecondThirdOpWithDifferentNames(t *testing.T) // Assert if err2 != nil { - t.Fatalf("Expected NewGoRoutine to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) + t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name) } - operation3 := generateNoopFunc() + operation3 := noopFunc op3Name := "unmount_volume" // Act - err3 := grm.Run(volumeName, "" /*pod name*/, types.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) + err3 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name}) if err3 != nil { - t.Fatalf("NewGoRoutine failed. Expected Actual: <%v>", err3) + t.Fatalf("NestedPendingOperations failed. Expected Actual: <%v>", err3) } } -func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operationPodName := types.UniquePodName("operation-podname") + operationPodName := volumetypes.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc // Act - err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") - operationPodName := types.UniquePodName("operation-podname") + operationPodName := volumetypes.UniquePodName("operation-podname") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc // Act - err2 := grm.Run(volumeName, operationPodName, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() + operation2 := noopFunc // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } } -func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { +func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() - operation3 := generateNoopFunc() + operation2 := noopFunc + operation3 := noopFunc // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } // Act @@ -409,9 +407,9 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -420,32 +418,32 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) { // Assert if err3 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) } } -func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) { // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err1 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err1 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err1) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) } - operation2 := generateNoopFunc() - operation3 := generateNoopFunc() + operation2 := noopFunc + operation3 := noopFunc // Act - err2 := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation2}) + err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2}) // Assert if err2 == nil { - t.Fatalf("NewGoRoutine did not fail. Expected: Actual: ", volumeName) + t.Fatalf("NestedPendingOperations did not fail. Expected: Actual: ", volumeName) } if !IsAlreadyExists(err2) { - t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2) + t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) } // Act @@ -453,9 +451,9 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t err3 := retryWithExponentialBackOff( time.Duration(initialOperationWaitTimeShort), func() (bool, error) { - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation3}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3}) if err != nil { - t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err) + t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err) return false, nil } return true, nil @@ -464,11 +462,11 @@ func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *t // Assert if err3 != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err3) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) } } -func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { +func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) { // Test than Wait() on empty GoRoutineMap always succeeds without blocking // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) @@ -487,7 +485,7 @@ func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) { } } -func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) { // Test than Wait() on empty GoRoutineMap always succeeds without blocking // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) @@ -506,16 +504,16 @@ func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) { } } -func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { +func Test_NestedPendingOperations_Positive_Wait(t *testing.T) { // Test that Wait() really blocks until the last operation succeeds // Arrange grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } // Act @@ -535,16 +533,16 @@ func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) { } } -func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { +func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) { // Test that Wait() really blocks until the last operation succeeds // Arrange grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) volumeName := v1.UniqueVolumeName("volume-name") operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) operation1 := generateWaitFunc(operation1DoneCh) - err := grm.Run(volumeName, "" /* operationSubName */, types.GeneratedOperations{OperationFunc: operation1}) + err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1}) if err != nil { - t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err) } // Act @@ -564,6 +562,268 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { } } +/* Concurrent operations tests */ + +// "None" means volume, pod, and node names are all empty +// "Volume" means volume name is set, but pod name and node name are empty +// "Volume Pod" means volume and pod names are set, but the node name is empty +// "Volume Node" means volume and node names are set, but the pod name is empty + +// The same volume, pod, and node names are used (where they are not empty). + +// Covered cases: +// FIRST OP | SECOND OP | RESULT +// None | None | Positive +// None | Volume | Positive +// None | Volume Pod | Positive +// None | Volume Node | Positive +// Volume | None | Positive +// Volume | Volume | Negative (covered in Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes above) +// Volume | Volume Pod | Negative +// Volume | Volume Node | Negative +// Volume Pod | None | Positive +// Volume Pod | Volume | Negative +// Volume Pod | Volume Pod | Negative (covered in Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes above) +// Volume Node | None | Positive +// Volume Node | Volume | Negative +// Volume Node | Volume Node | Negative + +// These cases are not covered because they will never occur within the same +// binary, so either result works. +// Volume Pod | Volume Node +// Volume Node | Volume Pod + +func Test_NestedPendingOperations_SecondOpBeforeFirstCompletes(t *testing.T) { + const ( + keyNone = iota + keyVolume + keyVolumePod + keyVolumeNode + ) + + type testCase struct { + testID int + keyTypes []int // only 2 elements are supported + expectPass bool + } + + tests := []testCase{ + {testID: 1, keyTypes: []int{keyNone, keyNone}, expectPass: true}, + {testID: 2, keyTypes: []int{keyNone, keyVolume}, expectPass: true}, + {testID: 3, keyTypes: []int{keyNone, keyVolumePod}, expectPass: true}, + {testID: 4, keyTypes: []int{keyNone, keyVolumeNode}, expectPass: true}, + {testID: 5, keyTypes: []int{keyVolume, keyNone}, expectPass: true}, + {testID: 6, keyTypes: []int{keyVolume, keyVolumePod}, expectPass: false}, + {testID: 7, keyTypes: []int{keyVolume, keyVolumeNode}, expectPass: false}, + {testID: 8, keyTypes: []int{keyVolumePod, keyNone}, expectPass: true}, + {testID: 9, keyTypes: []int{keyVolumePod, keyVolume}, expectPass: false}, + {testID: 10, keyTypes: []int{keyVolumeNode, keyNone}, expectPass: true}, + {testID: 11, keyTypes: []int{keyVolumeNode, keyVolume}, expectPass: false}, + {testID: 12, keyTypes: []int{keyVolumeNode, keyVolumeNode}, expectPass: false}, + } + + for _, test := range tests { + var ( + volumeNames []v1.UniqueVolumeName + podNames []volumetypes.UniquePodName + nodeNames []types.NodeName + ) + for _, keyType := range test.keyTypes { + var ( + v v1.UniqueVolumeName + p volumetypes.UniquePodName + n types.NodeName + ) + switch keyType { + case keyNone: + v = EmptyUniqueVolumeName + p = EmptyUniquePodName + n = EmptyNodeName + case keyVolume: + v = v1.UniqueVolumeName("volume-name") + p = EmptyUniquePodName + n = EmptyNodeName + case keyVolumePod: + v = v1.UniqueVolumeName("volume-name") + p = volumetypes.UniquePodName("operation-podname") + n = EmptyNodeName + case keyVolumeNode: + v = v1.UniqueVolumeName("volume-name") + p = EmptyUniquePodName + n = types.NodeName("operation-nodename") + } + volumeNames = append(volumeNames, v) + podNames = append(podNames, p) + nodeNames = append(nodeNames, n) + } + + t.Run(fmt.Sprintf("Test %d", test.testID), func(t *testing.T) { + if test.expectPass { + testConcurrentOperationsPositive(t, + volumeNames[0], podNames[0], nodeNames[0], + volumeNames[1], podNames[1], nodeNames[1], + ) + } else { + testConcurrentOperationsNegative(t, + volumeNames[0], podNames[0], nodeNames[0], + volumeNames[1], podNames[1], nodeNames[1], + ) + } + }) + + } + +} + +func Test_NestedPendingOperations_Positive_Issue_88355(t *testing.T) { + // This test reproduces the scenario that is likely to have caused + // kubernetes/kubernetes issue #88355. + // Please refer to the issue for more context: + // https://github.com/kubernetes/kubernetes/issues/88355 + + // Below, vx is a volume name, and nx is a node name. + + // Operation sequence: + // opZ(v0) starts (operates on a different volume from all other operations) + // op1(v1, n1) starts + // op2(v1, n2) starts + // opZ(v0) ends with success + // op2(v1, n2) ends with an error (exponential backoff should be triggered) + // op1(v1, n1) ends with success + // op3(v1, n2) starts (continuously retried on exponential backoff error) + // op3(v1, n2) ends with success + // op4(v1, n2) starts + // op4(v1, n2) ends with success + + const ( + mainVolumeName = "main-volume" + opZVolumeName = "other-volume" + node1 = "node1" + node2 = "node2" + + // delay after an operation is signaled to finish to ensure it actually + // finishes before running the next operation. + delay = 50 * time.Millisecond + + // Replicates the default AttachDetachController reconcile period + reconcilerPeriod = 100 * time.Millisecond + ) + + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + opZContinueCh := make(chan interface{}, 0 /* bufferSize */) + op1ContinueCh := make(chan interface{}, 0 /* bufferSize */) + op2ContinueCh := make(chan interface{}, 0 /* bufferSize */) + operationZ := generateWaitFunc(opZContinueCh) + operation1 := generateWaitFunc(op1ContinueCh) + operation2 := generateWaitWithErrorFunc(op2ContinueCh) + operation3 := noopFunc + operation4 := noopFunc + + errZ := grm.Run(opZVolumeName, "" /* podName */, "" /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operationZ}) + if errZ != nil { + t.Fatalf("NestedPendingOperations failed for operationZ. Expected: Actual: <%v>", errZ) + } + + err1 := grm.Run(mainVolumeName, "" /* podName */, node1, volumetypes.GeneratedOperations{OperationFunc: operation1}) + if err1 != nil { + t.Fatalf("NestedPendingOperations failed for operation1. Expected: Actual: <%v>", err1) + } + + err2 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation2}) + if err2 != nil { + t.Fatalf("NestedPendingOperations failed for operation2. Expected: Actual: <%v>", err2) + } + + opZContinueCh <- true + time.Sleep(delay) + op2ContinueCh <- true + time.Sleep(delay) + op1ContinueCh <- true + time.Sleep(delay) + + for { + err3 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation3}) + if err3 == nil { + break + } else if !exponentialbackoff.IsExponentialBackoff(err3) { + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) + } + time.Sleep(reconcilerPeriod) + } + + time.Sleep(delay) + + err4 := grm.Run(mainVolumeName, "" /* podName */, node2, volumetypes.GeneratedOperations{OperationFunc: operation4}) + if err4 != nil { + t.Fatalf("NestedPendingOperations failed. Expected: Actual: <%v>", err4) + } +} + +// testConcurrentOperationsPositive passes if the two operations keyed by the +// provided parameters are executed in parallel, and fails otherwise. +func testConcurrentOperationsPositive( + t *testing.T, + volumeName1 v1.UniqueVolumeName, + podName1 volumetypes.UniquePodName, + nodeName1 types.NodeName, + volumeName2 v1.UniqueVolumeName, + podName2 volumetypes.UniquePodName, + nodeName2 types.NodeName) { + + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + if err1 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + } + operation2 := noopFunc + + // Act + err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) + + // Assert + if err2 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + } +} + +// testConcurrentOperationsNegative passes if the creation of the second +// operation returns an alreadyExists error, and fails otherwise. +func testConcurrentOperationsNegative( + t *testing.T, + volumeName1 v1.UniqueVolumeName, + podName1 volumetypes.UniquePodName, + nodeName1 types.NodeName, + volumeName2 v1.UniqueVolumeName, + podName2 volumetypes.UniquePodName, + nodeName2 types.NodeName) { + + // Arrange + grm := NewNestedPendingOperations(false /* exponentialBackOffOnError */) + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateWaitFunc(operation1DoneCh) + err1 := grm.Run(volumeName1, podName1, nodeName1 /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1}) + if err1 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + } + operation2 := noopFunc + + // Act + err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2}) + + // Assert + if err2 == nil { + t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist") + } + if !IsAlreadyExists(err2) { + t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2) + } +} + +/* END concurrent operations tests */ + func generateCallbackFunc(done chan<- interface{}) func() (error, error) { return func() (error, error) { done <- true @@ -578,21 +838,22 @@ func generateWaitFunc(done <-chan interface{}) func() (error, error) { } } -func generatePanicFunc() func() (error, error) { - return func() (error, error) { - panic("testing panic") - } +func panicFunc() (error, error) { + panic("testing panic") } -func generateErrorFunc() func() (error, error) { +func errorFunc() (error, error) { + return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2") +} + +func generateWaitWithErrorFunc(done <-chan interface{}) func() (error, error) { return func() (error, error) { - return fmt.Errorf("placholder1"), fmt.Errorf("placeholder2") + <-done + return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2") } } -func generateNoopFunc() func() (error, error) { - return func() (error, error) { return nil, nil } -} +func noopFunc() (error, error) { return nil, nil } func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error { backoff := wait.Backoff{ diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index c95029e6fd7f..d856d3360f19 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -138,9 +138,9 @@ type OperationExecutor interface { // back off on retries. VerifyControllerAttachedVolume(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error - // IsOperationPending returns true if an operation for the given volumeName and podName is pending, - // otherwise it returns false - IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool + // IsOperationPending returns true if an operation for the given volumeName + // and one of podName or nodeName is pending, otherwise it returns false + IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool // ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume. ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error // ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin @@ -596,8 +596,11 @@ type operationExecutor struct { operationGenerator OperationGenerator } -func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { - return oe.pendingOperations.IsOperationPending(volumeName, podName) +func (oe *operationExecutor) IsOperationPending( + volumeName v1.UniqueVolumeName, + podName volumetypes.UniquePodName, + nodeName types.NodeName) bool { + return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName) } func (oe *operationExecutor) AttachVolume( @@ -606,8 +609,13 @@ func (oe *operationExecutor) AttachVolume( generatedOperations := oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) + if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) { + return oe.pendingOperations.Run( + volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations) + } + return oe.pendingOperations.Run( - volumeToAttach.VolumeName, "" /* podName */, generatedOperations) + volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) DetachVolume( @@ -620,8 +628,13 @@ func (oe *operationExecutor) DetachVolume( return err } + if util.IsMultiAttachAllowed(volumeToDetach.VolumeSpec) { + return oe.pendingOperations.Run( + volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations) + } return oe.pendingOperations.Run( - volumeToDetach.VolumeName, "" /* podName */, generatedOperations) + volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) + } func (oe *operationExecutor) VerifyVolumesAreAttached( @@ -707,7 +720,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( // Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin uniquePluginName := v1.UniqueVolumeName(pluginName) - err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, generatedOperations) + err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations) if err != nil { klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err) } @@ -725,7 +738,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode( } // Give an empty UniqueVolumeName so that this operation could be executed concurrently. - return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, generatedOperations) + return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) MountVolume( @@ -766,7 +779,7 @@ func (oe *operationExecutor) MountVolume( // TODO mount_device return oe.pendingOperations.Run( - volumeToMount.VolumeName, podName, generatedOperations) + volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) UnmountVolume( @@ -797,7 +810,7 @@ func (oe *operationExecutor) UnmountVolume( podName := volumetypes.UniquePodName(volumeToUnmount.PodUID) return oe.pendingOperations.Run( - volumeToUnmount.VolumeName, podName, generatedOperations) + volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) UnmountDevice( @@ -828,7 +841,7 @@ func (oe *operationExecutor) UnmountDevice( podName := nestedpendingoperations.EmptyUniquePodName return oe.pendingOperations.Run( - deviceToDetach.VolumeName, podName, generatedOperations) + deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error { @@ -836,7 +849,7 @@ func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actu if err != nil { return err } - return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations) + return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations) } func (oe *operationExecutor) VerifyControllerAttachedVolume( @@ -850,7 +863,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( } return oe.pendingOperations.Run( - volumeToMount.VolumeName, "" /* podName */, generatedOperations) + volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations) } // ReconstructVolumeOperation return a func to create volumeSpec from mount path diff --git a/pkg/volume/util/operationexecutor/operation_executor_test.go b/pkg/volume/util/operationexecutor/operation_executor_test.go index adc199a004f8..c94249b6d30b 100644 --- a/pkg/volume/util/operationexecutor/operation_executor_test.go +++ b/pkg/volume/util/operationexecutor/operation_executor_test.go @@ -17,6 +17,7 @@ limitations under the License. package operationexecutor import ( + "fmt" "strconv" "testing" "time" @@ -180,7 +181,7 @@ func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) { } } -func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { +func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) { // Arrange ch, quit, oe := setup() volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) @@ -191,6 +192,69 @@ func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { volumesToAttach[i] = VolumeToAttach{ VolumeName: v1.UniqueVolumeName(pdName), NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + } + oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("Attach volume operations should not start concurrently") + } +} + +func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) + pdName := "pd-volume" + + // Act + for i := range volumesToAttach { + volumesToAttach[i] = VolumeToAttach{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, + } + oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("Attach volume operations should not start concurrently") + } +} + +func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) + pdName := "pd-volume" + + // Act + for i := range volumesToAttach { + volumesToAttach[i] = VolumeToAttach{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: types.NodeName(fmt.Sprintf("node%d", i)), + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, } oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) } @@ -201,7 +265,63 @@ func TestOperationExecutor_AttachVolumeConcurrently(t *testing.T) { } } -func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { +func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + volumesToAttach := make([]VolumeToAttach, numVolumesToAttach) + pdName := "pd-volume" + + // Act + for i := range volumesToAttach { + volumesToAttach[i] = VolumeToAttach{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: types.NodeName(fmt.Sprintf("node%d", i)), + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, + } + oe.AttachVolume(volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) { + t.Fatalf("Attach volume operations should not execute serially") + } +} + +func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) { + // Arrange + ch, quit, oe := setup() + attachedVolumes := make([]AttachedVolume, numVolumesToDetach) + pdName := "pd-volume" + + // Act + for i := range attachedVolumes { + attachedVolumes[i] = AttachedVolume{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + } + oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunSerially(ch, quit) { + t.Fatalf("DetachVolume operations should not run concurrently") + } +} + +func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) { // Arrange ch, quit, oe := setup() attachedVolumes := make([]AttachedVolume, numVolumesToDetach) @@ -212,6 +332,13 @@ func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { attachedVolumes[i] = AttachedVolume{ VolumeName: v1.UniqueVolumeName(pdName), NodeName: "node", + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, } oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) } @@ -222,7 +349,35 @@ func TestOperationExecutor_DetachVolumeConcurrently(t *testing.T) { } } -func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { +func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + attachedVolumes := make([]AttachedVolume, numVolumesToDetach) + pdName := "pd-volume" + + // Act + for i := range attachedVolumes { + attachedVolumes[i] = AttachedVolume{ + VolumeName: v1.UniqueVolumeName(pdName), + NodeName: types.NodeName(fmt.Sprintf("node%d", i)), + VolumeSpec: &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + }, + }, + }, + } + oe.DetachVolume(attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) { + t.Fatalf("Attach volume operations should not execute serially") + } +} + +func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) { // Arrange ch, quit, oe := setup() @@ -237,6 +392,24 @@ func TestOperationExecutor_VerifyVolumesAreAttachedConcurrently(t *testing.T) { } } +func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) { + // Arrange + ch, quit, oe := setup() + + // Act + for i := 0; i < numVolumesToVerifyAttached; i++ { + oe.VerifyVolumesAreAttachedPerNode( + nil, /* attachedVolumes */ + types.NodeName(fmt.Sprintf("node-name-%d", i)), + nil /* actualStateOfWorldAttacherUpdater */) + } + + // Assert + if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) { + t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently") + } +} + func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) { // Arrange ch, quit, oe := setup() diff --git a/pkg/volume/util/util.go b/pkg/volume/util/util.go index e3baaa7c3a61..b1d25782e98f 100644 --- a/pkg/volume/util/util.go +++ b/pkg/volume/util/util.go @@ -644,3 +644,44 @@ func WriteVolumeCache(deviceMountPath string, exec utilexec.Interface) error { // For linux runtime, it skips because unmount will automatically flush disk data return nil } + +// IsMultiAttachAllowed checks if attaching this volume to multiple nodes is definitely not allowed/possible. +// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns +// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the +// attacher to fail fast in such cases. +// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047 +func IsMultiAttachAllowed(volumeSpec *volume.Spec) bool { + if volumeSpec == nil { + // we don't know if it's supported or not and let the attacher fail later in cases it's not supported + return true + } + + if volumeSpec.Volume != nil { + // Check for volume types which are known to fail slow or cause trouble when trying to multi-attach + if volumeSpec.Volume.AzureDisk != nil || + volumeSpec.Volume.Cinder != nil { + return false + } + } + + // Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to + // multi-attach. We trust in the individual volume implementations to not allow unsupported access modes + if volumeSpec.PersistentVolume != nil { + // Check for persistent volume types which do not fail when trying to multi-attach + if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 { + // No access mode specified so we don't know for sure. Let the attacher fail if needed + return true + } + + // check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false + for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes { + if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany { + return true + } + } + return false + } + + // we don't know if it's supported or not and let the attacher fail later in cases it's not supported + return true +}