diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b94ad55e78..1591d01adc 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -272,331 +272,331 @@ }, { "ImportPath": "k8s.io/api/admissionregistration/v1alpha1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/admissionregistration/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/apps/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/apps/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/apps/v1beta2", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/authentication/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/authentication/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/authorization/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/authorization/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/autoscaling/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/autoscaling/v2beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/batch/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/batch/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/batch/v2alpha1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/certificates/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/coordination/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/core/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/events/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/extensions/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/imagepolicy/v1alpha1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/networking/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/policy/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/rbac/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/rbac/v1alpha1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/rbac/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/scheduling/v1alpha1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/scheduling/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/settings/v1alpha1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/storage/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/storage/v1alpha1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/api/storage/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/apitesting", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/fuzzer", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/apitesting/roundtrip", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/equality", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/errors", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/meta", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/api/resource", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/fuzzer", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/internalversion", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/conversion", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/conversion/queryparams", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/fields", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/labels", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/schema", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/json", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/protobuf", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/recognizer", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/streaming", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/runtime/serializer/versioning", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/selection", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/types", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/cache", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/clock", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/diff", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/errors", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/framer", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/httpstream", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/httpstream/spdy", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/intstr", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/json", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/mergepatch", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/naming", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/net", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/remotecommand", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/runtime", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/sets", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/strategicpatch", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/validation", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/validation/field", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/wait", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/util/yaml", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/version", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/pkg/watch", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/json", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/netutil", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/apimachinery/third_party/forked/golang/reflect", - "Rev": "cd66986a58813b5cd7035cdacae20d4266f26f2d" + "Rev": "11dac8436abbe3fa21053bd0b31e8956ef1a29cc" }, { "ImportPath": "k8s.io/kube-openapi/pkg/util/proto", diff --git a/tools/cache/listwatch.go b/tools/cache/listwatch.go index 30463aea7d..3db9639637 100644 --- a/tools/cache/listwatch.go +++ b/tools/cache/listwatch.go @@ -20,15 +20,12 @@ import ( "context" "time" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/pager" - watchtools "k8s.io/client-go/tools/watch" ) // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. @@ -113,78 +110,3 @@ func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { return lw.WatchFunc(options) } - -// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout -// if timeout is exceeded without all conditions returning true, or an error if an error occurs. -// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { - if len(conditions) == 0 { - return nil, nil - } - - list, err := lw.List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - initialItems, err := meta.ExtractList(list) - if err != nil { - return nil, err - } - - // use the initial items as simulated "adds" - var lastEvent *watch.Event - currIndex := 0 - passedConditions := 0 - for _, condition := range conditions { - // check the next condition against the previous event and short circuit waiting for the next watch - if lastEvent != nil { - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - continue - } - } - - ConditionSucceeded: - for currIndex < len(initialItems) { - lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]} - currIndex++ - - done, err := condition(*lastEvent) - if err != nil { - return lastEvent, err - } - if done { - passedConditions = passedConditions + 1 - break ConditionSucceeded - } - } - } - if passedConditions == len(conditions) { - return lastEvent, nil - } - remainingConditions := conditions[passedConditions:] - - metaObj, err := meta.ListAccessor(list) - if err != nil { - return nil, err - } - currResourceVersion := metaObj.GetResourceVersion() - - watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion}) - if err != nil { - return nil, err - } - - ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() - evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...) - if err == watchtools.ErrWatchClosed { - // present a consistent error interface to callers - err = wait.ErrWaitTimeout - } - return evt, err -} diff --git a/tools/watch/informerwatcher.go b/tools/watch/informerwatcher.go new file mode 100644 index 0000000000..35a3469493 --- /dev/null +++ b/tools/watch/informerwatcher.go @@ -0,0 +1,114 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newTicketer() *ticketer { + return &ticketer{ + cond: sync.NewCond(&sync.Mutex{}), + } +} + +type ticketer struct { + counter uint64 + + cond *sync.Cond + current uint64 +} + +func (t *ticketer) GetTicket() uint64 { + // -1 to start from 0 + return atomic.AddUint64(&t.counter, 1) - 1 +} + +func (t *ticketer) WaitForTicket(ticket uint64, f func()) { + t.cond.L.Lock() + defer t.cond.L.Unlock() + for ticket != t.current { + t.cond.Wait() + } + + f() + + t.current++ + t.cond.Broadcast() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + t := newTicketer() + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + UpdateFunc: func(old, new interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + DeleteFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using watch API based on watch.Event + // but the caller can filter such objects by checking if metadata.deletionTimestamp is set + obj = staleObj + } + + select { + case ch <- watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + }, cache.Indexers{}) + + go func() { + informer.Run(w.StopChan()) + }() + + return indexer, informer, w +} diff --git a/tools/watch/informerwatcher_test.go b/tools/watch/informerwatcher_test.go new file mode 100644 index 0000000000..e94b4d2563 --- /dev/null +++ b/tools/watch/informerwatcher_test.go @@ -0,0 +1,236 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "math/rand" + "reflect" + "sort" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/watch" + fakeclientset "k8s.io/client-go/kubernetes/fake" + testcore "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +type byEventTypeAndName []watch.Event + +func (a byEventTypeAndName) Len() int { return len(a) } +func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byEventTypeAndName) Less(i, j int) bool { + if a[i].Type < a[j].Type { + return true + } + + if a[i].Type > a[j].Type { + return false + } + + return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name +} + +func TestTicketer(t *testing.T) { + tg := newTicketer() + + const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines + var tickets []uint64 + for i := 0; i < numTickets; i++ { + ticket := tg.GetTicket() + tickets = append(tickets, ticket) + + exp, got := uint64(i), ticket + if got != exp { + t.Fatalf("expected ticket %d, got %d", exp, got) + } + } + + // shuffle tickets + rand.Shuffle(len(tickets), func(i, j int) { + tickets[i], tickets[j] = tickets[j], tickets[i] + }) + + res := make(chan uint64, len(tickets)) + for _, ticket := range tickets { + go func(ticket uint64) { + time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) + tg.WaitForTicket(ticket, func() { + res <- ticket + }) + }(ticket) + } + + for i := 0; i < numTickets; i++ { + exp, got := uint64(i), <-res + if got != exp { + t.Fatalf("expected ticket %d, got %d", exp, got) + } + } +} + +func TestNewInformerWatcher(t *testing.T) { + // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. + tt := []struct { + name string + objects []runtime.Object + events []watch.Event + }{ + { + name: "basic test", + objects: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + }, + StringData: map[string]string{ + "foo-1": "initial", + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + }, + StringData: map[string]string{ + "foo-2": "initial", + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + }, + StringData: map[string]string{ + "foo-3": "initial", + }, + }, + }, + events: []watch.Event{ + { + Type: watch.Added, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-4", + }, + StringData: map[string]string{ + "foo-4": "initial", + }, + }, + }, + { + Type: watch.Modified, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + }, + StringData: map[string]string{ + "foo-2": "new", + }, + }, + }, + { + Type: watch.Deleted, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + var expected []watch.Event + for _, o := range tc.objects { + expected = append(expected, watch.Event{ + Type: watch.Added, + Object: o.DeepCopyObject(), + }) + } + for _, e := range tc.events { + expected = append(expected, *e.DeepCopy()) + } + + fake := fakeclientset.NewSimpleClientset(tc.objects...) + fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false) + fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil)) + + for _, e := range tc.events { + fakeWatch.Action(e.Type, e.Object) + } + + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fake.Core().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fake.Core().Secrets("").Watch(options) + }, + } + _, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{}) + + var result []watch.Event + loop: + for { + var event watch.Event + var ok bool + select { + case event, ok = <-w.ResultChan(): + if !ok { + t.Errorf("Failed to read event: channel is already closed!") + return + } + + result = append(result, *event.DeepCopy()) + case <-time.After(time.Second * 1): + // All the events are buffered -> this means we are done + // Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event + break loop + } + } + + // Informers don't guarantee event order so we need to sort these arrays to compare them + sort.Sort(byEventTypeAndName(expected)) + sort.Sort(byEventTypeAndName(result)) + + if !reflect.DeepEqual(expected, result) { + t.Error(spew.Errorf("\nexpected: %#v,\ngot: %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result))) + return + } + + // Fill in some data to test watch closing while there are some events to be read + for _, e := range tc.events { + fakeWatch.Action(e.Type, e.Object) + } + + // Stop before reading all the data to make sure the informer can deal with closed channel + w.Stop() + + // Wait a bit to see if the informer won't panic + // TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591) + time.Sleep(1 * time.Second) + }) + } + +} diff --git a/tools/watch/until.go b/tools/watch/until.go index 4a891b2351..9335788439 100644 --- a/tools/watch/until.go +++ b/tools/watch/until.go @@ -19,13 +19,22 @@ package watch import ( "context" "errors" + "fmt" "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" ) +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + // ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, // or an error if the condition cannot be checked and should terminate. In general, it is better to define // level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed @@ -86,6 +95,42 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions return lastEvent, nil } +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType) + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + // ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { if timeout < 0 { @@ -100,3 +145,81 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) ( return context.WithTimeout(parent, timeout) } + +// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout +// if timeout is exceeded without all conditions returning true, or an error if an error occurs. +// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. +// TODO: remove when no longer used +// +// Deprecated: Use UntilWithSync instead. +func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { + if len(conditions) == 0 { + return nil, nil + } + + list, err := lw.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + initialItems, err := meta.ExtractList(list) + if err != nil { + return nil, err + } + + // use the initial items as simulated "adds" + var lastEvent *watch.Event + currIndex := 0 + passedConditions := 0 + for _, condition := range conditions { + // check the next condition against the previous event and short circuit waiting for the next watch + if lastEvent != nil { + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + continue + } + } + + ConditionSucceeded: + for currIndex < len(initialItems) { + lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]} + currIndex++ + + done, err := condition(*lastEvent) + if err != nil { + return lastEvent, err + } + if done { + passedConditions = passedConditions + 1 + break ConditionSucceeded + } + } + } + if passedConditions == len(conditions) { + return lastEvent, nil + } + remainingConditions := conditions[passedConditions:] + + metaObj, err := meta.ListAccessor(list) + if err != nil { + return nil, err + } + currResourceVersion := metaObj.GetResourceVersion() + + watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion}) + if err != nil { + return nil, err + } + + ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...) + if err == ErrWatchClosed { + // present a consistent error interface to callers + err = wait.ErrWaitTimeout + } + return evt, err +} diff --git a/tools/watch/until_test.go b/tools/watch/until_test.go index e766acd736..dd0559461e 100644 --- a/tools/watch/until_test.go +++ b/tools/watch/until_test.go @@ -19,14 +19,19 @@ package watch import ( "context" "errors" + "reflect" "strings" "testing" "time" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + fakeclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) type fakePod struct { @@ -172,3 +177,127 @@ func TestUntilErrorCondition(t *testing.T) { t.Fatalf("expected %q in error string, got %q", expected, err.Error()) } } + +func TestUntilWithSync(t *testing.T) { + // FIXME: test preconditions + tt := []struct { + name string + lw *cache.ListWatch + preconditionFunc PreconditionFunc + conditionFunc ConditionFunc + expectedErr error + expectedEvent *watch.Event + }{ + { + name: "doesn't wait for sync with no precondition", + lw: &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + select {} + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + select {} + }, + }, + preconditionFunc: nil, + conditionFunc: func(e watch.Event) (bool, error) { + return true, nil + }, + expectedErr: errors.New("timed out waiting for the condition"), + expectedEvent: nil, + }, + { + name: "waits indefinitely with precondition if it can't sync", + lw: &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + select {} + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + select {} + }, + }, + preconditionFunc: func(store cache.Store) (bool, error) { + return true, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + return true, nil + }, + expectedErr: errors.New("UntilWithSync: unable to sync caches: context deadline exceeded"), + expectedEvent: nil, + }, + { + name: "precondition can stop the loop", + lw: func() *cache.ListWatch { + fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}) + + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fakeclient.CoreV1().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fakeclient.CoreV1().Secrets("").Watch(options) + }, + } + }(), + preconditionFunc: func(store cache.Store) (bool, error) { + _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"}) + if err != nil { + return true, err + } + if exists { + return true, nil + } + return false, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + return true, errors.New("should never reach this") + }, + expectedErr: nil, + expectedEvent: nil, + }, + { + name: "precondition lets it proceed to regular condition", + lw: func() *cache.ListWatch { + fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}) + + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fakeclient.CoreV1().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fakeclient.CoreV1().Secrets("").Watch(options) + }, + } + }(), + preconditionFunc: func(store cache.Store) (bool, error) { + return false, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + if e.Type == watch.Added { + return true, nil + } + panic("no other events are expected") + }, + expectedErr: nil, + expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + // Informer waits for caches to sync by polling in 100ms intervals, + // timeout needs to be reasonably higher + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc) + + if !reflect.DeepEqual(err, tc.expectedErr) { + t.Errorf("expected error %#v, got %#v", tc.expectedErr, err) + } + + if !reflect.DeepEqual(event, tc.expectedEvent) { + t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event) + } + }) + } +} diff --git a/util/certificate/csr/csr.go b/util/certificate/csr/csr.go index 22112a5b5b..4a53352fee 100644 --- a/util/certificate/csr/csr.go +++ b/util/certificate/csr/csr.go @@ -24,10 +24,11 @@ import ( "encoding/base64" "encoding/pem" "fmt" - "github.com/golang/glog" "reflect" "time" + "github.com/golang/glog" + certificates "k8s.io/api/certificates/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/watch" certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1" "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" certutil "k8s.io/client-go/util/cert" ) @@ -121,7 +123,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) { fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String() - event, err := cache.ListWatchUntil( + event, err := watchtools.ListWatchUntil( timeout, &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {