Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31295: Cache boto client and add timer messages to logs for some file I/O #552

Merged
merged 5 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/cli/butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ def _getPluginCommands(cls):
with open(pluginName, "r") as resourceFile:
resources = defaultdict(list, yaml.safe_load(resourceFile))
except Exception as err:
log.warning(f"Error loading commands from {pluginName}, skipping. {err}")
log.warning("Error loading commands from %s, skipping. %s", pluginName, err)
continue
if "cmd" not in resources:
log.warning(f"No commands found in {pluginName}, skipping.")
log.warning("No commands found in %s, skipping.", pluginName)
continue
pluginCommands = {cmd: [resources["cmd"]["import"]] for cmd in resources["cmd"]["commands"]}
cls._mergeCommandLists(commands, defaultdict(list, pluginCommands))
Expand Down
7 changes: 5 additions & 2 deletions python/lsst/daf/butler/core/_butlerUri/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,11 @@ def transfer_from(self, src: ButlerURI, transfer: str,
if transfer not in self.transferModes:
raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}")

log.debug(f"Transferring {src} [exists: {src.exists()}] -> "
f"{self} [exists: {self.exists()}] (transfer={transfer})")
# Existence checks can take time so only try if the log message
timj marked this conversation as resolved.
Show resolved Hide resolved
# will be issued.
if log.isEnabledFor(logging.DEBUG):
log.debug("Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
src, src.exists(), self, self.exists(), transfer)

# We do not have to special case ButlerFileURI here because
# as_local handles that.
Expand Down
44 changes: 26 additions & 18 deletions python/lsst/daf/butler/core/_butlerUri/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
Union,
)

from ..utils import time_this
from .utils import NoTransaction
from ._butlerUri import ButlerURI

Expand Down Expand Up @@ -327,8 +328,9 @@ def _as_local(self) -> Tuple[str, bool]:
if r.status_code != 200:
raise FileNotFoundError(f"Unable to download resource {self}; status code: {r.status_code}")
with tempfile.NamedTemporaryFile(suffix=self.getExtension(), delete=False) as tmpFile:
for chunk in r.iter_content():
tmpFile.write(chunk)
with time_this(log, msg="Downloading %s to local file", args=(self,)):
for chunk in r.iter_content():
tmpFile.write(chunk)
return tmpFile.name, True

