Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deployment controller updates #28684

Merged
merged 1 commit into from
Jul 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 81 additions & 28 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
Expand All @@ -51,6 +52,8 @@ const (
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
StoreSyncedPollPeriod = 100 * time.Millisecond
// MaxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
MaxRetries = 5
)

// DeploymentController is responsible for synchronizing Deployment objects stored
Expand All @@ -70,19 +73,23 @@ type DeploymentController struct {
rsStore cache.StoreToReplicaSetLister
// Watches changes to all ReplicaSets
rsController *framework.Controller
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsStoreSynced func() bool
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods
podController *framework.Controller

// dStoreSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dStoreSynced func() bool
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsStoreSynced func() bool
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool

// Deployments that need to be synced
queue *workqueue.Type
queue workqueue.RateLimitingInterface
}

// NewDeploymentController creates a new DeploymentController.
Expand All @@ -98,7 +105,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.New(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

dc.dStore.Store, dc.dController = framework.NewInformer(
Expand Down Expand Up @@ -158,6 +165,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
)

dc.syncHandler = dc.syncDeployment
dc.dStoreSynced = dc.dController.HasSynced
dc.rsStoreSynced = dc.rsController.HasSynced
dc.podStoreSynced = dc.podController.HasSynced
return dc
Expand All @@ -166,17 +174,43 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

go dc.dController.Run(stopCh)
go dc.rsController.Run(stopCh)
go dc.podController.Run(stopCh)

// Wait for the rc and dc stores to sync before starting any work in this controller.
ready := make(chan struct{})
go dc.waitForSyncedStores(ready, stopCh)
select {
case <-ready:
case <-stopCh:
return
}

for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}

<-stopCh
glog.Infof("Shutting down deployment controller")
dc.queue.ShutDown()
}

func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() {
select {
case <-time.After(StoreSyncedPollPeriod):
case <-stopCh:
return
}
}

close(ready)
}

func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
d := obj.(*extensions.Deployment)
glog.V(4).Infof("Adding deployment %s", d.Name)
Expand Down Expand Up @@ -382,19 +416,40 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
work := func() bool {
key, quit := dc.queue.Get()
if quit {
return true
}
defer dc.queue.Done(key)

err := dc.syncHandler(key.(string))
dc.handleErr(err, key)

return false
}

for {
func() {
key, quit := dc.queue.Get()
if quit {
return
}
defer dc.queue.Done(key)
err := dc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing deployment %v: %v", key, err)
}
}()
if quit := work(); quit {
return
}
}
}

func (dc *DeploymentController) handleErr(err error, key interface{}) {
if err == nil {
dc.queue.Forget(key)
return
}

if dc.queue.NumRequeues(key) < MaxRetries {
glog.V(2).Infof("Error syncing deployment %v: %v", key, err)
dc.queue.AddRateLimited(key)
return
}

utilruntime.HandleError(err)
dc.queue.Forget(key)
}

// syncDeployment will sync the deployment with the given key.
Expand All @@ -405,32 +460,30 @@ func (dc *DeploymentController) syncDeployment(key string) error {
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
}()

if !dc.rsStoreSynced() || !dc.podStoreSynced() {
// Sleep so we give the replica set / pod reflector goroutine a chance to run.
time.Sleep(StoreSyncedPollPeriod)
glog.Infof("Waiting for replica set / pod controller to sync, requeuing deployment %s", key)
dc.queue.Add(key)
return nil
}

obj, exists, err := dc.dStore.Store.GetByKey(key)
if err != nil {
glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
dc.queue.Add(key)
return err
}
if !exists {
glog.Infof("Deployment has been deleted %v", key)
return nil
}

d := obj.(*extensions.Deployment)
deployment := obj.(*extensions.Deployment)
everything := unversioned.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if reflect.DeepEqual(deployment.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
return nil
}

// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d, err := util.DeploymentDeepCopy(deployment)
if err != nil {
return err
}

if d.Spec.Paused {
return dc.sync(d)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/deployment/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
if err != nil {
dc.enqueueDeployment(deployment)
return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok because its properly rippling back to the sync?

Copy link
Contributor Author

@0xmichalis 0xmichalis Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I moved requeueing all errors from the controller loop in its caller up to the retry count.

}
if newReplicasCount > 0 {
Expand Down Expand Up @@ -415,9 +414,6 @@ func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newSc
rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
if err == nil {
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
glog.Warningf("Cannot update replica set %q: %v", rs.Name, err)
dc.enqueueDeployment(deployment)
}
return rs, err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/deployment/util/deployment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,3 +753,15 @@ func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired

return int32(surge), int32(unavailable), nil
}

func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployment, error) {
objCopy, err := api.Scheme.DeepCopy(deployment)
if err != nil {
return nil, err
}
copied, ok := objCopy.(*extensions.Deployment)
if !ok {
return nil, fmt.Errorf("expected Deployment, got %#v", objCopy)
}
return copied, nil
}