Skip to content

Commit

Permalink
Remove finalizer when orphaned
Browse files Browse the repository at this point in the history
Change-Id: Id88a28755660812a274dffab2693cb8a0ef4235c
  • Loading branch information
alculquicondor committed Mar 25, 2022
1 parent 56d9c45 commit f75e1b0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
41 changes: 32 additions & 9 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta
return job
}

// When a pod is created, enqueue the controller that manages it and update it's expectations.
// When a pod is created, enqueue the controller that manages it and update its expectations.
func (jm *Controller) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
Expand All @@ -261,7 +261,12 @@ func (jm *Controller) addPod(obj interface{}) {
return
}

// Otherwise, it's an orphan. Get a list of all matching controllers and sync
// Otherwise, it's an orphan.
// Clean the finalizer.
if hasJobTrackingFinalizer(pod) {
jm.enqueueOrphanPod(pod)
}
// Get a list of all matching controllers and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
Expand Down Expand Up @@ -331,7 +336,12 @@ func (jm *Controller) updatePod(old, cur interface{}) {
return
}

// Otherwise, it's an orphan. If anything changed, sync matching controllers
// Otherwise, it's an orphan.
// Clean the finalizer.
if hasJobTrackingFinalizer(curPod) {
jm.enqueueOrphanPod(curPod)
}
// If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged {
Expand Down Expand Up @@ -364,17 +374,18 @@ func (jm *Controller) deletePod(obj interface{}, final bool) {
}

controllerRef := metav1.GetControllerOf(pod)
hasFinalizer := hasJobTrackingFinalizer(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
// But this pod might have belonged to a Job and the GC removed the reference.
if hasJobTrackingFinalizer(pod) {
if hasFinalizer {
jm.enqueueOrphanPod(pod)
}
return
}
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
if job == nil {
if hasJobTrackingFinalizer(pod) {
if hasFinalizer {
jm.enqueueOrphanPod(pod)
}
return
Expand All @@ -387,7 +398,7 @@ func (jm *Controller) deletePod(obj interface{}, final bool) {

// Consider the finalizer removed if this is the final delete. Otherwise,
// it's an update for the deletion timestamp, then check finalizer.
if final || !hasJobTrackingFinalizer(pod) {
if final || !hasFinalizer {
jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
}

Expand Down Expand Up @@ -441,10 +452,14 @@ func (jm *Controller) deleteJob(obj interface{}) {
}
}
// Listing pods shouldn't really fail, as we are just querying the informer cache.
pods, _ := jm.podStore.Pods(jobObj.Namespace).List(labels.Everything())
selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("parsing deleted job selector: %v", err))
return
}
pods, _ := jm.podStore.Pods(jobObj.Namespace).List(selector)
for _, pod := range pods {
controllerRef := metav1.GetControllerOf(pod)
if (controllerRef == nil || controllerRef.UID == jobObj.UID) && hasJobTrackingFinalizer(pod) {
if metav1.IsControlledBy(pod, jobObj) && hasJobTrackingFinalizer(pod) {
jm.enqueueOrphanPod(pod)
}
}
Expand Down Expand Up @@ -567,6 +582,14 @@ func (jm Controller) syncOrphanPod(ctx context.Context, key string) error {
}
return err
}
// Make sure the pod is still orphaned.
if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil {
job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef)
if job != nil {
// The pod was adopted. Do not remove finalizer.
return nil
}
}
if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil {
if err := jm.podControl.PatchPod(ctx, ns, name, patch); err != nil && !apierrors.IsNotFound(err) {
return err
Expand Down
6 changes: 3 additions & 3 deletions test/integration/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func TestIndexedJob(t *testing.T) {
// Disable feature gate and restart controller.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)()
cancel()
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal(err)
Expand All @@ -439,7 +439,7 @@ func TestIndexedJob(t *testing.T) {
// Re-enable feature gate and restart controller. Failed Pod should be recreated now.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
cancel()
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)

validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Expand Down Expand Up @@ -779,7 +779,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
// Disable feature gate and restart controller to test that pods get created.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)()
cancel()
ctx, cancel = startJobController(restConfig, clientSet)
ctx, cancel = startJobController(restConfig)
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
Expand Down

0 comments on commit f75e1b0

Please sign in to comment.