Skip to content

Commit

Permalink
Find current resourceVersion for waiting for deletion/conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
liggitt committed Jan 11, 2019
1 parent 0614f52 commit d3299c0
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/kubectl/cmd/wait/BUILD
Expand Up @@ -11,6 +11,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
Expand Down
38 changes: 30 additions & 8 deletions pkg/kubectl/cmd/wait/wait.go
Expand Up @@ -28,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -242,14 +243,25 @@ func (o *WaitOptions) RunWait() error {
func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
endTime := time.Now().Add(o.Timeout)
for {
gottenObj, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(info.Name, metav1.GetOptions{})
if len(info.Name) == 0 {
return info.Object, false, fmt.Errorf("resource name must be provided")
}

nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()

// List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion)
gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})
if apierrors.IsNotFound(err) {
return info.Object, true, nil
}
if err != nil {
// TODO this could do something slightly fancier if we wish
return info.Object, false, err
}
if len(gottenObjList.Items) != 1 {
return info.Object, true, nil
}
gottenObj := &gottenObjList.Items[0]
resourceLocation := ResourceLocation{
GroupResource: info.Mapping.Resource.GroupResource(),
Namespace: gottenObj.GetNamespace(),
Expand All @@ -262,8 +274,8 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
}

watchOptions := metav1.ListOptions{}
watchOptions.FieldSelector = "metadata.name=" + info.Name
watchOptions.ResourceVersion = gottenObj.GetResourceVersion()
watchOptions.FieldSelector = nameSelector
watchOptions.ResourceVersion = gottenObjList.GetResourceVersion()
objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)
if err != nil {
return gottenObj, false, err
Expand Down Expand Up @@ -308,26 +320,36 @@ type ConditionalWait struct {
func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
endTime := time.Now().Add(o.Timeout)
for {
if len(info.Name) == 0 {
return info.Object, false, fmt.Errorf("resource name must be provided")
}

nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()

var gottenObj *unstructured.Unstructured
// List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion)
gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})

resourceVersion := ""
gottenObj, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(info.Name, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
resourceVersion = "0"
case err != nil:
return info.Object, false, err
case len(gottenObjList.Items) != 1:
resourceVersion = gottenObjList.GetResourceVersion()
default:
gottenObj = &gottenObjList.Items[0]
conditionMet, err := w.checkCondition(gottenObj)
if conditionMet {
return gottenObj, true, nil
}
if err != nil {
return gottenObj, false, err
}
resourceVersion = gottenObj.GetResourceVersion()
resourceVersion = gottenObjList.GetResourceVersion()
}

watchOptions := metav1.ListOptions{}
watchOptions.FieldSelector = "metadata.name=" + info.Name
watchOptions.FieldSelector = nameSelector
watchOptions.ResourceVersion = resourceVersion
objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)
if err != nil {
Expand Down
80 changes: 48 additions & 32 deletions pkg/kubectl/cmd/wait/wait_test.go
Expand Up @@ -39,6 +39,14 @@ import (
clienttesting "k8s.io/client-go/testing"
)

func newUnstructuredList(items ...*unstructured.Unstructured) *unstructured.UnstructuredList {
list := &unstructured.UnstructuredList{}
for i := range items {
list.Items = append(list.Items, *items[i])
}
return list
}

func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down Expand Up @@ -96,7 +104,7 @@ func TestWaitForDeletion(t *testing.T) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
},
Expand Down Expand Up @@ -129,8 +137,8 @@ func TestWaitForDeletion(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructuredList(newUnstructured("group/version", "TheKind", "ns-foo", "name-foo")), nil
})
count := 0
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
Expand Down Expand Up @@ -158,7 +166,7 @@ func TestWaitForDeletion(t *testing.T) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
},
Expand All @@ -176,8 +184,8 @@ func TestWaitForDeletion(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructuredList(newUnstructured("group/version", "TheKind", "ns-foo", "name-foo")), nil
})
return fakeClient
},
Expand All @@ -188,7 +196,7 @@ func TestWaitForDeletion(t *testing.T) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[1].Matches("watch", "theresource") {
Expand All @@ -209,8 +217,12 @@ func TestWaitForDeletion(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
unstructuredObj := newUnstructured("group/version", "TheKind", "ns-foo", "name-foo")
unstructuredObj.SetResourceVersion("123")
unstructuredList := newUnstructuredList(unstructuredObj)
unstructuredList.SetResourceVersion("234")
return true, unstructuredList, nil
})
count := 0
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
Expand All @@ -235,16 +247,16 @@ func TestWaitForDeletion(t *testing.T) {
if len(actions) != 4 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[1].Matches("watch", "theresource") {
if !actions[1].Matches("watch", "theresource") || actions[1].(clienttesting.WatchAction).GetWatchRestrictions().ResourceVersion != "234" {
t.Error(spew.Sdump(actions))
}
if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[2].Matches("list", "theresource") || actions[2].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[3].Matches("watch", "theresource") {
if !actions[3].Matches("watch", "theresource") || actions[3].(clienttesting.WatchAction).GetWatchRestrictions().ResourceVersion != "234" {
t.Error(spew.Sdump(actions))
}
},
Expand All @@ -262,8 +274,8 @@ func TestWaitForDeletion(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructuredList(newUnstructured("group/version", "TheKind", "ns-foo", "name-foo")), nil
})
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
fakeWatch := watch.NewRaceFreeFake()
Expand All @@ -278,7 +290,7 @@ func TestWaitForDeletion(t *testing.T) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[1].Matches("watch", "theresource") {
Expand Down Expand Up @@ -344,11 +356,11 @@ func TestWaitForCondition(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, addCondition(
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructuredList(addCondition(
newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"),
"the-condition", "status-value",
), nil
)), nil
})
return fakeClient
},
Expand All @@ -358,7 +370,7 @@ func TestWaitForCondition(t *testing.T) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
},
Expand Down Expand Up @@ -391,7 +403,7 @@ func TestWaitForCondition(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, addCondition(
newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"),
"some-other-condition", "status-value",
Expand All @@ -406,7 +418,7 @@ func TestWaitForCondition(t *testing.T) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[1].Matches("watch", "theresource") {
Expand All @@ -427,8 +439,12 @@ func TestWaitForCondition(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
unstructuredObj := newUnstructured("group/version", "TheKind", "ns-foo", "name-foo")
unstructuredObj.SetResourceVersion("123")
unstructuredList := newUnstructuredList(unstructuredObj)
unstructuredList.SetResourceVersion("234")
return true, unstructuredList, nil
})
count := 0
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
Expand All @@ -453,16 +469,16 @@ func TestWaitForCondition(t *testing.T) {
if len(actions) != 4 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[1].Matches("watch", "theresource") {
if !actions[1].Matches("watch", "theresource") || actions[1].(clienttesting.WatchAction).GetWatchRestrictions().ResourceVersion != "234" {
t.Error(spew.Sdump(actions))
}
if !actions[2].Matches("get", "theresource") || actions[2].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[2].Matches("list", "theresource") || actions[2].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[3].Matches("watch", "theresource") {
if !actions[3].Matches("watch", "theresource") || actions[3].(clienttesting.WatchAction).GetWatchRestrictions().ResourceVersion != "234" {
t.Error(spew.Sdump(actions))
}
},
Expand All @@ -480,8 +496,8 @@ func TestWaitForCondition(t *testing.T) {
},
fakeClient: func() *dynamicfakeclient.FakeDynamicClient {
fakeClient := dynamicfakeclient.NewSimpleDynamicClient(scheme)
fakeClient.PrependReactor("get", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"), nil
fakeClient.PrependReactor("list", "theresource", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, newUnstructuredList(newUnstructured("group/version", "TheKind", "ns-foo", "name-foo")), nil
})
fakeClient.PrependWatchReactor("theresource", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
fakeWatch := watch.NewRaceFreeFake()
Expand All @@ -499,7 +515,7 @@ func TestWaitForCondition(t *testing.T) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[1].Matches("watch", "theresource") {
Expand Down Expand Up @@ -536,7 +552,7 @@ func TestWaitForCondition(t *testing.T) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
}
if !actions[0].Matches("get", "theresource") || actions[0].(clienttesting.GetAction).GetName() != "name-foo" {
if !actions[0].Matches("list", "theresource") || actions[0].(clienttesting.ListAction).GetListRestrictions().Fields.String() != "metadata.name=name-foo" {
t.Error(spew.Sdump(actions))
}
if !actions[1].Matches("watch", "theresource") {
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/dynamic/fake/simple.go
Expand Up @@ -303,6 +303,7 @@ func (c *dynamicResourceClient) List(opts metav1.ListOptions) (*unstructured.Uns
}

list := &unstructured.UnstructuredList{}
list.SetResourceVersion(entireList.GetResourceVersion())
for i := range entireList.Items {
item := &entireList.Items[i]
metadata, err := meta.Accessor(item)
Expand Down

0 comments on commit d3299c0

Please sign in to comment.