Skip to content

Commit

Permalink
Cache watch returns incorrect object on DELETED events
Browse files Browse the repository at this point in the history
The underlying storage has always returned the old object on watch
delete events when filtering. The cache watcher does not, which means a
downsteam caller gets different behavior.

This fixes the cache watcher to be consistent with our long term
behavior for watch. It may result in a behavior change (the filter
becomes more precise) but this was a regression in behavior.
  • Loading branch information
smarterclayton committed May 22, 2017
1 parent 8cd95c7 commit e9e6935
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 5 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
Expand Down
20 changes: 15 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,18 +850,28 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
return
}

object, err := c.copier.Copy(event.Object)
if err != nil {
glog.Errorf("unexpected copy error: %v", err)
return
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
object, err := c.copier.Copy(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
return
}
watchEvent = watch.Event{Type: watch.Added, Object: object}
case curObjPasses && oldObjPasses:
object, err := c.copier.Copy(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
return
}
watchEvent = watch.Event{Type: watch.Modified, Object: object}
case !curObjPasses && oldObjPasses:
object, err := c.copier.Copy(event.PrevObject)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unexpected copy error: %v", err))
return
}
watchEvent = watch.Event{Type: watch.Deleted, Object: object}
}

Expand Down
121 changes: 121 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package storage

import (
"reflect"
"sync"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/pkg/api/v1"
)
Expand Down Expand Up @@ -55,3 +59,120 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
}
}

func TestCacheWatcherHandlesFiltering(t *testing.T) {
filter := func(_ string, _ labels.Set, field fields.Set) bool {
return field["spec.nodeName"] == "host"
}
forget := func(bool) {}

testCases := []struct {
events []*watchCacheEvent
expected []watch.Event
}{
// properly handle starting with the filter, then being deleted, then re-added
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
},
},
// properly handle ignoring changes prior to the filter, then getting added, then deleted
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""},
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
},
},
}

TestCase:
for i, testCase := range testCases {
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
for j := range testCase.events {
testCase.events[j].ResourceVersion = uint64(j) + 1
}
w := newCacheWatcher(scheme.Scheme, 0, 0, testCase.events, filter, forget)
ch := w.ResultChan()
for j, event := range testCase.expected {
e := <-ch
if !reflect.DeepEqual(event, e) {
t.Errorf("%d: unexpected event %d: %s", i, j, diff.ObjectReflectDiff(event, e))
break TestCase
}
}
select {
case obj, ok := <-ch:
t.Errorf("%d: unexpected excess event: %#v %t", i, obj, ok)
break TestCase
default:
}
w.Stop()
}
}

0 comments on commit e9e6935

Please sign in to comment.