Skip to content

Commit

Permalink
Ensure that initial events are sorted for WatchList
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Feb 28, 2024
1 parent d2b4928 commit c52e76b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}

func TestCacherWatchSemantics(t *testing.T) {
func TestWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
}

func TestCacherWatchSemanticInitialEventsExtended(t *testing.T) {
func TestWatchSemanticInitialEventsExtended(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cacher

import (
"fmt"
"sort"
"sync"

"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -114,9 +115,24 @@ func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValida
}
}

type sortableWatchCacheEvents []*watchCacheEvent

func (s sortableWatchCacheEvents) Len() int {
return len(s)
}

func (s sortableWatchCacheEvents) Less(i, j int) bool {
return s[i].Key < s[j].Key
}

func (s sortableWatchCacheEvents) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

// newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events
// returned by Next() need to be events from a List() done on the underlying store of
// the watch cache.
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{}
allItems := store.List()
Expand All @@ -140,6 +156,7 @@ func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getA
}
buffer.endIndex++
}
sort.Sort(sortableWatchCacheEvents(buffer.buffer))
ci := &watchCacheInterval{
startIndex: 0,
// Simulate that we already have all the events we're looking for.
Expand Down
8 changes: 0 additions & 8 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,6 @@ func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEven
}
}

func testCheckResultsInRandomOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
for range expectedEvents {
testCheckResultFunc(t, w, func(actualEvent watch.Event) {
ExpectContains(t, "unexpected event", toInterfaceSlice(expectedEvents), actualEvent)
})
}
}

func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
select {
case e := <-w.ResultChan():
Expand Down
102 changes: 48 additions & 54 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,16 +1244,16 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
}
return ret
}
initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) watch.Event {
return watch.Event{
initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{{
Type: watch.Bookmark,
Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: createdInitialPods[len(createdInitialPods)-1].ResourceVersion,
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
},
}
}}
}
scenarios := []struct {
name string
Expand All @@ -1264,22 +1264,22 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
// after adding the initial pods which is then used to establish a new watch request
useCurrentRV bool

// The test heavily relies on the fact that initialPods in created
// in the alphabetical order.
initialPods []*example.Pod
podsAfterEstablishingWatch []*example.Pod

expectedInitialEventsInRandomOrder func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEventsInStrictOrder func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEvents func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEventsBookmark func(createdInitialPods []*example.Pod) []watch.Event
expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event
}{
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1302,21 +1302,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset",
sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},

{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1342,21 +1340,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal,
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},

{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1366,7 +1362,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &falseVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInStrictOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1375,7 +1371,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &falseVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInStrictOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1384,21 +1380,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},

{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1424,7 +1418,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal,
useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1433,14 +1427,14 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
name: "legacy, RV=0",
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
{
name: "legacy, RV=unset",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1449,11 +1443,11 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
t.Run(scenario.name, func(t *testing.T) {
// set up env
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
if scenario.expectedInitialEventsInStrictOrder == nil {
scenario.expectedInitialEventsInStrictOrder = func(_ []*example.Pod) []watch.Event { return nil }
if scenario.expectedInitialEvents == nil {
scenario.expectedInitialEvents = func(_ []*example.Pod) []watch.Event { return nil }
}
if scenario.expectedInitialEventsInRandomOrder == nil {
scenario.expectedInitialEventsInRandomOrder = func(_ []*example.Pod) []watch.Event { return nil }
if scenario.expectedInitialEventsBookmark == nil {
scenario.expectedInitialEventsBookmark = func(_ []*example.Pod) []watch.Event { return nil }
}
if scenario.expectedEventsAfterEstablishingWatch == nil {
scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil }
Expand Down Expand Up @@ -1487,8 +1481,8 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
defer w.Stop()

// make sure we only get initial events
testCheckResultsInRandomOrder(t, w, scenario.expectedInitialEventsInRandomOrder(createdPods))
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsInStrictOrder(createdPods))
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsBookmark(createdPods))
testCheckNoMoreResults(t, w)

createdPods = []*example.Pod{}
Expand Down

0 comments on commit c52e76b

Please sign in to comment.