Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-37917: Add testing against real webDAV server for HttpResourcePath #40

Merged
merged 22 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a9b6f21
Improve handling of DELETE requests
airnandez Feb 7, 2023
0469e84
Test deletion of a directory raises
airnandez Feb 7, 2023
a5a5f12
Add method to explicitly close sessions to avoid warnings when unit t…
airnandez Feb 7, 2023
e0e5ac1
Improve comments for session retries
airnandez Feb 8, 2023
9ac5a1d
Comply with conventions for documenting methods
airnandez Feb 8, 2023
632155b
Add test case using a local webDAV server
airnandez Feb 8, 2023
5bbe8c0
Minor improvements and corrections of typos
airnandez Feb 8, 2023
108c893
Add missing return type annotation
airnandez Feb 8, 2023
f7fe03f
Raise NotADirectoryError rather than ValueError when not a directory
timj Feb 9, 2023
3ce43db
Fix the confusing test case class name for S3 tests
timj Feb 9, 2023
b79d643
Enable standard testing of HTTP WebDav interface
timj Feb 9, 2023
d52dd73
The root URI to test can include a non-empty path
airnandez Feb 10, 2023
554afff
Ensure the parent directory exists before actually uploading data
airnandez Feb 10, 2023
d56425f
Restructure tests in several ways:
airnandez Feb 10, 2023
459bccf
Give priority to test against real servers in development setting if …
airnandez Feb 13, 2023
260b147
Ensure all webDAV requests download the response body immediately
airnandez Feb 13, 2023
a7a3d9d
Parse status element of PROPFIND response for making easier inspectio…
airnandez Feb 13, 2023
3d556fc
Simplify path building
airnandez Feb 13, 2023
67e387f
Ensure absolute path starts by single slash
airnandez Feb 13, 2023
db184ba
Internal method _close_sessions() no longer needed
airnandez Feb 14, 2023
6b8a6bb
Restructure tests and remove testing with mocked responses.
airnandez Feb 14, 2023
4087c3e
Keep naming of test cases consistent
airnandez Feb 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 87 additions & 41 deletions python/lsst/resources/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os
import os.path
import random
import re
import stat
import tempfile
import xml.etree.ElementTree as eTree
Expand Down Expand Up @@ -79,7 +80,7 @@ def _is_webdav_endpoint(path: Union[ResourcePath, str]) -> bool:
try:
ca_cert_bundle = os.getenv("LSST_HTTP_CACERT_BUNDLE")
verify: Union[bool, str] = ca_cert_bundle if ca_cert_bundle else True
resp = requests.options(str(path), verify=verify)
resp = requests.options(str(path), verify=verify, stream=True)

# Check that "1" is part of the value of the "DAV" header. We don't
# use locks, so a server complying to class 1 is enough for our
Expand Down Expand Up @@ -251,12 +252,26 @@ def _make_session(self, rpath: ResourcePath, persist: bool) -> requests.Session:
log.debug("Creating new HTTP session for endpoint %s (persist connection=%s)...", root_uri, persist)

retries = Retry(
# Total number of retries to allow. Takes precedence over other
# counts.
total=3,
# How many connection-related errors to retry on.
connect=3,
# How many times to retry on read errors.
read=3,
# Backoff factor to apply between attempts after the second try
# (seconds)
backoff_factor=5.0 + random.random(),
# How many times to retry on bad status codes
status=3,
status_forcelist=[429, 500, 502, 503, 504],
# HTTP status codes that we should force a retry on
status_forcelist=[
requests.codes.too_many_requests, # 429
requests.codes.internal_server_error, # 500
requests.codes.bad_gateway, # 502
requests.codes.service_unavailable, # 503
requests.codes.gateway_timeout, # 504
],
)

# Persist a single connection to the front end server, if required
Expand All @@ -277,17 +292,17 @@ def _make_session(self, rpath: ResourcePath, persist: bool) -> requests.Session:
HTTPAdapter(pool_connections=1, pool_maxsize=0, pool_block=False, max_retries=retries),
)

