Fixing pod leak in KubernetesJobOperator#1
Merged
Conversation
…sJobOperator KubernetesJobOperator inherited from KubernetesPodOperator but overrode execute() without calling post_complete_action(), so the monitoring / log-streaming pods discovered via get_pods() were never deleted. These pods have no ownerReferences to the V1Job, so ttl_seconds_after_finished and the Foreground cascade in on_kill don't reap them either. - execute() and execute_complete() now wrap their work in try/finally and call post_complete_action() for each pod in self.pods. on_finish_action (delete_pod / delete_succeeded_pod / keep_pod) is now honoured. - on_kill() additionally calls pod_manager.delete_pod() for each monitoring pod (the Job's foreground cascade doesn't reach them). - Per-pod cleanup errors are logged but never mask the in-flight exception, so Job-level failures keep propagating. - execute_complete() resolves monitoring pods once and shares the lookup between the log-retrieval path and the cleanup path. - Added unit tests, a bugfix newsfragment, and an operators.rst section documenting the cleanup contract.
… drop unused import, relax pod-deletion ordering in test, fix trailing comma
There was a problem hiding this comment.
Pull request overview
This PR addresses a resource-leak in KubernetesJobOperator by ensuring the monitoring/log-streaming pods discovered via get_pods() are cleaned up at the end of execution (including kill and deferrable-resume paths), and documents how on_finish_action applies to these pods.
Changes:
- Add monitoring-pod cleanup to
execute()(viafinally),execute_complete(), andon_kill(). - Add unit tests covering cleanup on success/failure, honoring
on_finish_action, deferrable resume cleanup, and kill-path cleanup. - Document the distinction between Job-owned worker pods vs. operator-created monitoring pods, and add a bugfix newsfragment.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py |
Adds cleanup logic for monitoring pods across success/failure/kill/deferrable resume paths. |
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py |
Adds regression tests asserting monitoring-pod cleanup behavior and on_finish_action handling. |
providers/cncf/kubernetes/newsfragments/kubernetes-job-operator-cleanup-monitoring-pods.bugfix.rst |
Release note describing the monitoring-pod leak fix and new behavior. |
providers/cncf/kubernetes/docs/operators.rst |
Documentation explaining monitoring pods and how on_finish_action affects their cleanup. |
In _cleanup_monitoring_pods, remote_pod is resolved via find_pod(), which is designed to locate a single matching pod by task-instance labels and can invoke duplicate-pod resolution logic (process_duplicate_label_pods). For KubernetesJobOperator with parallelism > 1, this lookup can return the wrong pod (or trigger duplicate-handling side effects), so post_complete_action() may receive a mismatched remote_pod. Consider using the already-discovered pod’s name/namespace to refresh state (e.g. via hook.get_pod) or just pass remote_pod=pod when you already have the V1Pod object from get_pods(). Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
The new unit tests add several mock.MagicMock() instances (pods, jobs, TI, etc.) without spec/autospec, and some patch() usages also create non-spec'd mocks by default. Using autospec=True on patches and create_autospec(...)/MagicMock(spec=...) for key Kubernetes objects helps catch typos/attribute mismatches in these tests and aligns with Airflow’s test hardening guidance. Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
…nd test assertions - triggers/job.py: Always include pod_names/pod_namespace in trigger event regardless of get_logs setting, so execute_complete() can reliably clean up monitoring pods even when get_logs=False - operators/job.py: Log unexpected ApiException in on_kill() instead of suppressing all ApiExceptions; remove unused `suppress` import - tests/test_job.py: Rewrite test_execute_respects_keep_pod and test_execute_deletes_pod_default to keep process_pod_deletion real and assert on pod_manager.delete_pod; stub hook.get_pod for remote_pod resolution - tests/test_job.py: Add regression test for get_logs=False deferrable path
…ccidental deletion of method signature
| * ``delete_pod`` (default) — the pod is deleted after the task | ||
| finishes (success or failure). | ||
| * ``delete_succeeded_pod`` — the pod is deleted only when the task | ||
| succeeded. |
Comment on lines
+732
to
+733
| When the task is killed, ``on_kill`` deletes the Job (with foreground cascade) | ||
| and additionally attempts pod deletion directly for the discovered pods. |
| @@ -0,0 +1 @@ | |||
| Fix pod cleanup gaps in ``KubernetesJobOperator``: ``execute()``, ``execute_complete()`` and ``on_kill()`` now consistently clean up pods discovered via ``get_pods()``, including deferrable resume paths where pod lookup can fail. The inherited ``on_finish_action`` parameter (``delete_pod`` / ``delete_succeeded_pod`` / ``keep_pod``) is honoured for these pods, matching ``KubernetesPodOperator`` behavior. Per-pod cleanup errors are logged but never mask a Job-level failure. | |||
jykae
approved these changes
May 22, 2026
jykae
added a commit
that referenced
this pull request
May 22, 2026
* fix(providers/cncf/kubernetes): clean up monitoring pods in KubernetesJobOperator KubernetesJobOperator inherited from KubernetesPodOperator but overrode execute() without calling post_complete_action(), so the monitoring / log-streaming pods discovered via get_pods() were never deleted. These pods have no ownerReferences to the V1Job, so ttl_seconds_after_finished and the Foreground cascade in on_kill don't reap them either. - execute() and execute_complete() now wrap their work in try/finally and call post_complete_action() for each pod in self.pods. on_finish_action (delete_pod / delete_succeeded_pod / keep_pod) is now honoured. - on_kill() additionally calls pod_manager.delete_pod() for each monitoring pod (the Job's foreground cascade doesn't reach them). - Per-pod cleanup errors are logged but never mask the in-flight exception, so Job-level failures keep propagating. - execute_complete() resolves monitoring pods once and shares the lookup between the log-retrieval path and the cleanup path. - Added unit tests, a bugfix newsfragment, and an operators.rst section documenting the cleanup contract. * Address code review feedback: remove dead PodNotFoundException check, drop unused import, relax pod-deletion ordering in test, fix trailing comma * Potential fix for pull request finding In _cleanup_monitoring_pods, remote_pod is resolved via find_pod(), which is designed to locate a single matching pod by task-instance labels and can invoke duplicate-pod resolution logic (process_duplicate_label_pods). For KubernetesJobOperator with parallelism > 1, this lookup can return the wrong pod (or trigger duplicate-handling side effects), so post_complete_action() may receive a mismatched remote_pod. Consider using the already-discovered pod’s name/namespace to refresh state (e.g. via hook.get_pod) or just pass remote_pod=pod when you already have the V1Pod object from get_pods(). Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Use isinstance(exc, TaskDeferred) instead of brittle string comparison * Potential fix for pull request finding The new unit tests add several mock.MagicMock() instances (pods, jobs, TI, etc.) without spec/autospec, and some patch() usages also create non-spec'd mocks by default. Using autospec=True on patches and create_autospec(...)/MagicMock(spec=...) for key Kubernetes objects helps catch typos/attribute mismatches in these tests and aligns with Airflow’s test hardening guidance. Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Address PR review comments: fix trigger pod_names, on_kill logging, and test assertions - triggers/job.py: Always include pod_names/pod_namespace in trigger event regardless of get_logs setting, so execute_complete() can reliably clean up monitoring pods even when get_logs=False - operators/job.py: Log unexpected ApiException in on_kill() instead of suppressing all ApiExceptions; remove unused `suppress` import - tests/test_job.py: Rewrite test_execute_respects_keep_pod and test_execute_deletes_pod_default to keep process_pod_deletion real and assert on pod_manager.delete_pod; stub hook.get_pod for remote_pod resolution - tests/test_job.py: Add regression test for get_logs=False deferrable path * Fix orphaned test_on_kill_deletes_monitoring_pods method body after accidental deletion of method signature * Make pod resolution best-effort in execute_complete * Address remaining KubernetesJobOperator review comments * Finalize review-comment fixes for KubernetesJobOperator * Fix remaining KubernetesJobOperator review comments * Update KubernetesJobOperator docs for action semantics * Improve KubernetesJobOperator newsfragment readability --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Ville Jyrkkä <vjyrkka@gmail.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.