From 10b4ec7b47bfc5c820d16399841c96a0cb7da156 Mon Sep 17 00:00:00 2001 From: Michail Kargakis Date: Sun, 12 Feb 2017 17:42:26 +0100 Subject: [PATCH] controller: cleanup workload controllers a bit * Switches glog.Errorf to utilruntime.HandleError in DS and RC controllers * Drops a couple of unused variables in the DS, SS, and Deployment controllers * Updates some comments --- pkg/controller/daemon/daemoncontroller.go | 21 ++++++++---------- .../deployment/deployment_controller.go | 22 +++++++------------ .../replication/replication_controller.go | 20 ++++++++--------- pkg/controller/statefulset/stateful_set.go | 10 ++++----- 4 files changed, 31 insertions(+), 42 deletions(-) diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index b9f1916bf0e4..7212eca42810 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -52,9 +52,6 @@ import ( ) const ( - // Daemon sets will periodically check that their daemon pods are running as expected. - FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. - // The value of 250 is chosen b/c values that are too high can cause registry DoS issues BurstReplicas = 250 @@ -188,12 +185,12 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } ds, ok = tombstone.Obj.(*extensions.DaemonSet) if !ok { - glog.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj)) return } } @@ -249,7 +246,7 @@ func (dsc *DaemonSetsController) processNextWorkItem() bool { func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) { key, err := controller.KeyFunc(ds) if err != nil { - glog.Errorf("Couldn't get key for object %#v: %v", ds, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) return } @@ -263,7 +260,7 @@ func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.Daemon ds, ok := obj.(*extensions.DaemonSet) if !ok { // This should not happen - glog.Errorf("lookup cache does not retuen a ReplicationController object") + utilruntime.HandleError(fmt.Errorf("lookup cache does not return a DaemonSet object")) return nil } if dsc.isCacheValid(pod, ds) { @@ -279,7 +276,7 @@ func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.Daemon // More than two items in this list indicates user error. If two daemon // sets overlap, sort by creation timestamp, subsort by name, then pick // the first. - glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels) + utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)) sort.Sort(byCreationTimestamp(sets)) } @@ -324,7 +321,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) { if ds := dsc.getPodDaemonSet(pod); ds != nil { dsKey, err := controller.KeyFunc(ds) if err != nil { - glog.Errorf("Couldn't get key for object %#v: %v", ds, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) return } dsc.expectations.CreationObserved(dsKey) @@ -369,12 +366,12 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %#v", obj) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) return } } @@ -382,7 +379,7 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) { if ds := dsc.getPodDaemonSet(pod); ds != nil { dsKey, err := controller.KeyFunc(ds) if err != nil { - glog.Errorf("Couldn't get key for object %#v: %v", ds, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err)) return } dsc.expectations.DeletionObserved(dsKey) diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 182fcc9304e2..54d6ece1d24c 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -54,15 +54,8 @@ import ( ) const ( - // FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas - // of all deployments. - // This recomputation happens based on contents in the local caches. - FullDeploymentResyncPeriod = 30 * time.Second - // We must avoid creating new replica set / counting pods until the replica set / pods store has synced. - // If it hasn't synced, to avoid a hot loop, we'll wait this long between checks. - StoreSyncedPollPeriod = 100 * time.Millisecond - // MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue. - MaxRetries = 5 + // maxRetries is the number of times a deployment will be retried before it is dropped out of the queue. + maxRetries = 5 ) func getDeploymentKind() schema.GroupVersionKind { @@ -72,6 +65,7 @@ func getDeploymentKind() schema.GroupVersionKind { // DeploymentController is responsible for synchronizing Deployment objects stored // in the system with actual running replica sets and pods. type DeploymentController struct { + // rsControl is used for adopting/releasing replica sets. rsControl controller.RSControlInterface client clientset.Interface eventRecorder record.EventRecorder @@ -310,12 +304,12 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } rs, ok = tombstone.Obj.(*extensions.ReplicaSet) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v", obj)) return } } @@ -336,12 +330,12 @@ func (dc *DeploymentController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates pod", obj, FullDeploymentResyncPeriod)) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v, could take up to %v before a deployment recreates/updates pods", obj, FullDeploymentResyncPeriod)) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) return } } @@ -438,7 +432,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) { return } - if dc.queue.NumRequeues(key) < MaxRetries { + if dc.queue.NumRequeues(key) < maxRetries { glog.V(2).Infof("Error syncing deployment %v: %v", key, err) dc.queue.AddRateLimited(key) return diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index a0c96c1d127b..5cd1587c851c 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -182,7 +182,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr controller, ok := obj.(*v1.ReplicationController) if !ok { // This should not happen - glog.Errorf("lookup cache does not return a ReplicationController object") + utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicationController object")) return nil } if cached && rm.isCacheValid(pod, controller) { @@ -205,7 +205,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr // More than two items in this list indicates user error. If two replication-controller // overlap, sort by creation timestamp, subsort by name, then pick // the first. - glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels) + utilruntime.HandleError(fmt.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)) sort.Sort(OverlappingControllers(controllers)) } @@ -291,7 +291,7 @@ func (rm *ReplicationManager) addPod(obj interface{}) { } rcKey, err := controller.KeyFunc(rc) if err != nil { - glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)) return } @@ -371,12 +371,12 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %#v", obj) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) return } } @@ -384,7 +384,7 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { if rc := rm.getPodController(pod); rc != nil { rcKey, err := controller.KeyFunc(rc) if err != nil { - glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)) return } rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod)) @@ -396,7 +396,7 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { func (rm *ReplicationManager) enqueueController(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } @@ -413,7 +413,7 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) { func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } @@ -616,7 +616,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { // anymore but has the stale controller ref. pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything()) if err != nil { - glog.Errorf("Error getting pods for rc %q: %v", key, err) + utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err)) rm.queue.Add(key) return err } @@ -657,7 +657,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { } else { pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) if err != nil { - glog.Errorf("Error getting pods for rc %q: %v", key, err) + utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err)) rm.queue.Add(key) return err } diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 4726aa7cc199..5041529c8fd9 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -45,8 +45,6 @@ import ( ) const ( - // Time to sleep before polling to see if the pod cache has synced. - PodStoreSyncedPollPeriod = 100 * time.Millisecond // period to relist statefulsets and verify pets statefulSetResyncPeriod = 30 * time.Second ) @@ -55,18 +53,18 @@ const ( type StatefulSetController struct { // client interface kubeClient clientset.Interface - // newSyncer returns an interface capable of syncing a single pet. + // control returns an interface capable of syncing a stateful set. // Abstracted out for testing. control StatefulSetControlInterface // podStore is a cache of watched pods. podStore listers.StoreToPodLister // podStoreSynced returns true if the pod store has synced at least once. podStoreSynced cache.InformerSynced - // A store of StatefulSets, populated by the psController. + // A store of StatefulSets, populated by setController. setStore listers.StoreToStatefulSetLister // Watches changes to all StatefulSets. setController cache.Controller - // Controllers that need to be synced. + // StatefulSets that need to be synced. queue workqueue.RateLimitingInterface } @@ -128,6 +126,7 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { defer ssc.queue.ShutDown() glog.Infof("Starting statefulset controller") if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) return } go ssc.setController.Run(stopCh) @@ -267,7 +266,6 @@ func (ssc *StatefulSetController) processNextWorkItem() bool { // worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed func (ssc *StatefulSetController) worker() { for ssc.processNextWorkItem() { - } }