New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client-go: Add support for API streaming to the reflector #110772
client-go: Add support for API streaming to the reflector #110772
Conversation
/assign @wojtek-t |
deea5e9
to
8632649
Compare
8632649
to
ac00d9e
Compare
/triage accepted |
ac00d9e
to
04bcbfa
Compare
7b7c78c
to
25a8421
Compare
95f3bd1
to
19fd154
Compare
5dc3c67
to
718904b
Compare
if isErrorRetriableWithSideEffectsFn(err) { | ||
continue | ||
} | ||
klog.V(2).Infof("%s: watch-list of %v ended with: %v", r.name, r.expectedGVK, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove - this will be logged here:
https://github.com/kubernetes/kubernetes/pull/110772/files#diff-9ccdf713e010f73dbebd01e936cb0077fc63e4f5ab941d865ded42da219d84ecR337
if bookmarkReceived { | ||
break // success we got initial data and the bookmark | ||
} | ||
// try one more time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this comment - it doesn't bring any value.
} | ||
// try one more time | ||
} | ||
r.setIsLastSyncResourceVersionUnavailable(false) // watch-list was successful |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
// We successfully got initial state from watch-list confirmed by the
// "k8s.io/initial-events-end" bookmark.
initTrace.Step(...)
r.setIsLastSyncResource...
...
@@ -651,17 +801,35 @@ func (r *Reflector) relistResourceVersion() string { | |||
r.lastSyncResourceVersionMutex.RLock() | |||
defer r.lastSyncResourceVersionMutex.RUnlock() | |||
|
|||
if r.lastSyncResourceVersion == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't correct - I mean, we're changing semantics here.
If isLastSyncResourceVersionUnavailable
is true, then previously we would return "", and now we will return '0" if also lastSyncResourceVersion is empty.
Let's not change the semantics for the old path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[FWIW, I'm not sure why we actually did that, but even if we want to change it, we should do that explicitly in a separate PR and not change it as part of larger complex PR.]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't setIsLastSyncResourceVersionUnavailable
imply that r. lastSyncResourceVersion
is != ""
? After all we must have provided some RV to the server.
Anyway I can bring it back to the original state just in case.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion. | ||
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion. | ||
// After receiving a "Bookmark" event the reflector is considered to be synchronized. | ||
// It replaces its internal store with the collected items (syncWith) and reuses the current watch requests for getting further events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove "(syncWith)" and split the line a bit :)
var err error | ||
var temporaryStore Store | ||
var resourceVersion string | ||
var bookmarkReceived *bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This one isn't used in the scope of function - let's define it where it is used only.
var temporaryStore Store | ||
var resourceVersion string | ||
var bookmarkReceived *bool | ||
isErrorRetriableWithSideEffectsFn := func(err error) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'm not a huge fan of those nested functions - I'm wondering if we can make it an actual reflector function...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe in the future, for now it applies only to the watchList method.
I can add a TODO for me - make it a method and find a way to unify error handling in the reflector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not great, but I can probably live with this.
@@ -546,16 +682,13 @@ func watchHandler(start time.Time, | |||
name string, | |||
expectedTypeName string, | |||
setLastSyncResourceVersion func(string), | |||
exitOnInitEventBookmark *bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'm sorry - that was my mental shortcut, I think we should name it
"exitOnInitialEventsEndBookmark"
(for consistency with annotation name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np
@@ -609,6 +742,11 @@ loop: | |||
} | |||
case watch.Bookmark: | |||
// A `Bookmark` means watch has synced here, just update the resourceVersion | |||
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok { | |||
if exitOnInitEventBookmark != nil { | |||
exitOnInitEventBookmark = pointer.Bool(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*exitOn.. = true ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right :) - thx.
cb4b9cb
to
aac3889
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the test coverage - I added some comments, but there isn't anything big there.
if !apierrors.IsInvalid(err) { | ||
return err | ||
} | ||
klog.V(2).Info("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a warning
} | ||
klog.V(2).Info("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic") | ||
fallbackToList = true | ||
w = nil // just in case it was actually set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit;
// Ensure that we won't accidentally pass some garbage down the watch.
w = nil
var temporaryStore Store | ||
var resourceVersion string | ||
var bookmarkReceived *bool | ||
isErrorRetriableWithSideEffectsFn := func(err error) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not great, but I can probably live with this.
} | ||
return nil, err | ||
} | ||
// if we are here, that means we either hit the case 1 or the case 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this comment - this doesn't bring any value
return nil, err | ||
} | ||
if *bookmarkReceived { | ||
break // success we got initial data and the bookmark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this comment - this is obvious from variables naming.
lw.listCounter++ | ||
lw.requestOptions = append(lw.requestOptions, options) | ||
if lw.listCounter == lw.closeAfterListRequests { | ||
defer lw.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's defer here? Can't we just close it immediately?
if lw.watchCounter == lw.closeAfterWatchRequests { | ||
defer lw.close() | ||
} | ||
if lw.watcherErrorPredicate != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's call it maybe watchOptionsPredicate
{ | ||
name: "the reflector can fall back to old LIST/WATCH semantics when a server doesn't support streaming", | ||
watchServerErrorPredicate: func(options metav1.ListOptions) error { | ||
if options.ResourceVersionMatch == metav1.ResourceVersionMatchNotOlderThan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be more intuitive to check options.SendInitialEvents here...
returnedWatchEvents: []watch.Event{ | ||
{Type: watch.Added, Object: makePod("p1", "1")}, | ||
// second request | ||
{Type: watch.Added, Object: makePod("p2", "2")}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would sent the above p1 event again here - so that it looks more like a retry.
} | ||
}(), | ||
stopAfterWatchEvents: 3, | ||
closeAfterWatchEvents: 5, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand it - there aren't 5 events here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must have copy&past it from the previous test cases. Will remove.
It didn't affect the test - was unused.
aac3889
to
3a238c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last 3 minor nits - other than that LGTM.
go func() { | ||
for i, e := range scenario.watchEvents { | ||
listWatcher.fakeWatcher.Action(e.Type, e.Object) | ||
i++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand it. I played with it:
https://go.dev/play/p/FEuLkFBGb_g
having i++
there and not having it actually doesn't change the output - that's super misleading...
It would be much more intuitive to me if you instead change to:
- removing this
i++
fline - switching the following ifs to
if i+1 == ....
actualPods = append(actualPods, *p.(*v1.Pod)) | ||
} | ||
|
||
for _, expectedPod := range expectedPods { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of this, let's just sort the both lists by name and compare the whole lists - it's more intuitive.
[This code in general would return true for the following lists:
- [a, a, b]
- [a, b, b]
if r.UseWatchList { | ||
w, err = r.watchList(stopCh) | ||
if w == nil && err == nil { | ||
return nil // stopCh requested |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// stopCh was closed
return nil
3a238c3
to
9d872db
Compare
9d872db
to
966b26d
Compare
/lgtm |
LGTM label has been added. Git tree hash: 1e1503c42c1a45be2c251bab6cabcfed37b1e119
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: p0lyn0mial, wojtek-t The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@p0lyn0mial: The following tests failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
/test pull-kubernetes-e2e-kind-ipv6 |
What type of PR is this?
/kind feature
What this PR does / why we need it:
This pull request introduces new alpha functionality to the reflector, allowing users to enable API streaming.
To activate this feature, users can set the
ENABLE_CLIENT_GO_WATCH_LIST_ALPHA
environmental variable.It is important to note that the server must support streaming for this feature to function properly.
If streaming is not supported by the server, the reflector will revert to the previous method
of obtaining data through LIST/WATCH semantics.
Which issue(s) this PR fixes:
xref: kubernetes/enhancements#3157
xref: #115402
xref: #110960
Special notes for your reviewer:
Does this PR introduce a user-facing change?
Additional documentation e.g., KEPs (Kubernetes Enhancement Proposals), usage docs, etc.: