Skip to content

Commit

Permalink
[AIRFLOW-1561] Fix scheduler to pick up example DAGs without other DA…
Browse files Browse the repository at this point in the history
…Gs (apache#2635)
  • Loading branch information
mrkm4ntr authored and wayne.morris committed Jul 29, 2019
1 parent f4e7c6f commit ec7caf9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
simple_dags = []

try:
dagbag = models.DagBag(file_path)
dagbag = models.DagBag(file_path, include_examples=False)
except Exception:
self.log.exception("Failed at reloading the DAG file %s", file_path)
Stats.incr('dag_file_refresh_error', 1, 1)
Expand Down
12 changes: 4 additions & 8 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,7 @@ def __init__(
self.import_errors = {}
self.has_logged = False

if include_examples:
example_dag_folder = os.path.join(
os.path.dirname(__file__),
'example_dags')
self.collect_dags(example_dag_folder)
self.collect_dags(dag_folder)
self.collect_dags(dag_folder, include_examples)

def size(self):
"""
Expand Down Expand Up @@ -531,7 +526,8 @@ def bag_dag(self, dag, parent_dag, root_dag):
def collect_dags(
self,
dag_folder=None,
only_if_updated=True):
only_if_updated=True,
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES')):
"""
Given a file path or a folder, this method looks for python modules,
imports them and adds them to the dagbag collection.
Expand All @@ -551,7 +547,7 @@ def collect_dags(
stats = []
FileLoadStat = namedtuple(
'FileLoadStat', "file duration dag_num task_num dags")
for filepath in list_py_file_paths(dag_folder):
for filepath in list_py_file_paths(dag_folder, include_examples):
try:
ts = timezone.utcnow()
found_dags = self.process_file(
Expand Down
7 changes: 6 additions & 1 deletion airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ def get_dag(self, dag_id):
return self.dag_id_to_simple_dag[dag_id]


def list_py_file_paths(directory, safe_mode=True):
def list_py_file_paths(directory, safe_mode=True,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES')):
"""
Traverse a directory and look for Python files.
Expand Down Expand Up @@ -284,6 +285,10 @@ def list_py_file_paths(directory, safe_mode=True):
except Exception:
log = LoggingMixin().log
log.exception("Error while examining %s", f)
if include_examples:
import airflow.example_dags
example_dag_folder = airflow.example_dags.__path__[0]
file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False))
return file_paths


Expand Down
14 changes: 13 additions & 1 deletion tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from airflow import AirflowException, settings, models
from airflow import configuration
from airflow.bin import cli
import airflow.example_dags
from airflow.executors import BaseExecutor, SequentialExecutor
from airflow.jobs import BaseJob, BackfillJob, SchedulerJob, LocalTaskJob
from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
Expand Down Expand Up @@ -3335,7 +3336,18 @@ def test_list_py_file_paths(self):
if file_name not in ignored_files:
expected_files.add(
'{}/{}'.format(TEST_DAGS_FOLDER, file_name))
for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
for file_path in list_py_file_paths(TEST_DAGS_FOLDER, include_examples=False):
detected_files.add(file_path)
self.assertEqual(detected_files, expected_files)

example_dag_folder = airflow.example_dags.__path__[0]
for root, dirs, files in os.walk(example_dag_folder):
for file_name in files:
if file_name.endswith('.py') or file_name.endswith('.zip'):
if file_name not in ['__init__.py']:
expected_files.add(os.path.join(root, file_name))
detected_files.clear()
for file_path in list_py_file_paths(TEST_DAGS_FOLDER, include_examples=True):
detected_files.add(file_path)
self.assertEqual(detected_files, expected_files)

Expand Down

0 comments on commit ec7caf9

Please sign in to comment.