Skip to content

Commit

Permalink
Use shared informers in origin namespace finalizer controller
Browse files Browse the repository at this point in the history
  • Loading branch information
smarterclayton committed May 25, 2017
1 parent d65e605 commit 5f78700
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 83 deletions.
7 changes: 2 additions & 5 deletions pkg/cmd/server/origin/run_components.go
Expand Up @@ -81,11 +81,8 @@ func (c *MasterConfig) RunProjectAuthorizationCache() {
// RunOriginNamespaceController starts the controller that takes part in namespace termination of openshift content
func (c *MasterConfig) RunOriginNamespaceController() {
kclient := c.OriginNamespaceControllerClient()
factory := projectcontroller.NamespaceControllerFactory{
KubeClient: kclient,
}
controller := factory.Create()
controller.Run()
controller := projectcontroller.NewProjectFinalizerController(c.Informers.InternalKubernetesInformers().Core().InternalVersion().Namespaces(), kclient)
go controller.Run(utilwait.NeverStop, 5)
}

// RunServiceAccountsController starts the service account controller
Expand Down
59 changes: 0 additions & 59 deletions pkg/project/controller/factory.go

This file was deleted.

131 changes: 118 additions & 13 deletions pkg/project/controller/project_controller.go
@@ -1,21 +1,130 @@
package controller

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
kapi "k8s.io/kubernetes/pkg/api"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"

"github.com/golang/glog"
projectutil "github.com/openshift/origin/pkg/project/util"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
)

// NamespaceController is responsible for participating in Kubernetes Namespace termination
// Use the NamespaceControllerFactory to create this controller.
type NamespaceController struct {
// KubeClient is a Kubernetes client.
KubeClient internalclientset.Interface
// ProjectFinalizerController is responsible for participating in Kubernetes Namespace termination
type ProjectFinalizerController struct {
client internalclientset.Interface

queue workqueue.RateLimitingInterface
maxRetries int

controller cache.Controller
cache cache.Store

// extracted for testing
syncHandler func(key string) error
}

func NewProjectFinalizerController(namespaces informers.NamespaceInformer, client internalclientset.Interface) *ProjectFinalizerController {
c := &ProjectFinalizerController{
client: client,
controller: namespaces.Informer().GetController(),
cache: namespaces.Informer().GetStore(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
maxRetries: 10,
}
namespaces.Informer().AddEventHandlerWithResyncPeriod(
// TODO: generalize naiveResourceEventHandler and use it here
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueNamespace(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.enqueueNamespace(newObj)
},
},
10*time.Minute,
)

c.syncHandler = c.syncNamespace
return c
}

// Run starts the workers for this controller.
func (c *ProjectFinalizerController) Run(stopCh <-chan struct{}, workers int) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()

// Wait for the stores to fill
if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {
return
}

glog.V(5).Infof("Starting workers")
for i := 0; i < workers; i++ {
go c.worker()
}
<-stopCh
glog.V(1).Infof("Shutting down")
}

func (c *ProjectFinalizerController) enqueueNamespace(obj interface{}) {
ns, ok := obj.(*kapi.Namespace)
if !ok {
return
}
c.queue.Add(ns.Name)
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *ProjectFinalizerController) worker() {
for {
if !c.work() {
return
}
}
}

// work returns true if the worker thread should continue
func (c *ProjectFinalizerController) work() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

if err := c.syncHandler(key.(string)); err == nil {
// this means the request was successfully handled. We should "forget" the item so that any retry
// later on is reset
c.queue.Forget(key)
} else {
// if we had an error it means that we didn't handle it, which means that we want to requeue the work
runtime.HandleError(fmt.Errorf("error syncing namespace, it will be retried: %v", err))
c.queue.AddRateLimited(key)
}
return true
}

// syncNamespace will sync the namespace with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (c *ProjectFinalizerController) syncNamespace(key string) error {
item, exists, err := c.cache.GetByKey(key)
if err != nil {
return err
}
if !exists {
return nil
}
return c.finalize(item.(*kapi.Namespace))
}

// Handle processes a namespace and deletes content in origin if its terminating
func (c *NamespaceController) Handle(namespace *kapi.Namespace) (err error) {
// finalize processes a namespace and deletes content in origin if its terminating
func (c *ProjectFinalizerController) finalize(namespace *kapi.Namespace) error {
// if namespace is not terminating, ignore it
if namespace.Status.Phase != kapi.NamespaceTerminating {
return nil
Expand All @@ -27,10 +136,6 @@ func (c *NamespaceController) Handle(namespace *kapi.Namespace) (err error) {
}

// we have removed content, so mark it finalized by us
_, err = projectutil.Finalize(c.KubeClient, namespace)
if err != nil {
return err
}

return nil
_, err := projectutil.Finalize(c.client, namespace)
return err
}
12 changes: 6 additions & 6 deletions pkg/project/controller/project_controller_test.go
Expand Up @@ -14,8 +14,8 @@ import (

func TestSyncNamespaceThatIsTerminating(t *testing.T) {
mockKubeClient := &fake.Clientset{}
nm := NamespaceController{
KubeClient: mockKubeClient,
nm := &ProjectFinalizerController{
client: mockKubeClient,
}
now := metav1.Now()
testNamespace := &kapi.Namespace{
Expand All @@ -31,7 +31,7 @@ func TestSyncNamespaceThatIsTerminating(t *testing.T) {
Phase: kapi.NamespaceTerminating,
},
}
err := nm.Handle(testNamespace)
err := nm.finalize(testNamespace)
if err != nil {
t.Errorf("Unexpected error when handling namespace %v", err)
}
Expand All @@ -52,8 +52,8 @@ func TestSyncNamespaceThatIsTerminating(t *testing.T) {

func TestSyncNamespaceThatIsActive(t *testing.T) {
mockKubeClient := &fake.Clientset{}
nm := NamespaceController{
KubeClient: mockKubeClient,
nm := &ProjectFinalizerController{
client: mockKubeClient,
}
testNamespace := &kapi.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -67,7 +67,7 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
Phase: kapi.NamespaceActive,
},
}
err := nm.Handle(testNamespace)
err := nm.finalize(testNamespace)
if err != nil {
t.Errorf("Unexpected error when handling namespace %v", err)
}
Expand Down

0 comments on commit 5f78700

Please sign in to comment.