Skip to content

Commit

Permalink
Add controller for node-ca daemonset
Browse files Browse the repository at this point in the history
  • Loading branch information
dmage committed Mar 11, 2020
1 parent 4babf52 commit dce92a3
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 9 deletions.
7 changes: 7 additions & 0 deletions pkg/operator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
c.kubeconfig, c.clients, c.listers,
)

nodeCADaemonController := NewNodeCADaemonController(
c.clients.Apps,
kubeInformerFactory.Apps().V1().DaemonSets(),
kubeInformerFactory.Core().V1().Services(),
)

configInformerFactory.Start(stopCh)
kubeInformerFactory.Start(stopCh)
openshiftConfigKubeInformerFactory.Start(stopCh)
Expand All @@ -493,6 +499,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {

// TODO(dmage):This controller should be started from main.
go clusterOperatorStatusController.Run(stopCh)
go nodeCADaemonController.Run(stopCh)

klog.Info("waiting for informer caches to sync")
for _, informer := range informers {
Expand Down
102 changes: 102 additions & 0 deletions pkg/operator/nodecadaemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package operator

import (
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsv1informers "k8s.io/client-go/informers/apps/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
appsv1client "k8s.io/client-go/kubernetes/typed/apps/v1"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/openshift/cluster-image-registry-operator/defaults"
"github.com/openshift/cluster-image-registry-operator/pkg/resource"
)

type NodeCADaemonController struct {
appsClient appsv1client.AppsV1Interface
daemonSetLister appsv1listers.DaemonSetNamespaceLister
serviceLister corev1listers.ServiceNamespaceLister

cachesToSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
}

func NewNodeCADaemonController(
appsClient appsv1client.AppsV1Interface,
daemonSetInformer appsv1informers.DaemonSetInformer,
serviceInformer corev1informers.ServiceInformer,
) *NodeCADaemonController {
c := &NodeCADaemonController{
appsClient: appsClient,
daemonSetLister: daemonSetInformer.Lister().DaemonSets(defaults.ImageRegistryOperatorNamespace),
serviceLister: serviceInformer.Lister().Services(defaults.ImageRegistryOperatorNamespace),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NodeCADaemonController"),
}

daemonSetInformer.Informer().AddEventHandler(c.eventHandler())
serviceInformer.Informer().AddEventHandler(c.eventHandler())

c.cachesToSync = append(c.cachesToSync, daemonSetInformer.Informer().HasSynced)
c.cachesToSync = append(c.cachesToSync, serviceInformer.Informer().HasSynced)

return c
}

func (c *NodeCADaemonController) eventHandler() cache.ResourceEventHandler {
const workQueueKey = "instance"
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
}
}

func (c *NodeCADaemonController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *NodeCADaemonController) processNextWorkItem() bool {
obj, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(obj)

klog.V(1).Infof("get event from workqueue")
if err := c.sync(); err != nil {
c.queue.AddRateLimited(workqueueKey)
klog.Errorf("NodeCADaemonController: unable to sync: %s, requeuing", err)
} else {
c.queue.Forget(obj)
klog.Infof("NodeCADaemonController: event from workqueue successfully processed")
}
return true
}

func (c *NodeCADaemonController) sync() error {
gen := resource.NewGeneratorNodeCADaemonSet(c.daemonSetLister, c.serviceLister, c.appsClient, Parameters(defaults.ImageRegistryOperatorNamespace))
return resource.ApplyMutator(gen)
}

func (c *NodeCADaemonController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting NodeCADaemonController")
if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) {
return
}

go wait.Until(c.runWorker, time.Second, stopCh)

klog.Infof("Started NodeCADaemonController")
<-stopCh
klog.Infof("Shutting down NodeCADaemonController")
}
8 changes: 8 additions & 0 deletions pkg/resource/clusteroperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ func (gco *generatorClusterOperator) syncRelatedObjects(op *configapi.ClusterOpe
Name: defaults.ImageRegistryOperatorNamespace,
})

// The controller for the node-ca daemonset is always enabled
relatedObjects = append(relatedObjects, configapi.ObjectReference{
Group: "apps",
Resource: "daemonsets",
Namespace: defaults.ImageRegistryOperatorNamespace,
Name: "node-ca",
})

