Skip to content

Commit

Permalink
Merge pull request #356 from bgounon/tickets/DM-26383
Browse files Browse the repository at this point in the history
DM-26383: Add webdav datastore tests to daf_butler
  • Loading branch information
timj committed Aug 28, 2020
2 parents 6682cc8 + b8ae901 commit 3815080
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 18 deletions.
40 changes: 35 additions & 5 deletions python/lsst/daf/butler/core/_butlerUri.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,27 @@ def dirname(self) -> ButlerURI:
"""
return self.split()[0]

def parent(self) -> ButlerURI:
"""Returns a ButlerURI containing all the directories of the path
attribute, minus the last one.
Returns
-------
head : `ButlerURI`
Everything except the tail of path attribute, expanded and
normalized as per ButlerURI rules.
"""
# When self is file-like, return self.dirname()
if not self.dirLike:
return self.dirname()
# When self is dir-like, return its parent directory,
# regardless of the presence of a trailing separator
originalPath = self._pathLib(self.path)
parentPath = originalPath.parent
parentURI = self._uri._replace(path=str(parentPath))

return ButlerURI(parentURI, forceDirectory=True)

def replace(self, **kwargs: Any) -> ButlerURI:
"""Replace components in a URI with new values and return a new
instance.
Expand Down Expand Up @@ -1241,11 +1262,12 @@ class ButlerHttpURI(ButlerURI):
def session(self) -> requests.Session:
"""Client object to address remote resource."""
from .webdavutils import getHttpSession, isWebdavEndpoint
if isWebdavEndpoint(self):
log.debug("%s looks like a Webdav endpoint.", self.geturl())
baseURL = self.scheme + "://" + self.netloc
if isWebdavEndpoint(baseURL):
log.debug("%s looks like a Webdav endpoint.", baseURL)
return getHttpSession()

log.debug("%s looks like a standard HTTP endpoint.", self.geturl())
log.debug("%s looks like a standard HTTP endpoint.", baseURL)
return requests.Session()

def exists(self) -> bool:
Expand All @@ -1272,6 +1294,12 @@ def mkdir(self) -> None:
raise ValueError(f"Can not create a 'directory' for file-like URI {self}")

