Skip to content

Commit

Permalink
configurable lazy marking of upstream-failed task status
Browse files Browse the repository at this point in the history
  • Loading branch information
mchalek committed Nov 1, 2018
1 parent 66cad8d commit 5853490
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion airflow/ti_deps/deps/trigger_rule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.exceptions import AirflowConfigException


class TriggerRuleDep(BaseTIDep):
Expand Down Expand Up @@ -138,7 +139,8 @@ def _evaluate_trigger_rule(
# bundled together for efficiency.
# handling instant state assignment based on trigger rules
if flag_upstream_failed:
if tr == TR.ALL_SUCCESS:
if tr == TR.ALL_SUCCESS and (
upstream_done or not self._lazily_mark_upstream_failed()):
if upstream_failed or failed:
ti.set_state(State.UPSTREAM_FAILED, session)
elif skipped:
Expand Down Expand Up @@ -197,3 +199,15 @@ def _evaluate_trigger_rule(
else:
yield self._failing_status(
reason="No strategy to evaluate trigger rule '{0}'.".format(tr))

@staticmethod
def _lazily_mark_upstream_failed():
result = False
try:
result = airflow.configuration.get(
'scheduler',
'lazily_mark_upstream_failed')
except AirflowConfigException:
pass

return result

0 comments on commit 5853490

Please sign in to comment.