From 7b423085faa2edd4cdcab44ac512f7b904e35383 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 15 Dec 2016 23:03:58 -0500 Subject: [PATCH] Fix variable shadowing in exponential backoff when deleting volumes Also fix pv_controller unit tests to behave more accurately in light of exponential backoffs --- .../volume/persistentvolume/framework_test.go | 6 ++-- .../volume/persistentvolume/pv_controller.go | 2 +- pkg/util/goroutinemap/goroutinemap.go | 33 ++++++++++++++++- pkg/util/goroutinemap/goroutinemap_test.go | 35 +++++++++++++++++++ 4 files changed, 72 insertions(+), 4 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index c75230dd12de..f93d0cc3f12e 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -461,7 +461,7 @@ func (r *volumeReactor) getChangeCount() int { // waitForIdle waits until all tests, controllers and other goroutines do their // job and no new actions are registered for 10 milliseconds. func (r *volumeReactor) waitForIdle() { - r.ctrl.runningOperations.Wait() + r.ctrl.runningOperations.WaitForCompletion() // Check every 10ms if the controller does something and stop if it's // idle. oldChanges := -1 @@ -489,7 +489,7 @@ func (r *volumeReactor) waitTest(test controllerTest) error { } err := wait.ExponentialBackoff(backoff, func() (done bool, err error) { // Finish all operations that are in progress - r.ctrl.runningOperations.Wait() + r.ctrl.runningOperations.WaitForCompletion() // Return 'true' if the reactor reached the expected state err1 := r.checkClaims(test.expectedClaims) @@ -1038,6 +1038,8 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s break } } + // waiting here cools down exponential backoff + time.Sleep(600 * time.Millisecond) // There were some changes, process them switch obj.(type) { diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index e9b56108b4f8..649567af9b5a 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -1079,7 +1079,7 @@ func (ctrl *PersistentVolumeController) deleteVolumeOperation(arg interface{}) e } else { // The plugin failed, mark the volume as Failed and send Warning // event - if _, err = ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()); err != nil { + if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()); err != nil { glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err) // Save failed, retry on the next deletion attempt return err diff --git a/pkg/util/goroutinemap/goroutinemap.go b/pkg/util/goroutinemap/goroutinemap.go index 0d7065741028..aa7e3ce3b131 100644 --- a/pkg/util/goroutinemap/goroutinemap.go +++ b/pkg/util/goroutinemap/goroutinemap.go @@ -57,11 +57,16 @@ type GoRoutineMap interface { // a new operation to be started with the same operation name without error. Run(operationName string, operationFunc func() error) error - // Wait blocks until all operations are completed. This is typically + // Wait blocks until operations map is empty. This is typically // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() + // WaitForCompletion blocks until either all operations have successfully completed + // or have failed but are not pending. The test should wait until operations are either + // complete or have failed. + WaitForCompletion() + // IsOperationPending returns true if the operation is pending (currently // running), otherwise returns false. IsOperationPending(operationName string) bool @@ -179,6 +184,32 @@ func (grm *goRoutineMap) Wait() { } } +func (grm *goRoutineMap) WaitForCompletion() { + grm.lock.Lock() + defer grm.lock.Unlock() + + for { + if len(grm.operations) == 0 || grm.nothingPending() { + break + } else { + grm.cond.Wait() + } + } +} + +// Check if any operation is pending. Already assumes caller has the +// necessary locks +func (grm *goRoutineMap) nothingPending() bool { + nothingIsPending := true + for _, operation := range grm.operations { + if operation.operationPending { + nothingIsPending = false + break + } + } + return nothingIsPending +} + // NewAlreadyExistsError returns a new instance of AlreadyExists error. func NewAlreadyExistsError(operationName string) error { return alreadyExistsError{operationName} diff --git a/pkg/util/goroutinemap/goroutinemap_test.go b/pkg/util/goroutinemap/goroutinemap_test.go index 73a7411d763b..d516e4859c29 100644 --- a/pkg/util/goroutinemap/goroutinemap_test.go +++ b/pkg/util/goroutinemap/goroutinemap_test.go @@ -448,6 +448,34 @@ func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) { } } +func Test_NewGoRoutineMap_WaitForCompletionWithExpBackoff(t *testing.T) { + grm := NewGoRoutineMap(true /* exponentialBackOffOnError */) + operationName := "operation-err" + + operation1DoneCh := make(chan interface{}, 0 /* bufferSize */) + operation1 := generateErrorFunc(operation1DoneCh) + err := grm.Run(operationName, operation1) + if err != nil { + t.Fatalf("NewGoRoutine failed. Expected: Actual: <%v>", err) + } + + // Act + waitDoneCh := make(chan interface{}, 1) + go func() { + grm.WaitForCompletion() + waitDoneCh <- true + }() + + // Finish the operation + operation1DoneCh <- true + + // Assert that WaitForCompletion returns even if scheduled op had error + err = waitChannelWithTimeout(waitDoneCh, testTimeout) + if err != nil { + t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err) + } +} + func generateCallbackFunc(done chan<- interface{}) func() error { return func() error { done <- true @@ -455,6 +483,13 @@ func generateCallbackFunc(done chan<- interface{}) func() error { } } +func generateErrorFunc(done <-chan interface{}) func() error { + return func() error { + <-done + return fmt.Errorf("Generic error") + } +} + func generateWaitFunc(done <-chan interface{}) func() error { return func() error { <-done