From 720d2d065a8b7fcfe69ddb0d28d430a7c340d551 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 23 Dec 2022 09:24:30 -0600 Subject: [PATCH] Add back join to zombie query that was dropped in #28198 (#28544) #28198 accidentally dropped a join in a query, leading to this: airflow/jobs/scheduler_job.py:1547 SAWarning: SELECT statement has a cartesian product between FROM element(s) "dag_run_1", "task_instance", "job" and FROM element "dag". Apply join condition(s) between each element to resolve. (cherry picked from commit a24d18a534ddbcefbcf0d8790d140ff496781f8b) --- airflow/jobs/scheduler_job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index ae1f76966bb5b..2d91cc28ac0bb 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1486,7 +1486,8 @@ def _find_zombies(self) -> None: zombies: list[tuple[TI, str, str]] = ( session.query(TI, DM.fileloc, DM.processor_subdir) .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql") - .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id) + .join(LocalTaskJob, TI.job_id == LocalTaskJob.id) + .join(DM, TI.dag_id == DM.dag_id) .filter(TI.state == TaskInstanceState.RUNNING) .filter( or_(