for _, gen := range gco.mutators {
relatedObjects = append(relatedObjects, configapi.ObjectReference{
Group: gen.GetGroup(),
Expand Down
1 change: 0 additions & 1 deletion pkg/resource/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (g *Generator) List(cr *imageregistryv1.Config) ([]Mutator, error) {
mutators = append(mutators, newGeneratorPullSecret(g.clients.Core, g.params))
mutators = append(mutators, newGeneratorSecret(g.listers.Secrets, g.clients.Core, driver, g.params))
mutators = append(mutators, newGeneratorImageConfig(g.listers.ImageConfigs, g.listers.Routes, g.listers.Services, g.clients.Config, g.params))
mutators = append(mutators, newGeneratorNodeCADaemonSet(g.listers.DaemonSets, g.listers.Services, g.clients.Apps, g.params))
mutators = append(mutators, newGeneratorService(g.listers.Services, g.clients.Core, g.params))
mutators = append(mutators, newGeneratorDeployment(g.listers.Deployments, g.listers.ConfigMaps, g.listers.Secrets, g.listers.ProxyConfigs, g.clients.Core, g.clients.Apps, driver, g.params, cr))
mutators = append(mutators, g.listRoutes(cr)...)
Expand Down
14 changes: 7 additions & 7 deletions pkg/resource/nodecadaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
appsclientv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
kcorelisters "k8s.io/client-go/listers/core/v1"
appsv1client "k8s.io/client-go/kubernetes/typed/apps/v1"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"

"github.com/openshift/library-go/pkg/operator/resource/resourceread"

Expand All @@ -19,13 +19,13 @@ import (
var _ Mutator = &generatorNodeCADaemonSet{}

type generatorNodeCADaemonSet struct {
daemonSetLister appslisters.DaemonSetNamespaceLister
serviceLister kcorelisters.ServiceNamespaceLister
client appsclientv1.AppsV1Interface
daemonSetLister appsv1listers.DaemonSetNamespaceLister
serviceLister corev1listers.ServiceNamespaceLister
client appsv1client.AppsV1Interface
params *parameters.Globals
}

func newGeneratorNodeCADaemonSet(daemonSetLister appslisters.DaemonSetNamespaceLister, serviceLister kcorelisters.ServiceNamespaceLister, client appsclientv1.AppsV1Interface, params *parameters.Globals) *generatorNodeCADaemonSet {
func NewGeneratorNodeCADaemonSet(daemonSetLister appsv1listers.DaemonSetNamespaceLister, serviceLister corev1listers.ServiceNamespaceLister, client appsv1client.AppsV1Interface, params *parameters.Globals) Mutator {
return &generatorNodeCADaemonSet{
daemonSetLister: daemonSetLister,
serviceLister: serviceLister,
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/nodecadaemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestNodeCADaemon(t *testing.T) {

clientset := kfake.NewSimpleClientset()

g := newGeneratorNodeCADaemonSet(nil, nil, clientset.AppsV1(), params)
g := NewGeneratorNodeCADaemonSet(nil, nil, clientset.AppsV1(), params)
obj, err := g.Create()
if err != nil {
t.Fatal(err)
Expand Down
49 changes: 49 additions & 0 deletions test/e2e/nodecadaemon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package e2e

import (
"testing"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

imageregistryapiv1 "github.com/openshift/api/imageregistry/v1"
operatorapiv1 "github.com/openshift/api/operator/v1"

"github.com/openshift/cluster-image-registry-operator/defaults"
"github.com/openshift/cluster-image-registry-operator/test/framework"
)

func TestNodeCADaemonAlwaysDeployed(t *testing.T) {
client := framework.MustNewClientset(t, nil)

defer framework.MustRemoveImageRegistry(t, client)

cr := &imageregistryapiv1.Config{
ObjectMeta: metav1.ObjectMeta{
Name: defaults.ImageRegistryResourceName,
},
Spec: imageregistryapiv1.ImageRegistrySpec{
ManagementState: operatorapiv1.Removed,
Replicas: 1,
},
}
framework.MustDeployImageRegistry(t, client, cr)
framework.MustEnsureImageRegistryIsAvailable(t, client)

t.Log("waiting until the node-ca daemon is deployed")
err := wait.Poll(time.Second, framework.AsyncOperationTimeout, func() (stop bool, err error) {
_, err = client.DaemonSets(defaults.ImageRegistryOperatorNamespace).Get("node-ca", metav1.GetOptions{})
if errors.IsNotFound(err) {
t.Logf("ds/node-ca has not been created yet: %s", err)
return false, nil
} else if err != nil {
return false, err
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
}

0 comments on commit dce92a3

Please sign in to comment.