Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preconditions to pod/rs update retry func #22392

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 42 additions & 24 deletions pkg/util/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/integer"
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
Expand Down Expand Up @@ -187,27 +188,32 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf
}))
rsUpdated := false
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
if len(updatedRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS,
func(updated *extensions.ReplicaSet) error {
// Precondition: the RS doesn't contain the new hash in its pod template label.
if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
return errors.ErrPreconditionViolated
}
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
return nil
})
if err != nil {
return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
}
if rsUpdated {
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err)
}
if err != nil {
return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
}
if rsUpdated {
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err)
}
glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
// Return here and retry in the next sync loop.
return &rs, nil
}
glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
// Return here and retry in the next sync loop.
return &rs, nil
}
glog.V(4).Infof("Observed the update of rs %s's pod template with hash %s.", rs.Name, hash)

// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector)
Expand All @@ -231,10 +237,16 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf

// 3. Update rs label and selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods
if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS,
func(updated *extensions.ReplicaSet) error {
// Precondition: the RS doesn't contain the new hash in its label or selector.
if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
return errors.ErrPreconditionViolated
}
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
return nil
}); err != nil {
return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
}
if rsUpdated {
Expand Down Expand Up @@ -264,14 +276,20 @@ func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c client
for _, pod := range podList.Items {
// Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) {
podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod,
func(podToUpdate *api.Pod) error {
// Precondition: the pod doesn't contain the new hash in its label.
if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
return errors.ErrPreconditionViolated
}
podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
return nil
}); err != nil {
return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err)
} else if podUpdated {
glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If the pod wasn't updated but didn't return error when we try to update it, we've hit a pod not found error.
// If the pod wasn't updated but didn't return error when we try to update it, we've hit "pod not found" or "precondition violated" error.
// Then we can't say all pods are labeled
allPodsLabeled = false
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

package errors

import "fmt"
import (
"errors"
"fmt"
)

// Aggregate represents an object that contains multiple errors, but does not
// necessarily have singular semantic meaning.
Expand Down Expand Up @@ -148,3 +151,6 @@ func AggregateGoroutines(funcs ...func() error) Aggregate {
}
return NewAggregate(errs)
}

// ErrPreconditionViolated is returned when the precondition is violated
var ErrPreconditionViolated = errors.New("precondition is violated")
18 changes: 15 additions & 3 deletions pkg/util/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
errorsutil "k8s.io/kubernetes/pkg/util/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/pkg/util/wait"
)
Expand All @@ -38,7 +39,7 @@ func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 {

// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updatePodFunc func(pod *api.Pod)
type updatePodFunc func(pod *api.Pod) error

// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored.
// The returned bool value can be used to tell if the pod is actually updated.
Expand All @@ -52,8 +53,9 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod,
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
// TODO: add precondition for update
applyUpdate(pod)
if err = applyUpdate(pod); err != nil {
return false, err
}
if pod, err = podClient.Update(pod); err == nil {
// Update successful.
return true, nil
Expand All @@ -66,12 +68,22 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod,
podUpdated = true
}

// Handle returned error from wait poll
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("timed out trying to update pod: %+v", oldPod)
}
// Ignore the pod not found error, but the pod isn't updated.
if errors.IsNotFound(err) {
glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name)
err = nil
}
// Ignore the precondition violated error, but the pod isn't updated.
if err == errorsutil.ErrPreconditionViolated {
glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name)
err = nil
}

// If the error is non-nil the returned pod cannot be trusted; if podUpdated is false, the pod isn't updated;
// if the error is nil and podUpdated is true, the returned pod contains the applied update.
return pod, podUpdated, err
}
19 changes: 14 additions & 5 deletions pkg/util/replicaset/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apis/extensions"
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
errorsutil "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/wait"
)

// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updateRSFunc func(rs *extensions.ReplicaSet)
type updateRSFunc func(rs *extensions.ReplicaSet) error

// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored.
// The returned bool value can be used to tell if the RS is actually updated.
Expand All @@ -43,8 +44,9 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
// TODO: add precondition for update
applyUpdate(rs)
if err = applyUpdate(rs); err != nil {
return false, err
}
if rs, err = rsClient.Update(rs); err == nil {
// Update successful.
return true, nil
Expand All @@ -57,6 +59,7 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs
rsUpdated = true
}

// Handle returned error from wait poll
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("timed out trying to update RS: %+v", oldRs)
}
Expand All @@ -65,7 +68,13 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs
glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name)
err = nil
}
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
// controller contains the applied update.
// Ignore the precondition violated error, but the RS isn't updated.
if err == errorsutil.ErrPreconditionViolated {
glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name)
err = nil
}

// If the error is non-nil the returned RS cannot be trusted; if rsUpdated is false, the contoller isn't updated;
// if the error is nil and rsUpdated is true, the returned RS contains the applied update.
return rs, rsUpdated, err
}