Skip to content

Commit

Permalink
Merge pull request #110951 from 249043822/br-nestedPendingOperations
Browse files Browse the repository at this point in the history
fix nestedPendingOperations mount and umount parallel bug -- minimal change
  • Loading branch information
k8s-ci-robot committed Aug 26, 2022
2 parents a0bd6dd + 593f6c9 commit 4d0ad07
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 2 deletions.
17 changes: 15 additions & 2 deletions pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go
Expand Up @@ -244,23 +244,36 @@ func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, i
return false, -1
}

opIndex := -1
for previousOpIndex, previousOp := range grm.operations {
volumeNameMatch := previousOp.key.volumeName == key.volumeName

podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
key.podName == EmptyUniquePodName ||
previousOp.key.podName == key.podName

podNameExactMatch := previousOp.key.podName == key.podName

nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
key.nodeName == EmptyNodeName ||
previousOp.key.nodeName == key.nodeName

nodeNameExactMatch := previousOp.key.nodeName == key.nodeName

if volumeNameMatch && podNameMatch && nodeNameMatch {
return true, previousOpIndex
// nonExactMatch pending first
if previousOp.operationPending {
return true, previousOpIndex
}
// nonExactMatch with no pending, set opIndex to the first nonExactMatch
// exactMatch can override opIndex to expected
if opIndex == -1 || (podNameExactMatch && nodeNameExactMatch) {
opIndex = previousOpIndex
}
}
}
return opIndex != -1, opIndex

return false, -1
}

func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
Expand Down
Expand Up @@ -879,3 +879,140 @@ func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error
return fmt.Errorf("timeout after %v", timeout)
}
}

func Test_NestedPendingOperations_OperationExists_PendingFirst(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("test-volume")
podName1 := volumetypes.UniquePodName("pod1")
podName2 := volumetypes.UniquePodName("pod2")
podName3 := volumetypes.UniquePodName("pod3")
podName4 := EmptyUniquePodName
nodeName := EmptyNodeName

// delay after an operation is signaled to finish to ensure it actually
// finishes before running the next operation.
delay := 50 * time.Millisecond

// fake operation1 for pod1 failed
operation1DoneCh := make(chan interface{})
operation1 := generateWaitWithErrorFunc(operation1DoneCh)
err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}

// fake operation2 for pod2 fails
operation2DoneCh := make(chan interface{})
operation2 := generateWaitWithErrorFunc(operation2DoneCh)
err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
if err2 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}

// fake operation3 for pod3 pending
operation3DoneCh := make(chan interface{})
operation3 := generateWaitFunc(operation3DoneCh)
defer func() {
close(operation3DoneCh)
}()
err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
if err3 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}

operation1DoneCh <- true
operation2DoneCh <- true
time.Sleep(delay)

// fake operation4 for EmptyUniquePodName should be rejected as operation3 is still pending
operation4DoneCh := make(chan interface{})
operation4 := generateWaitFunc(operation4DoneCh)
defer func() {
close(operation4DoneCh)
}()
err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})

// Assert
if err4 == nil {
t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist")
}
if !IsAlreadyExists(err4) {
t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err4)
}
}

func Test_NestedPendingOperations_OperationExists_ExactMatchFirstNoPending(t *testing.T) {
// Arrange
grm := NewNestedPendingOperations(true /* exponentialBackOffOnError */)
volumeName := v1.UniqueVolumeName("test-volume")
podName1 := volumetypes.UniquePodName("pod1")
podName2 := volumetypes.UniquePodName("pod2")
podName3 := volumetypes.UniquePodName("pod3")
podName4 := EmptyUniquePodName
nodeName := EmptyNodeName

// delay after an operation is signaled to finish to ensure it actually
// finishes before running the next operation.
delay := 50 * time.Millisecond
backoffDelay := 500 * time.Millisecond

// fake operation1 for pod1 fails
operation1DoneCh := make(chan interface{})
operation1 := generateWaitWithErrorFunc(operation1DoneCh)
err1 := grm.Run(volumeName, podName1, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
if err1 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
}

// fake operation2 for pod2 fails
operation2DoneCh := make(chan interface{})
operation2 := generateWaitWithErrorFunc(operation2DoneCh)
err2 := grm.Run(volumeName, podName2, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
if err2 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
}

// fake operation3 for pod3 fails
operation3DoneCh := make(chan interface{})
operation3 := generateWaitWithErrorFunc(operation3DoneCh)
err3 := grm.Run(volumeName, podName3, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
if err3 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
}

operation1DoneCh <- true
operation2DoneCh <- true
operation3DoneCh <- true
time.Sleep(delay)

// fake operation4 with EmptyUniquePodName fails
operation4DoneCh := make(chan interface{})
operation4 := generateWaitWithErrorFunc(operation4DoneCh)
err4 := grm.Run(volumeName, podName4, nodeName /* nodeName */, volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})
if err4 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4)
}

operation4DoneCh <- true

// operation for pod2 retry
time.Sleep(backoffDelay)
operation5 := noopFunc
err5 := grm.Run(volumeName, podName2, nodeName, volumetypes.GeneratedOperations{OperationFunc: operation5, OperationName: "umount"})
if err5 != nil {
t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err5)
}
time.Sleep(delay)

// Assert
// Operation5 will override operation2, since we successfully finished unmount operation on pod2, it should be removed from operations array
grm.(*nestedPendingOperations).lock.Lock()
defer grm.(*nestedPendingOperations).lock.Unlock()
for _, op := range grm.(*nestedPendingOperations).operations {
if op.key.podName == podName2 {
t.Errorf("NestedPendingOperations failed. Operation for pod2 should be removed")
}
}

}

0 comments on commit 4d0ad07

Please sign in to comment.