diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go index aded86aa21aa..1e5ce60fde1c 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go @@ -244,6 +244,7 @@ func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, i return false, -1 } + opIndex := -1 for previousOpIndex, previousOp := range grm.operations { volumeNameMatch := previousOp.key.volumeName == key.volumeName @@ -251,16 +252,28 @@ func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, i key.podName == EmptyUniquePodName || previousOp.key.podName == key.podName + podNameExactMatch := previousOp.key.podName == key.podName + nodeNameMatch := previousOp.key.nodeName == EmptyNodeName || key.nodeName == EmptyNodeName || previousOp.key.nodeName == key.nodeName + nodeNameExactMatch := previousOp.key.nodeName == key.nodeName + if volumeNameMatch && podNameMatch && nodeNameMatch { - return true, previousOpIndex + // nonExactMatch pending first + if previousOp.operationPending { + return true, previousOpIndex + } + // nonExactMatch with no pending, set opIndex to the first nonExactMatch + // exactMatch can override opIndex to expected + if opIndex == -1 || (podNameExactMatch && nodeNameExactMatch) { + opIndex = previousOpIndex + } } } + return opIndex != -1, opIndex - return false, -1 } func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) { diff --git a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go index 7d25b3a2288b..6300e4652efa 100644 --- a/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go +++ b/pkg/volume/util/nestedpendingoperations/nestedpendingoperations_test.go @@ -879,3 +879,140 @@ func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error return fmt.Errorf("timeout after %v", timeout) } } + +func Test_NestedPendingOperations_OperationExists_PendingFirst(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := v1.UniqueVolumeName("test-volume") + podName1 := volumetypes.UniquePodName("pod1") + podName2 := volumetypes.UniquePodName("pod2") + podName3 := volumetypes.UniquePodName("pod3") + podName4 := EmptyUniquePodName + nodeName := EmptyNodeName + + // delay after an operation is signaled to finish to ensure it actually + // finishes before running the next operation. + delay := 50 * time.Millisecond + + // fake operation1 for pod1 failed + operation1DoneCh := make(chan interface{}) + operation1 := generateWaitWithErrorFunc(operation1DoneCh) + err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"}) + if err1 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + } + + // fake operation2 for pod2 fails + operation2DoneCh := make(chan interface{}) + operation2 := generateWaitWithErrorFunc(operation2DoneCh) + err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"}) + if err2 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + } + + // fake operation3 for pod3 pending + operation3DoneCh := make(chan interface{}) + operation3 := generateWaitFunc(operation3DoneCh) + defer func() { + close(operation3DoneCh) + }() + err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"}) + if err3 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) + } + + operation1DoneCh <- true + operation2DoneCh <- true + time.Sleep(delay) + + // fake operation4 for EmptyUniquePodName should be rejected as operation3 is still pending + operation4DoneCh := make(chan interface{}) + operation4 := generateWaitFunc(operation4DoneCh) + defer func() { + close(operation4DoneCh) + }() + err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"}) + + // Assert + if err4 == nil { + t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist") + } + if !IsAlreadyExists(err4) { + t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err4) + } +} + +func Test_NestedPendingOperations_OperationExists_ExactMatchFirstNoPending(t *testing.T) { + // Arrange + grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */) + volumeName := v1.UniqueVolumeName("test-volume") + podName1 := volumetypes.UniquePodName("pod1") + podName2 := volumetypes.UniquePodName("pod2") + podName3 := volumetypes.UniquePodName("pod3") + podName4 := EmptyUniquePodName + nodeName := EmptyNodeName + + // delay after an operation is signaled to finish to ensure it actually + // finishes before running the next operation. + delay := 50 * time.Millisecond + backoffDelay := 500 * time.Millisecond + + // fake operation1 for pod1 fails + operation1DoneCh := make(chan interface{}) + operation1 := generateWaitWithErrorFunc(operation1DoneCh) + err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"}) + if err1 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err1) + } + + // fake operation2 for pod2 fails + operation2DoneCh := make(chan interface{}) + operation2 := generateWaitWithErrorFunc(operation2DoneCh) + err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"}) + if err2 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err2) + } + + // fake operation3 for pod3 fails + operation3DoneCh := make(chan interface{}) + operation3 := generateWaitWithErrorFunc(operation3DoneCh) + err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"}) + if err3 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err3) + } + + operation1DoneCh <- true + operation2DoneCh <- true + operation3DoneCh <- true + time.Sleep(delay) + + // fake operation4 with EmptyUniquePodName fails + operation4DoneCh := make(chan interface{}) + operation4 := generateWaitWithErrorFunc(operation4DoneCh) + err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"}) + if err4 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err4) + } + + operation4DoneCh <- true + + // operation for pod2 retry + time.Sleep(backoffDelay) + operation5 := noopFunc + err5 := grm.Run(volumeName, podName2, nodeName, volumetypes.GeneratedOperations{OperationFunc: operation5, OperationName: "umount"}) + if err5 != nil { + t.Errorf("NestedPendingOperations failed. Expected: Actual: <%v>", err5) + } + time.Sleep(delay) + + // Assert + // Operation5 will override operation2, since we successfully finished unmount operation on pod2, it should be removed from operations array + grm.(*nestedPendingOperations).lock.Lock() + defer grm.(*nestedPendingOperations).lock.Unlock() + for _, op := range grm.(*nestedPendingOperations).operations { + if op.key.podName == podName2 { + t.Errorf("NestedPendingOperations failed. Operation for pod2 should be removed") + } + } + +}