Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle namespace deletion more gracefully in built-in controllers #84123

Merged
merged 7 commits into from Nov 4, 2019
5 changes: 4 additions & 1 deletion pkg/controller/controller_utils.go
Expand Up @@ -577,7 +577,10 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT
}
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
if err != nil {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
// only send an event if the namespace isn't terminating
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)
}
return err
}
accessor, err := meta.Accessor(object)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/cronjob/BUILD
Expand Up @@ -20,6 +20,7 @@ go_library(
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/batch/v1beta1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/cronjob/cronjob_controller.go
Expand Up @@ -39,6 +39,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -333,7 +334,11 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
}
jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
}
return
}
klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
Expand Down
25 changes: 16 additions & 9 deletions pkg/controller/daemon/daemon_controller.go
Expand Up @@ -1053,15 +1053,22 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod
ds, metav1.NewControllerRef(ds, controllerKind))
}

if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
if errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
}
if err != nil {
klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/deployment/deployment_controller.go
Expand Up @@ -475,7 +475,7 @@ func (dc *DeploymentController) processNextWorkItem() bool {
}

func (dc *DeploymentController) handleErr(err error, key interface{}) {
if err == nil {
if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will work only for unwrapped errors, but we might make it better when we get golang 1.13

I don't see the calls there wrapping the errors with context so this should be good for now

dc.queue.Forget(key)
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/deployment/sync.go
Expand Up @@ -256,6 +256,9 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
}
return nil, err
case errors.HasStatusCause(err, v1.NamespaceTerminatingCause):
// if the namespace is terminating, all subsequent creates will fail and we can safely do nothing
return nil, err
case err != nil:
msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err)
if deploymentutil.HasProgressDeadline(d) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/endpoint/endpoints_controller.go
Expand Up @@ -506,6 +506,11 @@ func (e *EndpointController) syncService(key string) error {
// 2. policy is misconfigured, in which case no service would function anywhere.
// Given the frequency of 1, we log at a lower level.
klog.V(5).Infof("Forbidden from creating endpoints: %v", err)

// If the namespace is terminating, creates will continue to fail. Simply drop the item.
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return nil
}
}