def read(self, size: int = -1) -> bytes:
Expand All @@ -342,7 +344,8 @@ def read(self, size: int = -1) -> bytes:
"""
log.debug("Reading from remote resource: %s", self.geturl())
timj marked this conversation as resolved.
Show resolved Hide resolved
stream = True if size > 0 else False
r = self.session.get(self.geturl(), stream=stream, timeout=TIMEOUT)
with time_this(log, msg="Read from remote resource %s", args=(self,)):
r = self.session.get(self.geturl(), stream=stream, timeout=TIMEOUT)
if r.status_code != 200:
raise FileNotFoundError(f"Unable to read resource {self}; status code: {r.status_code}")
if not stream:
Expand All @@ -367,7 +370,8 @@ def write(self, data: bytes, overwrite: bool = True) -> None:
if self.exists():
raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
dest_url = finalurl(self._emptyPut())
r = self.session.put(dest_url, data=data, timeout=TIMEOUT)
with time_this(log, msg="Write data to remote %s", args=(self,)):
r = self.session.put(dest_url, data=data, timeout=TIMEOUT)
if r.status_code not in [201, 202, 204]:
raise ValueError(f"Can not write file {self}, status code: {r.status_code}")

Expand All @@ -390,8 +394,11 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
if transfer not in self.transferModes:
raise ValueError(f"Transfer mode {transfer} not supported by URI scheme {self.scheme}")

log.debug(f"Transferring {src} [exists: {src.exists()}] -> "
f"{self} [exists: {self.exists()}] (transfer={transfer})")
# Existence checks cost time so do not call this unless we know
# that debugging is enabled.
if log.isEnabledFor(logging.DEBUG):
log.debug("Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
src, src.exists(), self, self.exists(), transfer)

if self.exists():
raise FileExistsError(f"Destination path {self} already exists.")
Expand All @@ -404,23 +411,24 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
if not self.is_webdav_endpoint:
raise NotImplementedError("Endpoint does not implement WebDAV functionality")

if transfer == "move":
r = self.session.request("MOVE", src.geturl(),
headers={"Destination": self.geturl()},
timeout=TIMEOUT)
log.debug("Running move via MOVE HTTP request.")
else:
r = self.session.request("COPY", src.geturl(),
headers={"Destination": self.geturl()},
timeout=TIMEOUT)
log.debug("Running copy via COPY HTTP request.")
with time_this(log, msg="Transfer from %s to %s directly", args=(src, self)):
if transfer == "move":
r = self.session.request("MOVE", src.geturl(),
headers={"Destination": self.geturl()},
timeout=TIMEOUT)
log.debug("Running move via MOVE HTTP request.")
else:
r = self.session.request("COPY", src.geturl(),
headers={"Destination": self.geturl()},
timeout=TIMEOUT)
log.debug("Running copy via COPY HTTP request.")
else:
# Use local file and upload it
with src.as_local() as local_uri:
with open(local_uri.ospath, "rb") as f:
dest_url = finalurl(self._emptyPut())
r = self.session.put(dest_url, data=f, timeout=TIMEOUT)
log.debug("Uploading URI %s to %s via local file", src, self)
with time_this(log, msg="Transfer from %s to %s via local file", args=(src, self)):
r = self.session.put(dest_url, data=f, timeout=TIMEOUT)

if r.status_code not in [201, 202, 204]:
raise ValueError(f"Can not transfer file {self}, status code: {r.status_code}")
Expand Down
33 changes: 23 additions & 10 deletions python/lsst/daf/butler/core/_butlerUri/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
Union,
)

from ..utils import time_this
from .utils import NoTransaction
from ._butlerUri import ButlerURI
from .s3utils import getS3Client, s3CheckFileExists, bucketExists
Expand Down Expand Up @@ -139,7 +140,8 @@ def read(self, size: int = -1) -> bytes:
**args)
except (self.client.exceptions.NoSuchKey, self.client.exceptions.NoSuchBucket) as err:
raise FileNotFoundError(f"No such resource: {self}") from err
body = response["Body"].read()
with time_this(log, msg="Read from %s", args=(self,)):
body = response["Body"].read()
response["Body"].close()
return body

Expand All @@ -149,8 +151,9 @@ def write(self, data: bytes, overwrite: bool = True) -> None:
if not overwrite:
if self.exists():
raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot,
Body=data)
with time_this(log, msg="Write to %s", args=(self,)):
self.client.put_object(Bucket=self.netloc, Key=self.relativeToPathRoot,
Body=data)

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
def mkdir(self) -> None:
Expand All @@ -177,7 +180,8 @@ def _as_local(self) -> Tuple[str, bool]:
Always returns `True`. This is always a temporary file.
"""
with tempfile.NamedTemporaryFile(suffix=self.getExtension(), delete=False) as tmpFile:
self.client.download_fileobj(self.netloc, self.relativeToPathRoot, tmpFile)
with time_this(log, msg="Downloading %s to local file", args=(self,)):
self.client.download_fileobj(self.netloc, self.relativeToPathRoot, tmpFile)
return tmpFile.name, True

@backoff.on_exception(backoff.expo, all_retryable_errors, max_time=max_retry_time)
Expand All @@ -202,15 +206,21 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
if transfer not in self.transferModes:
raise ValueError(f"Transfer mode '{transfer}' not supported by URI scheme {self.scheme}")

