Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WaitFor returns immediately when done is closed #72364

Merged
merged 2 commits into from Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/controller/garbagecollector/garbagecollector_test.go
Expand Up @@ -856,7 +856,16 @@ func TestGarbageCollectorSync(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go gc.Run(1, stopCh)
go gc.Sync(fakeDiscoveryClient, 10*time.Millisecond, stopCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing this to 1 second means the time.Sleep of 1 second is now too short to reliably catch sync problems. The test should be able to wait significantly longer than the resync period to ensure progress is made when expected. Having to lengthen the resync period this much means having to extend the test wait time significantly, which makes the test take much longer to run than desired

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the original 100x difference was to ensure the gc sync was run ~100 times to catch any flakiness.

How about setting it 200ms here, and let the test run 10s (by setting the sleep time to 10s), then the sync got tested 50 times?

Also note that the old test wasn't as reliable as it intended to be. Although the sync period was set to 10ms, because the old WaitFor function didn't handle the closed done channel, the WaitForCacheSync returned 100ms later, after the first poll period. So the old test only run the sync behavior 10 times in the 1s test time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original 100x difference was to ensure the gc sync was run ~100 times to catch any flakiness

it was actually to make sure the test waited waaaaay longer than the resync period to ensure there was time to complete at least 2 iterations

Copy link
Contributor Author

@kdada kdada Jan 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pseudo-code of GarbageCollector.Sync():

// In this case, `stopCh` won't be closed. Ignore it.
GarbageCollector.Sync():
    wait.Until() loops with period:
        wait.PollImmediateUntil() loops with 100ms (hardcode):
            controller.WaitForCacheSync() loops with a channel which will be closed after the `period`
            This loop never returns unless the sync is finished.

1 second of period makes controller.WaitForCacheSync() tries ~10 times to check if the cache is synced.

200ms of period makes ~2 executions of controller.WaitForCacheSync() in every wait.PollImmediateUntil() loop. Finally controller.WaitForCacheSync() also executes about 10 times. (Due to the behavior of time.Ticker, it may drop some ticks for slow receivers. So the 10 is the upper bound)

200ms is better because it tests both of wait.PollImmediateUntil() and controller.WaitForCacheSync().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's probably worth documenting in a comment in the test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@kdada could you add the comment as liggitt suggested? The suedo-code is great, it makes the reasoning much more clear. I added a little more details to it.

GarbageCollector.Sync():
    wait.Until() loops with `period` until the `stopCh` is closed :
        wait.PollImmediateUntil() loops with 100ms (hardcode) util the `stopCh` is closed:
            controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.

The final goal is to make sure that the outermost wait.Until loop runs at least 2 times during the 1s sleep, which ensures that the changes made to the fakeDiscoveryClient are picked up by the Sync.

Copy link
Contributor Author

@kdada kdada Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

// The pseudo-code of GarbageCollector.Sync():
// GarbageCollector.Sync(client, period, stopCh):
// wait.Until() loops with `period` until the `stopCh` is closed :
// wait.PollImmediateUntil() loops with 100ms (hardcode) util the `stopCh` is closed:
// GetDeletableResources()
// gc.resyncMonitors()
// controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.
//
// Setting the period to 200ms allows the WaitForCacheSync() to check for cache sync ~2 times in every wait.PollImmediateUntil() loop.
go gc.Sync(fakeDiscoveryClient, 200*time.Millisecond, stopCh)

// Wait until the sync discovers the initial resources
fmt.Printf("Test output")
Expand Down
22 changes: 10 additions & 12 deletions staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go
Expand Up @@ -351,22 +351,21 @@ type WaitFunc func(done <-chan struct{}) <-chan struct{}
// WaitFor continually checks 'fn' as driven by 'wait'.
//
// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value
// placed on the channel and once more when the channel is closed.
// placed on the channel and once more when the channel is closed. If the channel is closed
// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout.
//
// If 'fn' returns an error the loop ends and that error is returned, and if
// If 'fn' returns an error the loop ends and that error is returned. If
// 'fn' returns true the loop ends and nil is returned.
//
// ErrWaitTimeout will be returned if the channel is closed without fn ever
// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever
// returning true.
//
// When the done channel is closed, because the golang `select` statement is
// "uniform pseudo-random", the `fn` might still run one or multiple time,
// though eventually `WaitFor` will return.
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
stopCh := make(chan struct{})
once := sync.Once{}
closeCh := func() {
once.Do(func() {
close(stopCh)
})
}
defer closeCh()
defer close(stopCh)
c := wait(stopCh)
for {
select {
Expand All @@ -382,10 +381,9 @@ func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
return ErrWaitTimeout
}
case <-done:
closeCh()
return ErrWaitTimeout
}
}
return ErrWaitTimeout
}

// poller returns a WaitFunc that will send to the channel every interval until
Expand Down
33 changes: 32 additions & 1 deletion staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go
Expand Up @@ -456,11 +456,42 @@ func TestWaitFor(t *testing.T) {
}
}

// TestWaitForWithEarlyClosingWaitFunc tests WaitFor when the WaitFunc closes its channel. The WaitFor should
// always return ErrWaitTimeout.
func TestWaitForWithEarlyClosingWaitFunc(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

start := time.Now()
err := WaitFor(func(done <-chan struct{}) <-chan struct{} {
c := make(chan struct{})
close(c)
return c
}, func() (bool, error) {
return false, nil
}, stopCh)
duration := time.Now().Sub(start)

// The WaitFor should return immediately, so the duration is close to 0s.
if duration >= ForeverTestTimeout/2 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why dividing it by 2 instead of using ForeverTestTimeout directly? Is the purpose to make the test less false negative (i.e., less likely to miss a bug)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

t.Errorf("expected short timeout duration")
}
if err != ErrWaitTimeout {
t.Errorf("expected ErrWaitTimeout from WaitFunc")
}
}

// TestWaitForWithClosedChannel tests WaitFor when it receives a closed channel. The WaitFor should
// always return ErrWaitTimeout.
func TestWaitForWithClosedChannel(t *testing.T) {
stopCh := make(chan struct{})
close(stopCh)
c := make(chan struct{})
defer close(c)
start := time.Now()
err := WaitFor(poller(ForeverTestTimeout, ForeverTestTimeout), func() (bool, error) {
err := WaitFor(func(done <-chan struct{}) <-chan struct{} {
return c
}, func() (bool, error) {
return false, nil
}, stopCh)
duration := time.Now().Sub(start)
Expand Down