Skip to content

Commit

Permalink
Fix a data race in watcher.go (#2952)
Browse files Browse the repository at this point in the history
Ran into a small data race while testing await logic.

This PR fixes the underlying race condition as well as a race-y test.
  • Loading branch information
blampe committed Apr 15, 2024
1 parent cd36ac9 commit cb9002f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
18 changes: 11 additions & 7 deletions provider/pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ func (ow *ObjectWatcher) watch(
var obj *unstructured.Unstructured
results := make(chan result)
poll := func() {
var err error
obj, err = ow.pollFunc()
results <- result{Obj: obj, Err: err}
o, err := ow.pollFunc()
results <- result{Obj: o, Err: err}
}

wait := 500 * time.Millisecond
Expand All @@ -129,6 +128,7 @@ func (ow *ObjectWatcher) watch(
case <-ow.ctx.Done():
return cancellationErr(ow.objName, obj)
case res := <-results:
obj = res.Obj
if stop, err := until(res.Obj, res.Err); err != nil {
return err
} else if stop {
Expand Down Expand Up @@ -160,13 +160,17 @@ type result struct {
// --------------------------------------------------------------------------

func timeoutErr(name string, obj *unstructured.Unstructured) error {
return &watchError{object: obj,
message: fmt.Sprintf("Timeout occurred polling for '%s'", name)}
return &watchError{
object: obj,
message: fmt.Sprintf("Timeout occurred polling for '%s'", name),
}
}

func cancellationErr(name string, obj *unstructured.Unstructured) error {
return &watchError{object: obj,
message: fmt.Sprintf("Resource operation was cancelled for '%s'", name)}
return &watchError{
object: obj,
message: fmt.Sprintf("Resource operation was cancelled for '%s'", name),
}
}

type watchError struct {
Expand Down
17 changes: 8 additions & 9 deletions provider/pkg/watcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package watcher
import (
"context"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -72,27 +73,27 @@ func Test_WatchUntil_PollFuncTimeout(t *testing.T) {
testCompleted := make(chan struct{})
for _, test := range timeoutTests {
go func(test timeoutTest) {
pollFuncCalls, watchFuncCalls := 0, 0
pollFuncCalls, watchFuncCalls := atomic.Int32{}, atomic.Int32{}
err := testObjWatcher(
context.Background(),
func() (*unstructured.Unstructured, error) {
pollFuncCalls++
pollFuncCalls.Add(1)
return test.pollFunc()
}).
WatchUntil(
func(obj *unstructured.Unstructured) bool {
watchFuncCalls++
watchFuncCalls.Add(1)
return test.predicate(obj)
},
test.timeout)
if err == nil || !strings.HasPrefix(err.Error(), timeoutErrPrefix) {
t.Errorf("%s: Polling should have timed out", test.name)
}
if !test.targetPollFuncCalls(pollFuncCalls) {
t.Errorf("%s: Got %d poll function calls, which did not satisfy the test predicate", test.name, pollFuncCalls)
if !test.targetPollFuncCalls(int(pollFuncCalls.Load())) {
t.Errorf("%s: Got %d poll function calls, which did not satisfy the test predicate", test.name, pollFuncCalls.Load())
}
if !test.targetWatchFuncCalls(watchFuncCalls) {
t.Errorf("%s: Got %d watch function calls, which did not satisfy the test predicate", test.name, watchFuncCalls)
if !test.targetWatchFuncCalls(int(watchFuncCalls.Load())) {
t.Errorf("%s: Got %d watch function calls, which did not satisfy the test predicate", test.name, watchFuncCalls.Load())
}
testCompleted <- struct{}{}
}(test)
Expand All @@ -119,7 +120,6 @@ func Test_WatchUntil_Success(t *testing.T) {
return true // Always true.
},
1*time.Second)

if err != nil {
t.Error("Expected watch to terminate without error")
}
Expand All @@ -137,7 +137,6 @@ func Test_RetryUntil_Success(t *testing.T) {
return nil // Always succeeds.
},
1*time.Second)

if err != nil {
t.Error("Expected watch to terminate without error")
}
Expand Down

0 comments on commit cb9002f

Please sign in to comment.