Skip to content

Commit

Permalink
Add support for test_cycle from Airflow master
Browse files Browse the repository at this point in the history
  • Loading branch information
KostyaEsmukov authored and kxepal committed Apr 12, 2020
1 parent 75b1d11 commit 4a764af
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
5 changes: 2 additions & 3 deletions src/airflow_declarative/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import absolute_import, division, print_function, unicode_literals

from .compat import test_cycle
from .schema import ensure_schema


Expand Down Expand Up @@ -89,9 +90,7 @@ def build_dag(dag_id, schema, dag_class, operator_class, sensor_class):

build_flow(dict(operators, **sensors), schema.get("flow", {}))

if hasattr(dag, "test_cycle"):
# Since Airflow 1.10
dag.test_cycle()
test_cycle(dag)

return dag

Expand Down
18 changes: 17 additions & 1 deletion src/airflow_declarative/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,20 @@
except ImportError:
from airflow.operators import BaseOperator

__all__ = ("BaseOperator", "BaseSensorOperator", "Iterable", "Mapping")
try:
# Airflow master
from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle
except ImportError:
_test_cycle = None


__all__ = ("BaseOperator", "BaseSensorOperator", "Iterable", "Mapping", "test_cycle")


def test_cycle(dag):
if _test_cycle is not None:
# Airflow master
_test_cycle(dag)
elif hasattr(dag, "test_cycle"):
# Since Airflow 1.10
dag.test_cycle()

0 comments on commit 4a764af

Please sign in to comment.