# If the remote endpoint don't use secure HTTP we dont include bearer
# tokens in the requests nor need to authenticate the remove server.
if rpath.scheme != "https":
return session

# Should we use a specific CA cert bundle for authenticating the
# server?
session.verify = True
if ca_bundle := os.getenv("LSST_HTTP_CACERT_BUNDLE"):
session.verify = ca_bundle

# If the remote endpoint don't use secure HTTP dont include bearer
# tokens in the requests.
if rpath.scheme != "https":
return session

# Should we use bearer tokens for client authentication?
if token := os.getenv("LSST_HTTP_AUTH_BEARER_TOKEN"):
log.debug("... using bearer token authentication")
Expand Down Expand Up @@ -395,19 +410,20 @@ def exists(self) -> bool:
# request, even if the behavior for such a request against a
# directory is not specified, so it depends on the server
# implementation.
resp = self.session.head(self.geturl(), timeout=TIMEOUT, allow_redirects=True)
resp = self.session.head(self.geturl(), timeout=TIMEOUT, allow_redirects=True, stream=True)
return resp.status_code == requests.codes.ok # 200

# The remote endpoint is a webDAV server: send a PROPFIND request
# requesting only the 'getlastmodified' property.
request_body = """
<?xml version="1.0" encoding="utf-8" ?>
<D:propfind xmlns:D="DAV:"><D:prop><D:getlastmodified/></D:prop></D:propfind>
"""
request_body = (
"""<?xml version="1.0" encoding="utf-8" ?>"""
"""<D:propfind xmlns:D="DAV:"><D:prop><D:getlastmodified/></D:prop></D:propfind>"""
)
resp = self._propfind(request_body)
if resp.status_code == requests.codes.multi_status: # 207
# Retrieve the status of the first and only element in the response
propfind_resp = _parse_propfind_response_body(resp.text)[0]
return propfind_resp.status == "HTTP/1.1 200 OK"
return propfind_resp.status_code == requests.codes.ok
elif resp.status_code == requests.codes.not_found: # 404
return False
else:
Expand All @@ -423,7 +439,7 @@ def size(self) -> int:
if not self.is_webdav_endpoint:
# The remote is a plain HTTP server. Send a HEAD request to
# retrieve the size of the resource.
resp = self.session.head(self.geturl(), timeout=TIMEOUT, allow_redirects=True)
resp = self.session.head(self.geturl(), timeout=TIMEOUT, allow_redirects=True, stream=True)
if resp.status_code == requests.codes.ok: # 200
if "Content-Length" in resp.headers:
return int(resp.headers["Content-Length"])
Expand All @@ -440,16 +456,16 @@ def size(self) -> int:

# The remote is a webDAV server: send a PROPFIND request to retrieve
# the 'getcontentlength' property of the resource.
request_body = """
<?xml version="1.0" encoding="utf-8" ?>
<D:propfind xmlns:D="DAV:"><D:prop><D:getcontentlength/></D:prop></D:propfind>
"""
request_body = (
"""<?xml version="1.0" encoding="utf-8" ?>"""
"""<D:propfind xmlns:D="DAV:"><D:prop><D:getcontentlength/></D:prop></D:propfind>"""
)
resp = self._propfind(body=request_body)
if resp.status_code == requests.codes.multi_status: # 207
# Parse the response body and retrieve the 'getcontentlength'
# property
propfind_resp = _parse_propfind_response_body(resp.text)[0]
if propfind_resp.status == "HTTP/1.1 200 OK":
if propfind_resp.status_code == requests.codes.ok: # 200
return propfind_resp.getcontentlength
else:
raise FileNotFoundError(f"Resource {self} does not exist")
Expand All @@ -464,10 +480,12 @@ def mkdir(self) -> None:
"""Create the directory resource if it does not already exist."""
# Creating directories is only available on WebDAV backends.
if not self.is_webdav_endpoint:
raise NotImplementedError("Endpoint does not implement WebDAV functionality")
raise NotImplementedError(
f"Creation of directory {self} is not implemented by plain HTTP servers"
)

