Skip to content

Commit

Permalink
Fix DetachedInstanceError when finding zombies in Dag Parsing proce…
Browse files Browse the repository at this point in the history
…ss (apache#28198)

```
[2022-12-06T14:20:21.622+0000] {base_job.py:229} DEBUG - [heartbeat]
[2022-12-06T14:20:21.623+0000] {scheduler_job.py:1495} DEBUG - Finding 'running' jobs without a recent heartbeat
[2022-12-06T14:20:21.637+0000] {scheduler_job.py:1515} WARNING - Failing (2) jobs without heartbeat after 2022-12-06 14:15:21.623199+00:00
[2022-12-06T14:20:21.641+0000] {scheduler_job.py:1526} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/xxx_dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'xxx', 'Task Id': 'xxx', 'Run Id': 'scheduled__2022-12-05T00:15:00+00:00', 'Hostname': 'airflow-worker-0.airflow-worker.airflow2.svc.cluster.local', 'External Executor Id': '9520cb9f-3245-497a-8e17-e9dec29d4549'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f1cd4de4130>, 'is_failure_callback': True}
[2022-12-06T14:20:21.645+0000] {scheduler_job.py:763} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
    action(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
    processor_subdir=ti.dag_model.processor_subdir,
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
[2022-12-06T14:20:21.647+0000] {celery_executor.py:443} DEBUG - Inquiring about 5 celery task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:602} DEBUG - Fetched 5 state(s) for 5 task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:446} DEBUG - Inquiries completed.
[2022-12-06T14:20:21.669+0000] {scheduler_job.py:775} INFO - Exited execute loop
[2022-12-06T14:20:21.674+0000] {cli_action_loggers.py:83} DEBUG - Calling callbacks: []
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 39, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 103, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
    job.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 247, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
    action(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
    processor_subdir=ti.dag_model.processor_subdir,
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
```

When in standalone dag processor mode, will use `DatabaseCallbackSink`

`_find_zombies` func call `self.executor.send_callback(request)` func.
But not propagation orm `session` , provide_session in `send` func again.

```
class DatabaseCallbackSink(BaseCallbackSink):
    """Sends callbacks to database."""

    @provide_session
    def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
        """Sends callback for execution."""
        db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
        session.add(db_callback)
```

Signed-off-by: BobDu <i@bobdu.cc>

(cherry picked from commit 4b340b7)
  • Loading branch information
BobDu authored and shinny-taojiachun committed Apr 2, 2024
1 parent ed2d3be commit 0fe76f6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 39 deletions.
34 changes: 16 additions & 18 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1471,8 +1471,7 @@ def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

@provide_session
def _find_zombies(self, session: Session) -> None:
def _find_zombies(self) -> None:
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
or have a no-longer-running LocalTaskJob, and create a TaskCallbackRequest
Expand All @@ -1483,31 +1482,30 @@ def _find_zombies(self, session: Session) -> None:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)

zombies = (
session.query(TaskInstance, DagModel.fileloc)
.with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
.join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
.join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
.filter(TaskInstance.state == TaskInstanceState.RUNNING)
.filter(
or_(
LocalTaskJob.state != State.RUNNING,
LocalTaskJob.latest_heartbeat < limit_dttm,
with create_session() as session:
zombies: list[tuple[TI, str, str]] = (
session.query(TI, DM.fileloc, DM.processor_subdir)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
.filter(TI.state == TaskInstanceState.RUNNING)
.filter(
or_(
LocalTaskJob.state != State.RUNNING,
LocalTaskJob.latest_heartbeat < limit_dttm,
)
)
.filter(TI.queued_by_job_id == self.id)
.all()
)
.filter(TaskInstance.queued_by_job_id == self.id)
.all()
)

if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)

for ti, file_loc in zombies:

for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=ti.dag_model.processor_subdir,
processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
Expand Down
41 changes: 20 additions & 21 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4005,14 +4005,13 @@ def test_timeout_triggers(self, dag_maker):
assert ti2.state == State.DEFERRED

def test_find_zombies_nothing(self):
with create_session() as session:
executor = MockExecutor(do_update=False)
self.scheduler_job = SchedulerJob(executor=executor)
self.scheduler_job.processor_agent = mock.MagicMock()
executor = MockExecutor(do_update=False)
self.scheduler_job = SchedulerJob(executor=executor)
self.scheduler_job.processor_agent = mock.MagicMock()

self.scheduler_job._find_zombies(session=session)
self.scheduler_job._find_zombies()

self.scheduler_job.executor.callback_sink.send.assert_not_called()
self.scheduler_job.executor.callback_sink.send.assert_not_called()

def test_find_zombies(self):
dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
Expand Down Expand Up @@ -4055,20 +4054,21 @@ def test_find_zombies(self):
ti.queued_by_job_id = self.scheduler_job.id
session.flush()

self.scheduler_job._find_zombies(session=session)
self.scheduler_job._find_zombies()

self.scheduler_job.executor.callback_sink.send.assert_called_once()
requests = self.scheduler_job.executor.callback_sink.send.call_args[0]
assert 1 == len(requests)
assert requests[0].full_filepath == dag.fileloc
assert requests[0].msg == str(self.scheduler_job._generate_zombie_message_details(ti))
assert requests[0].is_failure_callback is True
assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
assert ti.dag_id == requests[0].simple_task_instance.dag_id
assert ti.task_id == requests[0].simple_task_instance.task_id
assert ti.run_id == requests[0].simple_task_instance.run_id
assert ti.map_index == requests[0].simple_task_instance.map_index
self.scheduler_job.executor.callback_sink.send.assert_called_once()
requests = self.scheduler_job.executor.callback_sink.send.call_args[0]
assert 1 == len(requests)
assert requests[0].full_filepath == dag.fileloc
assert requests[0].msg == str(self.scheduler_job._generate_zombie_message_details(ti))
assert requests[0].is_failure_callback is True
assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
assert ti.dag_id == requests[0].simple_task_instance.dag_id
assert ti.task_id == requests[0].simple_task_instance.task_id
assert ti.run_id == requests[0].simple_task_instance.run_id
assert ti.map_index == requests[0].simple_task_instance.map_index

with create_session() as session:
session.query(TaskInstance).delete()
session.query(LocalTaskJob).delete()

Expand Down Expand Up @@ -4143,12 +4143,11 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce
Check that the same set of failure callback with zombies are passed to the dag
file processors until the next zombie detection logic is invoked.
"""
with conf_vars({('core', 'load_examples'): 'False'}):
with conf_vars({("core", "load_examples"): "False"}), create_session() as session:
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "test_example_bash_operator.py"),
read_dags_from_db=False,
)
session = settings.Session()
session.query(LocalTaskJob).delete()
dag = dagbag.get_dag('test_example_bash_operator')
dag.sync_to_db(processor_subdir=TEST_DAG_FOLDER)
Expand Down Expand Up @@ -4177,7 +4176,7 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce
self.scheduler_job.executor = MockExecutor()
self.scheduler_job.processor_agent = mock.MagicMock()

self.scheduler_job._find_zombies(session=session)
self.scheduler_job._find_zombies()

self.scheduler_job.executor.callback_sink.send.assert_called_once()

Expand Down

0 comments on commit 0fe76f6

Please sign in to comment.