diff --git a/Makefile b/Makefile index a31f45e..1cc1e76 100644 --- a/Makefile +++ b/Makefile @@ -19,3 +19,9 @@ endif .PHONY: tidy tidy: format lint +# Removes the directory that contains bytecode cache files +# that are automatically generated by python. +.PHONY: clean +clean: + find . -type f -name "*.pyc" | xargs rm -fr + find . -type d -name __pycache__ | xargs rm -fr diff --git a/sync/__init__.py b/sync/__init__.py index c44a4a9..4e46d50 100644 --- a/sync/__init__.py +++ b/sync/__init__.py @@ -1,4 +1,4 @@ """Library for leveraging the power of Sync""" -__version__ = "0.5.2" +__version__ = "0.5.3" TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" diff --git a/sync/_databricks.py b/sync/_databricks.py index cb85fd1..27d0131 100644 --- a/sync/_databricks.py +++ b/sync/_databricks.py @@ -1725,6 +1725,25 @@ def _dbfs_directory_has_all_rollover_logs(contents: dict, run_end_time_millis: f ) +def _dbfs_any_file_has_zero_size(dbfs_contents: Dict) -> bool: + any_zeros = any(file["file_size"] == 0 for file in dbfs_contents["files"]) + if any_zeros: + logger.info("One or more dbfs event log files has a file size of zero") + return any_zeros + + +def _check_total_file_size_changed( + last_total_file_size: int, dbfs_contents: Dict +) -> Tuple[bool, int]: + + new_total_file_size = sum([file.get("file_size", 0) for file in dbfs_contents.get("files", {})]) + if new_total_file_size == last_total_file_size: + return False, new_total_file_size + else: + logger.info("Total file size of eventlog directory changed") + return True, new_total_file_size + + def _event_log_poll_duration_seconds(): """Convenience function to aid testing""" return 15 @@ -1820,9 +1839,12 @@ def _get_eventlog_from_dbfs( poll_num_attempts = 0 poll_max_attempts = 20 # 5 minutes / 15 seconds = 20 attempts - while ( + total_file_size = 0 + file_size_changed, total_file_size = _check_total_file_size_changed(0, eventlog_dir) + while (poll_num_attempts < poll_max_attempts) and ( not _dbfs_directory_has_all_rollover_logs(eventlog_dir, run_end_time_millis) - and poll_num_attempts < poll_max_attempts + or _dbfs_any_file_has_zero_size(eventlog_dir) + or file_size_changed ): if poll_num_attempts > 0: logger.info( @@ -1831,6 +1853,10 @@ def _get_eventlog_from_dbfs( sleep(poll_duration_seconds) eventlog_dir = dbx_client.list_dbfs_directory(matching_subdirectory["path"]) + file_size_changed, total_file_size = _check_total_file_size_changed( + total_file_size, eventlog_dir + ) + poll_num_attempts += 1 eventlog_zip = io.BytesIO()