Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upload/download tracker bugfixes #184

Merged
merged 6 commits into from Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -142,3 +142,6 @@ toolchest_client/demo.py

# Default temporary directory for splitting input files
temp_toolchest*

# Default directories for local runs of integration tests
test_*
46 changes: 33 additions & 13 deletions toolchest_client/files/s3.py
Expand Up @@ -4,6 +4,7 @@

Functions for handling files in AWS S3 buckets.
"""
import math
import os.path
import sys
import threading
Expand Down Expand Up @@ -103,11 +104,20 @@ def inputs_are_in_s3(input_paths):
return [path_is_s3_uri(file_path) for file_path in input_paths]


# Slightly modified from https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/s3/transfer.html
def pretty_print_file_size(num_bytes):
"""Returns a pretty formatted number of bytes (e.g. 1.80MB)

:param num_bytes size of file in bytes
"""
pretty_abbreviation = ["B", "KB", "MB", "GB", "TB"]
abbreviation_index = math.floor(math.log(num_bytes, 1024))
return f"{(num_bytes / (1024 ** abbreviation_index)):.1f}{pretty_abbreviation[abbreviation_index]}"


class UploadTracker:
def __init__(self, file_path):
self._filename = os.path.basename(file_path)
self._size = float(os.path.getsize(file_path))
self._size = float(os.path.getsize(file_path)) # tracker only used for local files
self._seen_so_far = 0
self._lock = threading.Lock()

Expand All @@ -117,11 +127,16 @@ def __call__(self, bytes_amount):
with self._lock:
self._seen_so_far += bytes_amount
percentage = round((self._seen_so_far / self._size) * 100, 2)
sys.stdout.write(
"\r%s %s / %s bytes (%.2f%%)" % (
self._filename, self._seen_so_far, self._size,
percentage))
sys.stdout.flush()
print(
"\r{} {} of {} ({:.2f}%)".format(
self._filename,
pretty_print_file_size(self._seen_so_far),
pretty_print_file_size(self._size),
percentage
).ljust(100), # pads right end with spaces to flush carriage return
end="",
flush=True,
)
if percentage == 100.00: # Adds newline at end of upload
print()

Expand All @@ -139,10 +154,15 @@ def __call__(self, bytes_amount):
with self._lock:
self._seen_so_far += bytes_amount
percentage = round((self._seen_so_far / self._size) * 100, 2)
sys.stdout.write(
"\r%s %s / %s bytes (%.2f%%)" % (
self._filename, self._seen_so_far, self._size,
percentage))
sys.stdout.flush()
if percentage == 100.00: # Adds newline at end of upload
print(
"\r{} {} of {} ({:.2f}%)".format(
self._filename,
pretty_print_file_size(self._seen_so_far),
pretty_print_file_size(self._size),
percentage
).ljust(100), # pads right end with spaces to flush carriage return
end="",
flush=True,
)
if percentage == 100.00: # Adds newline at end of download
print()
4 changes: 2 additions & 2 deletions toolchest_client/tools/tool.py
Expand Up @@ -317,8 +317,8 @@ def _wait_for_threads_to_finish(self, check_health=True):
for thread in self.query_threads:
thread_name = thread.getName()
statuses.append(self.query_thread_statuses.get(thread_name))
uploading = all(
map(lambda status: status not in [ThreadStatus.UPLOADING, ThreadStatus.INITIALIZED], statuses)
uploading = any(
map(lambda status: status in [ThreadStatus.INITIALIZED, ThreadStatus.UPLOADING], statuses)
)
time.sleep(5)
print("Finished spawning jobs.")
Expand Down