Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controller changes for perma failed deployments #35691

Merged
merged 3 commits into from Nov 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 20 additions & 4 deletions pkg/controller/controller_utils.go
Expand Up @@ -343,6 +343,22 @@ func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *U
return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
}

// Reasons for pod events
const (
// FailedCreatePodReason is added in an event and in a replica set condition
// when a pod for a replica set is failed to be created.
FailedCreatePodReason = "FailedCreate"
// SuccessfulCreatePodReason is added in an event when a pod for a replica set
// is successfully created.
SuccessfulCreatePodReason = "SuccessfulCreate"
// FailedDeletePodReason is added in an event and in a replica set condition
// when a pod for a replica set is failed to be deleted.
FailedDeletePodReason = "FailedDelete"
// SuccessfulDeletePodReason is added in an event when a pod for a replica set
// is successfully deleted.
SuccessfulDeletePodReason = "SuccessfulDelete"
)

// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
Expand Down Expand Up @@ -485,7 +501,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod
return fmt.Errorf("unable to create pods, no labels")
}
if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil {
r.Recorder.Eventf(object, api.EventTypeWarning, "FailedCreate", "Error creating: %v", err)
r.Recorder.Eventf(object, api.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
return fmt.Errorf("unable to create pods: %v", err)
} else {
accessor, err := meta.Accessor(object)
Expand All @@ -494,7 +510,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod
return nil
}
glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)
r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulCreate", "Created pod: %v", newPod.Name)
r.Recorder.Eventf(object, api.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)
}
return nil
}
Expand All @@ -505,11 +521,11 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}
if err := r.KubeClient.Core().Pods(namespace).Delete(podID, nil); err != nil {
r.Recorder.Eventf(object, api.EventTypeWarning, "FailedDelete", "Error deleting: %v", err)
r.Recorder.Eventf(object, api.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
return fmt.Errorf("unable to delete pods: %v", err)
} else {
glog.V(4).Infof("Controller %v deleted pod %v", accessor.GetName(), podID)
r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulDelete", "Deleted pod: %v", podID)
r.Recorder.Eventf(object, api.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/deployment/BUILD
Expand Up @@ -14,6 +14,7 @@ go_library(
name = "go_default_library",
srcs = [
"deployment_controller.go",
"progress.go",
"recreate.go",
"rollback.go",
"rolling.go",
Expand Down
15 changes: 15 additions & 0 deletions pkg/controller/deployment/deployment_controller.go
Expand Up @@ -350,6 +350,21 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return nil
}

// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(d); err != nil {
return err
}

_, err = dc.hasFailed(d)
if err != nil {
return err
}
// TODO: Automatically rollback here if we failed above. Locate the last complete
// revision and populate the rollback spec with it.
// See https://github.com/kubernetes/kubernetes/issues/23211.

if d.Spec.Paused {
return dc.sync(d)
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/controller/deployment/deployment_controller_test.go
Expand Up @@ -152,14 +152,6 @@ func (f *fixture) expectCreateRSAction(rs *extensions.ReplicaSet) {
f.actions = append(f.actions, core.NewCreateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs))
}

func (f *fixture) expectUpdateRSAction(rs *extensions.ReplicaSet) {
f.actions = append(f.actions, core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs))
}

func (f *fixture) expectListPodAction(namespace string, opt api.ListOptions) {
f.actions = append(f.actions, core.NewListAction(unversioned.GroupVersionResource{Resource: "pods"}, namespace, opt))
}

func newFixture(t *testing.T) *fixture {
f := &fixture{}
f.t = t
Expand Down
188 changes: 188 additions & 0 deletions pkg/controller/deployment/progress.go
@@ -0,0 +1,188 @@
/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package deployment

import (
"fmt"
"reflect"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/controller/deployment/util"
)

// hasFailed determines if a deployment has failed or not by estimating its progress.
// Progress for a deployment is considered when a new replica set is created or adopted,
// and when new pods scale up or old pods scale down. Progress is not estimated for paused
// deployments or when users don't really care about it ie. progressDeadlineSeconds is not
// specified.
func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error) {
if d.Spec.ProgressDeadlineSeconds == nil || d.Spec.RollbackTo != nil || d.Spec.Paused {
return false, nil
}

newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
if err != nil {
return false, err
}

// There is a template change so we don't need to check for any progress right now.
if newRS == nil {
return false, nil
}

// Look at the status of the deployment - if there is already a NewRSAvailableReason
// then we don't need to estimate any progress. This is needed in order to avoid
// estimating progress for scaling events after a rollout has finished.
cond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing)
if cond != nil && cond.Reason == util.NewRSAvailableReason {
return false, nil
}

// TODO: Look for permanent failures here.
// See https://github.com/kubernetes/kubernetes/issues/18568

allRSs := append(oldRSs, newRS)
newStatus := dc.calculateStatus(allRSs, newRS, d)

// If the deployment is complete or it is progressing, there is no need to check if it
// has timed out.
if util.DeploymentComplete(d, &newStatus) || util.DeploymentProgressing(d, &newStatus) {
return false, nil
}

// Check if the deployment has timed out.
return util.DeploymentTimedOut(d, &newStatus), nil
}

// syncRolloutStatus updates the status of a deployment during a rollout. There are
// cases this helper will run that cannot be prevented from the scaling detection,
// for example a resync of the deployment after it was scaled up. In those cases,
// we shouldn't try to estimate any progress.
func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
newStatus := dc.calculateStatus(allRSs, newRS, d)

// If there is no progressDeadlineSeconds set, remove any Progressing condition.
if d.Spec.ProgressDeadlineSeconds == nil {
util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentProgressing)
}

