From 9c969620c6d9784bad7f953813185c10bfb3f5f8 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Sun, 17 Dec 2023 14:27:18 -0600 Subject: [PATCH 1/5] wait for non-empty event log files in dbfs before downloading --- sync/_databricks.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sync/_databricks.py b/sync/_databricks.py index cb85fd1..b839941 100644 --- a/sync/_databricks.py +++ b/sync/_databricks.py @@ -1725,6 +1725,13 @@ def _dbfs_directory_has_all_rollover_logs(contents: dict, run_end_time_millis: f ) +def _dbfs_any_file_has_zero_size(contents: dict): + any_zeros = any(file["file_size"] == 0 for file in contents["files"]) + if any_zeros: + logger.info("One or more dbfs event log files has a file size of zero") + return any_zeros + + def _event_log_poll_duration_seconds(): """Convenience function to aid testing""" return 15 @@ -1821,7 +1828,8 @@ def _get_eventlog_from_dbfs( poll_max_attempts = 20 # 5 minutes / 15 seconds = 20 attempts while ( - not _dbfs_directory_has_all_rollover_logs(eventlog_dir, run_end_time_millis) + _dbfs_any_file_has_zero_size(eventlog_dir) + or not _dbfs_directory_has_all_rollover_logs(eventlog_dir, run_end_time_millis) and poll_num_attempts < poll_max_attempts ): if poll_num_attempts > 0: From 3c5b0dd19ffeade3855dfb877bc781ce8b90d0ef Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Sun, 17 Dec 2023 14:28:03 -0600 Subject: [PATCH 2/5] bump version --- sync/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync/__init__.py b/sync/__init__.py index 57e6122..4e46d50 100644 --- a/sync/__init__.py +++ b/sync/__init__.py @@ -1,4 +1,4 @@ """Library for leveraging the power of Sync""" -__version__ = "0.5.1" +__version__ = "0.5.3" TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" From 7c1341c081760f8921811b96eff30318d9458153 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Wed, 20 Dec 2023 07:15:45 -0600 Subject: [PATCH 3/5] wait for eventlog directory to be unchanging with nonzero file sizes --- Makefile | 6 ++++++ sync/_databricks.py | 23 +++++++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index a31f45e..1cc1e76 100644 --- a/Makefile +++ b/Makefile @@ -19,3 +19,9 @@ endif .PHONY: tidy tidy: format lint +# Removes the directory that contains bytecode cache files +# that are automatically generated by python. +.PHONY: clean +clean: + find . -type f -name "*.pyc" | xargs rm -fr + find . -type d -name __pycache__ | xargs rm -fr diff --git a/sync/_databricks.py b/sync/_databricks.py index b839941..aacf946 100644 --- a/sync/_databricks.py +++ b/sync/_databricks.py @@ -1725,13 +1725,24 @@ def _dbfs_directory_has_all_rollover_logs(contents: dict, run_end_time_millis: f ) -def _dbfs_any_file_has_zero_size(contents: dict): - any_zeros = any(file["file_size"] == 0 for file in contents["files"]) +def _dbfs_any_file_has_zero_size(dbfs_contents: Dict): + any_zeros = any(file["file_size"] == 0 for file in dbfs_contents["files"]) if any_zeros: logger.info("One or more dbfs event log files has a file size of zero") return any_zeros +def _check_total_file_size_changed( + last_total_file_size: int, dbfs_contents: Dict +) -> Tuple[bool, int]: + + new_total_file_size = sum([file.get("file_size", 0) for file in dbfs_contents.get("files", {})]) + if new_total_file_size == last_total_file_size: + return False, new_total_file_size + else: + return True, new_total_file_size + + def _event_log_poll_duration_seconds(): """Convenience function to aid testing""" return 15 @@ -1827,11 +1838,15 @@ def _get_eventlog_from_dbfs( poll_num_attempts = 0 poll_max_attempts = 20 # 5 minutes / 15 seconds = 20 attempts + total_file_size = 0 + file_size_changed, total_file_size = _check_total_file_size_changed(0, eventlog_dir) while ( _dbfs_any_file_has_zero_size(eventlog_dir) or not _dbfs_directory_has_all_rollover_logs(eventlog_dir, run_end_time_millis) and poll_num_attempts < poll_max_attempts + or file_size_changed ): + if poll_num_attempts > 0: logger.info( f"No or incomplete event log data detected - attempting again in {poll_duration_seconds} seconds" @@ -1839,6 +1854,10 @@ def _get_eventlog_from_dbfs( sleep(poll_duration_seconds) eventlog_dir = dbx_client.list_dbfs_directory(matching_subdirectory["path"]) + file_size_changed, total_file_size = _check_total_file_size_changed( + total_file_size, eventlog_dir + ) + poll_num_attempts += 1 eventlog_zip = io.BytesIO() From 947c493eb9e4ad523e1c1ce03f00326a9a0220b5 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Wed, 20 Dec 2023 07:58:19 -0600 Subject: [PATCH 4/5] fix logic. Add logging statement --- sync/_databricks.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sync/_databricks.py b/sync/_databricks.py index aacf946..14e7409 100644 --- a/sync/_databricks.py +++ b/sync/_databricks.py @@ -1740,6 +1740,7 @@ def _check_total_file_size_changed( if new_total_file_size == last_total_file_size: return False, new_total_file_size else: + logger.info("Total file size of eventlog directory changed") return True, new_total_file_size @@ -1840,13 +1841,11 @@ def _get_eventlog_from_dbfs( total_file_size = 0 file_size_changed, total_file_size = _check_total_file_size_changed(0, eventlog_dir) - while ( - _dbfs_any_file_has_zero_size(eventlog_dir) - or not _dbfs_directory_has_all_rollover_logs(eventlog_dir, run_end_time_millis) - and poll_num_attempts < poll_max_attempts + while (poll_num_attempts < poll_max_attempts) and ( + not _dbfs_directory_has_all_rollover_logs(eventlog_dir, run_end_time_millis) + or _dbfs_any_file_has_zero_size(eventlog_dir) or file_size_changed ): - if poll_num_attempts > 0: logger.info( f"No or incomplete event log data detected - attempting again in {poll_duration_seconds} seconds" From a8128e8000008397ccbcf99c7b1dfaf6ffab44f4 Mon Sep 17 00:00:00 2001 From: Sean Gorsky Date: Wed, 20 Dec 2023 09:31:43 -0600 Subject: [PATCH 5/5] add type hint --- sync/_databricks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync/_databricks.py b/sync/_databricks.py index 14e7409..27d0131 100644 --- a/sync/_databricks.py +++ b/sync/_databricks.py @@ -1725,7 +1725,7 @@ def _dbfs_directory_has_all_rollover_logs(contents: dict, run_end_time_millis: f ) -def _dbfs_any_file_has_zero_size(dbfs_contents: Dict): +def _dbfs_any_file_has_zero_size(dbfs_contents: Dict) -> bool: any_zeros = any(file["file_size"] == 0 for file in dbfs_contents["files"]) if any_zeros: logger.info("One or more dbfs event log files has a file size of zero")