if createEndpoints {
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/endpointslice/reconciler.go
Expand Up @@ -23,6 +23,7 @@ import (

corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -150,6 +151,10 @@ func (r *reconciler) finalize(
addTriggerTimeAnnotation(endpointSlice, triggerTime)
_, err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Create(endpointSlice)
if err != nil {
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
return nil
}
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
}
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/controller/job/job_controller.go
Expand Up @@ -771,15 +771,22 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
go func() {
defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
if errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
}
if err != nil {
defer utilruntime.HandleError(err)
Expand Down
25 changes: 16 additions & 9 deletions pkg/controller/replicaset/replica_set.go
Expand Up @@ -523,15 +523,22 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
// event spam that those failures would generate.
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: not sure why you need to have two ifs if both return the same, comment for both handled in one should be sufficient.

// if the namespace is being terminated, we don't have to do
// anything because any creation will fail
return nil
}
if errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
}
}
return err
})
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/serviceaccount/serviceaccounts_controller.go
Expand Up @@ -213,7 +213,10 @@ func (c *ServiceAccountsController) syncNamespace(key string) error {
sa.Namespace = ns.Name

if _, err := c.client.CoreV1().ServiceAccounts(ns.Name).Create(&sa); err != nil && !apierrs.IsAlreadyExists(err) {
createFailures = append(createFailures, err)
// we can safely ignore terminating namespace errors
if !apierrs.HasStatusCause(err, v1.NamespaceTerminatingCause) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this could be placed next to !apierrs.IsAlreadyExists(err)

createFailures = append(createFailures, err)
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/serviceaccount/tokens_controller.go
Expand Up @@ -408,6 +408,10 @@ func (e *TokensController) ensureReferencedToken(serviceAccount *v1.ServiceAccou
// Save the secret
createdToken, err := e.client.CoreV1().Secrets(serviceAccount.Namespace).Create(secret)
if err != nil {
// if the namespace is being terminated, create will fail no matter what
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return false, err
}
// retriable error
return true, err
}
Expand Down
6 changes: 6 additions & 0 deletions staging/src/k8s.io/api/core/v1/types.go
Expand Up @@ -4659,6 +4659,12 @@ const (
NamespaceTerminating NamespacePhase = "Terminating"
)

const (
// NamespaceTerminatingCause is returned as a defaults.cause item when a change is
// forbidden due to the namespace being terminated.
NamespaceTerminatingCause metav1.CauseType = "NamespaceTerminating"
)

type NamespaceConditionType string

// These are valid conditions of a namespace.
Expand Down
22 changes: 22 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go
Expand Up @@ -70,6 +70,28 @@ func (e *StatusError) DebugError() (string, []interface{}) {
return "server response object: %#v", []interface{}{e.ErrStatus}
}

// HasStatusCause returns true if the provided error has a details cause
// with the provided type name.
func HasStatusCause(err error, name metav1.CauseType) bool {
_, ok := StatusCause(err, name)
return ok
}

// StatusCause returns the named cause from the provided error if it exists and
// the error is of the type APIStatus. Otherwise it returns false.
func StatusCause(err error, name metav1.CauseType) (metav1.StatusCause, bool) {
apierr, ok := err.(APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return metav1.StatusCause{}, false
}
for _, cause := range apierr.Status().Details.Causes {
if cause.Type == name {
return cause, true
}
}
return metav1.StatusCause{}, false
}

// UnexpectedObjectError can be returned by FromObject if it's passed a non-status object.
type UnexpectedObjectError struct {
Object runtime.Object
Expand Down
Expand Up @@ -34,10 +34,12 @@ go_test(
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
Expand Down
Expand Up @@ -170,8 +170,15 @@ func (l *Lifecycle) Admit(ctx context.Context, a admission.Attributes, o admissi
return nil
}

// TODO: This should probably not be a 403
return admission.NewForbidden(a, fmt.Errorf("unable to create new content in namespace %s because it is being terminated", a.GetNamespace()))
err := admission.NewForbidden(a, fmt.Errorf("unable to create new content in namespace %s because it is being terminated", a.GetNamespace()))
if apierr, ok := err.(*errors.StatusError); ok {
apierr.ErrStatus.Details.Causes = append(apierr.ErrStatus.Details.Causes, metav1.StatusCause{
Type: v1.NamespaceTerminatingCause,
Message: fmt.Sprintf("namespace %s is being terminated", a.GetNamespace()),
Field: "metadata.namespace",
})
}
return err
}

return nil
Expand Down
Expand Up @@ -19,14 +19,17 @@ package lifecycle
import (
"context"
"fmt"
"reflect"
"testing"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
Expand Down Expand Up @@ -192,6 +195,14 @@ func TestAdmissionNamespaceTerminating(t *testing.T) {
if err == nil {
t.Errorf("Expected error rejecting creates in a namespace when it is terminating")
}
expectedCause := metav1.StatusCause{
Type: v1.NamespaceTerminatingCause,
Message: fmt.Sprintf("namespace %s is being terminated", namespace),
Field: "metadata.namespace",
}
if cause, ok := errors.StatusCause(err, v1.NamespaceTerminatingCause); !ok || !reflect.DeepEqual(expectedCause, cause) {
t.Errorf("Expected status cause indicating the namespace is terminating: %t %s", ok, diff.ObjectReflectDiff(expectedCause, cause))
}

// verify update operations in the namespace can proceed
err = handler.Admit(context.TODO(), admission.NewAttributesRecord(&pod, nil, v1.SchemeGroupVersion.WithKind("Pod").GroupKind().WithVersion("version"), pod.Namespace, pod.Name, v1.Resource("pods").WithVersion("version"), "", admission.Update, &metav1.UpdateOptions{}, false, nil), nil)
Expand Down