From 360cb58aec1f4b16bc3f0be0825e023d72eecaaa Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 17 Mar 2017 11:47:41 +0100 Subject: [PATCH] adapt: pkg/deploy informers --- .../controller/deployment/controller.go | 21 +++--- .../controller/deployment/controller_test.go | 29 +++++--- pkg/deploy/controller/deployment/factory.go | 72 ++++++++++--------- .../controller/deploymentconfig/controller.go | 28 ++++---- .../deploymentconfig/controller_test.go | 39 +++------- .../controller/deploymentconfig/factory.go | 60 ++++++++-------- .../controller/generictrigger/controller.go | 4 +- .../generictrigger/controller_test.go | 1 - .../controller/generictrigger/factory.go | 6 +- 9 files changed, 129 insertions(+), 131 deletions(-) diff --git a/pkg/deploy/controller/deployment/controller.go b/pkg/deploy/controller/deployment/controller.go index 5507c71290c0..1740e5355ebd 100755 --- a/pkg/deploy/controller/deployment/controller.go +++ b/pkg/deploy/controller/deployment/controller.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/util/workqueue" kapi "k8s.io/kubernetes/pkg/api" kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" deployapi "github.com/openshift/origin/pkg/deploy/api" deployutil "github.com/openshift/origin/pkg/deploy/util" @@ -65,14 +66,14 @@ type DeploymentController struct { // queue contains replication controllers that need to be synced. queue workqueue.RateLimitingInterface - // rcStore is a store of replication controllers. - rcStore cache.StoreToReplicationControllerLister - // podStore is a store of pods. - podStore cache.StoreToPodLister - // rcStoreSynced makes sure the rc store is synced before reconcling any deployment. - rcStoreSynced func() bool - // podStoreSynced makes sure the pod store is synced before reconcling any deployment. - podStoreSynced func() bool + // rcLister can list/get replication controllers from a shared informer's cache + rcLister kcorelisters.ReplicationControllerLister + // rcListerSynced makes sure the rc store is synced before reconcling any deployment. + rcListerSynced cache.InformerSynced + // podLister can list/get pods from a shared informer's cache + podLister kcorelisters.PodLister + // podListerSynced makes sure the pod store is synced before reconcling any deployment. + podListerSynced cache.InformerSynced // deployerImage specifies which Docker image can support the default strategies. deployerImage string @@ -102,7 +103,7 @@ func (c *DeploymentController) handle(deployment *kapi.ReplicationController, wi nextStatus := currentStatus deployerPodName := deployutil.DeployerPodNameForDeployment(deployment.Name) - deployer, deployerErr := c.podStore.Pods(deployment.Namespace).Get(deployerPodName, metav1.GetOptions{}) + deployer, deployerErr := c.podLister.Pods(deployment.Namespace).Get(deployerPodName) if deployerErr == nil { nextStatus = c.nextStatus(deployer, deployment, updatedAnnotations) } @@ -415,7 +416,7 @@ func (c *DeploymentController) makeDeployerContainer(strategy *deployapi.Deploym func (c *DeploymentController) cleanupDeployerPods(deployment *kapi.ReplicationController) error { selector := deployutil.DeployerPodSelector(deployment.Name) - deployerList, err := c.podStore.Pods(deployment.Namespace).List(selector) + deployerList, err := c.podLister.Pods(deployment.Namespace).List(selector) if err != nil { return fmt.Errorf("couldn't fetch deployer pods for %q: %v", deployutil.LabelForDeployment(deployment), err) } diff --git a/pkg/deploy/controller/deployment/controller_test.go b/pkg/deploy/controller/deployment/controller_test.go index 655b0d8f6564..a3f46b46152f 100644 --- a/pkg/deploy/controller/deployment/controller_test.go +++ b/pkg/deploy/controller/deployment/controller_test.go @@ -5,7 +5,6 @@ import ( "reflect" "sort" "testing" - "time" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -16,6 +15,7 @@ import ( kapi "k8s.io/kubernetes/pkg/api" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + kinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" deployapi "github.com/openshift/origin/pkg/deploy/api" _ "github.com/openshift/origin/pkg/deploy/api/install" @@ -29,26 +29,39 @@ var ( codec = kapi.Codecs.LegacyCodec(deployapiv1.SchemeGroupVersion) ) -func okDeploymentController(client kclientset.Interface, deployment *kapi.ReplicationController, hookPodNames []string, related bool, deployerStatus kapi.PodPhase) *DeploymentController { - rcInformer := cache.NewSharedIndexInformer(&cache.ListWatch{}, &kapi.ReplicationController{}, 2*time.Minute, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - podInformer := cache.NewSharedIndexInformer(&cache.ListWatch{}, &kapi.Pod{}, 2*time.Minute, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) +func alwaysReady() bool { return true } + +type deploymentController struct { + *DeploymentController + podIndexer cache.Indexer +} + +func okDeploymentController(client kclientset.Interface, deployment *kapi.ReplicationController, hookPodNames []string, related bool, deployerStatus kapi.PodPhase) *deploymentController { + informerFactory := kinformers.NewSharedInformerFactory(client, 0) + rcInformer := informerFactory.Core().InternalVersion().ReplicationControllers() + podInformer := informerFactory.Core().InternalVersion().Pods() c := NewDeploymentController(rcInformer, podInformer, client, "sa:test", "openshift/origin-deployer", env, codec) + c.podListerSynced = alwaysReady + c.rcListerSynced = alwaysReady // deployer pod if deployment != nil { pod := deployerPod(deployment, "", related) pod.Status.Phase = deployerStatus - c.podStore.Indexer.Add(pod) + podInformer.Informer().GetIndexer().Add(pod) } // hook pods for _, name := range hookPodNames { pod := deployerPod(deployment, name, related) - c.podStore.Indexer.Add(pod) + podInformer.Informer().GetIndexer().Add(pod) } - return c + return &deploymentController{ + c, + podInformer.Informer().GetIndexer(), + } } func deployerPod(deployment *kapi.ReplicationController, alternateName string, related bool) *kapi.Pod { @@ -580,7 +593,7 @@ func TestHandle_cleanupPodNoop(t *testing.T) { controller := okDeploymentController(client, deployment, nil, true, kapi.PodSucceeded) pod := deployerPod(deployment, "", true) pod.Labels[deployapi.DeployerPodForDeploymentLabel] = "unrelated" - controller.podStore.Indexer.Update(pod) + controller.podIndexer.Update(pod) if err := controller.handle(deployment, false); err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/deploy/controller/deployment/factory.go b/pkg/deploy/controller/deployment/factory.go index 8418cfae3198..0bc0739aa7d9 100644 --- a/pkg/deploy/controller/deployment/factory.go +++ b/pkg/deploy/controller/deployment/factory.go @@ -5,15 +5,18 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" kv1core "k8s.io/client-go/kubernetes/typed/core/v1" + kclientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" kapi "k8s.io/kubernetes/pkg/api" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + kcoreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion" kcontroller "k8s.io/kubernetes/pkg/controller" deployutil "github.com/openshift/origin/pkg/deploy/util" @@ -27,10 +30,18 @@ const ( ) // NewDeploymentController creates a new DeploymentController. -func NewDeploymentController(rcInformer, podInformer cache.SharedIndexInformer, kc kclientset.Interface, sa, image string, env []kapi.EnvVar, codec runtime.Codec) *DeploymentController { +func NewDeploymentController( + rcInformer kcoreinformers.ReplicationControllerInformer, + podInformer kcoreinformers.PodInformer, + kc kclientset.Interface, + sa, + image string, + env []kapi.EnvVar, + codec runtime.Codec, +) *DeploymentController { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) - recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deployer-controller"}) + eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: kv1core.New(kc.Core().RESTClient()).Events("")}) + recorder := eventBroadcaster.NewRecorder(kapi.Scheme, kclientv1.EventSource{Component: "deployer-controller"}) c := &DeploymentController{ rn: kc.Core(), @@ -38,6 +49,11 @@ func NewDeploymentController(rcInformer, podInformer cache.SharedIndexInformer, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + rcLister: rcInformer.Lister(), + rcListerSynced: rcInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podListerSynced: podInformer.Informer().HasSynced, + serviceAccount: sa, deployerImage: image, environment: env, @@ -45,57 +61,40 @@ func NewDeploymentController(rcInformer, podInformer cache.SharedIndexInformer, codec: codec, } - c.rcStore.Indexer = rcInformer.GetIndexer() - rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addReplicationController, UpdateFunc: c.updateReplicationController, }) - c.podStore.Indexer = podInformer.GetIndexer() - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: c.updatePod, DeleteFunc: c.deletePod, }) - c.rcStoreSynced = rcInformer.HasSynced - c.podStoreSynced = podInformer.HasSynced - return c } // Run begins watching and syncing. func (c *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + glog.Infof("Starting deployer controller") // Wait for the dc store to sync before starting any work in this controller. - ready := make(chan struct{}) - go c.waitForSyncedStores(ready, stopCh) - select { - case <-ready: - case <-stopCh: + if !cache.WaitForCacheSync(stopCh, c.rcListerSynced, c.podListerSynced) { return } + glog.Infof("Deployer controller caches are synced. Starting workers.") + for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } - <-stopCh - glog.Infof("Shutting down deployer controller") - c.queue.ShutDown() -} -func (c *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() + <-stopCh - for !c.rcStoreSynced() || !c.podStoreSynced() { - glog.V(4).Infof("Waiting for the rc and pod caches to sync before starting the deployer controller workers") - select { - case <-time.After(storeSyncedPollPeriod): - case <-stopCh: - return - } - } - close(ready) + glog.Infof("Shutting down deployer controller") } func (c *DeploymentController) addReplicationController(obj interface{}) { @@ -211,15 +210,20 @@ func (c *DeploymentController) work() bool { } func (c *DeploymentController) getByKey(key string) (*kapi.ReplicationController, error) { - obj, exists, err := c.rcStore.Indexer.GetByKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - c.queue.AddRateLimited(key) return nil, err } - if !exists { + rc, err := c.rcLister.ReplicationControllers(namespace).Get(name) + if errors.IsNotFound(err) { glog.V(4).Infof("Replication controller %q has been deleted", key) return nil, nil } + if err != nil { + glog.Infof("Unable to retrieve replication controller %q from store: %v", key, err) + c.queue.AddRateLimited(key) + return nil, err + } - return obj.(*kapi.ReplicationController), nil + return rc, nil } diff --git a/pkg/deploy/controller/deploymentconfig/controller.go b/pkg/deploy/controller/deploymentconfig/controller.go index a09d1359ae01..eef93595c24d 100644 --- a/pkg/deploy/controller/deploymentconfig/controller.go +++ b/pkg/deploy/controller/deploymentconfig/controller.go @@ -7,16 +7,15 @@ import ( "github.com/golang/glog" kapierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" kutilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" kapi "k8s.io/kubernetes/pkg/api" kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "k8s.io/kubernetes/pkg/client/retry" osclient "github.com/openshift/origin/pkg/client" @@ -63,17 +62,16 @@ type DeploymentConfigController struct { // dcStore provides a local cache for deployment configs. dcStore oscache.StoreToDeploymentConfigLister - // rcStore provides a local cache for replication controllers. - rcStore cache.StoreToReplicationControllerLister - // podStore provides a local cache for pods. - podStore cache.StoreToPodLister - // dcStoreSynced makes sure the dc store is synced before reconcling any deployment config. dcStoreSynced func() bool - // rcStoreSynced makes sure the rc store is synced before reconcling any deployment config. - rcStoreSynced func() bool - // podStoreSynced makes sure the pod store is synced before reconcling any deployment config. - podStoreSynced func() bool + // rcLister can list/get replication controllers from a shared informer's cache + rcLister kcorelisters.ReplicationControllerLister + // rcListerSynced makes sure the rc shared informer is synced before reconcling any deployment config. + rcListerSynced func() bool + // podLister can list/get pods from a shared informer's cache + podLister kcorelisters.PodLister + // podListerSynced makes sure the pod shared informer is synced before reconcling any deployment config. + podListerSynced func() bool // codec is used to build deployments from configs. codec runtime.Codec @@ -92,7 +90,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) // Find all deployments owned by the deployment config. selector := deployutil.ConfigSelector(config.Name) - existingDeployments, err := c.rcStore.ReplicationControllers(config.Namespace).List(selector) + existingDeployments, err := c.rcLister.ReplicationControllers(config.Namespace).List(selector) if err != nil { return err } @@ -124,7 +122,7 @@ func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) // Retry faster on conflicts var updatedDeployment *kapi.ReplicationController if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{}) + rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name) if kapierrors.IsNotFound(err) { return nil } @@ -241,7 +239,7 @@ func (c *DeploymentConfigController) reconcileDeployments(existingDeployments [] if newReplicaCount != oldReplicaCount { if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // refresh the replication controller version - rc, err := c.rcStore.ReplicationControllers(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{}) + rc, err := c.rcLister.ReplicationControllers(deployment.Namespace).Get(deployment.Name) if err != nil { return err } @@ -310,7 +308,7 @@ func (c *DeploymentConfigController) updateStatus(config *deployapi.DeploymentCo func (c *DeploymentConfigController) calculateStatus(config deployapi.DeploymentConfig, deployments []*kapi.ReplicationController, additional ...deployapi.DeploymentCondition) (deployapi.DeploymentConfigStatus, error) { selector := labels.Set(config.Spec.Selector).AsSelector() // TODO: Replace with using rc.status.availableReplicas that comes with the next rebase. - pods, err := c.podStore.Pods(config.Namespace).List(selector) + pods, err := c.podLister.Pods(config.Namespace).List(selector) if err != nil { return config.Status, err } diff --git a/pkg/deploy/controller/deploymentconfig/controller_test.go b/pkg/deploy/controller/deploymentconfig/controller_test.go index 8ad63767fbd5..d462222f5bf7 100644 --- a/pkg/deploy/controller/deploymentconfig/controller_test.go +++ b/pkg/deploy/controller/deploymentconfig/controller_test.go @@ -14,9 +14,9 @@ import ( "k8s.io/client-go/tools/cache" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + kinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "github.com/openshift/origin/pkg/client/testclient" - "github.com/openshift/origin/pkg/controller" deployapi "github.com/openshift/origin/pkg/deploy/api" _ "github.com/openshift/origin/pkg/deploy/api/install" deploytest "github.com/openshift/origin/pkg/deploy/api/test" @@ -24,6 +24,8 @@ import ( deployutil "github.com/openshift/origin/pkg/deploy/util" ) +func alwaysReady() bool { return true } + func TestHandleScenarios(t *testing.T) { type deployment struct { // version is the deployment version @@ -373,36 +375,17 @@ func TestHandleScenarios(t *testing.T) { 2*time.Minute, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - rcInformer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return kc.Core().ReplicationControllers(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return kc.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options) - }, - }, - &kapi.ReplicationController{}, - 2*time.Minute, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - podInformer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return kc.Core().Pods(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return kc.Core().Pods(metav1.NamespaceAll).Watch(options) - }, - }, - &kapi.Pod{}, - 2*time.Minute, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + + kubeInformerFactory := kinformers.NewSharedInformerFactory(kc, 0) + rcInformer := kubeInformerFactory.Core().InternalVersion().ReplicationControllers() + podInformer := kubeInformerFactory.Core().InternalVersion().Pods() c := NewDeploymentConfigController(dcInformer, rcInformer, podInformer, oc, kc, codec) + c.dcStoreSynced = alwaysReady + c.podListerSynced = alwaysReady + c.rcListerSynced = alwaysReady for i := range toStore { - c.rcStore.Indexer.Add(toStore[i]) + rcInformer.Informer().GetStore().Add(toStore[i]) } config := deploytest.OkDeploymentConfig(test.newVersion) diff --git a/pkg/deploy/controller/deploymentconfig/factory.go b/pkg/deploy/controller/deploymentconfig/factory.go index 7eef5899bdd9..c703da01952e 100644 --- a/pkg/deploy/controller/deploymentconfig/factory.go +++ b/pkg/deploy/controller/deploymentconfig/factory.go @@ -9,11 +9,13 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" kv1core "k8s.io/client-go/kubernetes/typed/core/v1" + kclientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" kapi "k8s.io/kubernetes/pkg/api" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + kcoreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion" kcontroller "k8s.io/kubernetes/pkg/controller" osclient "github.com/openshift/origin/pkg/client" @@ -28,10 +30,17 @@ const ( ) // NewDeploymentConfigController creates a new DeploymentConfigController. -func NewDeploymentConfigController(dcInformer, rcInformer, podInformer cache.SharedIndexInformer, oc osclient.Interface, kc kclientset.Interface, codec runtime.Codec) *DeploymentConfigController { +func NewDeploymentConfigController( + dcInformer cache.SharedIndexInformer, + rcInformer kcoreinformers.ReplicationControllerInformer, + podInformer kcoreinformers.PodInformer, + oc osclient.Interface, + kc kclientset.Interface, + codec runtime.Codec, +) *DeploymentConfigController { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) - recorder := eventBroadcaster.NewRecorder(kapi.EventSource{Component: "deploymentconfig-controller"}) + eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: kv1core.New(kc.Core().RESTClient()).Events("")}) + recorder := eventBroadcaster.NewRecorder(kapi.Scheme, kclientv1.EventSource{Component: "deploymentconfig-controller"}) c := &DeploymentConfigController{ dn: oc, @@ -39,6 +48,11 @@ func NewDeploymentConfigController(dcInformer, rcInformer, podInformer cache.Sha queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + rcLister: rcInformer.Lister(), + rcListerSynced: rcInformer.Informer().HasSynced, + podLister: podInformer.Lister(), + podListerSynced: podInformer.Informer().HasSynced, + recorder: recorder, codec: codec, } @@ -49,58 +63,42 @@ func NewDeploymentConfigController(dcInformer, rcInformer, podInformer cache.Sha UpdateFunc: c.updateDeploymentConfig, DeleteFunc: c.deleteDeploymentConfig, }) - c.rcStore.Indexer = rcInformer.GetIndexer() - rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + c.dcStoreSynced = dcInformer.HasSynced + + rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: c.updateReplicationController, DeleteFunc: c.deleteReplicationController, }) - c.podStore.Indexer = podInformer.GetIndexer() - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: c.updatePod, DeleteFunc: c.deletePod, }) - c.dcStoreSynced = dcInformer.HasSynced - c.rcStoreSynced = rcInformer.HasSynced - c.podStoreSynced = podInformer.HasSynced - return c } // Run begins watching and syncing. func (c *DeploymentConfigController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + glog.Infof("Starting deploymentconfig controller") // Wait for the rc and dc stores to sync before starting any work in this controller. - ready := make(chan struct{}) - go c.waitForSyncedStores(ready, stopCh) - select { - case <-ready: - case <-stopCh: + if !cache.WaitForCacheSync(stopCh, c.dcStoreSynced, c.rcListerSynced, c.podListerSynced) { return } + glog.Info("deploymentconfig controller caches are synced. Starting workers.") + for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } <-stopCh - glog.Infof("Shutting down deploymentconfig controller") - c.queue.ShutDown() -} - -func (c *DeploymentConfigController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) { - defer utilruntime.HandleCrash() - for !c.dcStoreSynced() || !c.rcStoreSynced() || !c.podStoreSynced() { - glog.V(4).Infof("Waiting for the dc, rc, and pod caches to sync before starting the deployment config controller workers") - select { - case <-time.After(storeSyncedPollPeriod): - case <-stopCh: - return - } - } - close(ready) + glog.Infof("Shutting down deploymentconfig controller") } func (c *DeploymentConfigController) addDeploymentConfig(obj interface{}) { diff --git a/pkg/deploy/controller/generictrigger/controller.go b/pkg/deploy/controller/generictrigger/controller.go index 6094935ba768..6d2c7649d69a 100644 --- a/pkg/deploy/controller/generictrigger/controller.go +++ b/pkg/deploy/controller/generictrigger/controller.go @@ -3,8 +3,8 @@ package generictrigger import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + kcorelisterinternal "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "github.com/golang/glog" osclient "github.com/openshift/origin/pkg/client" @@ -32,7 +32,7 @@ type DeploymentTriggerController struct { // dcListerSynced makes sure the dc store is synced before reconcling any deployment config. dcListerSynced func() bool // rcLister provides a local cache for replication controllers. - rcLister cache.StoreToReplicationControllerLister + rcLister kcorelisterinternal.ReplicationControllerLister // rcListerSynced makes sure the dc store is synced before reconcling any replication controller. rcListerSynced func() bool diff --git a/pkg/deploy/controller/generictrigger/controller_test.go b/pkg/deploy/controller/generictrigger/controller_test.go index 62995499eba3..7acf37075393 100644 --- a/pkg/deploy/controller/generictrigger/controller_test.go +++ b/pkg/deploy/controller/generictrigger/controller_test.go @@ -13,7 +13,6 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "github.com/openshift/origin/pkg/client/testclient" - "github.com/openshift/origin/pkg/controller" deployapi "github.com/openshift/origin/pkg/deploy/api" _ "github.com/openshift/origin/pkg/deploy/api/install" testapi "github.com/openshift/origin/pkg/deploy/api/test" diff --git a/pkg/deploy/controller/generictrigger/factory.go b/pkg/deploy/controller/generictrigger/factory.go index f3bb3c9225f6..68b7817f4be8 100644 --- a/pkg/deploy/controller/generictrigger/factory.go +++ b/pkg/deploy/controller/generictrigger/factory.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + kcorelistersinternal "k8s.io/kubernetes/pkg/client/listers/core/internalversion" kcontroller "k8s.io/kubernetes/pkg/controller" osclient "github.com/openshift/origin/pkg/client" @@ -47,8 +48,9 @@ func NewDeploymentTriggerController(dcInformer, rcInformer, streamInformer cache }) c.dcLister.Indexer = dcInformer.GetIndexer() - c.rcLister.Indexer = rcInformer.GetIndexer() c.dcListerSynced = dcInformer.HasSynced + + c.rcLister = kcorelistersinternal.NewReplicationControllerLister(rcInformer.GetIndexer()) c.rcListerSynced = rcInformer.HasSynced return c } @@ -132,7 +134,7 @@ func (c *DeploymentTriggerController) updateDeploymentConfig(old, cur interface{ // we will try to instantiate a deployment config at the expense of duplicating some of the // work that the instantiate endpoint is already doing but I think this is fine. shouldInstantiate := true - latestRc, err := c.rcLister.ReplicationControllers(newDc.Namespace).Get(deployutil.LatestDeploymentNameForConfig(newDc), metav1.GetOptions{}) + latestRc, err := c.rcLister.ReplicationControllers(newDc.Namespace).Get(deployutil.LatestDeploymentNameForConfig(newDc)) if err != nil { // If we get an error here it may be due to the rc cache lagging behind. In such a case // just defer to the api server (instantiate REST) where we will retry this.