Skip to content

Commit

Permalink
Don't emit first_task_scheduling_delay metric for only-once dags (apa…
Browse files Browse the repository at this point in the history
…che#12835)

Dags with a schedule interval of None, or `@once` don't have a following
schedule, so we can't realistically calculate this metric.

Additionally, this changes the emitted metric from seconds to
milliseconds -- all timers to statsd should be in milliseconds -- this
is what Statsd and apps that consume data from there expect. See apache#10629
for more details.

This will be a "breaking" change from 1.10.14, where the metric was
back-ported to, but was (incorrectly) emitting seconds.

(cherry picked from commit 4a02e0a)
  • Loading branch information
ashb authored and kaxil committed Dec 7, 2020
1 parent e9940ab commit 2c92b02
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 30 deletions.
22 changes: 14 additions & 8 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,23 +371,29 @@ def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
Note, the stat will only be emitted if the DagRun is a scheduler triggered one
(i.e. external_trigger is False).
"""
if self.state == State.RUNNING:
return
if self.external_trigger:
return
if not finished_tis:
return

try:
if self.state == State.RUNNING:
return
if self.external_trigger:
return
if not finished_tis:
return
dag = self.get_dag()

if not self.dag.schedule_interval or self.dag.schedule_interval == "@once":
# We can't emit this metric if there is no following schedule to cacluate from!
return

ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
first_start_date = ordered_tis_by_start_date[0].start_date
if first_start_date:
# dag.following_schedule calculates the expected start datetime for a scheduled dagrun
# i.e. a daily flow for execution date 1/1/20 actually runs on 1/2/20 hh:mm:ss,
# and ti.start_date will be 1/2/20 hh:mm:ss so the following schedule is comparison
true_delay = (first_start_date - dag.following_schedule(self.execution_date)).total_seconds()
if true_delay >= 0:
true_delay = first_start_date - dag.following_schedule(self.execution_date)
if true_delay.total_seconds() > 0:
Stats.timing('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
except Exception as e:
self.log.warning('Failed to record first_task_scheduling_delay metric:\n', e)
Expand Down
2 changes: 1 addition & 1 deletion docs/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,5 @@ Name Description
``dagrun.duration.failed.<dag_id>`` Milliseconds taken for a DagRun to reach failed state
``dagrun.schedule_delay.<dag_id>`` Milliseconds of delay between the scheduled DagRun
start date and the actual DagRun start date
``dagrun.<dag_id>.first_task_scheduling_delay`` Seconds elapsed between first task start_date and dagrun expected start
``dagrun.<dag_id>.first_task_scheduling_delay`` Milliseconds elapsed between first task start_date and dagrun expected start
================================================= =======================================================================
67 changes: 46 additions & 21 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.utils.trigger_rule import TriggerRule
from tests.compat import mock, call
from tests.models import DEFAULT_DATE
from tests.test_utils import db


class DagRunTest(unittest.TestCase):
Expand Down Expand Up @@ -628,31 +629,55 @@ def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
self.assertNotIn(call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)),
stats_mock.mock_calls)

@mock.patch.object(Stats, 'timing')
def test_emit_scheduling_delay(self, stats_mock):
@parameterized.expand(
[
("*/5 * * * *", True),
(None, False),
("@once", False),
]
)
def test_emit_scheduling_delay(self, schedule_interval, expected):
"""
Tests that dag scheduling delay stat is set properly once running scheduled dag.
dag_run.update_state() invokes the _emit_true_scheduling_delay_stats_for_finished_state method.
"""
dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1))
# Cleanup
db.clear_db_runs()
db.clear_db_dags()

dag = DAG(dag_id='test_emit_dag_stats', start_date=days_ago(1), schedule_interval=schedule_interval)
dag_task = DummyOperator(task_id='dummy', dag=dag, owner='airflow')

session = settings.Session()
orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
session.add(orm_dag)
session.flush()
dag_run = dag.create_dagrun(
run_id="test",
state=State.SUCCESS,
execution_date=dag.start_date,
start_date=dag.start_date,
session=session,
)
ti = dag_run.get_task_instance(dag_task.task_id)
ti.set_state(State.SUCCESS, session)
session.commit()
session.close()
dag_run.update_state()
true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date)).total_seconds()
sched_delay_stat_call = call('dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id), true_delay)
self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)
try:
orm_dag = DagModel(dag_id=dag.dag_id, is_active=True)
session.add(orm_dag)
session.flush()
dag_run = dag.create_dagrun(
run_id="test",
state=State.SUCCESS,
execution_date=dag.start_date,
start_date=dag.start_date,
session=session,
)
ti = dag_run.get_task_instance(dag_task.task_id, session)
ti.set_state(State.SUCCESS, session)
session.flush()

with mock.patch.object(Stats, 'timing') as stats_mock:
dag_run.update_state(session)

metric_name = 'dagrun.{}.first_task_scheduling_delay'.format(dag.dag_id)

if expected:
true_delay = (ti.start_date - dag.following_schedule(dag_run.execution_date))
sched_delay_stat_call = call(metric_name, true_delay)
assert sched_delay_stat_call in stats_mock.mock_calls
else:
# Assert that we never passed the metric
sched_delay_stat_call = call(metric_name, mock.ANY)
assert sched_delay_stat_call not in stats_mock.mock_calls
finally:
# Don't write anything to the DB
session.rollback()
session.close()

0 comments on commit 2c92b02

Please sign in to comment.