Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch kubectl to use watch.Until #33942

Merged
merged 2 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/client/restclient/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,9 @@ var fieldMappings = versionToResourceToFieldMapping{
nodeUnschedulable: nodeUnschedulable,
},
"pods": clientFieldNameToAPIVersionFieldName{
podHost: podHost,
podStatus: podStatus,
objectNameField: objectNameField,
podHost: podHost,
podStatus: podStatus,
},
"secrets": clientFieldNameToAPIVersionFieldName{
secretType: secretType,
Expand Down
24 changes: 15 additions & 9 deletions pkg/kubectl/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/watch"
)

Expand Down Expand Up @@ -266,17 +267,22 @@ func RunGet(f *cmdutil.Factory, out io.Writer, errOut io.Writer, cmd *cobra.Comm

first := true
filteredResourceCount = 0
kubectl.WatchLoop(w, func(e watch.Event) error {
if !isList && first {
// drop the initial watch event in the single resource case
first = false
return nil
}
err := printer.PrintObj(e.Object, out)
if err == nil {
intr := interrupt.New(nil, w.Stop)
intr.Run(func() error {
Copy link
Member

@ncdc ncdc Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to add a 2nd commit to return the result of this call instead of nil, as you suggested in the previous PR?

_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
if !isList && first {
// drop the initial watch event in the single resource case
first = false
return false, nil
}
err := printer.PrintObj(e.Object, out)
if err != nil {
return false, err
}
filteredResourceCount++
cmdutil.PrintFilterCount(filteredResourceCount, mapping.Resource, errOut, filterOpts)
}
return false, nil
})
return err
})
return nil
Expand Down
30 changes: 17 additions & 13 deletions pkg/kubectl/cmd/rollout/rollout_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/watch"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -125,18 +126,21 @@ func RunStatus(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, args []str
}

// if the rollout isn't done yet, keep watching deployment status
kubectl.WatchLoop(w, func(e watch.Event) error {
// print deployment's status
status, done, err := statusViewer.Status(cmdNamespace, info.Name)
if err != nil {
return err
}
fmt.Fprintf(out, "%s", status)
// Quit waiting if the rollout is done
if done {
w.Stop()
}
return nil
intr := interrupt.New(nil, w.Stop)
return intr.Run(func() error {
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
// print deployment's status
status, done, err := statusViewer.Status(cmdNamespace, info.Name)
if err != nil {
return false, err
}
fmt.Fprintf(out, "%s", status)
// Quit waiting if the rollout is done
if done {
return true, nil
}
return false, nil
})
return err
})
return nil
}
111 changes: 57 additions & 54 deletions pkg/kubectl/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ import (
"github.com/docker/distribution/reference"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
conditions "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/watch"
)

Expand Down Expand Up @@ -372,90 +375,90 @@ func contains(resourcesList map[string]*unversioned.APIResourceList, resource un

// waitForPod watches the given pod until the exitCondition is true. Each two seconds
// the tick function is called e.g. for progress output.
func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition func(*api.Pod) bool, tick func(*api.Pod)) (*api.Pod, error) {
pod, err := podClient.Pods(ns).Get(name)
func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition watch.ConditionFunc, tick func(*api.Pod)) (*api.Pod, error) {
w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: name}))
if err != nil {
return nil, err
}
if exitCondition(pod) {
return pod, nil
}

tick(pod)

w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: pod.Name, ResourceVersion: pod.ResourceVersion}))
if err != nil {
return nil, err
}
pods := make(chan *api.Pod) // observed pods passed to the exitCondition
defer close(pods)

t := time.NewTicker(2 * time.Second)
defer t.Stop()
// wait for the first event, then start the 2 sec ticker and loop
go func() {
for range t.C {
tick(pod)
pod := <-pods
if pod == nil {
return
}
}()
tick(pod)

err = nil
result := pod
kubectl.WatchLoop(w, func(ev watch.Event) error {
switch ev.Type {
case watch.Added, watch.Modified:
pod = ev.Object.(*api.Pod)
if exitCondition(pod) {
result = pod
w.Stop()
t := time.NewTicker(2 * time.Second)
defer t.Stop()

for {
select {
case pod = <-pods:
if pod == nil {
return
}
case _, ok := <-t.C:
if !ok {
return
}
tick(pod)
}
case watch.Deleted:
w.Stop()
case watch.Error:
result = nil
err = fmt.Errorf("failed to watch pod %s/%s", ns, name)
w.Stop()
}
return nil
})
}()

intr := interrupt.New(nil, w.Stop)
var result *api.Pod
err = intr.Run(func() error {
ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) {
c, err := exitCondition(ev)
if c == false && err == nil {
pods <- ev.Object.(*api.Pod) // send to ticker
}
return c, err
})
result = ev.Object.(*api.Pod)
return err
})
return result, err
}

func waitForPodRunning(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
switch pod.Status.Phase {
case api.PodRunning:
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
case api.PodSucceeded, api.PodFailed:
return true
default:
return false
}
}
return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) {
pod, err := waitForPod(podClient, ns, name, conditions.PodRunningAndReady, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})

// fix generic not found error with empty name in PodRunningAndReady
if err != nil && errors.IsNotFound(err) {
return nil, errors.NewNotFound(api.Resource("pods"), name)
}

return pod, err
}

func waitForPodTerminated(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
return pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed
}
return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) {
pod, err := waitForPod(podClient, ns, name, conditions.PodCompleted, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})

// fix generic not found error with empty name in PodCompleted
if err != nil && errors.IsNotFound(err) {
return nil, errors.NewNotFound(api.Resource("pods"), name)
}

return pod, err
}

func handleAttachPod(f *cmdutil.Factory, podClient coreclient.PodsGetter, ns, name string, opts *AttachOptions, quiet bool) error {
pod, err := waitForPodRunning(podClient, ns, name, opts.Out, quiet)
if err != nil {
if err != nil && err != conditions.ErrPodCompleted {
return err
}
ctrName, err := opts.GetContainerName(pod)
Expand Down
45 changes: 0 additions & 45 deletions pkg/kubectl/watchloop.go

This file was deleted.

3 changes: 2 additions & 1 deletion pkg/watch/until.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ConditionFunc func(event Event) (bool, error)
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// A zero timeout means to wait forever.
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
Expand All @@ -40,7 +41,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
after = time.After(timeout)
} else {
ch := make(chan time.Time)
close(ch)
defer close(ch)
after = ch
}
var lastEvent *Event
Expand Down
27 changes: 21 additions & 6 deletions pkg/watch/until_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/wait"
)

func TestUntil(t *testing.T) {
Expand Down Expand Up @@ -83,17 +82,33 @@ func TestUntilMultipleConditions(t *testing.T) {

func TestUntilTimeout(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
fw.Modify(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) {
return event.Type == Added, nil
},
func(event Event) (bool, error) {
return event.Type == Modified, nil
},
}

timeout := time.Duration(0)
lastEvent, err := Until(timeout, fw, conditions...)
if err != wait.ErrWaitTimeout {
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent != nil {
t.Fatalf("expected nil event, got %#v", lastEvent)
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}

Expand Down