Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout, fix potential startup hang #20433

Merged
merged 3 commits into from
Feb 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/cache/expiration_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestTTLPolicy(t *testing.T) {
exactlyOnTTL := fakeTime.Add(-ttl)
expiredTime := fakeTime.Add(-(ttl + 1))

policy := TTLPolicy{ttl, &util.FakeClock{Time: fakeTime}}
policy := TTLPolicy{ttl, util.NewFakeClock(fakeTime)}
fakeTimestampedEntry := &timestampedEntry{obj: struct{}{}, timestamp: exactlyOnTTL}
if policy.IsExpired(fakeTimestampedEntry) {
t.Errorf("TTL cache should not expire entries exactly on ttl")
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/record/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func TestEventf(t *testing.T) {
eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)

clock := &util.FakeClock{time.Now()}
clock := util.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for index, item := range table {
clock.Step(1 * time.Second)
Expand Down Expand Up @@ -559,7 +559,7 @@ func TestEventfNoNamespace(t *testing.T) {
eventBroadcaster := NewBroadcaster()
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)

clock := &util.FakeClock{time.Now()}
clock := util.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)

for index, item := range table {
Expand Down Expand Up @@ -846,7 +846,7 @@ func TestMultiSinkCache(t *testing.T) {
}

eventBroadcaster := NewBroadcaster()
clock := &util.FakeClock{time.Now()}
clock := util.NewFakeClock(time.Now())
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)

sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
// NewFakeControllerExpectationsLookup creates a fake store for PodExpectations.
func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *util.FakeClock) {
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
fakeClock := &util.FakeClock{Time: fakeTime}
fakeClock := util.NewFakeClock(fakeTime)
ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
ttlStore := cache.NewFakeExpirationStore(
ExpKeyFunc, nil, ttlPolicy, fakeClock)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestControllerExpectations(t *testing.T) {
}

// Expectations have expired because of ttl
fakeClock.Time = fakeClock.Time.Add(ttl + 1)
fakeClock.Step(ttl + 1)
if !e.SatisfiedExpectations(rcKey) {
t.Errorf("Expectations should have expired but didn't")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/container/image_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestPuller(t *testing.T) {
}

backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()}
fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock

fakeRuntime := &FakeRuntime{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/container/serialized_image_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestSerializedPuller(t *testing.T) {
}

backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()}
fakeClock := util.NewFakeClock(time.Now())
backOff.Clock = fakeClock

fakeRuntime := &FakeRuntime{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/dockertools/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func TestGetAPIPodStatusWithLastTermination(t *testing.T) {
}

func TestSyncPodBackoff(t *testing.T) {
var fakeClock = &util.FakeClock{Time: time.Now()}
var fakeClock = util.NewFakeClock(time.Now())
startTime := fakeClock.Now()

dm, fakeDocker := newTestDockerManager()
Expand Down Expand Up @@ -1232,7 +1232,7 @@ func TestSyncPodBackoff(t *testing.T) {
backOff.Clock = fakeClock
for _, c := range tests {
fakeDocker.SetFakeContainers(dockerContainers)
fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second)
fakeClock.SetTime(startTime.Add(time.Duration(c.tick) * time.Second))

runSyncPod(t, dm, fakeDocker, pod, backOff, c.expectErr)
verifyCalls(t, fakeDocker, c.result)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/image_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ func TestGarbageCollectImageNotOldEnough(t *testing.T) {
},
}

fakeClock := util.FakeClock{Time: time.Now()}
fmt.Println(fakeClock.Now())
fakeClock := util.NewFakeClock(time.Now())
t.Log(fakeClock.Now())
require.NoError(t, manager.detectImages(fakeClock.Now()))
require.Equal(t, manager.imageRecordsLen(), 2)
// no space freed since one image is in used, and another one is not old enough
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
LowThresholdPercent: 80,
}
kubelet.imageManager, err = newImageManager(fakeRuntime, mockCadvisor, fakeRecorder, fakeNodeRef, fakeImageGCPolicy)
fakeClock := &util.FakeClock{Time: time.Now()}
fakeClock := util.NewFakeClock(time.Now())
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
kubelet.backOff.Clock = fakeClock
kubelet.podKillingCh = make(chan *kubecontainer.Pod, 20)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/util/queue/work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func newTestBasicWorkQueue() (*basicWorkQueue, *util.FakeClock) {
fakeClock := &util.FakeClock{Time: time.Now()}
fakeClock := util.NewFakeClock(time.Now())
wq := &basicWorkQueue{
clock: fakeClock,
queue: make(map[types.UID]time.Time),
Expand Down
18 changes: 9 additions & 9 deletions pkg/master/tunneler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,32 @@ func TestSecondsSinceSync(t *testing.T) {
tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix()

// Nano Second. No difference.
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 2, time.UTC))
assert.Equal(int64(0), tunneler.SecondsSinceSync())

// Second
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 2, 1, time.UTC))
assert.Equal(int64(1), tunneler.SecondsSinceSync())

// Minute
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 2, 1, 1, time.UTC))
assert.Equal(int64(60), tunneler.SecondsSinceSync())

