New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client-go: Add support for API streaming to the reflector #110772
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ import ( | |
"k8s.io/client-go/tools/pager" | ||
"k8s.io/klog/v2" | ||
"k8s.io/utils/clock" | ||
"k8s.io/utils/pointer" | ||
"k8s.io/utils/trace" | ||
) | ||
|
||
|
@@ -99,6 +100,15 @@ type Reflector struct { | |
ShouldResync func() bool | ||
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch. | ||
MaxInternalErrorRetryDuration time.Duration | ||
// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server. | ||
// Streaming has the primary advantage of using fewer server's resources to fetch data. | ||
// | ||
// The old behaviour establishes a LIST request which gets data in chunks. | ||
// Paginated list is less efficient and depending on the actual size of objects | ||
// might result in an increased memory consumption of the APIServer. | ||
// | ||
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details | ||
UseWatchList bool | ||
} | ||
|
||
// ResourceVersionUpdater is an interface that allows store implementation to | ||
|
@@ -311,17 +321,39 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { | |
// It returns error if ListAndWatch didn't even try to initialize watch. | ||
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name) | ||
var err error | ||
var w watch.Interface | ||
fallbackToList := !r.UseWatchList | ||
|
||
err := r.list(stopCh) | ||
if err != nil { | ||
return err | ||
if r.UseWatchList { | ||
w, err = r.watchList(stopCh) | ||
if w == nil && err == nil { | ||
// stopCh was closed | ||
return nil | ||
} | ||
if err != nil { | ||
if !apierrors.IsInvalid(err) { | ||
return err | ||
} | ||
klog.Warning("the watch-list feature is not supported by the server, falling back to the previous LIST/WATCH semantic") | ||
fallbackToList = true | ||
// Ensure that we won't accidentally pass some garbage down the watch. | ||
w = nil | ||
} | ||
} | ||
|
||
if fallbackToList { | ||
err = r.list(stopCh) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
resyncerrc := make(chan error, 1) | ||
cancelCh := make(chan struct{}) | ||
defer close(cancelCh) | ||
go r.startResync(stopCh, cancelCh, resyncerrc) | ||
return r.watch(nil, stopCh, resyncerrc) | ||
return r.watch(w, stopCh, resyncerrc) | ||
} | ||
|
||
// startResync periodically calls r.store.Resync() method. | ||
|
@@ -392,8 +424,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc | |
} | ||
} | ||
|
||
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) | ||
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh) | ||
// Ensure that watch will not be reused across iterations. | ||
w.Stop() | ||
wojtek-t marked this conversation as resolved.
Show resolved
Hide resolved
|
||
w = nil | ||
retry.After(err) | ||
if err != nil { | ||
|
@@ -528,6 +561,114 @@ func (r *Reflector) list(stopCh <-chan struct{}) error { | |
return nil | ||
} | ||
|
||
// watchList establishes a stream to get a consistent snapshot of data | ||
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal | ||
// | ||
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan) | ||
// Establishes a consistent stream with the server. | ||
// That means the returned data is consistent, as if, served directly from etcd via a quorum read. | ||
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion. | ||
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion. | ||
// After receiving a "Bookmark" event the reflector is considered to be synchronized. | ||
// It replaces its internal store with the collected items and | ||
// reuses the current watch requests for getting further events. | ||
// | ||
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan) | ||
// Establishes a stream with the server at the provided resource version. | ||
// To establish the initial state the server begins with synthetic "Added" events. | ||
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version. | ||
// After receiving a "Bookmark" event the reflector is considered to be synchronized. | ||
// It replaces its internal store with the collected items and | ||
// reuses the current watch requests for getting further events. | ||
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) { | ||
var w watch.Interface | ||
var err error | ||
var temporaryStore Store | ||
var resourceVersion string | ||
// TODO(#115478): see if this function could be turned | ||
// into a method and see if error handling | ||
// could be unified with the r.watch method | ||
isErrorRetriableWithSideEffectsFn := func(err error) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I'm not a huge fan of those nested functions - I'm wondering if we can make it an actual reflector function... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe in the future, for now it applies only to the watchList method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not great, but I can probably live with this. |
||
if canRetry := isWatchErrorRetriable(err); canRetry { | ||
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err) | ||
<-r.initConnBackoffManager.Backoff().C() | ||
return true | ||
} | ||
if isExpiredError(err) || isTooLargeResourceVersionError(err) { | ||
// we tried to re-establish a watch request but the provided RV | ||
// has either expired or it is greater than the server knows about. | ||
// In that case we reset the RV and | ||
// try to get a consistent snapshot from the watch cache (case 1) | ||
r.setIsLastSyncResourceVersionUnavailable(true) | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name}) | ||
defer initTrace.LogIfLong(10 * time.Second) | ||
for { | ||
select { | ||
case <-stopCh: | ||
return nil, nil | ||
default: | ||
} | ||
|
||
resourceVersion = "" | ||
lastKnownRV := r.rewatchResourceVersion() | ||
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc) | ||
// TODO(#115478): large "list", slow clients, slow network, p&f | ||
// might slow down streaming and eventually fail. | ||
// maybe in such a case we should retry with an increased timeout? | ||
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) | ||
options := metav1.ListOptions{ | ||
ResourceVersion: lastKnownRV, | ||
AllowWatchBookmarks: true, | ||
SendInitialEvents: pointer.Bool(true), | ||
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, | ||
TimeoutSeconds: &timeoutSeconds, | ||
} | ||
start := r.clock.Now() | ||
|
||
w, err = r.listerWatcher.Watch(options) | ||
if err != nil { | ||
if isErrorRetriableWithSideEffectsFn(err) { | ||
continue | ||
} | ||
return nil, err | ||
} | ||
bookmarkReceived := pointer.Bool(false) | ||
err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, | ||
func(rv string) { resourceVersion = rv }, | ||
bookmarkReceived, | ||
r.clock, make(chan error), stopCh) | ||
if err != nil { | ||
w.Stop() // stop and retry with clean state | ||
if err == errorStopRequested { | ||
return nil, nil | ||
} | ||
if isErrorRetriableWithSideEffectsFn(err) { | ||
continue | ||
} | ||
return nil, err | ||
} | ||
if *bookmarkReceived { | ||
break | ||
} | ||
} | ||
wojtek-t marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// We successfully got initial state from watch-list confirmed by the | ||
// "k8s.io/initial-events-end" bookmark. | ||
initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())}) | ||
r.setIsLastSyncResourceVersionUnavailable(false) | ||
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil { | ||
return nil, fmt.Errorf("unable to sync watch-list result: %v", err) | ||
} | ||
initTrace.Step("SyncWith done") | ||
r.setLastSyncResourceVersion(resourceVersion) | ||
|
||
return w, nil | ||
} | ||
|
||
// syncWith replaces the store's items with the given list. | ||
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { | ||
found := make([]interface{}, 0, len(items)) | ||
|
@@ -546,15 +687,17 @@ func watchHandler(start time.Time, | |
name string, | ||
expectedTypeName string, | ||
setLastSyncResourceVersion func(string), | ||
exitOnInitialEventsEndBookmark *bool, | ||
clock clock.Clock, | ||
errc chan error, | ||
stopCh <-chan struct{}, | ||
) error { | ||
eventCount := 0 | ||
|
||
// Stopping the watcher should be idempotent and if we return from this function there's no way | ||
// we're coming back in with the same watch interface. | ||
defer w.Stop() | ||
if exitOnInitialEventsEndBookmark != nil { | ||
// set it to false just in case somebody | ||
// made it positive | ||
*exitOnInitialEventsEndBookmark = false | ||
} | ||
|
||
wojtek-t marked this conversation as resolved.
Show resolved
Hide resolved
|
||
loop: | ||
for { | ||
|
@@ -609,6 +752,11 @@ loop: | |
} | ||
case watch.Bookmark: | ||
// A `Bookmark` means watch has synced here, just update the resourceVersion | ||
if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok { | ||
if exitOnInitialEventsEndBookmark != nil { | ||
*exitOnInitialEventsEndBookmark = true | ||
} | ||
} | ||
default: | ||
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event)) | ||
} | ||
|
@@ -617,6 +765,11 @@ loop: | |
rvu.UpdateResourceVersion(resourceVersion) | ||
} | ||
eventCount++ | ||
if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark { | ||
watchDuration := clock.Since(start) | ||
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration) | ||
return nil | ||
} | ||
} | ||
} | ||
|
||
|
@@ -665,6 +818,18 @@ func (r *Reflector) relistResourceVersion() string { | |
return r.lastSyncResourceVersion | ||
} | ||
|
||
// rewatchResourceVersion determines the resource version the reflector should start streaming from. | ||
func (r *Reflector) rewatchResourceVersion() string { | ||
r.lastSyncResourceVersionMutex.RLock() | ||
defer r.lastSyncResourceVersionMutex.RUnlock() | ||
if r.isLastSyncResourceVersionUnavailable { | ||
// initial stream should return data at the most recent resource version. | ||
// the returned data must be consistent i.e. as if served from etcd via a quorum read | ||
return "" | ||
} | ||
return r.lastSyncResourceVersion | ||
} | ||
|
||
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned | ||
// "expired" or "too large resource version" error. | ||
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deads2k do you have a preference on how to allow clients to enable/disable the streaming feature?
I'd like to avoid updating code generation at least for Alpha.