From aff10e31d6a5179b4f2247886e26fed14e030a99 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Fri, 16 Dec 2022 16:53:49 -0800 Subject: [PATCH] Remove delegating map impl, and simplify InformersMap Signed-off-by: Vince Prignano --- pkg/cache/internal/deleg_map.go | 124 -------- pkg/cache/internal/informers_map.go | 432 +++++++++++++++------------- 2 files changed, 234 insertions(+), 322 deletions(-) delete mode 100644 pkg/cache/internal/deleg_map.go diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go deleted file mode 100644 index 289932a627..0000000000 --- a/pkg/cache/internal/deleg_map.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package internal - -import ( - "context" - "time" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" -) - -// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. -// It uses a standard parameter codec constructed based on the given generated Scheme. -type InformersMap struct { - // we abstract over the details of structured/unstructured/metadata with the specificInformerMaps - // TODO(directxman12): genericize this over different projections now that we have 3 different maps - // TODO(vincepri): A different structure of the specific informer map is possible, although it requires - // a large refactoring that takes into account that there may be different kind of informers, in this case - // 3 of those: structured, unstructured, and metadata. - - structured *specificInformersMap - unstructured *specificInformersMap - metadata *specificInformersMap -} - -// InformersMapOptions configures an InformerMap. -type InformersMapOptions struct { - Scheme *runtime.Scheme - Mapper meta.RESTMapper - ResyncPeriod time.Duration - Namespace string - ByGVK InformersMapOptionsByGVK -} - -// InformersMapOptionsByGVK configured additional by group version kind (or object) -// in an InformerMap. -type InformersMapOptionsByGVK struct { - Selectors SelectorsByGVK - Transformers TransformFuncByObject // TODO(vincepri): Why is this by object and not GVK? - DisableDeepCopy DisableDeepCopyByGVK -} - -// NewInformersMap creates a new InformersMap that can create informers for -// both structured and unstructured objects. -func NewInformersMap(config *rest.Config, options *InformersMapOptions) *InformersMap { - return &InformersMap{ - structured: newSpecificInformersMap(config, &specificInformerMapOptions{ - InformersMapOptions: options, - ListWatcherFunc: structuredListWatch, - }), - unstructured: newSpecificInformersMap(config, &specificInformerMapOptions{ - InformersMapOptions: options, - ListWatcherFunc: unstructuredListWatch, - }), - metadata: newSpecificInformersMap(config, &specificInformerMapOptions{ - InformersMapOptions: options, - ListWatcherFunc: metadataListWatch, - }), - } -} - -// Start calls Run on each of the informers and sets started to true. Blocks on the context. -func (m *InformersMap) Start(ctx context.Context) error { - go m.structured.Start(ctx) - go m.unstructured.Start(ctx) - go m.metadata.Start(ctx) - <-ctx.Done() - return nil -} - -// WaitForCacheSync waits until all the caches have been started and synced. -func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { - syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...) - syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...) - syncedFuncs = append(syncedFuncs, m.metadata.HasSyncedFuncs()...) - - if !m.structured.waitForStarted(ctx) { - return false - } - if !m.unstructured.waitForStarted(ctx) { - return false - } - if !m.metadata.waitForStarted(ctx) { - return false - } - return cache.WaitForCacheSync(ctx.Done(), syncedFuncs...) -} - -// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns -// the Informer from the map. -func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { - switch obj.(type) { - case *unstructured.Unstructured: - return m.unstructured.Get(ctx, gvk, obj) - case *unstructured.UnstructuredList: - return m.unstructured.Get(ctx, gvk, obj) - case *metav1.PartialObjectMetadata: - return m.metadata.Get(ctx, gvk, obj) - case *metav1.PartialObjectMetadataList: - return m.metadata.Get(ctx, gvk, obj) - default: - return m.structured.Get(ctx, gvk, obj) - } -} diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 94fa3b8719..f1d86780fa 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -26,6 +26,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -34,7 +35,6 @@ import ( "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) @@ -42,32 +42,42 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// clientListWatcherFunc knows how to create a ListWatcher. -type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) +// InformersMapOptions configures an InformerMap. +type InformersMapOptions struct { + Scheme *runtime.Scheme + Mapper meta.RESTMapper + ResyncPeriod time.Duration + Namespace string + ByGVK InformersMapOptionsByGVK +} -// specificInformerMapOptions contains options to create a new specific informer map. -type specificInformerMapOptions struct { - *InformersMapOptions - ListWatcherFunc createListWatcherFunc +// InformersMapOptionsByGVK configured additional by group version kind (or object) +// in an InformerMap. +type InformersMapOptionsByGVK struct { + Selectors SelectorsByGVK + Transformers TransformFuncByObject + DisableDeepCopy DisableDeepCopyByGVK } -// newSpecificInformersMap returns a new specificInformersMap (like -// the generical InformersMap, except that it doesn't implement WaitForCacheSync). -func newSpecificInformersMap(config *rest.Config, options *specificInformerMapOptions) *specificInformersMap { - return &specificInformersMap{ - config: config, - scheme: options.Scheme, - mapper: options.Mapper, - informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), - codecs: serializer.NewCodecFactory(options.Scheme), - paramCodec: runtime.NewParameterCodec(options.Scheme), - resync: options.ResyncPeriod, - startWait: make(chan struct{}), - createListWatcher: options.ListWatcherFunc, - namespace: options.Namespace, - selectors: options.ByGVK.Selectors.forGVK, - disableDeepCopy: options.ByGVK.DisableDeepCopy, - transformers: options.ByGVK.Transformers, +// NewInformersMap creates a new InformersMap that can create informers under the hood. +func NewInformersMap(config *rest.Config, options *InformersMapOptions) *InformersMap { + return &InformersMap{ + config: config, + scheme: options.Scheme, + mapper: options.Mapper, + informers: informers{ + Structured: make(map[schema.GroupVersionKind]*MapEntry), + Unstructured: make(map[schema.GroupVersionKind]*MapEntry), + Metadata: make(map[schema.GroupVersionKind]*MapEntry), + }, + codecs: serializer.NewCodecFactory(options.Scheme), + paramCodec: runtime.NewParameterCodec(options.Scheme), + resync: options.ResyncPeriod, + startWait: make(chan struct{}), + namespace: options.Namespace, + selectors: options.ByGVK.Selectors.forGVK, + disableDeepCopy: options.ByGVK.DisableDeepCopy, + transformers: options.ByGVK.Transformers, } } @@ -80,9 +90,15 @@ type MapEntry struct { Reader CacheReader } -// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. +type informers struct { + Structured map[schema.GroupVersionKind]*MapEntry + Unstructured map[schema.GroupVersionKind]*MapEntry + Metadata map[schema.GroupVersionKind]*MapEntry +} + +// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. // It uses a standard parameter codec constructed based on the given generated Scheme. -type specificInformersMap struct { +type InformersMap struct { // scheme maps runtime.Objects to GroupVersionKinds scheme *runtime.Scheme @@ -92,8 +108,8 @@ type specificInformersMap struct { // mapper maps GroupVersionKinds to Resources mapper meta.RESTMapper - // informersByGVK is the cache of informers keyed by groupVersionKind - informersByGVK map[schema.GroupVersionKind]*MapEntry + // informers is the cache of informers keyed by their type and groupVersionKind + informers informers // codecs is used to create a new REST client codecs serializer.CodecFactory @@ -119,11 +135,6 @@ type specificInformersMap struct { // informer has been started. startWait chan struct{} - // createClient knows how to create a client and a list object, - // and allows for abstracting over the particulars of structured vs - // unstructured objects. - createListWatcher createListWatcherFunc - // namespace is the namespace that all ListWatches are restricted to // default or empty string means all namespaces namespace string @@ -141,7 +152,7 @@ type specificInformersMap struct { // Start calls Run on each of the informers and sets started to true. Blocks on the context. // It doesn't return start because it can't return an error, and it's not a runnable directly. -func (ip *specificInformersMap) Start(ctx context.Context) { +func (ip *InformersMap) Start(ctx context.Context) error { func() { ip.mu.Lock() defer ip.mu.Unlock() @@ -150,8 +161,14 @@ func (ip *specificInformersMap) Start(ctx context.Context) { ip.stop = ctx.Done() // Start each informer - for _, informer := range ip.informersByGVK { - go informer.Informer.Run(ctx.Done()) + for _, i := range ip.informers.Structured { + go i.Informer.Run(ctx.Done()) + } + for _, i := range ip.informers.Unstructured { + go i.Informer.Run(ctx.Done()) + } + for _, i := range ip.informers.Metadata { + go i.Informer.Run(ctx.Done()) } // Set started to true so we immediately start any informers added later. @@ -159,9 +176,10 @@ func (ip *specificInformersMap) Start(ctx context.Context) { close(ip.startWait) }() <-ctx.Done() + return nil } -func (ip *specificInformersMap) waitForStarted(ctx context.Context) bool { +func (ip *InformersMap) waitForStarted(ctx context.Context) bool { select { case <-ip.startWait: return true @@ -171,27 +189,45 @@ func (ip *specificInformersMap) waitForStarted(ctx context.Context) bool { } // HasSyncedFuncs returns all the HasSynced functions for the informers in this map. -func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced { +func (ip *InformersMap) HasSyncedFuncs() []cache.InformerSynced { ip.mu.RLock() defer ip.mu.RUnlock() - syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK)) - for _, informer := range ip.informersByGVK { - syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced) + + res := make([]cache.InformerSynced, 0, + len(ip.informers.Structured)+len(ip.informers.Unstructured)+len(ip.informers.Metadata), + ) + for _, i := range ip.informers.Structured { + res = append(res, i.Informer.HasSynced) + } + for _, i := range ip.informers.Unstructured { + res = append(res, i.Informer.HasSynced) + } + for _, i := range ip.informers.Metadata { + res = append(res, i.Informer.HasSynced) } - return syncedFuncs + return res +} + +// WaitForCacheSync waits until all the caches have been started and synced. +func (ip *InformersMap) WaitForCacheSync(ctx context.Context) bool { + if !ip.waitForStarted(ctx) { + return false + } + return cache.WaitForCacheSync(ctx.Done(), ip.HasSyncedFuncs()...) +} + +func (ip *InformersMap) get(gvk schema.GroupVersionKind, obj runtime.Object) (entry *MapEntry, started bool, ok bool) { + ip.mu.RLock() + defer ip.mu.RUnlock() + i, ok := ip.informersByType(obj)[gvk] + return i, ip.started, ok } // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { +func (ip *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) { // Return the informer if it is found - i, started, ok := func() (*MapEntry, bool, bool) { - ip.mu.RLock() - defer ip.mu.RUnlock() - i, ok := ip.informersByGVK[gvk] - return i, ip.started, ok - }() - + i, started, ok := ip.get(gvk, obj) if !ok { var err error if i, started, err = ip.addInformerToMap(gvk, obj); err != nil { @@ -209,24 +245,44 @@ func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersion return started, i, nil } -func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { +func (ip *InformersMap) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*MapEntry { + switch obj.(type) { + case *unstructured.Unstructured, *unstructured.UnstructuredList: + return ip.informers.Unstructured + case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: + return ip.informers.Metadata + default: + return ip.informers.Structured + } +} + +func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) { ip.mu.Lock() defer ip.mu.Unlock() // Check the cache to see if we already have an Informer. If we do, return the Informer. // This is for the case where 2 routines tried to get the informer when it wasn't in the map // so neither returned early, but the first one created it. - if i, ok := ip.informersByGVK[gvk]; ok { + if i, ok := ip.informersByType(obj)[gvk]; ok { return i, ip.started, nil } // Create a NewSharedIndexInformer and add it to the map. - var lw *cache.ListWatch - lw, err := ip.createListWatcher(gvk, ip) + lw, err := ip.makeListWatcher(gvk, obj) if err != nil { return nil, false, err } - ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{ + ni := cache.NewSharedIndexInformer(&cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors(gvk).ApplyToList(&opts) + return lw.ListFunc(opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors(gvk).ApplyToList(&opts) + opts.Watch = true // Watch needs to be set to true separately + return lw.WatchFunc(opts) + }, + }, obj, resyncPeriod(ip.resync)(), cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) @@ -249,7 +305,7 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob disableDeepCopy: ip.disableDeepCopy.IsDisabled(gvk), }, } - ip.informersByGVK[gvk] = i + ip.informersByType(obj)[gvk] = i // Start the Informer if need by // TODO(seans): write thorough tests and document what happens here - can you add indexers? @@ -260,51 +316,11 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob return i, ip.started, nil } -// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer. -func structuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { - // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the - // groupVersionKind to the Resource API we will use. - mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - return nil, err - } - - client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) - if err != nil { - return nil, err - } - listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") - listObj, err := ip.scheme.New(listGVK) - if err != nil { - return nil, err - } - - // TODO: the functions that make use of this ListWatch should be adapted to - // pass in their own contexts instead of relying on this fixed one here. +func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) { + // TODO(vincepri): Wire the context in here and don't use TODO(). + // Can we use the context from the Get call? ctx := context.TODO() - // Create a new ListWatch for the obj - return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - ip.selectors(gvk).ApplyToList(&opts) - res := listObj.DeepCopyObject() - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot - err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) - return res, err - }, - // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selectors(gvk).ApplyToList(&opts) - // Watch needs to be set to true separately - opts.Watch = true - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot - return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx) - }, - }, nil -} -func unstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the // groupVersionKind to the Resource API we will use. mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) @@ -312,109 +328,129 @@ func unstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap return nil, err } - // If the rest configuration has a negotiated serializer passed in, - // we should remove it and use the one that the dynamic client sets for us. - cfg := rest.CopyConfig(ip.config) - cfg.NegotiatedSerializer = nil - dynamicClient, err := dynamic.NewForConfig(cfg) - if err != nil { - return nil, err - } - - // TODO: the functions that make use of this ListWatch should be adapted to - // pass in their own contexts instead of relying on this fixed one here. - ctx := context.TODO() - // Create a new ListWatch for the obj - return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - ip.selectors(gvk).ApplyToList(&opts) - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - return dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts) - } - return dynamicClient.Resource(mapping.Resource).List(ctx, opts) - }, - // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selectors(gvk).ApplyToList(&opts) - // Watch needs to be set to true separately - opts.Watch = true - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - return dynamicClient.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts) - } - return dynamicClient.Resource(mapping.Resource).Watch(ctx, opts) - }, - }, nil -} - -func metadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { - // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the - // groupVersionKind to the Resource API we will use. - mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - return nil, err + // Figure out if the GVK we're dealing with is global, or namespace scoped. + var namespace string + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + namespace = restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) } - // Always clear the negotiated serializer and use the one - // set from the metadata client. - cfg := rest.CopyConfig(ip.config) - cfg.NegotiatedSerializer = nil - - // grab the metadata client - client, err := metadata.NewForConfig(cfg) - if err != nil { - return nil, err - } - - // TODO: the functions that make use of this ListWatch should be adapted to - // pass in their own contexts instead of relying on this fixed one here. - ctx := context.TODO() - - // create the relevant listwatch - return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - ip.selectors(gvk).ApplyToList(&opts) + switch obj.(type) { + // + // Unstructured + // + case *unstructured.Unstructured, *unstructured.UnstructuredList: + // If the rest configuration has a negotiated serializer passed in, + // we should remove it and use the one that the dynamic client sets for us. + cfg := rest.CopyConfig(ip.config) + cfg.NegotiatedSerializer = nil + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, err + } + resources := dynamicClient.Resource(mapping.Resource) + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + if namespace != "" { + return resources.Namespace(namespace).List(ctx, opts) + } + return resources.List(ctx, opts) + }, + // Setup the watch function + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + if namespace != "" { + return resources.Namespace(namespace).Watch(ctx, opts) + } + return resources.Watch(ctx, opts) + }, + }, nil + // + // Metadata + // + case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: + // Always clear the negotiated serializer and use the one + // set from the metadata client. + cfg := rest.CopyConfig(ip.config) + cfg.NegotiatedSerializer = nil + + // Grab the metadata metadataClient. + metadataClient, err := metadata.NewForConfig(cfg) + if err != nil { + return nil, err + } + resources := metadataClient.Resource(mapping.Resource) + + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + var ( + list *metav1.PartialObjectMetadataList + err error + ) + if namespace != "" { + list, err = resources.Namespace(namespace).List(ctx, opts) + } else { + list, err = resources.List(ctx, opts) + } + if list != nil { + for i := range list.Items { + list.Items[i].SetGroupVersionKind(gvk) + } + } + return list, err + }, + // Setup the watch function + WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) { + defer func() { + if watcher != nil { + watcher = newGVKFixupWatcher(gvk, watcher) + } + }() + + if namespace != "" { + return resources.Namespace(namespace).Watch(ctx, opts) + } + return resources.Watch(ctx, opts) + }, + }, nil + // + // Structured. + // + default: + client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) + if err != nil { + return nil, err + } + listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") + listObj, err := ip.scheme.New(listGVK) + if err != nil { + return nil, err + } + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + // Build the request. + req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec) + if namespace != "" { + req.Namespace(namespace) + } - var ( - list *metav1.PartialObjectMetadataList - err error - ) - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts) - } else { - list, err = client.Resource(mapping.Resource).List(ctx, opts) - } - if list != nil { - for i := range list.Items { - list.Items[i].SetGroupVersionKind(gvk) + // Create the resulting object, and execute the request. + res := listObj.DeepCopyObject() + if err := req.Do(ctx).Into(res); err != nil { + return nil, err } - } - return list, err - }, - // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selectors(gvk).ApplyToList(&opts) - // Watch needs to be set to true separately - opts.Watch = true - - var ( - watcher watch.Interface - err error - ) - namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk)) - if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts) - } else { - watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts) - } - if watcher != nil { - watcher = newGVKFixupWatcher(gvk, watcher) - } - return watcher, err - }, - }, nil + return res, nil + }, + // Setup the watch function + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + // Build the request. + req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec) + if namespace != "" { + req.Namespace(namespace) + } + // Call the watch. + return req.Watch(ctx) + }, + }, nil + } } // newGVKFixupWatcher adds a wrapper that preserves the GVK information when