Skip to content

Commit

Permalink
Try to add 429 retry to S3
Browse files Browse the repository at this point in the history
Catch the 429 client error and translate it to something
that backoff can catch.
  • Loading branch information
timj committed Mar 24, 2022
1 parent 4c3dfcb commit b5272a5
Showing 1 changed file with 83 additions and 23 deletions.
106 changes: 83 additions & 23 deletions python/lsst/resources/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ def on_exception(func: Callable, *args: Any, **kwargs: Any) -> Callable:

backoff = Backoff


class _TooManyRequestsException(Exception):
"""Private exception that can be used for 429 retry.
botocore refuses to deal with 429 error itself so issues a generic
ClientError.
"""

pass


# settings for "backoff" retry decorators. these retries are belt-and-
# suspenders along with the retries built into Boto3, to account for
# semantic differences in errors between S3-like providers.
Expand All @@ -64,6 +75,8 @@ def on_exception(func: Callable, *args: Any, **kwargs: Any) -> Callable:
# built-ins
TimeoutError,
ConnectionError,
# private
_TooManyRequestsException,
)

# Client error can include NoSuchKey so retry may not be the right
Expand Down Expand Up @@ -114,6 +127,26 @@ def __call__(self, bytes_amount: int) -> None:
)


def _translate_client_error(err: ClientError) -> None:
"""Translate a ClientError into a specialist error if relevant.
Parameters
----------
err : `ClientError`
Exception to translate.
Raises
------
_TooManyRequestsException
Raised if the `ClientError` looks like a 429 retry request.
"""
if "(429)" in str(err):
# ClientError includes the error code in the message
# but no direct way to access it without looking inside the
# response.
raise _TooManyRequestsException(str(err)) from err


class S3ResourcePath(ResourcePath):
"""S3 URI resource path implementation class."""

Expand Down Expand Up @@ -192,6 +225,23 @@ def mkdir(self) -> None:
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot)

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def _download_file(self, local_file: ResourcePath, progress: Optional[ProgressPercentage]) -> None:
"""Download the remote resource to a local file.
Helper routine for _as_local to allow backoff without regenerating
the temporary file.
"""
try:
self.client.download_fileobj(self.netloc, self.relativeToPathRoot, local_file, Callback=progress)
except (
self.client.exceptions.NoSuchKey,
self.client.exceptions.NoSuchBucket,
) as err:
raise FileNotFoundError(f"No such resource: {self}") from err
except ClientError as err:
_translate_client_error(err)
raise

def _as_local(self) -> Tuple[str, bool]:
"""Download object from S3 and place in temporary directory.
Expand All @@ -209,19 +259,39 @@ def _as_local(self) -> Tuple[str, bool]:
if log.isEnabledFor(ProgressPercentage.log_level)
else None
)
try:
self.client.download_fileobj(
self.netloc, self.relativeToPathRoot, tmpFile, Callback=progress
)
except (
ClientError,
self.client.exceptions.NoSuchKey,
self.client.exceptions.NoSuchBucket,
) as err:
raise FileNotFoundError(f"No such resource: {self}") from err
self._download_file(tmpFile, progress)
return tmpFile.name, True

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def _upload_file(self, local_file: ResourcePath, progress: Optional[ProgressPercentage]) -> None:
"""Upload a local file with backoff.
Helper method to wrap file uploading in backoff for transfer_from.
"""
try:
self.client.upload_file(
local_file.ospath, self.netloc, self.relativeToPathRoot, Callback=progress
)
except self.client.exceptions.NoSuchBucket as err:
raise NotADirectoryError(f"Target does not exist: {err}") from err
except ClientError as err:
_translate_client_error(err)
raise

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def _copy_from(self, src: ResourcePath) -> None:
copy_source = {
"Bucket": src.netloc,
"Key": src.relativeToPathRoot,
}
try:
self.client.copy_object(CopySource=copy_source, Bucket=self.netloc, Key=self.relativeToPathRoot)
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
raise FileNotFoundError("No such resource to transfer: {self}") from err
except ClientError as err:
_translate_client_error(err)
raise

def transfer_from(
self,
src: ResourcePath,
Expand Down Expand Up @@ -281,17 +351,9 @@ def transfer_from(
# Looks like an S3 remote uri so we can use direct copy
# note that boto3.resource.meta.copy is cleverer than the low
# level copy_object
copy_source = {
"Bucket": src.netloc,
"Key": src.relativeToPathRoot,
}
with time_this(log, msg=timer_msg, args=timer_args):
try:
self.client.copy_object(
CopySource=copy_source, Bucket=self.netloc, Key=self.relativeToPathRoot
)
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
raise FileNotFoundError("No such resource to transfer: {self}") from err
self._copy_from(src)

else:
# Use local file and upload it
with src.as_local() as local_uri:
Expand All @@ -301,9 +363,7 @@ def transfer_from(
else None
)
with time_this(log, msg=timer_msg, args=timer_args):
self.client.upload_file(
local_uri.ospath, self.netloc, self.relativeToPathRoot, Callback=progress
)
self._upload_file(local_uri, progress)

# This was an explicit move requested from a remote resource
# try to remove that resource
Expand Down

0 comments on commit b5272a5

Please sign in to comment.