// Hour
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 2, 1, 1, 1, time.UTC))
assert.Equal(int64(3600), tunneler.SecondsSinceSync())

// Day
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 2, 1, 1, 1, 1, time.UTC))
assert.Equal(int64(86400), tunneler.SecondsSinceSync())

// Month
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC))
assert.Equal(int64(2678400), tunneler.SecondsSinceSync())

// Future Month. Should be -Month.
tunneler.lastSync = time.Date(2015, time.February, 1, 1, 1, 1, 1, time.UTC).Unix()
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC))
assert.Equal(int64(-2678400), tunneler.SecondsSinceSync())
}

Expand All @@ -89,12 +89,12 @@ func TestIsTunnelSyncHealthy(t *testing.T) {

// Pass case: 540 second lag
tunneler.lastSync = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC).Unix()
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 9, 1, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 9, 1, 1, time.UTC))
err := master.IsTunnelSyncHealthy(nil)
assert.NoError(err, "IsTunnelSyncHealthy() should not have returned an error.")

// Fail case: 720 second lag
tunneler.clock = &util.FakeClock{Time: time.Date(2015, time.January, 1, 1, 12, 1, 1, time.UTC)}
tunneler.clock = util.NewFakeClock(time.Date(2015, time.January, 1, 1, 12, 1, 1, time.UTC))
err = master.IsTunnelSyncHealthy(nil)
assert.Error(err, "IsTunnelSyncHealthy() should have returned an error.")
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/registry/service/ipallocator/controller/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ func (c *Repair) runOnce() error {
}

ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
options := &api.ListOptions{ResourceVersion: latest.ObjectMeta.ResourceVersion}
list, err := c.registry.ListServices(ctx, options)
// We explicitly send no resource version, since the resource version
// of 'latest' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
list, err := c.registry.ListServices(ctx, nil)
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
}
filterFunc := filterFunction(key, c.keyFunc, filter)

objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV)
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
}
for _, obj := range objs {
object, ok := obj.(runtime.Object)
if !ok {
Expand Down
32 changes: 30 additions & 2 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@ import (
"sort"
"strconv"
"sync"
"time"

"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)

const (
// MaximumListWait determines how long we're willing to wait for a
// list if a client specified a resource version in the future.
MaximumListWait = 60 * time.Second
)

// watchCacheEvent is a single "watch event" that is send to users of
// watchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the
Expand Down Expand Up @@ -85,6 +93,9 @@ type watchCache struct {
// This handler is run at the end of every Add/Update/Delete method
// and additionally gets the previous value of the object.
onEvent func(watchCacheEvent)

// for testing timeouts.
clock util.Clock
}

func newWatchCache(capacity int) *watchCache {
Expand All @@ -95,6 +106,7 @@ func newWatchCache(capacity int) *watchCache {
endIndex: 0,
store: cache.NewStore(cache.MetaNamespaceKeyFunc),
resourceVersion: 0,
clock: util.RealClock{},
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
Expand Down Expand Up @@ -193,13 +205,29 @@ func (w *watchCache) List() []interface{} {
return w.store.List()
}

func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64) {
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
startTime := w.clock.Now()
go func() {
// Wake us up when the time limit has expired. The docs
// promise that time.After (well, NewTimer, which it calls)
// will wait *at least* the duration given. Since this go
// routine starts sometime after we record the start time, and
// it will wake up the loop below sometime after the broadcast,
// we don't need to worry about waking it up before the time
// has expired accidentally.
<-w.clock.After(MaximumListWait)
w.cond.Broadcast()
}()

w.RLock()
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= MaximumListWait {
return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
}
w.cond.Wait()
}
defer w.RUnlock()
return w.store.List(), w.resourceVersion
return w.store.List(), w.resourceVersion, nil
}

func (w *watchCache) ListKeys() []string {
Expand Down
55 changes: 48 additions & 7 deletions pkg/storage/watch_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"strconv"
"testing"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
Expand All @@ -40,8 +41,15 @@ func makeTestPod(name string, resourceVersion uint64) *api.Pod {
}
}

// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int) *watchCache {
wc := newWatchCache(capacity)
wc.clock = util.NewFakeClock(time.Now())
return wc
}

func TestWatchCacheBasic(t *testing.T) {
store := newWatchCache(2)
store := newTestWatchCache(2)

// Test Add/Update/Delete.
pod1 := makeTestPod("pod", 1)
Expand Down Expand Up @@ -111,7 +119,7 @@ func TestWatchCacheBasic(t *testing.T) {
}

func TestEvents(t *testing.T) {
store := newWatchCache(5)
store := newTestWatchCache(5)

store.Add(makeTestPod("pod", 2))

Expand Down Expand Up @@ -231,15 +239,18 @@ func TestEvents(t *testing.T) {
}

func TestWaitUntilFreshAndList(t *testing.T) {
store := newWatchCache(3)
store := newTestWatchCache(3)

// In background, update the store.
go func() {
store.Add(makeTestPod("foo", 2))
store.Add(makeTestPod("bar", 5))
}()

list, resourceVersion := store.WaitUntilFreshAndList(4)
list, resourceVersion, err := store.WaitUntilFreshAndList(5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
Expand All @@ -248,6 +259,30 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}
}

func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store := newTestWatchCache(3)
fc := store.clock.(*util.FakeClock)

// In background, step clock after the below call starts the timer.
go func() {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
fc.Step(MaximumListWait)

// Add an object to make sure the test would
// eventually fail instead of just waiting
// forever.
time.Sleep(30 * time.Second)
store.Add(makeTestPod("bar", 5))
}()

_, _, err := store.WaitUntilFreshAndList(5)
if err == nil {
t.Fatalf("unexpected lack of timeout error")
}
}

type testLW struct {
ListFunc func(options api.ListOptions) (runtime.Object, error)
WatchFunc func(options api.ListOptions) (watch.Interface, error)
Expand All @@ -261,10 +296,13 @@ func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
}

func TestReflectorForWatchCache(t *testing.T) {
store := newWatchCache(5)
store := newTestWatchCache(5)

{
_, version := store.WaitUntilFreshAndList(0)
_, version, err := store.WaitUntilFreshAndList(0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 0 {
t.Errorf("unexpected resource version: %d", version)
}
Expand All @@ -284,7 +322,10 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(util.NeverStop)

{
_, version := store.WaitUntilFreshAndList(10)
_, version, err := store.WaitUntilFreshAndList(10)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if version != 10 {
t.Errorf("unexpected resource version: %d", version)
}
Expand Down