Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix variable shadowing in exponential backoff when deleting volumes #38855

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/controller/volume/persistentvolume/framework_test.go
Expand Up @@ -461,7 +461,7 @@ func (r *volumeReactor) getChangeCount() int {
// waitForIdle waits until all tests, controllers and other goroutines do their
// job and no new actions are registered for 10 milliseconds.
func (r *volumeReactor) waitForIdle() {
r.ctrl.runningOperations.Wait()
r.ctrl.runningOperations.WaitForCompletion()
// Check every 10ms if the controller does something and stop if it's
// idle.
oldChanges := -1
Expand Down Expand Up @@ -489,7 +489,7 @@ func (r *volumeReactor) waitTest(test controllerTest) error {
}
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// Finish all operations that are in progress
r.ctrl.runningOperations.Wait()
r.ctrl.runningOperations.WaitForCompletion()

// Return 'true' if the reactor reached the expected state
err1 := r.checkClaims(test.expectedClaims)
Expand Down Expand Up @@ -1038,6 +1038,8 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
break
}
}
// waiting here cools down exponential backoff
time.Sleep(600 * time.Millisecond)

// There were some changes, process them
switch obj.(type) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/volume/persistentvolume/pv_controller.go
Expand Up @@ -1079,7 +1079,7 @@ func (ctrl *PersistentVolumeController) deleteVolumeOperation(arg interface{}) e
} else {
// The plugin failed, mark the volume as Failed and send Warning
// event
if _, err = ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()); err != nil {
if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()); err != nil {
glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
// Save failed, retry on the next deletion attempt
return err
Expand Down
33 changes: 32 additions & 1 deletion pkg/util/goroutinemap/goroutinemap.go
Expand Up @@ -57,11 +57,16 @@ type GoRoutineMap interface {
// a new operation to be started with the same operation name without error.
Run(operationName string, operationFunc func() error) error

// Wait blocks until all operations are completed. This is typically
// Wait blocks until operations map is empty. This is typically
// necessary during tests - the test should wait until all operations finish
// and evaluate results after that.
Wait()

// WaitForCompletion blocks until either all operations have successfully completed
// or have failed but are not pending. The test should wait until operations are either
// complete or have failed.
WaitForCompletion()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests to goroutinemap_test.go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test.


// IsOperationPending returns true if the operation is pending (currently
// running), otherwise returns false.
IsOperationPending(operationName string) bool
Expand Down Expand Up @@ -179,6 +184,32 @@ func (grm *goRoutineMap) Wait() {
}
}

func (grm *goRoutineMap) WaitForCompletion() {
grm.lock.Lock()
defer grm.lock.Unlock()

for {
if len(grm.operations) == 0 || grm.nothingPending() {
break
} else {
grm.cond.Wait()
}
}
}

// Check if any operation is pending. Already assumes caller has the
// necessary locks
func (grm *goRoutineMap) nothingPending() bool {
nothingIsPending := true
for _, operation := range grm.operations {
if operation.operationPending {
nothingIsPending = false
break
}
}
return nothingIsPending
}

// NewAlreadyExistsError returns a new instance of AlreadyExists error.
func NewAlreadyExistsError(operationName string) error {
return alreadyExistsError{operationName}
Expand Down
35 changes: 35 additions & 0 deletions pkg/util/goroutinemap/goroutinemap_test.go
Expand Up @@ -448,13 +448,48 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
}
}

func Test_NewGoRoutineMap_WaitForCompletionWithExpBackoff(t *testing.T) {
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
operationName := "operation-err"

operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
operation1 := generateErrorFunc(operation1DoneCh)
err := grm.Run(operationName, operation1)
if err != nil {
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
}

// Act
waitDoneCh := make(chan interface{}, 1)
go func() {
grm.WaitForCompletion()
waitDoneCh <- true
}()

// Finish the operation
operation1DoneCh <- true

// Assert that WaitForCompletion returns even if scheduled op had error
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
if err != nil {
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
}
}

func generateCallbackFunc(done chan<- interface{}) func() error {
return func() error {
done <- true
return nil
}
}

func generateErrorFunc(done <-chan interface{}) func() error {
return func() error {
<-done
return fmt.Errorf("Generic error")
}
}

func generateWaitFunc(done <-chan interface{}) func() error {
return func() error {
<-done
Expand Down