Skip to content

Commit

Permalink
Rename Until to UntilWithoutRetry and move to using context so it's
Browse files Browse the repository at this point in the history
cancelable
  • Loading branch information
tnozicka committed Aug 10, 2018
1 parent ccb92f6 commit 3d4a02a
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 88 deletions.
3 changes: 2 additions & 1 deletion pkg/client/tests/listwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
. "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/api/testapi"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestListWatchUntil(t *testing.T) {
watch: fw,
}

conditions := []watch.ConditionFunc{
conditions := []watchtools.ConditionFunc{
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Added, nil
Expand Down
8 changes: 6 additions & 2 deletions pkg/kubectl/cmd/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package get

import (
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -36,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubectl"
Expand Down Expand Up @@ -564,9 +566,11 @@ func (o *GetOptions) watch(f cmdutil.Factory, cmd *cobra.Command, args []string)
}

first := true
intr := interrupt.New(nil, w.Stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
intr := interrupt.New(nil, cancel)
intr.Run(func() error {
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
if !isList && first {
// drop the initial watch event in the single resource case
first = false
Expand Down
11 changes: 9 additions & 2 deletions pkg/kubectl/cmd/rollout/rollout_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ limitations under the License.
package rollout

import (
"context"
"fmt"
"time"

"github.com/spf13/cobra"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/watch"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand Down Expand Up @@ -191,9 +194,13 @@ func (o *RolloutStatusOptions) Run() error {
}

// if the rollout isn't done yet, keep watching deployment status
intr := interrupt.New(nil, w.Stop)
// TODO: expose timeout
timeout := 0 * time.Second
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
intr := interrupt.New(nil, cancel)
return intr.Run(func() error {
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
// print deployment's status
status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision)
if err != nil {
Expand Down
15 changes: 10 additions & 5 deletions pkg/kubectl/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ limitations under the License.
package cmd

import (
"context"
"fmt"
"time"

"github.com/docker/distribution/reference"
"github.com/spf13/cobra"

"github.com/golang/glog"
"github.com/spf13/cobra"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,6 +35,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
Expand Down Expand Up @@ -467,16 +469,19 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R
}

// waitForPod watches the given pod until the exitCondition is true
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watch.ConditionFunc) (*corev1.Pod, error) {
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) {
w, err := podClient.Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name}))
if err != nil {
return nil, err
}

intr := interrupt.New(nil, w.Stop)
// TODO: expose the timeout
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second)
defer cancel()
intr := interrupt.New(nil, cancel)
var result *corev1.Pod
err = intr.Run(func() error {
ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) {
ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) {
return exitCondition(ev)
})
if ev != nil {
Expand Down
16 changes: 12 additions & 4 deletions pkg/kubectl/cmd/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package wait

import (
"context"
"errors"
"fmt"
"strings"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/genericclioptions"
Expand Down Expand Up @@ -272,11 +274,14 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
// we're out of time
return gottenObj, false, wait.ErrWaitTimeout
}
watchEvent, err := watch.Until(o.Timeout, objWatch, isDeleted)

ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, isDeleted)
cancel()
switch {
case err == nil:
return watchEvent.Object, true, nil
case err == watch.ErrWatchClosed:
case err == watchtools.ErrWatchClosed:
continue
case err == wait.ErrWaitTimeout:
if watchEvent != nil {
Expand Down Expand Up @@ -334,11 +339,14 @@ func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (ru
// we're out of time
return gottenObj, false, wait.ErrWaitTimeout
}
watchEvent, err := watch.Until(o.Timeout, objWatch, w.isConditionMet)

ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet)
cancel()
switch {
case err == nil:
return watchEvent.Object, true, nil
case err == watch.ErrWatchClosed:
case err == watchtools.ErrWatchClosed:
continue
case err == wait.ErrWaitTimeout:
if watchEvent != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/kubectl/polymorphichelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package polymorphichelpers

import (
"context"
"fmt"
"sort"
"time"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
Expand Down Expand Up @@ -69,7 +71,10 @@ func GetFirstPod(client coreclient.PodsGetter, namespace string, selector string
condition := func(event watch.Event) (bool, error) {
return event.Type == watch.Added || event.Type == watch.Modified, nil
}
event, err := watch.Until(timeout, w, condition)

ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
event, err := watchtools.UntilWithoutRetry(ctx, w, condition)
if err != nil {
return nil, 0, err
}
Expand Down
9 changes: 6 additions & 3 deletions staging/src/k8s.io/client-go/tools/cache/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/pager"
watchtools "k8s.io/client-go/tools/watch"
)

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
Expand Down Expand Up @@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
return nil, err
}

evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
if err == watch.ErrWatchClosed {
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
if err == watchtools.ErrWatchClosed {
// present a consistent error interface to callers
err = wait.ErrWaitTimeout
}
Expand Down
49 changes: 32 additions & 17 deletions staging/src/k8s.io/client-go/tools/watch/until.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,39 @@ limitations under the License.
package watch

import (
"context"
"errors"
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
)

// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
// from false to true).
type ConditionFunc func(event Event) (bool, error)
type ConditionFunc func(event watch.Event) (bool, error)

// ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
var ErrWatchClosed = errors.New("watch closed before Until timeout")
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")

// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// A zero timeout means to wait forever.
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
// Waits until context deadline or until context is canceled.
//
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
// Warning: solving such issues.
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
var after <-chan time.Time
if timeout > 0 {
after = time.After(timeout)
} else {
ch := make(chan time.Time)
defer close(ch)
after = ch
}
var lastEvent *Event
var lastEvent *watch.Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
Expand All @@ -69,7 +70,6 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
}
lastEvent = &event

// TODO: check for watch expired error and retry watch from latest point?
done, err := condition(event)
if err != nil {
return lastEvent, err
Expand All @@ -78,10 +78,25 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
break ConditionSucceeded
}

case <-after:
case <-ctx.Done():
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}

// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 {
// This should be handled in validation
glog.Errorf("Timeout for context shall not be negative!")
timeout = 0
}

if timeout == 0 {
return context.WithCancel(parent)
}

return context.WithTimeout(parent, timeout)
}

0 comments on commit 3d4a02a

Please sign in to comment.