Skip to content

Commit

Permalink
Move S3-backoff into actual S3 class
Browse files Browse the repository at this point in the history
Some question on which methods get all_errors or just client_errors for
backoff. Also question in JIRA about why not use boto3 native backoff?

For now this at least passes tests...
  • Loading branch information
William Strecker-Kellogg authored and timj committed Nov 4, 2020
1 parent dbb9b5f commit 34bee24
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 59 deletions.
11 changes: 8 additions & 3 deletions python/lsst/daf/butler/core/_butlerUri/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
from http.client import ImproperConnectionState, HTTPException
from urllib3.exceptions import RequestError, HTTPError



if TYPE_CHECKING:
try:
import boto3
Expand Down Expand Up @@ -100,19 +98,22 @@ def client(self) -> boto3.client:
# Defer import for circular dependencies
return getS3Client()

@backoff.on_exception(backoff.expo, retryable_client_errors, max_time=max_retry_time)
def exists(self) -> bool:
if self.is_root:
# Only check for the bucket since the path is irrelevant
return bucketExists(self.netloc)
exists, _ = s3CheckFileExists(self, client=self.client)
return exists

@backoff.on_exception(backoff.expo, retryable_client_errors, max_time=max_retry_time)
def size(self) -> int:
if self.dirLike:
return 0
_, sz = s3CheckFileExists(self, client=self.client)
return sz

@backoff.on_exception(backoff.expo, retryable_client_errors, max_time=max_retry_time)
def remove(self) -> None:
"""Remove the resource."""

Expand All @@ -122,6 +123,7 @@ def remove(self) -> None:
# response all the time
self.client.delete_object(Bucket=self.netloc, Key=self.relativeToPathRoot)

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def read(self, size: int = -1) -> bytes:
args = {}
if size > 0:
Expand All @@ -136,15 +138,16 @@ def read(self, size: int = -1) -> bytes:
response["Body"].close()
return body

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def write(self, data: bytes, overwrite: bool = True) -> None:
if not overwrite:
if self.exists():
raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot,
Body=data)

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def mkdir(self) -> None:
# Defer import for circular dependencies
if not bucketExists(self.netloc):
raise ValueError(f"Bucket {self.netloc} does not exist for {self}!")

Expand All @@ -155,6 +158,7 @@ def mkdir(self) -> None:
if not self.path == "/":
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot)

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def as_local(self) -> Tuple[str, bool]:
"""Download object from S3 and place in temporary directory.
Expand All @@ -169,6 +173,7 @@ def as_local(self) -> Tuple[str, bool]:
self.client.download_fileobj(self.netloc, self.relativeToPathRoot, tmpFile)
return tmpFile.name, True

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def transfer_from(self, src: ButlerURI, transfer: str = "copy",
overwrite: bool = False,
transaction: Optional[Union[DatastoreTransaction, NoTransaction]] = None) -> None:
Expand Down
56 changes: 0 additions & 56 deletions python/lsst/daf/butler/datastores/s3Datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,21 @@

import logging

from botocore.exceptions import ClientError
from http.client import ImproperConnectionState, HTTPException
from urllib3.exceptions import RequestError, HTTPError

from typing import (
TYPE_CHECKING,
Any,
Union,
Callable
)

from lsst.daf.butler import (
DatasetRef,
Location,
StoredFileInfo,
)

from .remoteFileDatastore import RemoteFileDatastore
from lsst.daf.butler.core._butlerUri.s3utils import getS3Client, bucketExists

if TYPE_CHECKING:
from .fileLikeDatastore import DatastoreFileGetInformation
from lsst.daf.butler import DatastoreConfig
from lsst.daf.butler.registry.interfaces import DatastoreRegistryBridgeManager

log = logging.getLogger(__name__)




class S3Datastore(RemoteFileDatastore):
"""Basic S3 Object Storage backed Datastore.
Expand Down Expand Up @@ -98,44 +83,3 @@ def __init__(self, config: Union[DatastoreConfig, str],
# parameters, so for now we do not create a bucket if one is
# missing. Further discussion can make this happen though.
raise IOError(f"Bucket {self.locationFactory.netloc} does not exist!")

@backoff.on_exception(backoff.expo, retryable_client_errors, max_time=max_retry_time)
def _artifact_exists(self, location: Location) -> bool:
"""Check that an artifact exists in this datastore at the specified
location.
Parameters
----------
location : `Location`
Expected location of the artifact associated with this datastore.
Returns
-------
exists : `bool`
True if the location can be found, false otherwise.
"""
# Exists to allow backoff retry
return super()._artifact_exists(location)

@backoff.on_exception(backoff.expo, retryable_client_errors, max_time=max_retry_time)
def _delete_artifact(self, location: Location) -> None:
"""Delete the artifact from the datastore.
Parameters
----------
location : `Location`
Location of the artifact associated with this datastore.
"""
# Exists to allow backoff retry
return super()._delete_artifact(location)

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
ref: DatasetRef, isComponent: bool = False) -> Any:
# Exists to allow backoff retry
return super()._read_artifact_into_memory(getInfo, ref, isComponent)

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) -> StoredFileInfo:
# Exists to allow backoff retry
return super()._write_in_memory_to_artifact(inMemoryDataset, ref)

0 comments on commit 34bee24

Please sign in to comment.