diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 93dc060a20401..58ef6fc95e4db 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -587,7 +587,23 @@ def _run_parsing_loop(self): if standalone_dag_processor: self._fetch_callbacks(max_callbacks_per_loop) self._deactivate_stale_dags() - self._refresh_dag_dir() + + # Lyft-specific patch + # https://jira.lyft.net/browse/DATAOR-1099 + # self._file_path_queue does not reach 0 due to the number of callbacks which are enqueued + # during the _do_scheduling run in the scheduler (scheduler_job.py:935) + # as a result, dags without a callback defined and triggered are never re-queued, and + # changes to those dags since the start of the scheduler are not picked up + # + # To address: re-add files to the queue periodically. + # The 'dag_dir_list_interval' is re-used for simplicity. + # The prepare_file_path_queue function is modified as part of this patch to exclude + # files already in the queue from being re-added, to ensure there are no duplicates + now = timezone.utcnow() + elapsed_time_since_refresh = (now - self.last_dag_dir_refresh_time).total_seconds() + if elapsed_time_since_refresh > self.dag_dir_list_interval: + self._refresh_dag_dir() + self.prepare_file_path_queue() self._kill_timed_out_processors() @@ -747,6 +763,11 @@ def _print_stat(self): if 0 < self.print_stats_interval < time.monotonic() - self.last_stat_print_time: if self._file_paths: self._log_file_processing_stats(self._file_paths) + + # Lyft-specific patch + # https://jira.lyft.net/browse/DATAOR-1099 + Stats.gauge('dag_processing.file_path_queue_length', len(self._file_path_queue)) + self.last_stat_print_time = time.monotonic() @provide_session @@ -1066,8 +1087,16 @@ def prepare_file_path_queue(self): # Do not convert the following list to set as set does not preserve the order # and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode` + # + # Lyft-specific Patch + # https://jira.lyft.net/browse/DATAOR-1099 + # self._file_path_queue is not guaranteed to be empty when this function is called + # ensure not adding duplicates to the queue by excluding files already in queue + self.log.debug("Excluding files already in file queue from being added to processing again:\n\t%s", + "\n\t".join(self._file_path_queue)) files_paths_to_queue = [ file_path for file_path in file_paths if file_path not in file_paths_to_exclude + and file_path not in self._file_path_queue ] for file_path, processor in self._processors.items(): diff --git a/setup.py b/setup.py index 7f9bb3eb87a58..533d9ed346cb4 100644 --- a/setup.py +++ b/setup.py @@ -45,7 +45,7 @@ logger = logging.getLogger(__name__) -version = '2.3.4.post25' +version = '2.3.4.post30' AIRFLOW_SOURCES_ROOT = Path(__file__).parent.resolve() my_dir = dirname(__file__)