Skip to content

Commit

Permalink
remote.s3: Streaming compression of uploaded files
Browse files Browse the repository at this point in the history
The previous implementation compressed the entire file contents in
memory, which bloats the memory usage of the CLI unnecessarily,
particularly for large files.

I'd wanted a streaming implementation originally, but to my surprise
there wasn't an appropriate existing one available.
  • Loading branch information
tsibley committed Dec 12, 2019
1 parent e268bc1 commit c55d1a3
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 21 deletions.
57 changes: 57 additions & 0 deletions nextstrain/cli/gzip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Gzip stream utilities.
"""
import zlib
from io import BufferedIOBase
from typing import BinaryIO


class GzipCompressingReader(BufferedIOBase):
"""
Compress a data stream as it is being read.
The constructor takes an existing, readable byte *stream*. Calls to this
class's :meth:`.read` method will read data from the source *stream* and
return a compressed copy.
"""
def __init__(self, stream: BinaryIO):
if not stream.readable():
raise ValueError('"stream" argument must be readable.')

self.stream = stream
self.__gzip = zlib.compressobj(
level = zlib.Z_BEST_COMPRESSION,
wbits = 16 + zlib.MAX_WBITS, # Offset of 16 is gzip encapsulation
memLevel = 9, # Memory is ~cheap; use it for better compression
)

def readable(self):
return True

def read(self, size = None):
return self._compress(self.stream.read(size))

def read1(self, size = None):
return self._compress(self.stream.read1(size)) # type: ignore

def _compress(self, data: bytes):
if self.__gzip:
if data:
return self.__gzip.compress(data)
else:
# EOF on underlying stream, flush any remaining compressed
# data. On the next call, we'll return EOF too.
try:
return self.__gzip.flush(zlib.Z_FINISH)
finally:
self.__gzip = None # type: ignore
else:
# Already hit EOF on the underlying stream and flushed.
return b''

def close(self):
if self.stream:
try:
self.stream.close()
finally:
self.stream = None
23 changes: 2 additions & 21 deletions nextstrain/cli/remote/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@
import boto3
import mimetypes
import re
import shutil
import urllib.parse
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError, WaiterError
from gzip import GzipFile
from io import BytesIO
from os.path import commonprefix
from pathlib import Path
from textwrap import dedent
from time import time
from typing import List, Tuple
from .. import aws
from ..gzip import GzipCompressingReader
from ..util import warn, remove_prefix
from ..errors import UserError

Expand Down Expand Up @@ -97,7 +95,7 @@ def upload(local_files: List[Path], bucket, prefix: str) -> List[str]:
print("Deploying", local_file, "as", remote_file)

# Upload compressed data
with local_file.open("rb") as data, gzip_stream(data) as gzdata:
with GzipCompressingReader(local_file.open("rb")) as gzdata:
bucket.upload_fileobj(
gzdata,
remote_file,
Expand All @@ -106,23 +104,6 @@ def upload(local_files: List[Path], bucket, prefix: str) -> List[str]:
return [ remote for local, remote in files ]


def gzip_stream(stream):
"""
Takes an IO stream and compresses it in-memory with gzip. Returns a
BytesIO stream of compressed data.
"""
gzstream = BytesIO()

# Pass the original contents through gzip into memory
with GzipFile(fileobj = gzstream, mode = "wb") as gzfile:
shutil.copyfileobj(stream, gzfile)

# Re-seek the compressed data to the start
gzstream.seek(0)

return gzstream


def content_type(path: Path) -> str:
"""
Guess the content type of *path* from its name.
Expand Down

0 comments on commit c55d1a3

Please sign in to comment.