Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce timeout for waiting for resource version #38705

Merged
merged 1 commit into from
Dec 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/api/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ func IsForbidden(err error) bool {
return reasonForError(err) == metav1.StatusReasonForbidden
}

// IsTimeout determines if err is an error which indicates that request times out due to long
// processing.
func IsTimeout(err error) bool {
return reasonForError(err) == metav1.StatusReasonTimeout
}

// IsServerTimeout determines if err is an error which indicates that the request needs to be retried
// by the client.
func IsServerTimeout(err error) bool {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob

obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
if err != nil {
return fmt.Errorf("failed to wait for fresh get: %v", err)
return err
}

if exists {
Expand Down Expand Up @@ -429,7 +429,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri

obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return fmt.Errorf("failed to wait for fresh get: %v", err)
return err
}
trace.Step("Got from cache")

Expand Down Expand Up @@ -485,7 +485,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p

objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
return err
}
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,29 @@ func TestList(t *testing.T) {
}
}

func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()

podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)

// Set up List at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
listRV := strconv.Itoa(int(rv + 10))

result := &api.PodList{}
err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result)
if !errors.IsTimeout(err) {
t.Errorf("Unexpected error: %v", err)
}
}

func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
_, _, line, _ := goruntime.Caller(1)
select {
Expand Down
18 changes: 11 additions & 7 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import (
)

const (
// MaximumListWait determines how long we're willing to wait for a
// list if a client specified a resource version in the future.
MaximumListWait = 60 * time.Second
// blockTimeout determines how long we're willing to block the request
// to wait for a given resource version to be propagated to cache,
// before terminating request and returning Timeout error with retry
// after suggestion.
blockTimeout = 3 * time.Second
)

// watchCacheEvent is a single "watch event" that is send to users of
Expand Down Expand Up @@ -206,7 +208,8 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
// Use bitsize being the size of int on the machine.
return strconv.ParseUint(resourceVersion, 10, 0)
}

func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
Expand Down Expand Up @@ -288,7 +291,7 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.
// it will wake up the loop below sometime after the broadcast,
// we don't need to worry about waking it up before the time
// has expired accidentally.
<-w.clock.After(MaximumListWait)
<-w.clock.After(blockTimeout)
w.cond.Broadcast()
}()

Expand All @@ -297,8 +300,9 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.
trace.Step("watchCache locked acquired")
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= MaximumListWait {
return fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
if w.clock.Since(startTime) >= blockTimeout {
// Timeout with retry after 1 second.
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
}
w.cond.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/watch_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
fc.Step(MaximumListWait)
fc.Step(blockTimeout)

// Add an object to make sure the test would
// eventually fail instead of just waiting
Expand Down