Skip to content

Commit

Permalink
Merge pull request #310 from lyft/lyft-test-2.3.4
Browse files Browse the repository at this point in the history
apply lyft patch for dag file processor
  • Loading branch information
leifrf committed Nov 6, 2023
2 parents 8d2638d + deaa05b commit 926b42c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
31 changes: 30 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,23 @@ def _run_parsing_loop(self):
if standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
self._deactivate_stale_dags()
self._refresh_dag_dir()

# Lyft-specific patch
# https://jira.lyft.net/browse/DATAOR-1099
# self._file_path_queue does not reach 0 due to the number of callbacks which are enqueued
# during the _do_scheduling run in the scheduler (scheduler_job.py:935)
# as a result, dags without a callback defined and triggered are never re-queued, and
# changes to those dags since the start of the scheduler are not picked up
#
# To address: re-add files to the queue periodically.
# The 'dag_dir_list_interval' is re-used for simplicity.
# The prepare_file_path_queue function is modified as part of this patch to exclude
# files already in the queue from being re-added, to ensure there are no duplicates
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_dag_dir_refresh_time).total_seconds()
if elapsed_time_since_refresh > self.dag_dir_list_interval:
self._refresh_dag_dir()
self.prepare_file_path_queue()

self._kill_timed_out_processors()

Expand Down Expand Up @@ -747,6 +763,11 @@ def _print_stat(self):
if 0 < self.print_stats_interval < time.monotonic() - self.last_stat_print_time:
if self._file_paths:
self._log_file_processing_stats(self._file_paths)

# Lyft-specific patch
# https://jira.lyft.net/browse/DATAOR-1099
Stats.gauge('dag_processing.file_path_queue_length', len(self._file_path_queue))

self.last_stat_print_time = time.monotonic()

@provide_session
Expand Down Expand Up @@ -1066,8 +1087,16 @@ def prepare_file_path_queue(self):

# Do not convert the following list to set as set does not preserve the order
# and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
#
# Lyft-specific Patch
# https://jira.lyft.net/browse/DATAOR-1099
# self._file_path_queue is not guaranteed to be empty when this function is called
# ensure not adding duplicates to the queue by excluding files already in queue
self.log.debug("Excluding files already in file queue from being added to processing again:\n\t%s",
"\n\t".join(self._file_path_queue))
files_paths_to_queue = [
file_path for file_path in file_paths if file_path not in file_paths_to_exclude
and file_path not in self._file_path_queue
]

for file_path, processor in self._processors.items():
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

logger = logging.getLogger(__name__)

version = '2.3.4.post25'
version = '2.3.4.post30'

AIRFLOW_SOURCES_ROOT = Path(__file__).parent.resolve()
my_dir = dirname(__file__)
Expand Down

0 comments on commit 926b42c

Please sign in to comment.