Skip to content

Commit

Permalink
Merge pull request #73 from ibuildthecloud/master
Browse files Browse the repository at this point in the history
Add Summary informer
  • Loading branch information
ibuildthecloud committed Mar 21, 2020
2 parents 371ff25 + c48dca6 commit 54b5e5f
Show file tree
Hide file tree
Showing 9 changed files with 686 additions and 1 deletion.
22 changes: 22 additions & 0 deletions pkg/summary/client/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package client

import (
"github.com/rancher/wrangler/pkg/summary"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
)

type Interface interface {
Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
}

type ResourceInterface interface {
List(opts metav1.ListOptions) (*summary.SummarizedObjectList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
}

type NamespaceableResourceInterface interface {
Namespace(string) ResourceInterface
ResourceInterface
}
109 changes: 109 additions & 0 deletions pkg/summary/client/simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package client

import (
"github.com/rancher/wrangler/pkg/summary"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
)

type summaryClient struct {
client dynamic.Interface
}

var _ Interface = &summaryClient{}

func NewForDynamicClient(client dynamic.Interface) Interface {
return &summaryClient{client: client}
}

type summaryResourceClient struct {
client dynamic.Interface
namespace string
resource schema.GroupVersionResource
}

func (c *summaryClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
return &summaryResourceClient{client: c.client, resource: resource}
}

func (c *summaryResourceClient) Namespace(ns string) ResourceInterface {
ret := *c
ret.namespace = ns
return &ret
}

func (c *summaryResourceClient) List(opts metav1.ListOptions) (*summary.SummarizedObjectList, error) {
var (
u *unstructured.UnstructuredList
err error
)

if c.namespace == "" {
u, err = c.client.Resource(c.resource).List(opts)
} else {
u, err = c.client.Resource(c.resource).Namespace(c.namespace).List(opts)
}
if err != nil {
return nil, err
}

list := &summary.SummarizedObjectList{
TypeMeta: metav1.TypeMeta{
Kind: u.GetKind(),
APIVersion: u.GetAPIVersion(),
},
ListMeta: metav1.ListMeta{
ResourceVersion: u.GetResourceVersion(),
Continue: u.GetContinue(),
RemainingItemCount: u.GetRemainingItemCount(),
},
}

for _, obj := range u.Items {
list.Items = append(list.Items, *summary.Summarized(&obj))
}

return list, nil
}

func (c *summaryResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
var (
resp watch.Interface
err error
)

eventChan := make(chan watch.Event)

if c.namespace == "" {
resp, err = c.client.Resource(c.resource).Watch(opts)
} else {
resp, err = c.client.Resource(c.resource).Namespace(c.namespace).Watch(opts)
}
if err != nil {
return nil, err
}

go func() {
for event := range resp.ResultChan() {
event.Object = summary.Summarized(event.Object)
eventChan <- event
}
}()

return &watcher{
Interface: resp,
eventChan: eventChan,
}, nil
}

type watcher struct {
watch.Interface
eventChan chan watch.Event
}

func (w watcher) ResultChan() <-chan watch.Event {
return w.eventChan
}
157 changes: 157 additions & 0 deletions pkg/summary/informer/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informer

import (
"sync"
"time"

"github.com/rancher/wrangler/pkg/summary"
"github.com/rancher/wrangler/pkg/summary/client"
"github.com/rancher/wrangler/pkg/summary/lister"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

// NewSummarySharedInformerFactory constructs a new instance of summarySharedInformerFactory for all namespaces.
func NewSummarySharedInformerFactory(client client.Interface, defaultResync time.Duration) SummarySharedInformerFactory {
return NewFilteredSummarySharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
}

// NewFilteredSummarySharedInformerFactory constructs a new instance of summarySharedInformerFactory.
// Listers obtained via this factory will be subject to the same filters as specified here.
func NewFilteredSummarySharedInformerFactory(client client.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) SummarySharedInformerFactory {
return &summarySharedInformerFactory{
client: client,
defaultResync: defaultResync,
namespace: namespace,
informers: map[schema.GroupVersionResource]informers.GenericInformer{},
startedInformers: make(map[schema.GroupVersionResource]bool),
tweakListOptions: tweakListOptions,
}
}

type summarySharedInformerFactory struct {
client client.Interface
defaultResync time.Duration
namespace string

lock sync.Mutex
informers map[schema.GroupVersionResource]informers.GenericInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[schema.GroupVersionResource]bool
tweakListOptions TweakListOptionsFunc
}

var _ SummarySharedInformerFactory = &summarySharedInformerFactory{}

func (f *summarySharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
f.lock.Lock()
defer f.lock.Unlock()

key := gvr
informer, exists := f.informers[key]
if exists {
return informer
}

informer = NewFilteredSummaryInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
f.informers[key] = informer

return informer
}

// Start initializes all requested informers.
func (f *summarySharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Informer().Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

// WaitForCacheSync waits for all started informers' cache were synced.
func (f *summarySharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer.Informer()
}
}
return informers
}()

res := map[schema.GroupVersionResource]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}

// NewFilteredSummaryInformer constructs a new informer for a summary type.
func NewFilteredSummaryInformer(client client.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &summaryInformer{
gvr: gvr,
informer: cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.Resource(gvr).Namespace(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.Resource(gvr).Namespace(namespace).Watch(options)
},
},
&summary.SummarizedObject{},
resyncPeriod,
indexers,
),
}
}

type summaryInformer struct {
informer cache.SharedIndexInformer
gvr schema.GroupVersionResource
}

var _ informers.GenericInformer = &summaryInformer{}

func (d *summaryInformer) Informer() cache.SharedIndexInformer {
return d.informer
}

func (d *summaryInformer) Lister() cache.GenericLister {
return lister.NewRuntimeObjectShim(lister.New(d.informer.GetIndexer(), d.gvr))
}
34 changes: 34 additions & 0 deletions pkg/summary/informer/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informer

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
)

// SummarySharedInformerFactory provides access to a shared informer and lister for dynamic client
type SummarySharedInformerFactory interface {
Start(stopCh <-chan struct{})
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
}

// TweakListOptionsFunc defines the signature of a helper function
// that wants to provide more listing options to API
type TweakListOptionsFunc func(*metav1.ListOptions)
40 changes: 40 additions & 0 deletions pkg/summary/lister/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lister

import (
"github.com/rancher/wrangler/pkg/summary"
"k8s.io/apimachinery/pkg/labels"
)

// Lister helps list resources.
type Lister interface {
// List lists all resources in the indexer.
List(selector labels.Selector) (ret []*summary.SummarizedObject, err error)
// Get retrieves a resource from the indexer with the given name
Get(name string) (*summary.SummarizedObject, error)
// Namespace returns an object that can list and get resources in a given namespace.
Namespace(namespace string) NamespaceLister
}

// NamespaceLister helps list and get resources.
type NamespaceLister interface {
// List lists all resources in the indexer for a given namespace.
List(selector labels.Selector) (ret []*summary.SummarizedObject, err error)
// Get retrieves a resource from the indexer for a given namespace and name.
Get(name string) (*summary.SummarizedObject, error)
}
Loading

0 comments on commit 54b5e5f

Please sign in to comment.