// If there is only one replica set that is active then that means we are not running
// a new rollout and this is a resync where we don't need to estimate any progress.
// In such a case, we should simply not estimate any progress for this deployment.
currentCond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing)
isResyncEvent := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
// Check for progress only if there is a progress deadline set and the latest rollout
// hasn't completed yet.
if d.Spec.ProgressDeadlineSeconds != nil && !isResyncEvent {
switch {
case util.DeploymentComplete(d, &newStatus):
// Update the deployment conditions with a message for the new replica set that
// was successfully deployed. If the condition already exists, we ignore this update.
msg := fmt.Sprintf("Replica set %q has successfully progressed.", newRS.Name)
condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, util.NewRSAvailableReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)

case util.DeploymentProgressing(d, &newStatus):
// If there is any progress made, continue by not checking if the deployment failed. This
// behavior emulates the rolling updater progressDeadline check.
msg := fmt.Sprintf("Replica set %q is progressing.", newRS.Name)
condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
// Update the current Progressing condition or add a new one if it doesn't exist.
// If a Progressing condition with status=true already exists, we should update
// everything but lastTransitionTime. SetDeploymentCondition already does that but
// it also is not updating conditions when the reason of the new condition is the
// same as the old. The Progressing condition is a special case because we want to
// update with the same reason and change just lastUpdateTime iff we notice any
// progress. That's why we handle it here.
if currentCond != nil {
if currentCond.Status == api.ConditionTrue {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentProgressing)
}
util.SetDeploymentCondition(&newStatus, *condition)

case util.DeploymentTimedOut(d, &newStatus):
// Update the deployment with a timeout condition. If the condition already exists,
// we ignore this update.
msg := fmt.Sprintf("Replica set %q has timed out progressing.", newRS.Name)
condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionFalse, util.TimedOutReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
}
}

// Move failure conditions of all replica sets in deployment conditions. For now,
// only one failure condition is returned from getReplicaFailures.
if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
// There will be only one ReplicaFailure condition on the replica set.
util.SetDeploymentCondition(&newStatus, replicaFailureCond[0])
} else {
util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentReplicaFailure)
}

// Do not update if there is nothing new to add.
if reflect.DeepEqual(d.Status, newStatus) {
// TODO: If there is no sign of progress at this point then there is a high chance that the
// deployment is stuck. We should resync this deployment at some point[1] in the future[2] and
// check if it has timed out. We definitely need this, otherwise we depend on the controller
// resync interval. See https://github.com/kubernetes/kubernetes/issues/34458.
//
// [1] time.Now() + progressDeadlineSeconds - lastUpdateTime (of the Progressing condition).
// [2] Use dc.queue.AddAfter
return nil
}

newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.Extensions().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment)
return err
}

// getReplicaFailures will convert replica failure conditions from replica sets
// to deployment conditions.
func (dc *DeploymentController) getReplicaFailures(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) []extensions.DeploymentCondition {
var conditions []extensions.DeploymentCondition
if newRS != nil {
for _, c := range newRS.Status.Conditions {
if c.Type != extensions.ReplicaSetReplicaFailure {
continue
}
conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c))
}
}

// Return failures for the new replica set over failures from old replica sets.
if len(conditions) > 0 {
return conditions
}

for i := range allRSs {
rs := allRSs[i]
if rs == nil {
continue
}

for _, c := range rs.Status.Conditions {
if c.Type != extensions.ReplicaSetReplicaFailure {
continue
}
conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c))
}
}
return conditions
}
6 changes: 3 additions & 3 deletions pkg/controller/deployment/recreate.go
Expand Up @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen
}
if scaledDown {
// Update DeploymentStatus
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, deployment)
}

// Wait for all old replica set to scale down to zero.
Expand All @@ -67,13 +67,13 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen
}
if scaledUp {
// Update DeploymentStatus
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, deployment)
}

dc.cleanupDeployment(oldRSs, deployment)

// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, deployment)
}

// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate"
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/deployment/rolling.go
Expand Up @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment
}
if scaledUp {
// Update DeploymentStatus
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, deployment)
}

// Scale down, if we can.
Expand All @@ -52,13 +52,13 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment
}
if scaledDown {
// Update DeploymentStatus
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, deployment)
}

dc.cleanupDeployment(oldRSs, deployment)

// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, deployment)
}

func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
Expand Down