if not self.exists():
# We need to test the absence of the parent directory,
# but also if parent URL is different from self URL,
# otherwise we could be stuck in a recursive loop
# where self == parent
if not self.parent().exists() and self.parent().geturl() != self.geturl():
self.parent().mkdir()
log.debug("Creating new directory: %s", self.geturl())
r = self.session.request("MKCOL", self.geturl())
if r.status_code != 201:
Expand Down Expand Up @@ -1371,17 +1399,19 @@ def transfer_from(self, src: ButlerURI, transfer: str = "copy",
if isinstance(src, type(self)):
if transfer == "move":
self.session.request("MOVE", src.geturl(), headers={"Destination": self.geturl()})
log.debug("Direct move via MOVE operation executed.")
else:
self.session.request("COPY", src.geturl(), headers={"Destination": self.geturl()})
log.debug("Direct copy via COPY operation executed.")
else:
# Use local file and upload it
local_src, is_temporary = src.as_local()
f = open(local_src, "rb")
files = {"file": f}
self.session.post(self.geturl(), files=files)
self.session.put(self.geturl(), data=f)
f.close()
if is_temporary:
os.remove(local_src)
log.debug("Indirect copy via temporary file executed.")


class ButlerInMemoryURI(ButlerURI):
Expand Down
3 changes: 3 additions & 0 deletions python/lsst/daf/butler/datastores/fileLikeDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,9 @@ def _extractIngestInfo(self, path: Union[str, ButlerURI], ref: DatasetRef, *,
# Work out the name we want this ingested file to have
# inside the datastore
tgtLocation = self._calculate_ingested_datastore_name(srcUri, ref, formatter)
if not tgtLocation.uri.dirname().exists():
log.debug("Folder %s does not exist yet.", tgtLocation.uri.dirname())
tgtLocation.uri.dirname().mkdir()

# if we are transferring from a local file to a remote location
# it may be more efficient to get the size and checksum of the
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/datastores/remoteFileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
# non-zero time for network).
log.warning("Object %s exists in datastore for ref %s", location.uri, ref)

if not location.uri.dirname().exists():
log.debug("Folder %s does not exist yet.", location.uri.dirname())
location.uri.dirname().mkdir()

if self._transaction is None:
raise RuntimeError("Attempting to write artifact without transaction enabled")

Expand Down
2 changes: 1 addition & 1 deletion tests/config/basic/webdavDatastore.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
datastore:
cls: lsst.daf.butler.datastores.webdavDatastore.WebdavDatastore
root: https://some.webdav.server/butlerRoot
root: http://anywebdavserver/butlerRoot
templates: !include templates.yaml
formatters: !include formatters.yaml
composites: !include composites.yaml
178 changes: 178 additions & 0 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import pickle
import string
import random
import time
import socket

try:
import boto3
Expand All @@ -43,7 +45,15 @@ def mock_s3(cls):
"""
return cls

try:
from cheroot import wsgi
from wsgidav.wsgidav_app import WsgiDAVApp
except ImportError:
WsgiDAVApp = None

import astropy.time
from threading import Thread
from tempfile import gettempdir
from lsst.utils import doImport
from lsst.daf.butler.core.utils import safeMakeDir
from lsst.daf.butler import Butler, Config, ButlerConfig
Expand All @@ -58,6 +68,7 @@ def mock_s3(cls):
from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
from lsst.daf.butler.core.s3utils import (setAwsEnvCredentials,
unsetAwsEnvCredentials)
from lsst.daf.butler.core.webdavutils import isWebdavEndpoint

from lsst.daf.butler.tests import MultiDetectorFormatter, MetricsExample

Expand Down Expand Up @@ -1193,5 +1204,172 @@ def tearDown(self):
unsetAwsEnvCredentials()


@unittest.skipIf(WsgiDAVApp is None, "Warning: wsgidav/cheroot not found!")
# Mock required environment variables during tests
@unittest.mock.patch.dict(os.environ, {"WEBDAV_AUTH_METHOD": "TOKEN",
"WEBDAV_BEARER_TOKEN": "XXXXXX"})
class WebdavDatastoreButlerTestCase(FileLikeDatastoreButlerTests, unittest.TestCase):
"""WebdavDatastore specialization of a butler; a Webdav storage Datastore +
a local in-memory SqlRegistry.
"""
configFile = os.path.join(TESTDIR, "config/basic/butler-webdavstore.yaml")
fullConfigKey = None
validationCanFail = True

serverName = "localhost"
"""Name of the server that will be used in the tests.
"""

portNumber = 8080
"""Port on which the webdav server listens. Automatically chosen
at setUpClass via the _getfreeport() method
"""

root = "butlerRoot/"
"""Root repository directory expected to be used in case useTempRoot=False.
Otherwise the root is set to a 20 characters long randomly generated string
during set-up.
"""

datastoreStr = [f"datastore={root}"]
"""Contains all expected root locations in a format expected to be
returned by Butler stringification.
"""

datastoreName = ["WebdavDatastore@https://{serverName}/{root}"]
"""The expected format of the WebdavDatastore string."""

registryStr = ":memory:"
"""Expected format of the Registry string."""

serverThread = None
"""Thread in which the local webdav server will run"""

stopWebdavServer = False
"""This flag will cause the webdav server to
gracefully shut down when True
"""

def genRoot(self):
"""Returns a random string of len 20 to serve as a root
name for the temporary bucket repo.
This is equivalent to tempfile.mkdtemp as this is what self.root
becomes when useTempRoot is True.
"""
rndstr = "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(20)
)
return rndstr + "/"

@classmethod
def setUpClass(cls):
# Do the same as inherited class
cls.storageClassFactory = StorageClassFactory()
cls.storageClassFactory.addFromConfig(cls.configFile)

cls.portNumber = cls._getfreeport()
# Run a local webdav server on which tests will be run
cls.serverThread = Thread(target=cls._serveWebdav,
args=(cls, cls.portNumber, lambda: cls.stopWebdavServer),
daemon=True)
cls.serverThread.start()
# Wait for it to start
time.sleep(3)

@classmethod
def tearDownClass(cls):
# Ask for graceful shut down of the webdav server
cls.stopWebdavServer = True
# Wait for the thread to exit
cls.serverThread.join()

# Mock required environment variables during tests
@unittest.mock.patch.dict(os.environ, {"WEBDAV_AUTH_METHOD": "TOKEN",
"WEBDAV_BEARER_TOKEN": "XXXXXX"})
def setUp(self):
config = Config(self.configFile)

if self.useTempRoot:
self.root = self.genRoot()
self.rooturi = f"http://{self.serverName}:{self.portNumber}/{self.root}"
config.update({"datastore": {"datastore": {"root": self.rooturi}}})

self.datastoreStr = f"datastore={self.root}"
self.datastoreName = [f"WebdavDatastore@{self.rooturi}"]

if not isWebdavEndpoint(self.rooturi):
raise OSError("Webdav server not running properly: cannot run tests.")

Butler.makeRepo(self.rooturi, config=config, forceConfigRoot=False)
self.tmpConfigFile = posixpath.join(self.rooturi, "butler.yaml")

# Mock required environment variables during tests
@unittest.mock.patch.dict(os.environ, {"WEBDAV_AUTH_METHOD": "TOKEN",
"WEBDAV_BEARER_TOKEN": "XXXXXX"})
def tearDown(self):
# Clear temporary directory
ButlerURI(self.rooturi).remove()

def _serveWebdav(self, port: int, stopWebdavServer):
"""Starts a local webdav-compatible HTTP server,
Listening on http://localhost:8080
This server only runs when this test class is instantiated,
and then shuts down. Must be started is a separate thread.
Parameters
----------
port : `int`
The port number on which the server should listen
"""
root_path = gettempdir()

config = {
"host": "0.0.0.0",
"port": port,
"provider_mapping": {"/": root_path},
"http_authenticator": {
"domain_controller": None
},
"simple_dc": {"user_mapping": {"*": True}},
"verbose": 0,
}
app = WsgiDAVApp(config)

server_args = {
"bind_addr": (config["host"], config["port"]),
"wsgi_app": app,
}
server = wsgi.Server(**server_args)
server.prepare()

try:
# Start the actual server in a separate thread
t = Thread(target=server.serve, daemon=True)
t.start()
# watch stopWebdavServer, and gracefully
# shut down the server when True
while True:
if stopWebdavServer():
break
time.sleep(1)
except KeyboardInterrupt:
print("Caught Ctrl-C, shutting down...")
finally:
server.stop()
t.join()

def _getfreeport():
"""
Determines a free port using sockets.
"""
free_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
free_socket.bind(('0.0.0.0', 0))
free_socket.listen()
port = free_socket.getsockname()[1]
free_socket.close()
return port


if __name__ == "__main__":
unittest.main()
30 changes: 19 additions & 11 deletions tests/test_uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ def testQuoting(self):
self.assertEqual(child.unquoted_path, "/" + subpath)


# Mock required environment variables during tests
@unittest.mock.patch.dict(os.environ, {"WEBDAV_AUTH_METHOD": "TOKEN",
"WEBDAV_BEARER_TOKEN": "XXXXXX"})
class WebdavURITestCase(unittest.TestCase):

def setUp(self):
Expand All @@ -363,6 +366,8 @@ def setUp(self):
existingFileName = "existingFile"
notExistingFileName = "notExistingFile"

self.baseURL = ButlerURI(
f"https://{serverRoot}", forceDirectory=True)
self.existingFileButlerURI = ButlerURI(
f"https://{serverRoot}/{existingFolderName}/{existingFileName}")
self.notExistingFileButlerURI = ButlerURI(
Expand All @@ -374,17 +379,8 @@ def setUp(self):

# Need to declare the options
responses.add(responses.OPTIONS,
self.existingFileButlerURI.geturl(),
headers={'not': '1024'}, status=200)
responses.add(responses.OPTIONS,
self.notExistingFileButlerURI.geturl(),
headers={'not': '1024'}, status=200)
responses.add(responses.OPTIONS,
self.notExistingFolderButlerURI.geturl(),
headers={'not': '1024'}, status=200)
responses.add(responses.OPTIONS,
self.existingFolderButlerURI.geturl(),
headers={'not': '1024'}, status=200)
self.baseURL.geturl(),
status=200, headers={"DAV": "1,2,3"})

# Used by ButlerHttpURI.exists()
responses.add(responses.HEAD,
Expand Down Expand Up @@ -434,6 +430,9 @@ def setUp(self):
responses.add(responses.HEAD,
self.existingFolderButlerURI.geturl(),
status=200, headers={'Content-Length': '1024'})
responses.add(responses.HEAD,
self.baseURL.geturl(),
status=200, headers={'Content-Length': '1024'})
responses.add(responses.HEAD,
self.notExistingFolderButlerURI.geturl(),
status=404)
Expand Down Expand Up @@ -499,6 +498,15 @@ def testTransfer(self):
src=self.existingFileButlerURI,
transfer="unsupported")

def testParent(self):

self.assertEqual(self.existingFolderButlerURI.geturl(),
self.notExistingFileButlerURI.parent().geturl())
self.assertEqual(self.baseURL.geturl(),
self.baseURL.parent().geturl())
self.assertEqual(self.existingFileButlerURI.parent().geturl(),
self.existingFileButlerURI.dirname().geturl())


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion tests/test_webdavutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class WebdavUtilsTestCase(unittest.TestCase):
"""Test for the Webdav related utilities.
"""
session = requests.Session()
serverRoot = "www.lsstnowebdav.orgx"
serverRoot = "www.lsstwithwebdav.orgx"
wrongRoot = "www.lsstwithoutwebdav.org"
existingfolderName = "testFolder"
notExistingfolderName = "testFolder_not_exist"
Expand Down

0 comments on commit 3815080

Please sign in to comment.