log.debug(f"Transferring {src} [exists: {src.exists()}] -> "
f"{self} [exists: {self.exists()}] (transfer={transfer})")
# Existence checks cost time so do not call this unless we know
# that debugging is enabled.
if log.isEnabledFor(logging.DEBUG):
log.debug("Transferring %s [exists: %s] -> %s [exists: %s] (transfer=%s)",
src, src.exists(), self, self.exists(), transfer)

if not overwrite and self.exists():
raise FileExistsError(f"Destination path '{self}' already exists.")

if transfer == "auto":
transfer = self.transferDefault

timer_msg = "Transfer from %s to %s"
timer_args = (src, self)

if isinstance(src, type(self)):
# Looks like an S3 remote uri so we can use direct copy
# note that boto3.resource.meta.copy is cleverer than the low
Expand All @@ -219,16 +229,19 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
"Bucket": src.netloc,
"Key": src.relativeToPathRoot,
}
self.client.copy_object(CopySource=copy_source, Bucket=self.netloc, Key=self.relativeToPathRoot)
with time_this(log, msg=timer_msg, args=timer_args):
self.client.copy_object(CopySource=copy_source, Bucket=self.netloc,
Key=self.relativeToPathRoot)
else:
# Use local file and upload it
with src.as_local() as local_uri:

# resource.meta.upload_file seems like the right thing
# but we have a low level client
with open(local_uri.ospath, "rb") as fh:
self.client.put_object(Bucket=self.netloc,
Key=self.relativeToPathRoot, Body=fh)
with time_this(log, msg=timer_msg, args=timer_args):
with open(local_uri.ospath, "rb") as fh:
self.client.put_object(Bucket=self.netloc,
Key=self.relativeToPathRoot, Body=fh)

# This was an explicit move requested from a remote resource
# try to remove that resource
Expand Down
7 changes: 7 additions & 0 deletions python/lsst/daf/butler/core/_butlerUri/s3utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
__all__ = ("getS3Client", "s3CheckFileExists", "bucketExists", "setAwsEnvCredentials",
"unsetAwsEnvCredentials")

import functools
import os