if not self.dirLike:
raise ValueError(f"Can not create a 'directory' for file-like URI {self}")
raise NotADirectoryError(f"Can not create a 'directory' for file-like URI {self}")

if not self.exists():
# We need to test the absence of the parent directory,
Expand Down Expand Up @@ -522,6 +540,10 @@ def write(self, data: bytes, overwrite: bool = True) -> None:
if self.exists():
raise FileExistsError(f"Remote resource {self} exists and overwrite has been disabled")

# Ensure the parent directory exists
self.parent().mkdir()

# Upload the data
with time_this(log, msg="Write to remote %s (%d bytes)", args=(self, len(data))):
self._put(data=data)

Expand Down Expand Up @@ -660,13 +682,10 @@ def _send_webdav_request(
if url is None:
url = self.geturl()

# Strip any whitespace at the beginning of the request body, as some
# XML parsers don't handle that correctly.
body = body.strip() if body is not None else None

for _ in range(max_redirects := 5):
req = requests.Request(method, url, data=body, headers=headers)
resp = self.session.send(req.prepare(), timeout=TIMEOUT, allow_redirects=False)
resp = self.session.request(
method, url, data=body, headers=headers, stream=True, timeout=TIMEOUT, allow_redirects=False
)
if resp.is_redirect:
url = resp.headers["Location"]
else:
Expand Down Expand Up @@ -702,6 +721,16 @@ def _propfind(self, body: Optional[str] = None, depth: str = "0") -> requests.Re
)
return self._send_webdav_request("PROPFIND", headers=headers, body=body)

def _options(self) -> requests.Response:
"""Send a OPTIONS webDAV request for this resource."""

return self._send_webdav_request("OPTIONS")

def _head(self) -> requests.Response:
"""Send a HEAD webDAV request for this resource."""

return self._send_webdav_request("HEAD")

def _mkcol(self) -> None:
"""Send a MKCOL webDAV request to create a collection. The collection
may already exist.
Expand All @@ -718,17 +747,27 @@ def _mkcol(self) -> None:

def _delete(self) -> None:
"""Send a DELETE webDAV request for this resource."""
# TODO: should we first check that the resource is not a directory?
# TODO: we should remove Depth header which should not be used for
# directories and systematically check if the return code is
# multistatus, in which case we need to look for errors in the
# response.
resp = self._send_webdav_request("DELETE", headers={"Depth": "0"})

log.debug("Deleting %s ...", self.geturl())

# If this is a directory, ensure the remote is a webDAV server because
# plain HTTP servers don't support DELETE requests on non-file
# paths.
if self.dirLike and not self.is_webdav_endpoint:
raise NotImplementedError(
f"Deletion of directory {self} is not implemented by plain HTTP servers"
)

resp = self._send_webdav_request("DELETE")
if resp.status_code in (requests.codes.ok, requests.codes.accepted, requests.codes.no_content):
return
elif resp.status_code == requests.codes.not_found:
raise FileNotFoundError(f"Resource {self} does not exist, status code: {resp.status_code}")
else:
# TODO: the response to a DELETE request against a webDAV server
# may be multistatus. If so, we need to parse the reponse body to
# determine more precisely the reason of the failure (e.g. a lock)
# and provide a more helpful error message.
raise ValueError(f"Unable to delete resource {self}; status code: {resp.status_code}")

def _copy_via_local(self, src: ResourcePath) -> None:
Expand Down Expand Up @@ -827,14 +866,15 @@ def _put(self, data: Union[BinaryIO, bytes]) -> None:

url = self.geturl()
log.debug("Sending empty PUT request to %s", url)
req = requests.Request("PUT", url, data=None, headers=headers)
resp = self.session.send(req.prepare(), timeout=TIMEOUT, allow_redirects=False)
resp = self.session.request(
"PUT", url, data=None, headers=headers, stream=True, timeout=TIMEOUT, allow_redirects=False
)
if resp.is_redirect:
url = resp.headers["Location"]

# Send data to its final destination using the PUT session
log.debug("Uploading data to %s", url)
resp = self.put_session.put(url, data=data, timeout=TIMEOUT, allow_redirects=False)
resp = self.put_session.put(url, data=data, timeout=TIMEOUT, allow_redirects=False, stream=True)
if resp.status_code not in (requests.codes.ok, requests.codes.created, requests.codes.no_content):
raise ValueError(f"Can not write file {self}, status code: {resp.status_code}")

Expand All @@ -845,9 +885,8 @@ def _openImpl(
*,
encoding: Optional[str] = None,
) -> Iterator[ResourceHandleProtocol]:
url = self.geturl()
response = self.session.head(url, timeout=TIMEOUT, allow_redirects=True)
accepts_range = "Accept-Ranges" in response.headers
resp = self._head()
accepts_range = resp.status_code == requests.codes.ok and resp.headers.get("Accept-Ranges") == "bytes"
handle: ResourceHandleProtocol
if mode in ("rb", "r") and accepts_range:
handle = HttpReadResourceHandle(
Expand Down Expand Up @@ -985,8 +1024,13 @@ class PropfindResponse:
a single resource.
"""

