Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that initial events are sorted for WatchList #120897

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}

func TestCacherWatchSemantics(t *testing.T) {
func TestWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store)
}

func TestCacherWatchSemanticInitialEventsExtended(t *testing.T) {
func TestWatchSemanticInitialEventsExtended(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate)
storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cacher

import (
"fmt"
"sort"
"sync"

"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -114,9 +115,24 @@ func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValida
}
}

type sortableWatchCacheEvents []*watchCacheEvent

func (s sortableWatchCacheEvents) Len() int {
return len(s)
}

func (s sortableWatchCacheEvents) Less(i, j int) bool {
return s[i].Key < s[j].Key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is key = namespace/name ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the etcd key [that etcd is using to sort, as well as we're using elsewhere:
https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go#L479-L491
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - I think I found it -

keyFunc := func(obj runtime.Object) (string, error) {

}

func (s sortableWatchCacheEvents) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

// newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update the doc saying the output will be sorted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// returned by Next() need to be events from a List() done on the underlying store of
// the watch cache.
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{}
allItems := store.List()
Expand All @@ -140,6 +156,7 @@ func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getA
}
buffer.endIndex++
}
sort.Sort(sortableWatchCacheEvents(buffer.buffer))
wojtek-t marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider using the newer slices.Sort function, which runs faster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wojtek-t mind checking if slices.Sort is faster ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that it's not used anywhere in the codebase yet - we should do better analysis and migrate more if it appears better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks.

ci := &watchCacheInterval{
startIndex: 0,
// Simulate that we already have all the events we're looking for.
Expand Down
8 changes: 0 additions & 8 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,6 @@ func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEven
}
}

func testCheckResultsInRandomOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
for range expectedEvents {
testCheckResultFunc(t, w, func(actualEvent watch.Event) {
ExpectContains(t, "unexpected event", toInterfaceSlice(expectedEvents), actualEvent)
})
}
}

func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
select {
case e := <-w.ResultChan():
Expand Down
100 changes: 46 additions & 54 deletions staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,16 +1244,16 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
}
return ret
}
initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) watch.Event {
return watch.Event{
initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{{
Type: watch.Bookmark,
Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: createdInitialPods[len(createdInitialPods)-1].ResourceVersion,
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
},
}
}}
}
scenarios := []struct {
name string
Expand All @@ -1267,19 +1267,17 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
initialPods []*example.Pod
podsAfterEstablishingWatch []*example.Pod

expectedInitialEventsInRandomOrder func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEventsInStrictOrder func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEvents func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEventsBookmark func(createdInitialPods []*example.Pod) []watch.Event
expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event
}{
{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1302,21 +1300,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset",
sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},

{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1342,21 +1338,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal,
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},

{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1366,7 +1360,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &falseVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInStrictOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1375,7 +1369,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &falseVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInStrictOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1384,21 +1378,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal,
resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},

{
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event {
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV",
allowWatchBookmarks: true,
sendInitialEvents: &trueVal,
useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1424,7 +1416,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal,
useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1433,14 +1425,14 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
name: "legacy, RV=0",
resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
{
name: "legacy, RV=unset",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods,
expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
},
Expand All @@ -1449,11 +1441,11 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
t.Run(scenario.name, func(t *testing.T) {
// set up env
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
if scenario.expectedInitialEventsInStrictOrder == nil {
scenario.expectedInitialEventsInStrictOrder = func(_ []*example.Pod) []watch.Event { return nil }
if scenario.expectedInitialEvents == nil {
scenario.expectedInitialEvents = func(_ []*example.Pod) []watch.Event { return nil }
}
if scenario.expectedInitialEventsInRandomOrder == nil {
scenario.expectedInitialEventsInRandomOrder = func(_ []*example.Pod) []watch.Event { return nil }
if scenario.expectedInitialEventsBookmark == nil {
scenario.expectedInitialEventsBookmark = func(_ []*example.Pod) []watch.Event { return nil }
}
if scenario.expectedEventsAfterEstablishingWatch == nil {
scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil }
Expand Down Expand Up @@ -1487,8 +1479,8 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
defer w.Stop()

// make sure we only get initial events
testCheckResultsInRandomOrder(t, w, scenario.expectedInitialEventsInRandomOrder(createdPods))
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsInStrictOrder(createdPods))
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsBookmark(createdPods))
testCheckNoMoreResults(t, w)

createdPods = []*example.Pod{}
Expand Down