Skip to content

Commit

Permalink
[TWTR][CX-14365] additional logging to help debug scheduler hang, can…
Browse files Browse the repository at this point in the history
… be removed/reverted once issue is resolved
  • Loading branch information
Redwan Rahman authored and aoen committed Jul 2, 2019
1 parent f4915fc commit 99ee040
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 1 deletion.
12 changes: 12 additions & 0 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,10 @@ def helper():
stdout = StreamLogWriter(log, logging.INFO)
stderr = StreamLogWriter(log, logging.WARN)

log.info("Setting log context for file {}".format(file_path))
# log file created here
set_context(log, file_path)
log.info("Successfully set log context for file {}".format(file_path))

try:
# redirect stdout/stderr to log
Expand All @@ -384,6 +387,7 @@ def helper():
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
log.info("Processing file {}".format(file_path))
result = scheduler_job.process_file(file_path,
pickle_dags)
result_queue.put(result)
Expand Down Expand Up @@ -412,6 +416,7 @@ def start(self):
"""
Launch the process and start processing the DAG.
"""
self.log.info("Launching process to process DAG at {}".format(self.file_path))
self._process = DagFileProcessor._launch_process(
self._result_queue,
self.file_path,
Expand Down Expand Up @@ -1671,6 +1676,7 @@ def _execute_helper(self, processor_manager):
# Kick of new processes and collect results from finished ones
self.log.debug("Heartbeating the process manager")
simple_dags = processor_manager.heartbeat()
self.log.debug("Finished process manager heartbeat")

if self.using_sqlite:
# For the sqlite case w/ 1 thread, wait until the processor
Expand Down Expand Up @@ -1700,6 +1706,9 @@ def _execute_helper(self, processor_manager):
State.SCHEDULED],
State.NONE)

scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids)
self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids))

self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,))

Expand Down Expand Up @@ -1741,14 +1750,17 @@ def _execute_helper(self, processor_manager):
break

# Stop any processors
self.log.info("Terminating DAG processors")
processor_manager.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
all_files_processed = True
for file_path in known_file_paths:
if processor_manager.get_last_finish_time(file_path) is None:
self.log.info("File {} not processed".format(file_path))
all_files_processed = False
break
if all_files_processed:
Expand Down
6 changes: 6 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4185,6 +4185,8 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
:return: None
"""

self.log.info("Attempting to sync DAG {} to DB".format(self._dag_id))

if owner is None:
owner = self.owner
if sync_time is None:
Expand All @@ -4203,8 +4205,12 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
session.merge(orm_dag)
session.commit()

self.log.info("Synced DAG %s to DB", self._dag_id)

for subdag in self.subdags:
self.log.info("Syncing SubDAG %s", subdag._dag_id)
subdag.sync_to_db(owner=owner, sync_time=sync_time, session=session)
self.log.info("Successfully synced SubDAG %s", subdag._dag_id)

@staticmethod
@provide_session
Expand Down
5 changes: 5 additions & 0 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,15 @@ def heartbeat(self):
processor.pid, file_path
)
self._processors[file_path] = processor

self.log.info("Number of active file processors: {}".format(len(self._processors)))

# Update scheduler heartbeat count.
self._run_count[self._heart_beat_key] += 1

simple_dag_ids = ", ".join([simple_dag.dag_id for simple_dag in simple_dags])
self.log.info("Processed DAGs: {}".format(simple_dag_ids))

return simple_dags

def max_runs_reached(self):
Expand Down
2 changes: 2 additions & 0 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,11 @@ def _init_file(self, filename):
directory = os.path.dirname(full_path)

if not os.path.exists(directory):
logging.info("Creating directory {}".format(directory))
os.makedirs(directory)

if not os.path.exists(full_path):
logging.info("Creating file {}".format(full_path))
open(full_path, "a").close()

return full_path
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
#

version = '1.10.0+twtr11'
version = '1.10.0+twtr12'

0 comments on commit 99ee040

Please sign in to comment.