# Regular expression to extract the status code and reason from
# the 'status' element of a PROPFIND response.
_status_rex = re.compile(r"^HTTP/.* +(?P<status_code>\d{3}) +(?P<reason>.*)$", re.IGNORECASE)

def __init__(self, response: Optional[eTree.Element]):
self.status: str = ""
self.status_code: int = 0
self.reason: str = ""
self.href: str = ""
self.collection: bool = False
self.getlastmodified: str = ""
Expand All @@ -1000,7 +1044,9 @@ def _parse(self, response: eTree.Element) -> None:
if element is not None:
# We need to use "str(element.text)"" instead of "element.text" to
# keep mypy happy
self.status = str(element.text).strip()
if match := self._status_rex.match(str(element.text)):
self.status_code = int(match["status_code"])
self.reason = match["reason"]

# Parse "href"
element = response.find("./{DAV:}href")
Expand Down
13 changes: 11 additions & 2 deletions python/lsst/resources/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import logging
import os
import pathlib
import random
import string
import unittest
import urllib.parse
import uuid
Expand Down Expand Up @@ -119,6 +121,7 @@ class _GenericTestCase:

scheme: Optional[str] = None
netloc: Optional[str] = None
base_path: Optional[str] = None
path1 = "test_dir"
path2 = "file.txt"

Expand All @@ -144,6 +147,9 @@ def _make_uri(self, path: str, netloc: Optional[str] = None) -> str:
netloc = self.netloc
if path.startswith("/"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path.lstrip at the top makes this line a no-op I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above.

path = path[1:]
if self.base_path is not None:
path = f"{self.base_path}{path}".lstrip("/")

return f"{self.scheme}://{netloc}/{path}"
else:
return path
Expand Down Expand Up @@ -427,8 +433,11 @@ def setUp(self) -> None:
# so relsymlink gets quite confused.
self.tmpdir = ResourcePath(makeTestTempDir(self.testdir))
else:
# Create tmp directory relative to the test root.
self.tmpdir = self.root_uri.join("TESTING/")
# Create random tmp directory relative to the test root.
self.tmpdir = self.root_uri.join(
"TESTING-" + "".join(random.choices(string.ascii_lowercase + string.digits, k=8)),
forceDirectory=True,
)
self.tmpdir.mkdir()

def tearDown(self) -> None:
Expand Down