Skip to content

Commit

Permalink
Change Storage client to use exponential backoff. (#107)
Browse files Browse the repository at this point in the history
* use exponential backoff for retrying storage errors.

removes support for Storage.backoff, and instead each retry calls get_backoff(attempts), which calculates the exponential backoff for the given attempt number.

* set default max_backoff based on settings.STORAGE_MAX_BACKOFF, a new env var.

* include the UUID in file upload path to prevent 429s on the same object.

* TrackingConsumer: put UUID in beginning of filepath and use same UUID for each frame of a tiff stack.
  • Loading branch information
willgraf committed May 4, 2020
1 parent 9735932 commit c709b49
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 35 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ The consumer is configured using environment variables. Please find a table of a
| `REDIS_TIMEOUT` | Timeout for each Redis request, in seconds. | `3` |
| `EMPTY_QUEUE_TIMEOUT` | Time to wait after finding an empty queue, in seconds. | `5` |
| `DO_NOTHING_TIMEOUT` | Time to wait after finding an item that requires no work, in seconds. | `0.5` |
| `STORAGE_MAX_BACKOFF` | Maximum time to wait before retrying a Storage request | `60` |
| `EXPIRE_TIME` | Expire Redis items this many seconds after completion. | `3600` |
| `METADATA_EXPIRE_TIME` | Expire cached model metadata after this many seconds. | `30` |
| `TF_HOST` | The IP address or hostname of TensorFlow Serving. | `"tf-serving"` |
Expand Down
4 changes: 3 additions & 1 deletion redis_consumer/consumers/base_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,15 @@ def is_valid_hash(self, redis_hash):
def _upload_archived_images(self, hvalues, redis_hash):
"""Extract all image files and upload them to storage and redis"""
all_hashes = set()
archive_uuid = uuid.uuid4().hex
with utils.get_tempdir() as tempdir:
fname = self.storage.download(hvalues.get('input_file_name'), tempdir)
image_files = utils.get_image_files_from_dir(fname, tempdir)
for i, imfile in enumerate(image_files):

clean_imfile = settings._strip(imfile.replace(tempdir, ''))
# Save each result channel as an image file
subdir = os.path.dirname(clean_imfile)
subdir = os.path.join(archive_uuid, os.path.dirname(clean_imfile))
dest, _ = self.storage.upload(imfile, subdir=subdir)

os.remove(imfile) # remove the file to save some memory
Expand Down
5 changes: 3 additions & 2 deletions redis_consumer/consumers/tracking_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@ def _load_data(self, redis_hash, subdir, fname):
self.logger.debug('tiffstack num_frames %s.', num_frames)

with utils.get_tempdir() as tempdir:
uid = uuid.uuid4().hex
for (i, img) in enumerate(tiff_stack):
# make a file name for this frame
segment_fname = '{}-tracking-frame-{}-{}.tif'.format(
hvalues.get('original_name'), i, uuid.uuid4().hex)
segment_fname = '{}-{}-tracking-frame-{}.tif'.format(
uid, hvalues.get('original_name'), i)
segment_local_path = os.path.join(tempdir, segment_fname)

# upload it
Expand Down
1 change: 1 addition & 0 deletions redis_consumer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def _strip(x):
REDIS_TIMEOUT = config('REDIS_TIMEOUT', default=3, cast=int)
EMPTY_QUEUE_TIMEOUT = config('EMPTY_QUEUE_TIMEOUT', default=5, cast=int)
DO_NOTHING_TIMEOUT = config('DO_NOTHING_TIMEOUT', default=0.5, cast=float)
STORAGE_MAX_BACKOFF = config('STORAGE_MAX_BACKOFF', default=60, cast=float)

# Cloud storage
CLOUD_PROVIDER = config('CLOUD_PROVIDER', cast=str, default='gke').lower()
Expand Down
52 changes: 36 additions & 16 deletions redis_consumer/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import requests

from redis_consumer import settings
from redis_consumer.settings import DOWNLOAD_DIR


class StorageException(Exception):
Expand Down Expand Up @@ -81,14 +80,21 @@ class Storage(object):
download_dir: path to local directory to save downloaded files
"""

def __init__(self, bucket, download_dir=DOWNLOAD_DIR, backoff=None):
def __init__(self, bucket,
download_dir=settings.DOWNLOAD_DIR,
max_backoff=settings.STORAGE_MAX_BACKOFF):
self.bucket = bucket
self.download_dir = download_dir
self.output_dir = 'output'
if backoff is None:
backoff = random.randint(10, 31) / 10.0
self.backoff = backoff
self.logger = logging.getLogger(str(self.__class__.__name__))
self.max_backoff = max_backoff

def get_backoff(self, attempts):
"""Get backoff time based on previous number of attempts"""
milis = random.randint(1, 1000) / 1000
exponential = 2 ** attempts + milis
backoff = min(exponential, self.max_backoff)
return backoff

def get_storage_client(self):
"""Returns the storage API client"""
Expand Down Expand Up @@ -144,8 +150,10 @@ class GoogleStorage(Storage):
download_dir: path to local directory to save downloaded files
"""

def __init__(self, bucket, download_dir=DOWNLOAD_DIR, backoff=1.5):
super(GoogleStorage, self).__init__(bucket, download_dir, backoff)
def __init__(self, bucket,
download_dir=settings.DOWNLOAD_DIR,
max_backoff=settings.STORAGE_MAX_BACKOFF):
super(GoogleStorage, self).__init__(bucket, download_dir, max_backoff)
self.bucket_url = 'www.googleapis.com/storage/v1/b/{}/o'.format(bucket)
self._network_errors = (
socket.gaierror,
Expand All @@ -169,10 +177,11 @@ def get_storage_client(self):
return google_storage.Client()
except OSError as err:
if attempts < 3:
backoff = self.get_backoff(attempts)
attempts += 1
self.logger.warning('Encountered error while creating '
'storage client: %s', err)
time.sleep(self.backoff)
time.sleep(backoff)
else:
raise err

Expand All @@ -186,6 +195,7 @@ def get_public_url(self, filepath):
url: Public URL to download the file
"""
retrying = True
attempts = 0
while retrying:
try:
client = self.get_storage_client()
Expand All @@ -196,10 +206,12 @@ def get_public_url(self, filepath):
return blob.public_url

except self._network_errors as err:
backoff = self.get_backoff(attempts)
self.logger.warning('Encountered %s: %s. Backing off for %s '
'seconds...', type(err).__name__, err,
self.backoff)
time.sleep(self.backoff)
backoff)
time.sleep(backoff)
attempts += 1
retrying = True # Unneccessary but explicit

except Exception as err:
Expand All @@ -220,6 +232,7 @@ def upload(self, filepath, subdir=None):
start = timeit.default_timer()
self.logger.debug('Uploading %s to bucket %s.', filepath, self.bucket)
retrying = True
attempts = 0
while retrying:
client = self.get_storage_client()
try:
Expand All @@ -238,10 +251,12 @@ def upload(self, filepath, subdir=None):
retrying = False
return dest, blob.public_url
except self._network_errors as err:
backoff = self.get_backoff(attempts)
self.logger.warning('Encountered %s: %s. Backing off for %s '
'seconds...', type(err).__name__, err,
self.backoff)
time.sleep(self.backoff)
backoff)
time.sleep(backoff)
attempts += 1
retrying = True # Unneccessary but explicit

except Exception as err:
Expand All @@ -263,6 +278,7 @@ def download(self, filepath, download_dir=None):
dest = self.get_download_path(filepath, download_dir)
self.logger.debug('Downloading %s to %s.', filepath, dest)
retrying = True
attempts = 0
while retrying:
client = self.get_storage_client()
try:
Expand All @@ -275,10 +291,12 @@ def download(self, filepath, download_dir=None):
return dest

except self._network_errors as err:
backoff = self.get_backoff(attempts)
self.logger.warning('Encountered %s: %s. Backing off for %s '
'seconds and...', type(err).__name__, err,
self.backoff)
time.sleep(self.backoff)
backoff)
time.sleep(backoff)
attempts += 1
retrying = True # Unneccessary but explicit

except Exception as err:
Expand All @@ -296,8 +314,10 @@ class S3Storage(Storage):
download_dir: path to local directory to save downloaded files
"""

def __init__(self, bucket, download_dir=DOWNLOAD_DIR, backoff=1.5):
super(S3Storage, self).__init__(bucket, download_dir, backoff)
def __init__(self, bucket,
download_dir=settings.DOWNLOAD_DIR,
max_backoff=settings.STORAGE_MAX_BACKOFF):
super(S3Storage, self).__init__(bucket, download_dir, max_backoff)
self.bucket_url = 's3.amazonaws.com/{}'.format(bucket)

def get_storage_client(self):
Expand Down
77 changes: 61 additions & 16 deletions redis_consumer/storage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@
from redis_consumer import utils


# global var forcing storage clients to throw an error
global storage_error
storage_error = True

global critical_error
critical_error = False


class DummyGoogleClient(object):
public_url = 'public-url'
fail_tolerance = 2
fail_count = 0

def get_bucket(self, *_, **__):
return self
Expand All @@ -51,18 +57,33 @@ def blob(self, *_, **__):
return self

def make_public(self, *_, **__):
global storage_error
global critical_error

if critical_error:
raise Exception('critical-error-thrown-on-purpose')

if storage_error:
storage_error = False
raise TooManyRequests('thrown-on-purpose')
storage_error = True
return self

def upload_from_filename(self, dest, **_):
# if self.fail_count < self.fail_tolerance:
# self.fail_count += 1
# raise TooManyRequests('thrown-on-purpose')
global storage_error

if storage_error:
storage_error = False
raise TooManyRequests('thrown-on-purpose')
storage_error = True
assert os.path.exists(dest)

def download_to_filename(self, dest, **_):
# if self.fail_count < self.fail_tolerance:
# self.fail_count += 1
# raise TooManyRequests('thrown-on-purpose')
global storage_error
if storage_error:
storage_error = False
raise TooManyRequests('thrown-on-purpose')
storage_error = True
assert dest.endswith('/test/file.txt')


Expand Down Expand Up @@ -91,6 +112,18 @@ def test_get_client():

class TestStorage(object):

def test_get_backoff(self):
max_backoff = 30
client = storage.Storage('bucket', max_backoff=max_backoff)
backoff = client.get_backoff(attempts=0)
assert 1 < backoff < 2

backoff = client.get_backoff(attempts=3)
assert 8 < backoff < 9

backoff = client.get_backoff(attempts=5)
assert backoff == max_backoff

def test_get_download_path(self):
with utils.get_tempdir() as tempdir:
bucket = 'test-bucket'
Expand All @@ -110,20 +143,31 @@ class TestGoogleStorage(object):

def test_get_public_url(self):
with utils.get_tempdir() as tempdir:
bucket = 'test-bucket'
stg_cls = storage.GoogleStorage
stg_cls.get_storage_client = DummyGoogleClient
stg = stg_cls(bucket, tempdir, backoff=0)
url = stg.get_public_url('test')
assert url == 'public-url'
with tempfile.NamedTemporaryFile(dir=tempdir) as temp:
bucket = 'test-bucket'
stg_cls = storage.GoogleStorage
stg_cls.get_storage_client = DummyGoogleClient
stg = stg_cls(bucket, tempdir)
stg.get_backoff = lambda x: 0
url = stg.get_public_url(temp.name)
assert url == 'public-url'

# test bad filename
global critical_error
with pytest.raises(Exception):
critical_error = True
# client.make_public() raises error.
stg.get_public_url('file-does-not-exist')
critical_error = False

def test_upload(self):
with utils.get_tempdir() as tempdir:
with tempfile.NamedTemporaryFile(dir=tempdir) as temp:
bucket = 'test-bucket'
stg_cls = storage.GoogleStorage
stg_cls.get_storage_client = DummyGoogleClient
stg = stg_cls(bucket, tempdir, backoff=0)
stg = stg_cls(bucket, tempdir)
stg.get_backoff = lambda x: 0

# test succesful upload
dest, url = stg.upload(temp.name)
Expand All @@ -146,7 +190,8 @@ def test_download(self):
bucket = 'test-bucket'
stg_cls = storage.GoogleStorage
stg_cls.get_storage_client = DummyGoogleClient
stg = stg_cls(bucket, tempdir, backoff=0)
stg = stg_cls(bucket, tempdir)
stg.get_backoff = lambda x: 0

# test succesful download
dest = stg.download(remote_file, tempdir)
Expand Down

0 comments on commit c709b49

Please sign in to comment.