Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: cleanup workload controllers a bit #41300

Merged
merged 1 commit into from Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 9 additions & 12 deletions pkg/controller/daemon/daemoncontroller.go
Expand Up @@ -52,9 +52,6 @@ import (
)

const (
// Daemon sets will periodically check that their daemon pods are running as expected.
FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.

// The value of 250 is chosen b/c values that are too high can cause registry DoS issues
BurstReplicas = 250

Expand Down Expand Up @@ -188,12 +185,12 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
ds, ok = tombstone.Obj.(*extensions.DaemonSet)
if !ok {
glog.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj)
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj))
return
}
}
Expand Down Expand Up @@ -249,7 +246,7 @@ func (dsc *DaemonSetsController) processNextWorkItem() bool {
func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
key, err := controller.KeyFunc(ds)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
return
}

Expand All @@ -263,7 +260,7 @@ func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.Daemon
ds, ok := obj.(*extensions.DaemonSet)
if !ok {
// This should not happen
glog.Errorf("lookup cache does not retuen a ReplicationController object")
utilruntime.HandleError(fmt.Errorf("lookup cache does not return a DaemonSet object"))
return nil
}
if dsc.isCacheValid(pod, ds) {
Expand All @@ -279,7 +276,7 @@ func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.Daemon
// More than two items in this list indicates user error. If two daemon
// sets overlap, sort by creation timestamp, subsort by name, then pick
// the first.
glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)
utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
sort.Sort(byCreationTimestamp(sets))
}

Expand Down Expand Up @@ -324,7 +321,7 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
if ds := dsc.getPodDaemonSet(pod); ds != nil {
dsKey, err := controller.KeyFunc(ds)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
return
}
dsc.expectations.CreationObserved(dsKey)
Expand Down Expand Up @@ -369,20 +366,20 @@ func (dsc *DaemonSetsController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %#v", obj)
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
return
}
}
glog.V(4).Infof("Pod %s deleted.", pod.Name)
if ds := dsc.getPodDaemonSet(pod); ds != nil {
dsKey, err := controller.KeyFunc(ds)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
return
}
dsc.expectations.DeletionObserved(dsKey)
Expand Down
22 changes: 8 additions & 14 deletions pkg/controller/deployment/deployment_controller.go
Expand Up @@ -54,15 +54,8 @@ import (
)

const (
// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
// of all deployments.
// This recomputation happens based on contents in the local caches.
FullDeploymentResyncPeriod = 30 * time.Second
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
StoreSyncedPollPeriod = 100 * time.Millisecond
// MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
MaxRetries = 5
// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
maxRetries = 5
)

func getDeploymentKind() schema.GroupVersionKind {
Expand All @@ -72,6 +65,7 @@ func getDeploymentKind() schema.GroupVersionKind {
// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running replica sets and pods.
type DeploymentController struct {
// rsControl is used for adopting/releasing replica sets.
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder
Expand Down Expand Up @@ -310,12 +304,12 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*extensions.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
Expand All @@ -336,12 +330,12 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates pod", obj, FullDeploymentResyncPeriod))
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v, could take up to %v before a deployment recreates/updates pods", obj, FullDeploymentResyncPeriod))
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
return
}
}
Expand Down Expand Up @@ -438,7 +432,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
return
}

if dc.queue.NumRequeues(key) < MaxRetries {
if dc.queue.NumRequeues(key) < maxRetries {
glog.V(2).Infof("Error syncing deployment %v: %v", key, err)
dc.queue.AddRateLimited(key)
return
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/replication/replication_controller.go
Expand Up @@ -182,7 +182,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
controller, ok := obj.(*v1.ReplicationController)
if !ok {
// This should not happen
glog.Errorf("lookup cache does not return a ReplicationController object")
utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicationController object"))
return nil
}
if cached && rm.isCacheValid(pod, controller) {
Expand All @@ -205,7 +205,7 @@ func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationContr
// More than two items in this list indicates user error. If two replication-controller
// overlap, sort by creation timestamp, subsort by name, then pick
// the first.
glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)
utilruntime.HandleError(fmt.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels))
sort.Sort(OverlappingControllers(controllers))
}

Expand Down Expand Up @@ -291,7 +291,7 @@ func (rm *ReplicationManager) addPod(obj interface{}) {
}
rcKey, err := controller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err))
return
}

Expand Down Expand Up @@ -371,20 +371,20 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %#v", obj)
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %#v", obj)
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
return
}
}
glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v, labels %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod.Labels)
if rc := rm.getPodController(pod); rc != nil {
rcKey, err := controller.KeyFunc(rc)
if err != nil {
glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err))
return
}
rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod))
Expand All @@ -396,7 +396,7 @@ func (rm *ReplicationManager) deletePod(obj interface{}) {
func (rm *ReplicationManager) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}

Expand All @@ -413,7 +413,7 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) {
func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}

Expand Down Expand Up @@ -616,7 +616,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
// anymore but has the stale controller ref.
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err))
rm.queue.Add(key)
return err
}
Expand Down Expand Up @@ -657,7 +657,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
} else {
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err))
rm.queue.Add(key)
return err
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/controller/statefulset/stateful_set.go
Expand Up @@ -45,8 +45,6 @@ import (
)

const (
// Time to sleep before polling to see if the pod cache has synced.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// period to relist statefulsets and verify pets
statefulSetResyncPeriod = 30 * time.Second
)
Expand All @@ -55,18 +53,18 @@ const (
type StatefulSetController struct {
// client interface
kubeClient clientset.Interface
// newSyncer returns an interface capable of syncing a single pet.
// control returns an interface capable of syncing a stateful set.
// Abstracted out for testing.
control StatefulSetControlInterface
// podStore is a cache of watched pods.
podStore listers.StoreToPodLister
// podStoreSynced returns true if the pod store has synced at least once.
podStoreSynced cache.InformerSynced
// A store of StatefulSets, populated by the psController.
// A store of StatefulSets, populated by setController.
setStore listers.StoreToStatefulSetLister
// Watches changes to all StatefulSets.
setController cache.Controller
// Controllers that need to be synced.
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
}

Expand Down Expand Up @@ -128,6 +126,7 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer ssc.queue.ShutDown()
glog.Infof("Starting statefulset controller")
if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
go ssc.setController.Run(stopCh)
Expand Down Expand Up @@ -267,7 +266,6 @@ func (ssc *StatefulSetController) processNextWorkItem() bool {
// worker runs a worker goroutine that invokes processNextWorkItem until the the controller's queue is closed
func (ssc *StatefulSetController) worker() {
for ssc.processNextWorkItem() {

}
}

Expand Down