Skip to content

Commit

Permalink
Migrate utility functions for butlerUri into folder
Browse files Browse the repository at this point in the history
Merge webdavutils.py into http.py for _butlerUri and simply move s3utils
into the folder for now. Someday we can perhaps make s3utils go away but
not today...
  • Loading branch information
William Strecker-Kellogg committed Oct 30, 2020
1 parent e86affe commit b8e3527
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 330 deletions.
2 changes: 0 additions & 2 deletions python/lsst/daf/butler/core/_butlerUri/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Should only expose ButlerURI

from ._butlerUri import ButlerURI
__all__ = ("ButlerURI",)

2 changes: 1 addition & 1 deletion python/lsst/daf/butler/core/_butlerUri/_butlerUri.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def __new__(cls, uri: Union[str, urllib.parse.ParseResult, ButlerURI],
from .http import ButlerHttpURI
subclass = ButlerHttpURI
elif parsed.scheme == "resource":
# Rules for scheme names disasllow pkg_resource
# Rules for scheme names disallow pkg_resource
from .packageresource import ButlerPackageResourceURI
subclass = ButlerPackageResourceURI
elif parsed.scheme == "mem":
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/core/_butlerUri/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import copy
import logging

__all__ = ('ButlerFileURI',)

from typing import (
TYPE_CHECKING,
cast,
Expand Down
270 changes: 267 additions & 3 deletions python/lsst/daf/butler/core/_butlerUri/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import tempfile
import logging

__all__ = ('ButlerHttpURI', )

from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

from typing import (
TYPE_CHECKING,
Optional,
Expand All @@ -36,13 +41,275 @@

from .utils import NoTransaction
from ._butlerUri import ButlerURI
from ..location import Location

if TYPE_CHECKING:
from ..datastore import DatastoreTransaction

log = logging.getLogger(__name__)


def getHttpSession() -> requests.Session:
"""Create a requests.Session pre-configured with environment variable data
Returns
-------
session : `requests.Session`
An http session used to execute requests.
Notes
-----
The following environment variables must be set:
- LSST_BUTLER_WEBDAV_CA_BUNDLE: the directory where CA
certificates are stored if you intend to use HTTPS to
communicate with the endpoint.
- LSST_BUTLER_WEBDAV_AUTH: which authentication method to use.
Possible values are X509 and TOKEN
- (X509 only) LSST_BUTLER_WEBDAV_PROXY_CERT: path to proxy
certificate used to authenticate requests
- (TOKEN only) LSST_BUTLER_WEBDAV_TOKEN_FILE: file which
contains the bearer token used to authenticate requests
- (OPTIONAL) LSST_BUTLER_WEBDAV_EXPECT100: if set, we will add an
"Expect: 100-Continue" header in all requests. This is required
on certain endpoints where requests redirection is made.
"""

retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])

session = requests.Session()
session.mount("http://", HTTPAdapter(max_retries=retries))
session.mount("https://", HTTPAdapter(max_retries=retries))

log.debug("Creating new HTTP session...")

try:
env_auth_method = os.environ['LSST_BUTLER_WEBDAV_AUTH']
except KeyError:
raise KeyError("Environment variable LSST_BUTLER_WEBDAV_AUTH is not set, "
"please use values X509 or TOKEN")

if env_auth_method == "X509":
log.debug("... using x509 authentication.")
try:
proxy_cert = os.environ['LSST_BUTLER_WEBDAV_PROXY_CERT']
except KeyError:
raise KeyError("Environment variable LSST_BUTLER_WEBDAV_PROXY_CERT is not set")
session.cert = (proxy_cert, proxy_cert)
elif env_auth_method == "TOKEN":
log.debug("... using bearer-token authentication.")
refreshToken(session)
else:
raise ValueError("Environment variable LSST_BUTLER_WEBDAV_AUTH must be set to X509 or TOKEN")

ca_bundle = None
try:
ca_bundle = os.environ['LSST_BUTLER_WEBDAV_CA_BUNDLE']
except KeyError:
log.warning("Environment variable LSST_BUTLER_WEBDAV_CA_BUNDLE is not set: "
"HTTPS requests will fail. If you intend to use HTTPS, please "
"export this variable.")

session.verify = ca_bundle

# This header is required for request redirection, in dCache for example
if "LSST_BUTLER_WEBDAV_EXPECT100" in os.environ:
log.debug("Expect: 100-Continue header enabled.")
session.headers.update({'Expect': '100-continue'})

log.debug("Session configured and ready.")

return session


def isTokenAuth() -> bool:
"""Returns the status of bearer-token authentication.
Returns
-------
isTokenAuth : `bool`
True if LSST_BUTLER_WEBDAV_AUTH is set to TOKEN, False otherwise.
"""
try:
env_auth_method = os.environ['LSST_BUTLER_WEBDAV_AUTH']
except KeyError:
raise KeyError("Environment variable LSST_BUTLER_WEBDAV_AUTH is not set, "
"please use values X509 or TOKEN")

if env_auth_method == "TOKEN":
return True
return False


def refreshToken(session: requests.Session) -> None:
"""Set or update the 'Authorization' header of the session,
configure bearer token authentication, with the value fetched
from LSST_BUTLER_WEBDAV_TOKEN_FILE
Parameters
----------
session : `requests.Session`
Session on which bearer token authentication must be configured
"""
try:
token_path = os.environ['LSST_BUTLER_WEBDAV_TOKEN_FILE']
if not os.path.isfile(token_path):
raise FileNotFoundError(f"No token file: {token_path}")
bearer_token = open(os.environ['LSST_BUTLER_WEBDAV_TOKEN_FILE'], 'r').read().replace('\n', '')
except KeyError:
raise KeyError("Environment variable LSST_BUTLER_WEBDAV_TOKEN_FILE is not set")

session.headers.update({'Authorization': 'Bearer ' + bearer_token})


def webdavCheckFileExists(path: Union[Location, ButlerURI, str],
session: Optional[requests.Session] = None) -> Tuple[bool, int]:
"""Check that a remote HTTP resource exists.
Parameters
----------
path : `Location`, `ButlerURI` or `str`
Location or ButlerURI containing the bucket name and filepath.
session : `requests.Session`, optional
Session object to query.
Returns
-------
exists : `bool`
True if resource exists, False otherwise.
size : `int`
Size of the resource, if it exists, in bytes, otherwise -1
"""
if session is None:
session = getHttpSession()

filepath = _getFileURL(path)

log.debug("Checking if file exists: %s", filepath)

r = session.head(filepath)
return (True, int(r.headers['Content-Length'])) if r.status_code == 200 else (False, -1)


def webdavDeleteFile(path: Union[Location, ButlerURI, str],
session: Optional[requests.Session] = None) -> None:
"""Remove a remote HTTP resource.
Raises a FileNotFoundError if the resource does not exist or on failure.
Parameters
----------
path : `Location`, `ButlerURI` or `str`
Location or ButlerURI containing the bucket name and filepath.
session : `requests.Session`, optional
Session object to query.
"""
if session is None:
session = getHttpSession()

filepath = _getFileURL(path)

log.debug("Removing file: %s", filepath)
r = session.delete(filepath)
if r.status_code not in [200, 202, 204]:
raise FileNotFoundError(f"Unable to delete resource {filepath}; status code: {r.status_code}")


def folderExists(path: Union[Location, ButlerURI, str],
session: Optional[requests.Session] = None) -> bool:
"""Check if the Webdav repository at a given URL actually exists.
Parameters
----------
path : `Location`, `ButlerURI` or `str`
Location or ButlerURI containing the bucket name and filepath.
session : `requests.Session`, optional
Session object to query.
Returns
-------
exists : `bool`
True if it exists, False if no folder is found.
"""
if session is None:
session = getHttpSession()

filepath = _getFileURL(path)

log.debug("Checking if folder exists: %s", filepath)
r = session.head(filepath)
return True if r.status_code == 200 else False


def isWebdavEndpoint(path: Union[Location, ButlerURI, str]) -> bool:
"""Check whether the remote HTTP endpoint implements Webdav features.
Parameters
----------
path : `Location`, `ButlerURI` or `str`
Location or ButlerURI containing the bucket name and filepath.
Returns
-------
isWebdav : `bool`
True if the endpoint implements Webdav, False if it doesn't.
"""
ca_bundle = None
try:
ca_bundle = os.environ['LSST_BUTLER_WEBDAV_CA_BUNDLE']
except KeyError:
log.warning("Environment variable LSST_BUTLER_WEBDAV_CA_BUNDLE is not set: "
"HTTPS requests will fail. If you intend to use HTTPS, please "
"export this variable.")
filepath = _getFileURL(path)

log.debug("Detecting HTTP endpoint type...")
r = requests.options(filepath, verify=ca_bundle)
return True if 'DAV' in r.headers else False


def finalurl(r: requests.Response) -> str:
"""Check whether the remote HTTP endpoint redirects to a different
endpoint, and return the final destination of the request.
This is needed when using PUT operations, to avoid starting
to send the data to the endpoint, before having to send it again once
the 307 redirect response is received, and thus wasting bandwidth.
Parameters
----------
r : `requests.Response`
An HTTP response received when requesting the endpoint
Returns
-------
destination_url: `string`
The final destination to which requests must be sent.
"""
destination_url = r.url
if r.status_code == 307:
destination_url = r.headers['Location']
log.debug("Request redirected to %s", destination_url)
return destination_url


def _getFileURL(path: Union[Location, ButlerURI, str]) -> str:
"""Returns the absolute URL of the resource as a string.
Parameters
----------
path : `Location`, `ButlerURI` or `str`
Location or ButlerURI containing the bucket name and filepath.
Returns
-------
filepath : `str`
The fully qualified URL of the resource.
"""
if isinstance(path, Location):
filepath = path.uri.geturl()
else:
filepath = ButlerURI(path).geturl()
return filepath


class ButlerHttpURI(ButlerURI):
"""General HTTP(S) resource."""
_session = requests.Session()
Expand All @@ -51,7 +318,6 @@ class ButlerHttpURI(ButlerURI):
@property
def session(self) -> requests.Session:
"""Client object to address remote resource."""
from ..webdavutils import refreshToken, isTokenAuth, getHttpSession, isWebdavEndpoint
if ButlerHttpURI._sessionInitialized:
if isTokenAuth():
refreshToken(ButlerHttpURI._session)
Expand Down Expand Up @@ -162,7 +428,6 @@ def write(self, data: bytes, overwrite: bool = True) -> None:
If `True` the resource will be overwritten if it exists. Otherwise
the write will fail.
"""
from ..webdavutils import finalurl
log.debug("Writing to remote resource: %s", self.geturl())
if not overwrite:
if self.exists():
Expand All @@ -187,7 +452,6 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
transaction : `DatastoreTransaction`, optional
Currently unused.
"""
from ..webdavutils import finalurl
# Fail early to prevent delays if remote resources are requested
if transfer not in self.transferModes:
raise ValueError(f"Transfer mode {transfer} not supported by URI scheme {self.scheme}")
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/core/_butlerUri/mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

from typing import Tuple

__all__ = ('ButlerInMemoryURI',)

from ._butlerUri import ButlerURI


Expand Down
2 changes: 2 additions & 0 deletions python/lsst/daf/butler/core/_butlerUri/packageresource.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import pkg_resources
import logging

__all__ = ('ButlerPackageResourceURI',)

from ._butlerUri import ButlerURI

log = logging.getLogger(__name__)
Expand Down

0 comments on commit b8e3527

Please sign in to comment.