Skip to content

Commit

Permalink
Use a filtered shared informer. (#28)
Browse files Browse the repository at this point in the history
Even if the logic is based on the current node, the current shared informer implementation
caches all the pods of the cluster.
Here we filter out all the pods not belonging to this cluster.

Signed-off-by: Federico Paolinelli <fpaoline@redhat.com>
  • Loading branch information
fedepaol committed Sep 30, 2020
1 parent e9f624b commit c5b56ed
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 31 deletions.
32 changes: 28 additions & 4 deletions main.go
@@ -1,11 +1,17 @@
package main

import (
"context"
"flag"
"fmt"
"time"

kubeinformers "k8s.io/client-go/informers"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"

Expand Down Expand Up @@ -52,10 +58,28 @@ func main() {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
fieldSelector := fmt.Sprintf("spec.nodeName=%s", config.currentNode)
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(context.Background(),
metav1.ListOptions{
FieldSelector: fieldSelector,
})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return kubeClient.CoreV1().Pods(metav1.NamespaceAll).Watch(context.Background(), metav1.ListOptions{
FieldSelector: fieldSelector,
})
},
},
&v1.Pod{},
time.Second*30,
cache.Indexers{},
)

ctrl := controller.New(kubeClient, kubeInformerFactory.Core().V1().Pods(), config.currentNode)
kubeInformerFactory.Start(stopCh)
ctrl := controller.New(kubeClient, informer, config.currentNode)
go informer.Run(stopCh)

podmetrics.Serve(config.metricsAddress, stopCh)

Expand Down
33 changes: 19 additions & 14 deletions pkg/controller/controller.go
Expand Up @@ -8,9 +8,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
Expand All @@ -23,29 +21,27 @@ import (
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
podsLister corelisters.PodLister
podsSynced cache.InformerSynced

workqueue workqueue.RateLimitingInterface
indexer cache.Indexer
workqueue workqueue.RateLimitingInterface
}

// New returns a new controller listening to pods.
func New(
kubeclientset kubernetes.Interface,
podsInformer coreinformers.PodInformer,
informer cache.SharedIndexInformer,
currentNode string) *Controller {

controller := &Controller{
kubeclientset: kubeclientset,

podsLister: podsInformer.Lister(),
podsSynced: podsInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"),
indexer: informer.GetIndexer(),
podsSynced: informer.HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"),
}

klog.Info("Setting up event handlers")

podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
_, ok := pod.Annotations[podnetwork.Status]
Expand Down Expand Up @@ -172,18 +168,27 @@ func (c *Controller) podHandler(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

obj, exists, err := c.indexer.GetByKey(key)
// Get the Pod resource with this namespace/name
pod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
podmetrics.DeleteAllForPod(name, namespace)
return nil
}

return err
}

if !exists {
podmetrics.DeleteAllForPod(name, namespace)
return nil
}

pod, ok := obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("invalid object for key: %s", key))
return nil
}

klog.Infof("Received pod '%s'", pod.Name)
networks, err := podnetwork.Get(pod)
if err != nil {
Expand Down
39 changes: 26 additions & 13 deletions pkg/controller/controller_test.go
@@ -1,6 +1,7 @@
package controller

import (
"context"
"strings"
"testing"
"time"
Expand All @@ -10,7 +11,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

"github.com/openshift/network-metrics-daemon/pkg/podmetrics"
Expand Down Expand Up @@ -58,31 +59,43 @@ func newPod(name, namespace string, networkAnnotation string) *v1.Pod {
}
}

func (f *fixture) newController() (*Controller, kubeinformers.SharedInformerFactory) {
func (f *fixture) newController() (*Controller, cache.SharedInformer) {
f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)

k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc())
c := New(f.kubeclient, k8sI.Core().V1().Pods(), "NodeName")
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return f.kubeclient.CoreV1().Pods(metav1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.kubeclient.CoreV1().Pods(metav1.NamespaceAll).Watch(context.Background(), metav1.ListOptions{})
},
},
&v1.Pod{},
0, //Skip resync
cache.Indexers{},
)
c := New(f.kubeclient, informer, "NodeName")

c.podsSynced = alwaysReady

for _, p := range f.podsLister {
k8sI.Core().V1().Pods().Informer().GetIndexer().Add(p)
informer.GetIndexer().Add(p)
}

return c, k8sI
return c, informer
}

type testBody func(c *Controller, k8si kubeinformers.SharedInformerFactory)
type testBody func(c *Controller, informer cache.SharedInformer)

func (f *fixture) run(t testBody) {
c, k8sI := f.newController()
c, informer := f.newController()

stopCh := make(chan struct{})
defer close(stopCh)
k8sI.Start(stopCh)
go informer.Run(stopCh)

t(c, k8sI)
t(c, informer)
}

func TestPublishesMetric(t *testing.T) {
Expand All @@ -103,7 +116,7 @@ func TestPublishesMetric(t *testing.T) {
pod_network_name_info{interface="eth0",namespace="namespace",network_name="kindnet",pod="podname"} 0
`

f.run(func(c *Controller, k8si kubeinformers.SharedInformerFactory) {
f.run(func(c *Controller, informer cache.SharedInformer) {
c.podHandler(getKey(pod, t))
})

Expand Down Expand Up @@ -131,12 +144,12 @@ func TestDeletesMetric(t *testing.T) {
f.expectedMetrics = `
`

f.run(func(c *Controller, k8si kubeinformers.SharedInformerFactory) {
f.run(func(c *Controller, informer cache.SharedInformer) {
// send pod, then make it disappear simulating a delete
c.podHandler(getKey(pod, t))
f.podsLister = []*v1.Pod{}
f.kubeobjects = []runtime.Object{}
indxr := k8si.Core().V1().Pods().Informer().GetIndexer()
indxr := informer.GetStore()
for _, p := range indxr.List() {
indxr.Delete(p)
}
Expand Down

0 comments on commit c5b56ed

Please sign in to comment.