From a255063f212afda55cb9c66c9f990d08737323c6 Mon Sep 17 00:00:00 2001 From: Sebastian Kurscheid Date: Wed, 2 Oct 2019 08:54:45 +0000 Subject: [PATCH] Merged in sebastian_kurscheid/snakemake/azure_integration (pull request #378) Azure integration * fixes broken conda shell command * adds methods to access Azure Storage - work in progress! * adds methods to access Azure Storage * further programming * ignores virtualenv directory * adds exception for Azure file/blob operations * makes minor changes to private methods * fixes typo * changes version of conda from which on to use "conda activate" - see https://docs.conda.io/projects/conda/en/latest/release-notes.html?highlight=release%20notes#id105 * fixes indentation error * corrects method name * adds required helper functions for parsing file paths to container/blob names * amends earlier commit * adds missing modules * fixes variable names * catches up * still bug hunting * tries to fix problem with instantiating * removes argument Azure Storage can't handle * assigns correct Azure account object to RemoteProvider class * further fixes * see previous * still bug fixing * removes @lazy_properties * still working on fixing this mess * commits remote modifications * fixes problem with upload * pre-merge commit * adds test scenario for testing Azure Storage implementation * latest update * adds rudimentary test * adds more info * applies patch to enable testing of Azre Storage * amends doc * removes clutter; addresses some conflicts with master * restores accidentily deleted legitimate test * removes unneeded packages from production environment.yml * removes commented code block not needed for AzureStorage integration --- .gitignore | 3 + Snakefile | 32 +++ environment.yml | 1 + snakemake/conda.py | 1 - snakemake/exceptions.py | 4 + snakemake/remote/AzureStorage.py | 321 +++++++++++++++++++++++++++++ snakemake/remote/S3.py | 18 +- tests/test_remote_azure/.gitignore | 2 + tests/test_remote_azure/README.md | 8 + tests/test_remote_azure/Snakefile | 46 +++++ tests/tests.py | 13 +- 11 files changed, 437 insertions(+), 12 deletions(-) create mode 100644 Snakefile create mode 100644 snakemake/remote/AzureStorage.py create mode 100644 tests/test_remote_azure/.gitignore create mode 100644 tests/test_remote_azure/README.md create mode 100644 tests/test_remote_azure/Snakefile diff --git a/.gitignore b/.gitignore index 1a88a5f63..923121c57 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,9 @@ dist/ *.egg .eggs/ .snakemake* +.venv +.venv/* + .idea diff --git a/Snakefile b/Snakefile new file mode 100644 index 000000000..152419f9a --- /dev/null +++ b/Snakefile @@ -0,0 +1,32 @@ +__author__ = "Sebastian Kurscheid (sebastian.kurscheid@anu.edu.au)" +__license__ = "MIT" +__date__ = "2018-09-015" + +from snakemake.exceptions import MissingInputException +import os + +from snakemake.remote.AzureStorage import RemoteProvider as AzureRemoteProvider +account_key='khTRU9zcTi6lTrBp7IsswmZRGrzEIOxBXiF1AI/uCAdbFK0kgZYmz6W/IETswtSZg1EFNOoNhYL75R2Ir1bEMQ==' +account_name='cellcycledata' + +AS = AzureRemoteProvider(account_name=account_name, account_key=account_key) +file_names = AS.glob_wildcards("experiment/{fn}.gz") + +rule glob_test: + input: + expand("local_data/{fn}.gz", fn=file_names)) + +rule make_data: + input: + AS.remote("experiment/aks_test/{fn}.gz") + output: + "local_data/{fn}.gz" + shell: + "mv {input[0]} {output[0]}" + +rule all: + input: + AS.remote("e-mtab-6967/E-MTAB-6967.sdrf.txt") + run: + outputName = os.path.basename(input[0]) + shell("cp {input} {outputName}") \ No newline at end of file diff --git a/environment.yml b/environment.yml index d51403c70..e01750731 100644 --- a/environment.yml +++ b/environment.yml @@ -20,6 +20,7 @@ dependencies: - pandas - nomkl - google-cloud-storage + - azure - ratelimiter - configargparse - appdirs diff --git a/snakemake/conda.py b/snakemake/conda.py index 80af14755..bcc9cfe14 100644 --- a/snakemake/conda.py +++ b/snakemake/conda.py @@ -280,7 +280,6 @@ def __eq__(self, other): return self.file == other.file return False - class Conda: instances = dict() diff --git a/snakemake/exceptions.py b/snakemake/exceptions.py index 292d7842e..fdee9a945 100644 --- a/snakemake/exceptions.py +++ b/snakemake/exceptions.py @@ -339,6 +339,10 @@ class S3FileException(RuleException): def __init__(self, msg, lineno=None, snakefile=None): super().__init__(msg, lineno=lineno, snakefile=snakefile) +class AzureFileException(RuleException): + def __init__(self, msg, lineno=None, snakefile=None): + super().__init__(msg, lineno=lineno, snakefile=snakefile) + class SFTPFileException(RuleException): def __init__(self, msg, lineno=None, snakefile=None): super().__init__(msg, lineno=lineno, snakefile=snakefile) diff --git a/snakemake/remote/AzureStorage.py b/snakemake/remote/AzureStorage.py new file mode 100644 index 000000000..82b57c695 --- /dev/null +++ b/snakemake/remote/AzureStorage.py @@ -0,0 +1,321 @@ +__author__ = "Sebastian Kurscheid" +__copyright__ = "Copyright 2019, Sebastian Kurscheid" +__email__ = "sebastian.kurscheid@anu.edu.au" +__license__ = "MIT" + +# built-ins +import os +import re +import math +import functools +import concurrent.futures + +# snakemake specific +from snakemake.common import lazy_property + +# module specific +from snakemake.exceptions import WorkflowError, AzureFileException +from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider + +# service provider support +try: + from azure.storage.common.cloudstorageaccount import CloudStorageAccount as AzureStorageAccount +except ImportError as e: + raise WorkflowError("The Python 3 meta package 'azure' " + "needs to be installed to use Azure Storage remote() file functionality. %s" % e.msg) + + +class RemoteProvider(AbstractRemoteProvider): + + supports_default = True + + def __init__(self, *args, stay_on_remote=False, **kwargs): + super(RemoteProvider, self).__init__(*args, stay_on_remote=stay_on_remote, **kwargs) + + self._as = AzureStorageHelper(*args, **kwargs) + + def remote_interface(self): + return self._as + + @property + def default_protocol(self): + """The protocol that is prepended to the path when no protocol is specified.""" + return "ab://" + + @property + def available_protocols(self): + """List of valid protocols for this remote provider.""" + return ["ab://"] + + +class RemoteObject(AbstractRemoteObject): + + def __init__(self, *args, keep_local=False, provider=None, **kwargs): + super(RemoteObject, self).__init__(*args, keep_local=keep_local, provider=provider, **kwargs) + + if provider: + self._as = provider.remote_interface() + else: + self._as = AzureStorageHelper(*args, **kwargs) + + # === Implementations of abstract class members === + def exists(self): + if self._matched_as_path: + return self._as.exists_in_container(self.container_name, self.blob_name) + else: + raise AzureFileException("The file cannot be parsed as an Azure Blob path in form 'container/blob': %s" % self.local_file()) + + def mtime(self): + if self.exists(): + t = self._as.blob_last_modified(self.container_name, self.blob_name) + return t + else: + raise AzureFileException("The file does not seem to exist remotely: %s" % self.local_file()) + + def size(self): + if self.exists(): + return self._as.blob_size(self.container_name, self.blob_name) + else: + return self._iofile.size_local + + def download(self): + if self.exists(): + os.makedirs(os.path.dirname(self.local_file()), exist_ok=True) + self._as.download_from_azure_storage(self.container_name, self.blob_name, destination_path = self.local_file()) + os.sync() + return self.local_file() + return None + + def upload(self): + self._as.upload_to_azure_storage(container_name = self.container_name, blob_name = self.blob_name, file_path = self.local_file()) + + @property + def list(self): + return self._as.list_blobs(self.container_name) + + + # # === Related methods === + @property + def _matched_as_path(self): + return re.search("(?P[^/]*)/(?P.*)", self.local_file()) + + def as_create_stub(self): + if self._matched_as_path: + if not self.exists: + self._as.download_from_azure_storage(self.container_name, self.blob_name, self.file, create_stub_only=True) + else: + raise AzureFileException("The file to be downloaded cannot be parsed as an Azure Storage path in form 'container/blob': %s" % + self.local_file()) + + @property + def container_name(self): + if len(self._matched_as_path.groups()) == 2: + return self._matched_as_path.group("container_name") + return None + + @property + def name(self): + return self.blob_name + + @property + def blob_name(self): + if len(self._matched_as_path.groups()) == 2: + return self._matched_as_path.group("blob_name") +# Actual Azure specific functions, adapted from S3.py + +class AzureStorageHelper(object): + def __init__(self, *args, **kwargs): + if "stay_on_remote" in kwargs: + del kwargs["stay_on_remote"] + + self.azure = AzureStorageAccount(**kwargs).create_block_blob_service() + + def container_exists(self, container_name): + try: + self.azure.exists(container_name=container_name) + return True + except: + return False + + def upload_to_azure_storage( + self, + container_name, + file_path, + blob_name=None, + use_relative_path_for_blob_name=True, + relative_start_dir=None, + extra_args=None): + """ Upload a file to Azure Storage + This function uploads a file to an Azure Storage Container as a blob. + Args: + container_name: the name of the Azure container to use + file_path: The path to the file to upload. + blob_name: The name to set for the blob on Azure. If not specified, this will default to the + name of the file. + Returns: The blob_name of the file on Azure if written, None otherwise + """ + file_path = os.path.realpath(os.path.expanduser(file_path)) + + assert container_name, "container_name must be specified" + assert os.path.exists(file_path), "The file path specified does not exist: %s" % file_path + assert os.path.isfile(file_path), "The file path specified does not appear to be a file: %s" % file_path + + if not self.azure.exists(container_name): + self.azure.create_container(container_name=container_name) + if not blob_name: + if use_relative_path_for_blob_name: + if relative_start_dir: + path_blob_name = os.path.relpath(file_path, relative_start_dir) + else: + path_blob_name = os.path.relpath(file_path) + else: + path_blob_name = os.path.basename(file_path) + blob_name = path_blob_name + b = self.azure + try: + b.create_blob_from_path(container_name, + file_path = file_path, + blob_name = blob_name) + return b.get_blob_properties(container_name, blob_name = blob_name).name + except: + raise WorkflowError("Error in creating blob. %s" % e.msg) + #return None + + def download_from_azure_storage( + self, + container_name, + blob_name, + destination_path=None, + expandBlobNameIntoDirs=True, + make_dest_dirs=True, + create_stub_only=False): + """ Download a file from Azure Storage + This function downloads an object from a specified Azure Storage container. + Args: + container_name: the name of the Azure Storage container to use (container name only) + destination_path: If specified, the file will be saved to this path, otherwise cwd. + expandBlobNameIntoDirs: Since Azure blob names can include slashes, if this is True (defult) + then Azure blob names with slashes are expanded into directories on the receiving end. + If it is False, the blob name is passed to os.path.basename() to get the substring + following the last slash. + make_dest_dirs: If this is True (default) and the destination path includes directories + that do not exist, they will be created. + Returns: + The destination path of the downloaded file on the receiving end, or None if the destination_path + could not be downloaded + """ + assert container_name, "container_name must be specified" + assert blob_name, "blob_name must be specified" + if destination_path: + destination_path = os.path.realpath(os.path.expanduser(destination_path)) + else: + if expandBlobNameIntoDirs: + destination_path = os.path.join(os.getcwd(), blob_name) + else: + destination_path = os.path.join(os.getcwd(), os.path.basename(blob_name)) + # if the destination path does not exist + if make_dest_dirs: + os.makedirs(os.path.dirname(destination_path), exist_ok=True) + b = self.azure + try: + if not create_stub_only: + b.get_blob_to_path(container_name = container_name, + blob_name = blob_name, + file_path = destination_path) + else: + # just create an empty file with the right timestamps + with open(destination_path, 'wb') as fp: + os.utime(fp.name, (k.last_modified.timestamp(), k.last_modified.timestamp())) + return destination_path + except: + return None + + def delete_from_container(self, container_name, blob_name): + """ Delete a file from Azure Storage container + + This function deletes an object from a specified Azure Storage container. + + Args: + container_name: the name of the Azure Storage container to use (container name only, not endpoint) + blob_name: the name of the blob to delete from the container + + Returns: + nothing + """ + assert container_name, "container_name must be specified" + assert blob_name, "blob_name must be specified" + b = self.azure + b.delete_blob(container_name, blob_name) + + def exists_in_container(self, container_name, blob_name): + """ Returns whether the blob exists in the container + + Args: + container_name: the name of the Azure Storage container (container name only, not endpoint) + blob_name: the blob_name of the object to delete from the container + + Returns: + True | False + """ + assert container_name, "container_name must be specified" + assert blob_name, "blob_name must be specified" + try: + return self.azure.exists(container_name, blob_name) + except: + return None + + def blob_size(self, container_name, blob_name): + """ Returns the size of a blob + + Args: + container_name: the name of the Azure Storage container (container name only, not endpoint) + blob_name: the blob_name of the object to delete from the container + + Returns: + Size in kb + """ + assert container_name, "container_name must be specified" + assert blob_name, "blob_name must be specified" + + try: + b = self.azure.get_blob_properties(container_name, blob_name) + return b.properties.content_length // 1024 + except: + print("blob or container do not exist") + return None + + def blob_last_modified(self, container_name, blob_name): + """ Returns a timestamp of a blob + + Args: + container_name: the name of the Azure Storage container (container name only, not endpoint) + blob_name: the blob_name of the object to delete from the container + + Returns: + timestamp + """ + assert container_name, "container_name must be specified" + assert blob_name, "blob_name must be specified" + try: + b = self.azure.get_blob_properties(container_name, blob_name) + return b.properties.last_modified.timestamp() + except: + print("blob or container do not exist") + return None + + def list_blobs(self, container_name): + """ Returns a list of blobs from the container + + Args: + container_name: the name of the Azure Storage container (container name only, not endpoint) + + Returns: + list of blobs + """ + assert container_name, "container_name must be specified" + try: + b = self.azure.list_blobs(container_name) + return [o.name for o in b] + except: + print("Did you provide a valid container_name?") + return None diff --git a/snakemake/remote/S3.py b/snakemake/remote/S3.py index 67248042e..646b33fa5 100644 --- a/snakemake/remote/S3.py +++ b/snakemake/remote/S3.py @@ -23,24 +23,24 @@ "needs to be installed to use S3 remote() file functionality. %s" % e.msg) -class RemoteProvider(AbstractRemoteProvider): +class RemoteProvider(AbstractRemoteProvider): # class inherits from AbstractRemoteProvider - supports_default = True + supports_default = True # class variable - def __init__(self, *args, stay_on_remote=False, keep_local=False, is_default=False, **kwargs): - super(RemoteProvider, self).__init__(*args, stay_on_remote=stay_on_remote, keep_local=keep_local, is_default=is_default, **kwargs) + def __init__(self, *args, stay_on_remote=False, **kwargs): # this method is evaluated when instantiating this class + super(RemoteProvider, self).__init__(*args, stay_on_remote=stay_on_remote, **kwargs) # in addition to methods provided by AbstractRemoteProvider, we add these in - self._s3c = S3Helper(*args, **kwargs) + self._s3c = S3Helper(*args, **kwargs) # _private variable by convention def remote_interface(self): return self._s3c - @property - def default_protocol(self): + @property # decorator, so that this function can be access as an attribute, instead of a method + def default_(self): """The protocol that is prepended to the path when no protocol is specified.""" return 's3://' - @property + @property # decorator, so that this function can be access as an attribute, instead of a method def available_protocols(self): """List of valid protocols for this remote provider.""" return ['s3://'] @@ -230,8 +230,6 @@ def download_from_s3( assert bucket_name, "bucket_name must be specified" assert key, "Key must be specified" - b = self.s3.Bucket(bucket_name) - if destination_path: destination_path = os.path.realpath(os.path.expanduser(destination_path)) else: diff --git a/tests/test_remote_azure/.gitignore b/tests/test_remote_azure/.gitignore new file mode 100644 index 000000000..b59f82339 --- /dev/null +++ b/tests/test_remote_azure/.gitignore @@ -0,0 +1,2 @@ +test.txt.gz +globbing.done diff --git a/tests/test_remote_azure/README.md b/tests/test_remote_azure/README.md new file mode 100644 index 000000000..0f4aa60f7 --- /dev/null +++ b/tests/test_remote_azure/README.md @@ -0,0 +1,8 @@ +# Instruction for testing of Azure Storage integration +* in order to perform this test, an Azure Storage Account is required +* Both the storage account and associated key need to be passed to snakemake at runtime +* currently this is solved by setting and exporting environment variables called +** $AZURE_ACCOUNT +** $AZURE_KEY +* furthermore, in the storage account, a container "snakemake-test" needs to be created prio to running the test + diff --git a/tests/test_remote_azure/Snakefile b/tests/test_remote_azure/Snakefile new file mode 100644 index 000000000..2da45e6aa --- /dev/null +++ b/tests/test_remote_azure/Snakefile @@ -0,0 +1,46 @@ +import os +import fnmatch +import snakemake +from snakemake.exceptions import MissingInputException +from snakemake.remote.AzureStorage import RemoteProvider as AzureRemoteProvider + +# setup Azure Storage for remote access +# for testing these variable can be added to CircleCI +account_key=os.environ['AZURE_KEY'] +account_name=os.environ['AZURE_ACCOUNT'] +AS = AzureRemoteProvider(account_name=account_name, account_key=account_key) + + +rule upload_to_azure_storage: + input: + "data/test.txt.gz" + output: + AS.remote("snakemake-test/data_upload/test.txt.gz") + run: + shell("cp {input} {output}") + +rule download_from_azure_storage: + input: + AS.remote("snakemake-test/data_upload/test.txt.gz") + output: + "test.txt.gz" + run: + shell("cp {input} {output}") + +rule test_globbing: + input: + AS.remote("snakemake-test/data_upload/test.txt.gz") + output: + touch("globbing.done") + run: + basenames, = AS.glob_wildcards("snakemake-test/data_upload/{base}.txt.gz") + assert "test" in basenames + +rule test_download: + input: + "globbing.done", + "test.txt.gz" + +rule test_upload: + input: + AS.remote("snakemake-test/data_upload/test.txt.gz") diff --git a/tests/tests.py b/tests/tests.py index 90d89b948..3789809e4 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -590,10 +590,21 @@ def test_run_namedlist(): @connected @not_ci def test_remote_gs(): - run(dpath("test_remote_gs")) + if not "CI" in os.environ: + run(dpath("test_remote_gs")) + else: + print("skipping test_remote_gs in CI") @connected +@ci +def test_remote_azure(): + if not "CI" in os.environ: + run(dpath("test_remote_azure")) + else: + print("skipping test_remote_azure in CI") + + def test_remote_log(): run(dpath("test_remote_log"), shouldfail=True)