Skip to content

Commit

Permalink
Add progress reporting for S3 file transfers
Browse files Browse the repository at this point in the history
Only enabled for DEBUG logging.
  • Loading branch information
timj committed Feb 11, 2022
1 parent 531016a commit e47eb1e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
52 changes: 47 additions & 5 deletions python/lsst/resources/s3.py
Expand Up @@ -14,6 +14,7 @@
import logging
import re
import tempfile
import threading

__all__ = ("S3ResourcePath",)

Expand Down Expand Up @@ -83,6 +84,36 @@ def on_exception(func: Callable, *args: Any, **kwargs: Any) -> Callable:
log = logging.getLogger(__name__)


class ProgressPercentage:
"""Progress bar for S3 file uploads."""

log_level = logging.DEBUG
"""Default log level to use when issuing a message."""

def __init__(self, file: ResourcePath, file_for_msg: Optional[ResourcePath] = None, msg: str = ""):
self._filename = file
self._file_for_msg = str(file_for_msg) if file_for_msg is not None else str(file)
self._size = file.size()
self._seen_so_far = 0
self._lock = threading.Lock()
self._msg = msg

def __call__(self, bytes_amount):
# To simplify, assume this is hooked up to a single filename
with self._lock:
self._seen_so_far += bytes_amount
percentage = (100 * self._seen_so_far) // self._size
log.log(
self.log_level,
"%s %s %s / %s (%s%%)",
self._msg,
self._file_for_msg,
self._seen_so_far,
self._size,
percentage,
)


class S3ResourcePath(ResourcePath):
"""S3 URI resource path implementation class."""

Expand Down Expand Up @@ -173,8 +204,15 @@ def _as_local(self) -> Tuple[str, bool]:
"""
with tempfile.NamedTemporaryFile(suffix=self.getExtension(), delete=False) as tmpFile:
with time_this(log, msg="Downloading %s to local file", args=(self,)):
progress = (
ProgressPercentage(self, msg="Downloading:")
if log.isEnabledFor(ProgressPercentage.log_level)
else None
)
try:
self.client.download_fileobj(self.netloc, self.relativeToPathRoot, tmpFile)
self.client.download_fileobj(
self.netloc, self.relativeToPathRoot, tmpFile, Callback=progress
)
except (
ClientError,
self.client.exceptions.NoSuchKey,
Expand Down Expand Up @@ -257,11 +295,15 @@ def transfer_from(
else:
# Use local file and upload it
with src.as_local() as local_uri:

# resource.meta.upload_file seems like the right thing
# but we have a low level client
progress = (
ProgressPercentage(local_uri, file_for_msg=src, msg="Uploading:")
if log.isEnabledFor(ProgressPercentage.log_level)
else None
)
with time_this(log, msg=timer_msg, args=timer_args):
self.client.upload_file(local_uri.ospath, self.netloc, self.relativeToPathRoot)
self.client.upload_file(
local_uri.ospath, self.netloc, self.relativeToPathRoot, Callback=progress
)

# This was an explicit move requested from a remote resource
# try to remove that resource
Expand Down
15 changes: 15 additions & 0 deletions tests/test_s3.py
Expand Up @@ -82,6 +82,21 @@ def test_bucket_fail(self):
with self.assertRaises(FileNotFoundError):
uri.remove()

def test_transfer_progress(self):
"""Test progress bar reporting for upload and download."""
remote = self.root_uri.join("test.dat")
remote.write(b"42")
with ResourcePath.temporary_uri(suffix=".dat") as tmp:
# Download from S3.
with self.assertLogs("lsst.resources", level="DEBUG") as cm:
tmp.transfer_from(remote, transfer="auto")
self.assertRegex("".join(cm.output), r"test\.dat.*100\%")

# Upload to S3.
with self.assertLogs("lsst.resources", level="DEBUG") as cm:
remote.transfer_from(tmp, transfer="auto", overwrite=True)
self.assertRegex("".join(cm.output), rf"{tmp.basename()}.*100\%")


if __name__ == "__main__":
unittest.main()

0 comments on commit e47eb1e

Please sign in to comment.