from typing import (
Expand Down Expand Up @@ -70,6 +71,12 @@ def getS3Client() -> boto3.client:
if not endpoint:
endpoint = None # Handle ""

return _get_s3_client(endpoint)


@functools.lru_cache()
def _get_s3_client(endpoint: str) -> boto3.client:
# Helper function to cache the client for this endpoint
config = botocore.config.Config(
read_timeout=180,
retries={
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ def names(self, topLevelOnly=False, delimiter=None):
# works.
ntries = 0
while delimiter in combined:
log.debug(f"Delimiter '{delimiter}' could not be used. Trying another.")
log.debug("Delimiter '%s' could not be used. Trying another.", delimiter)
ntries += 1

if ntries > 100:
Expand All @@ -785,7 +785,7 @@ def names(self, topLevelOnly=False, delimiter=None):
if not delimiter.isalnum():
break

log.debug(f"Using delimiter {delimiter!r}")
log.debug("Using delimiter %r", delimiter)

# Form the keys, escaping the delimiter if necessary
strings = [delimiter + delimiter.join(str(s).replace(delimiter, f"\\{delimiter}") for s in k)
Expand Down
59 changes: 59 additions & 0 deletions python/lsst/daf/butler/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"safeMakeDir",
"Singleton",
"stripIfNotNone",
"time_this",
"transactional",
)

Expand All @@ -40,7 +41,9 @@
import fnmatch
import functools
import logging
import time
import re
from contextlib import contextmanager
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -475,3 +478,59 @@ def isplit(string: T, sep: T) -> Iterator[T]:
return
yield string[begin:end]
begin = end + 1


@contextmanager
def time_this(log: Optional[logging.Logger] = None, msg: Optional[str] = None,
level: int = logging.DEBUG, prefix: Optional[str] = "timer",
args: Iterable[Any] = ()) -> Iterator[None]:
"""Time the enclosed block and issue a log message.

Parameters
----------
log : `logging.Logger`, optional
Logger to use to report the timer message. The root logger will
be used if none is given.
msg : `str`, optional
Context to include in log message.
level : `int`, optional
Python logging level to use to issue the log message. If the
code block raises an exception the log message will automatically
switch to level ERROR.
prefix : `str`, optional
Prefix to use to prepend to the supplied logger to
create a new logger to use instead. No prefix is used if the value
is set to `None`. Defaults to "timer".
args : iterable of any
Additional parameters passed to the log command that should be
written to ``msg``.
"""
if log is None:
log = logging.getLogger()
if prefix:
log_name = f"{prefix}.{log.name}" if not isinstance(log, logging.RootLogger) else prefix
log = logging.getLogger(log_name)

success = False
start = time.time()
try:
yield
success = True
finally:
end = time.time()

# The message is pre-inserted to allow the logger to expand
# the additional args provided. Make that easier by converting
# the None message to empty string.
if msg is None:
msg = ""

if not success:
# Something went wrong so change the log level to indicate
# this.
level = logging.ERROR

# Specify stacklevel to ensure the message is reported from the
# caller (1 is this file, 2 is contextlib, 3 is user)
log.log(level, msg + "%sTook %.4f seconds", *args,
": " if msg else "", end - start, stacklevel=3)
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def remove(self, ref: DatasetRef) -> None:
Attempt to remove a dataset that does not exist. Raised if none
of the child datastores removed the dataset.
"""
log.debug(f"Removing {ref}")
log.debug("Removing %s", ref)
self.trash(ref, ignore_errors=False)
self.emptyTrash(ignore_errors=False)

Expand Down
14 changes: 9 additions & 5 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
)

from lsst.daf.butler.core.repoRelocation import replaceRoot
from lsst.daf.butler.core.utils import getInstanceOf, getClassOf, transactional
from lsst.daf.butler.core.utils import getInstanceOf, getClassOf, transactional, time_this
from .genericDatastore import GenericBaseDatastore

if TYPE_CHECKING:
Expand Down Expand Up @@ -1116,7 +1116,8 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
formatter = getInfo.formatter
nbytes_max = 10_000_000 # Arbitrary number that we can tune
if resource_size <= nbytes_max and formatter.can_read_bytes():
serializedDataset = uri.read()
with time_this(log, msg="Reading bytes from %s", args=(uri,)):
serializedDataset = uri.read()
log.debug("Deserializing %s from %d bytes from location %s with formatter %s",
f"component {getInfo.component}" if isComponent else "",
len(serializedDataset), uri, formatter.name())
Expand Down Expand Up @@ -1168,7 +1169,10 @@ def _read_artifact_into_memory(self, getInfo: DatastoreFileGetInformation,
uri, msg, formatter.name())
try:
with formatter._updateLocation(newLocation):
result = formatter.read(component=getInfo.component if isComponent else None)
with time_this(log, msg="Reading%s from location %s %s with formatter %s",
args=(f" component {getInfo.component}" if isComponent else "",
uri, msg, formatter.name())):
result = formatter.read(component=getInfo.component if isComponent else None)
except Exception as e:
raise ValueError(f"Failure from formatter '{formatter.name()}' for dataset {ref.id}"
f" ({ref.datasetType.name} from {uri}): {e}") from e
Expand Down Expand Up @@ -1703,8 +1707,8 @@ def trash(self, ref: Union[DatasetRef, Iterable[DatasetRef]], ignore_errors: boo
self.bridge.moveToTrash([ref])
except Exception as e:
if ignore_errors:
log.warning(f"Attempted to mark dataset ({ref}) to be trashed in datastore {self.name} "
f"but encountered an error: {e}")
log.warning("Attempted to mark dataset (%s) to be trashed in datastore %s "
"but encountered an error: %s", ref, self.name, e)
pass
else:
raise
Expand Down