From 0f9c49fe8643cca0e42e3b091cf9706a7feb877d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Sun, 11 Jun 2023 20:15:30 +0200 Subject: [PATCH] feat: Azbatch executor (#1953) (#2246) ### Description Finish remaining issues with the original implementation of the Azure Batch Executor originally tackled by @andreas-wilm in #533. - simplified command line options - batch configuration as a class with sensible defaults, or environment variable overrides. - made use of existing AzBlob helper where possible. - updates dockerfile with necessary deps - container jakevc/snakemake:latest contains these changes for testing Toyed with the idea of having more configuration from command line options, but this litters the codebase with redundant code (each command line option needs to be repeated a few times in __init__.py, scheduler.py, workflow.py, azure_batch.py). Env Variables seem appropriate with sensible defaults. ### QC * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). ### Example Example invocation: ```bash export AZ_BLOB_ACCOUNT_URL=********* export AZ_BATCH_ACCOUNT_KEY=********* snakemake \ --jobs 3 \ -rpf --verbose --default-remote-prefix snaketest \ --use-conda \ --default-remote-provider AzBlob \ --envvars AZ_BLOB_ACCOUNT_URL \ --az-batch \ --container-image jakevc/snakemake \ --az-batch-account-url ********** ``` --------- ### Description ### QC * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). --------- Signed-off-by: dependabot[bot] Co-authored-by: Jake VanCampen Co-authored-by: Emmanouil "Manolis" Maragkakis Co-authored-by: Jesse Connell Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: roshnipatel Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Mike Taves Co-authored-by: Rich Abdill Co-authored-by: Koen van Greevenbroek <74298901+koen-vg@users.noreply.github.com> Co-authored-by: Chang Y --- .github/workflows/main.yml | 12 + .github/workflows/scripts/sacct-proxy.py | 2 +- Dockerfile | 2 +- docs/executor_tutorial/azure_batch.rst | 380 +++++++ docs/executor_tutorial/tutorial.rst | 3 +- setup.cfg | 8 + snakemake/__init__.py | 76 +- snakemake/executors/__init__.py | 3 +- snakemake/executors/azure_batch.py | 924 ++++++++++++++++++ snakemake/executors/common.py | 9 + snakemake/jobs.py | 9 +- snakemake/remote/AzBlob.py | 28 +- snakemake/scheduler.py | 33 + snakemake/workflow.py | 6 + test-environment.yml | 6 +- tests/test_azure_batch/Snakefile | 11 + .../expected-results/.gitignore | 0 tests/test_azure_batch_executor.py | 25 + versioneer.py | 307 +++--- 19 files changed, 1709 insertions(+), 135 deletions(-) create mode 100644 docs/executor_tutorial/azure_batch.rst create mode 100644 snakemake/executors/azure_batch.py create mode 100644 tests/test_azure_batch/Snakefile create mode 100644 tests/test_azure_batch/expected-results/.gitignore create mode 100644 tests/test_azure_batch_executor.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f43603930..6a1de2ab9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -244,6 +244,18 @@ jobs: # shell: bash -el {0} # run: | # pytest -s -v -x tests/test_kubernetes.py + + - name: Test Azure Batch Executor + shell: bash -el {0} + env: + AZ_BLOB_PREFIX: "${{ secrets.AZ_BLOB_PREFIX }}" + AZ_BLOB_ACCOUNT_URL: "${{ secrets.AZ_STORAGE_ACCOUNT_URL }}" + AZ_BLOB_CREDENTIAL: "${{ secrets.AZ_STORAGE_KEY }}" + AZ_BATCH_ACCOUNT_URL: "${{ secrets.AZ_BATCH_ACCOUNT_URL }}" + AZ_BATCH_ACCOUNT_KEY: "${{ secrets.AZ_BATCH_KEY }}" + run: | + pytest -s -v -x tests/test_azure_batch_executor.py + - name: Test GA4GH TES executor shell: bash -el {0} run: | diff --git a/.github/workflows/scripts/sacct-proxy.py b/.github/workflows/scripts/sacct-proxy.py index 8838b51d4..1ba750b8d 100755 --- a/.github/workflows/scripts/sacct-proxy.py +++ b/.github/workflows/scripts/sacct-proxy.py @@ -22,4 +22,4 @@ elif args.name: sp.call(["squeue", "--noheader", "--format", "%F|%T", "--name", args.name]) else: - raise ValueError("Unsupported arguments") \ No newline at end of file + raise ValueError("Unsupported arguments") diff --git a/Dockerfile b/Dockerfile index 015d0c661..e57b4482d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,6 +15,6 @@ RUN /bin/bash -c "mamba create -q -y -c conda-forge -c bioconda -n snakemake sna mamba install -q -y -c conda-forge singularity && \ conda clean --all -y && \ which python && \ - pip install .[reports,messaging,google-cloud]" + pip install .[reports,messaging,google-cloud,azure]" RUN echo "source activate snakemake" > ~/.bashrc ENV PATH /opt/conda/envs/snakemake/bin:${PATH} diff --git a/docs/executor_tutorial/azure_batch.rst b/docs/executor_tutorial/azure_batch.rst new file mode 100644 index 000000000..f02c847d6 --- /dev/null +++ b/docs/executor_tutorial/azure_batch.rst @@ -0,0 +1,380 @@ +.. _tutorial-azure-batch: + +Azure Batch Tutorial +--------------------------------------------------------------- + +.. _Snakemake: http://snakemake.readthedocs.io +.. _Python: https://www.python.org/ +.. _AZCLI: https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest + +In this tutorial we will show how to execute a Snakemake workflow +on Azure batch nodes without a shared file-system. One could use attached storage +solutions as a shared file system, but this adds an unnecessary level of complexity +and most importantly costs. Instead we use cheap Azure Blob storage, +which is used by Snakemake to automatically stage data in and out for +every job. Please visit the `Azure Batch Documentation +`__ +for an overview of the various components of Azure Batch. + +Following the steps below you will + +#. Set up Azure Blob storage, and sync the Snakemake tutorial data to a storage container. +#. Create an Azure Batch account +#. Run the example Sankemake workflow on the batch account + + +Setup +::::: + +To go through this tutorial, you need the following software installed: + +* Python_ ≥3.6 +* Snakemake_ ≥7.25.4 +* AZCLI_ + + +First install conda as outlined in the :ref:`tutorial `, +and then install the full Snakemake with some additional Azure related optional dependencies: + +.. code:: console + + conda create -c bioconda -c conda-forge -n snakemake snakemake azure-batch azure-storage-blob azure-mgmt-batch azure-identity + +Naturally, you can omit the deployment of such an environment in case you already have it, or you can update an existing Snakemake environment with the additional dependencies. + +Create an Azure storage account and upload example data +::::::::::::::::::::::::::::::::::::::::::::::: + +We will be starting from scratch, i.e. we will +create a new resource group and storage account. You can obviously reuse +existing resources instead. + +.. code:: console + + # change the following names as required + # azure region where to run: + export region=westus + + # name of the resource group to create: + export resgroup=snakemaks-rg + + # name of storage account to create (all lowercase, no hyphens etc.): + export stgacct=snakemaksstg + + # create a resource group with name and in region as defined above + az group create --name $resgroup --location $region + + # create a general purpose storage account with cheapest SKU + az storage account create -n $stgacct -g $resgroup --sku Standard_LRS -l $region + +Get a key for that account and save it as ``stgkey`` for later use: + +.. code:: console + + export stgkey=$(az storage account keys list -g $resgroup -n $stgacct | head -n1 | cut -f 3) + +Next, you will create a storage container (think: bucket) to upload the Snakemake tutorial data to: + +.. code:: console + + az storage container create --resource-group $resgroup --account-name $stgacct \ + --account-key $stgkey --name snakemake-tutorial + + cd /tmp + + git clone https://github.com/snakemake/snakemake-tutorial-data.git + + cd snakemake-tutorial-data + + az storage blob upload-batch -d snakemake-tutorial --account-name $stgacct \ + --account-key $stgkey -s data/ --destination-path data + +Here we are using `az storage blob` for uploading the tutorial data, because the AZCLI_ is already installed. +Another cli tool for uploading to azure storage is +`azcopy `__. + +Azure Blob Storage Warning: +::::::::::::::::::: + +The snakemake azbatch executor will not work with data in a storage account that has "hierarchical namespace" enabled. +Azure hierarchical namespace is a new api on azure storage that is also called "ADLS Gen2". +Snakemake does not currently support this storage format because the Python API is distinct from traditional blob storage. +For more details see: https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-namespace. + + +Create an Azure Batch Account +::::::::::::::::::::::::::::::::::::::::: + +Create a new azure batch account and capture the batch account url and batch account key as environment variables. The batch account key will be given to snakemake to enable creation of batch resources by snakemake. + +.. code:: console + + # can set variables as appropriate + export accountname=snakebatch01 + az batch account create --resource-group $resgroup --name $accountname --location $region + + +The format of the batch account url is :code:`https://${accountname}.${region}.batch.azure.com`, which can be constructed from the output of the command :code:`az batch account list` or copied from the azure portal overview page of your batch account. + +.. code:: console + + # get batch account url from command line + export batch_endpoint=$(az batch account show --name $accountname --resource-group $resgroup --query "accountEndpoint" --output tsv) + export batch_account_url=="https://${batch_endpoint}" + + +.. code:: console + + az_batch_account_key=$(az batch account keys list --resource-group $resgroup --name $accountname -o tsv | head -n1 | cut -f2) + + + +To run the test workflow, two primary environment variables need to be set local to the snakemake invocation. +The azure batch account key, and the azure storage account url with an SAS key. More details about the AZ_BLOB_ACCOUNT_URL +are described in the section below. + +.. code:: console + + export AZ_BLOB_ACCOUNT_URL="${storage_account_url_with_sas}" + export AZ_BATCH_ACCOUNT_KEY="${az_batch_account_key}" + + +Running the workflow +:::::::::::::::::::: + +Below we will run an example Snakemake workflow, using conda to install software on the fly. +Clone the example workflow and cd into the directory: + +.. code:: console + + $ git clone https://github.com/jakevc/snakemake-azbatch-example.git + $ cd snakemake-azbatch-example + $ tree + tree + . + ├── README.md + ├── Snakefile + ├── envs + │ ├── calling.yaml + │ ├── mapping.yaml + │ └── stats.yaml + ├── run.sh + └── src + └── plot-quals.py + +Now we will need to setup the credentials that allow the batch nodes to +read and write from blob storage. For the AzBlob storage provider in +Snakemake this is done through the environment variables +``AZ_BLOB_ACCOUNT_URL`` and optionally ``AZ_BLOB_CREDENTIAL``. See the +`documentation `__ for more info. +``AZ_BLOB_ACCOUNT_URL`` takes the form ``https://.blob.core.windows.net/`` +or may also contain a storage account shared access signature (SAS) token with the form ``https://.blob.core.windows.net/``, +which is a powerful way to define fine grained and even time controlled access to storage blobs on Azure. +If the SAS token is not specified as part of the ``AZ_BLOB_ACCOUNT_URL`` it must be specified using ``AZ_BLOB_CREDENTIAL``. +``AZ_BLOB_CREDENTIAL`` must be a storage account SAS token, and usually needs to be enclosed in quotes when set from the +command line as it contains special characters that need to be escaped. + +When using azure storage and snakemake without the azure batch executor, it is valid to use storage account key credentials for ``AZ_BLOB_CREDENTIAL``, +but this type of authentication is not supported with Azure batch so we must use a storage account SAS token credential when using the azure batch executor. + +The blob account url combined with SAS token is generally the simplest solution because it results in only needing to specify the ``AZ_BLOB_ACCOUNT_URL``. We’ll pass the ``AZ_BLOB_ACCOUNT_URL`` on to the batch nodes +with ``--envvars`` (see below). If using both AZ_BLOB_ACCOUNT_URL, and AZ_BLOB_CREDENTIAL, you will pass both variables to the --envvars command line argument. + +The following optional environment variables can be set to override their associated default values, +and are used to change the runtime configuration of the batch nodes themselves: + + +.. list-table:: Optional Batch Node Configuration Environment Variables + :widths: 40 40 40 + :header-rows: 1 + + * - Environment Variable + - Default Value + - Description + * - BATCH_POOL_IMAGE_PUBLISHER + - microsoft-azure-batch + - publisher of the vm image for the batch nodes + * - BATCH_POOL_IMAGE_OFFER + - ubuntu-server-container + - vm image offer for the batch nodes + * - BATCH_POOL_IMAGE_SKU + - 20-04-lts + - vm image sku for batch nodes + * - BATCH_POOL_VM_CONTAINER_IMAGE + - ubuntu + - batch nodes vm container image + * - BATCH_POOL_VM_NODE_AGENT_SKU_ID + - batch.node.ubuntu 20.04 + - sku id for batch node vm images + * - BATCH_POOL_VM_SIZE + - Standard_D2_v3 + - batch node vm image size + * - BATCH_POOL_SUBNET_ID + - None + - subnetwork to deploy batch nodes into, requires the configuration of BATCH_MANAGED_IDENTITY + * - BATCH_POOL_NODE_COUNT + - 1 + - batch pool node count + * - BATCH_POOL_RESOURCE_FILE_PREFIX + - resource-files + - container prefix for temporary resource files tar ball (Snakefile, envs) + * - BATCH_NODE_START_TASK_SAS_URL + - None + - specify an SAS url to a bash script start task to run on each batch node + * - BATCH_NODE_FILL_TYPE + - spread + - possible values ("spread", or "pack") + * - BATCH_NODE_COMMUNICATION_SIMPLIFIED + - None, "classic" + - If set, configures the batch pool to use the 'simplified' node communication mode. + * - BATCH_TASKS_PER_NODE + - 1 + - the number of tasks allowed per batch node + * - BATCH_MANAGED_IDENTITY_RESOURCE_ID + - None + - The resource ID of the managed identity to use + * - BATCH_MANAGED_IDENTITY_CLIENT_ID + - None + - The client ID of the managed identity to use + * - BATCH_CONTAINER_REGISTRY_URL + - None + - Container registry url to configure on the batch nodes + * - BATCH_CONTAINER_REGISTRY_USER + - None + - Container registry user, overrides managed identity authentication if set with password. + * - BATCH_CONTAINER_REGISTRY_PASS + - None + - Container registry password + + + +Now you are ready to run the analysis: + +.. code:: console + + # required env variables + export AZ_BLOB_PREFIX=snakemake-tutorial + export AZ_BATCH_ACCOUNT_URL="${batch_account_url}" + export AZ_BATCH_ACCOUNT_KEY="${az_batch_account_key}" + export AZ_BLOB_ACCOUNT_URL="${account_url_with_sas}" + + # optional environment variables with defaults listed + + # network and identity + # export BATCH_POOL_SUBNET_ID= + # export BATCH_MANAGED_IDENTITY_RESOURCE_ID= + # export BATCH_MANAGED_IDENTITY_CLIENT_ID= + + # if unset, default is "classic" + # export BATCH_NODE_COMMUNICATION_SIMPLIFIED=true + + # don't recommend changing + # export BATCH_POOL_IMAGE_PUBLISHER=microsoft-azure-batch + # export BATCH_POOL_IMAGE_OFFER=ubuntu-server-container + # export BATCH_POOL_IMAGE_SKU=20-04-lts + # export BATCH_POOL_RESOURCE_FILE_PREFIX=resource-files + + # export BATCH_POOL_VM_CONTAINER_IMAGE=ubuntu + # export BATCH_POOL_VM_NODE_AGENT_SKU_ID="batch.node.ubuntu 20.04" + + # can be used to add a startup task to the batch nodes formatted as an sas url to a bash script + # export BATCH_NODE_START_TASK_SAS_URL= + + # can be useful to alter task distribution across nodes + + # export BATCH_POOL_VM_SIZE=Standard_D2_v3 + # export BATCH_NODE_FILL_TYPE=spread + # export BATCH_POOL_NODE_COUNT=1 + # export BATCH_TASKS_PER_NODE=1 + + # container registry configuration to pull container image from custom registry + # export BATCH_CONTAINER_REGISTRY_URL= + # export BATCH_CONTAINER_REGISTRY_USER= + # export BATCH_CONTAINER_REGISTRY_PASS= + + snakemake \ + --jobs 3 \ + -rpf --verbose --default-remote-prefix $AZ_BLOB_PREFIX \ + --use-conda \ + --default-remote-provider AzBlob \ + --envvars AZ_BLOB_ACCOUNT_URL \ + --az-batch \ + --container-image snakemake/snakemake \ + --az-batch-account-url $AZ_BATCH_ACCOUNT_URL + +This will use the default Snakemake image from Dockerhub. If you would like to use your +own, make sure that the image contains the same Snakemake version as installed locally +and also supports Azure Blob storage. The optional BATCH_CONTAINER_REGISTRY can be configured +to fetch from your own container registry. If that registry is an azure container registry +that the managed identity has access to, then the BATCH_CONTAINER_REGISTRY_USER and BATCH_CONTAINER_REGISTRY_PASS is not needed. + +After completion all results including +logs can be found in the blob container prefix specified by `--default-remote-prefix`. + +:: + + $ az storage blob list --container-name snakemake-tutorial --account-name $stgacct --account-key $stgkey -o table + Name IsDirectory Blob Type Blob Tier Length Content Type Last Modified Snapshot + ---------------------------------------------------------------------------------------------- ------------- ----------- ----------- -------- ------------------------ ------------------------- ---------- + data/genome.fa BlockBlob Hot 234112 application/octet-stream 2022-12-14T23:28:00+00:00 + data/genome.fa.amb BlockBlob Hot 2598 application/octet-stream 2022-12-14T23:28:01+00:00 + data/genome.fa.ann BlockBlob Hot 83 application/octet-stream 2022-12-14T23:28:01+00:00 + data/genome.fa.bwt BlockBlob Hot 230320 application/octet-stream 2022-12-14T23:28:01+00:00 + data/genome.fa.fai BlockBlob Hot 18 application/octet-stream 2022-12-14T23:28:01+00:00 + data/genome.fa.pac BlockBlob Hot 57556 application/octet-stream 2022-12-14T23:28:00+00:00 + data/genome.fa.sa BlockBlob Hot 115160 application/octet-stream 2022-12-14T23:28:01+00:00 + data/samples/A.fastq BlockBlob Hot 5752788 application/octet-stream 2022-12-14T23:28:04+00:00 + data/samples/B.fastq BlockBlob Hot 5775000 application/octet-stream 2022-12-14T23:28:06+00:00 + data/samples/C.fastq BlockBlob Hot 5775000 application/octet-stream 2022-12-14T23:28:02+00:00 + logs/mapped_reads/A.log BlockBlob Hot application/octet-stream 2022-12-28T18:14:33+00:00 + logs/mapped_reads/B.log BlockBlob Hot application/octet-stream 2022-12-28T18:15:25+00:00 + logs/mapped_reads/C.log BlockBlob Hot application/octet-stream 2022-12-28T18:16:17+00:00 + results/calls/all.vcf BlockBlob Hot 90962 application/octet-stream 2022-12-28T18:22:20+00:00 + results/mapped_reads/A.bam BlockBlob Hot 2258050 application/octet-stream 2022-12-28T18:14:33+00:00 + results/mapped_reads/B.bam BlockBlob Hot 2262766 application/octet-stream 2022-12-28T18:15:25+00:00 + results/mapped_reads/C.bam BlockBlob Hot 2262766 application/octet-stream 2022-12-28T18:16:17+00:00 + results/plots/quals.svg BlockBlob Hot 12571 application/octet-stream 2022-12-28T19:16:28+00:00 + results/sorted_reads/A.bam BlockBlob Hot 2244652 application/octet-stream 2022-12-28T18:17:10+00:00 + results/sorted_reads/A.bam.bai BlockBlob Hot 344 application/octet-stream 2022-12-28T18:19:48+00:00 + results/sorted_reads/B.bam BlockBlob Hot 2248758 application/octet-stream 2022-12-28T18:18:08+00:00 + results/sorted_reads/B.bam.bai BlockBlob Hot 344 application/octet-stream 2022-12-28T18:20:36+00:00 + results/sorted_reads/C.bam BlockBlob Hot 2248758 application/octet-stream 2022-12-28T18:18:58+00:00 + results/sorted_reads/C.bam.bai BlockBlob Hot 344 application/octet-stream 2022-12-28T18:21:23+00:00 + +Once the execution is complete, the Batch nodes will scale down +automatically. If you are not planning to run anything else, it makes +sense to shut down it down entirely: + +:: + + az batch account delete --name $accountname --resource-group $resgroup + + +Defining a Start Task +::::: +A start task can be optionally specified as a shell scirpt that runs during each node's startup as it's added to the batch pool. +To specify a start task, set the environment variable BATCH_NODE_START_TASK_SAS_URL to the SAS url of a start task shell script. +Store your shell script in a blob storage account and generate an SAS url to a shell script blob object. +You can generate an SAS URL to the blob using the azure portal or the command line using the following command structure: + +:: + + container="container-name" + expiry="2024-01-01" + blob_name="starttask.sh" + SAS_TOKEN=$(az storage blob generate-sas --account-name $stgacct --container-name $container --name $blob_name --permissions r --auth-mode login --as-user --expiry $expiry -o tsv) + BLOB_URL=$(az storage blob url --account-name cromwellstorage --container-name snaketest --name starttask.sh --auth-mode login -o tsv) + + # then export the full SAS URL + export BATCH_NODE_START_TASK_SAS_URL="${BLOB_URL}?${SAS_TOKEN}" + + +Autoscaling and Task Distribution +::::: + +The azure batch executor supports autoscaling of the batch nodes by including the flag --az-batch-enable-autoscale. +This flag sets the initial dedicated node count of the pool to zero, and re-evaluates the number of nodes to be spun up or down based on the number of remaining tasks to run over a five minute interval. +Since five minutes is the smallest allowed interval for azure batch autoscaling, this feature becomes more useful for long running jobs. For more information on azure batch autoscaling configuration, see: https://learn.microsoft.com/en-us/azure/batch/batch-automatic-scaling. + +For shorter running jobs it might be more cost/time effective to set VM size with more cores `BATCH_POOL_VM_SIZE` and increase the number of `BATCH_TASKS_PER_NODE`. Or, if you want to keep tasks running on separate nodes, you can set a larger number for `BATCH_POOL_NODE_COUNT`. +It may require experimentation to find the most efficient/cost effective task distribution model for your use case depending on what you are optimizing for. For more details on limitations of azure batch node / task distribution see: https://learn.microsoft.com/en-us/azure/batch/batch-parallel-node-tasks. diff --git a/docs/executor_tutorial/tutorial.rst b/docs/executor_tutorial/tutorial.rst index f1b1006be..a7e20a951 100644 --- a/docs/executor_tutorial/tutorial.rst +++ b/docs/executor_tutorial/tutorial.rst @@ -23,8 +23,9 @@ We ensured that no bioinformatics knowledge is needed to understand the tutorial .. toctree:: :maxdepth: 2 - google_lifesciences + azure_batch azure_aks + google_lifesciences flux diff --git a/setup.cfg b/setup.cfg index 9340b3e1d..7d010c1a9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -57,6 +57,7 @@ install_requires = toposort >=1.10 wrapt yte >=1.0,<2.0 + msrest [options.extras_require] google-cloud = @@ -64,6 +65,13 @@ google-cloud = google-cloud-storage google-crc32c oauth2client +azure = + azure-storage-blob + azure-batch + azure-core + azure-identity + azure-mgmt-batch + messaging = slacker pep = eido diff --git a/snakemake/__init__.py b/snakemake/__init__.py index 8717a1f7b..c8b244250 100644 --- a/snakemake/__init__.py +++ b/snakemake/__init__.py @@ -15,7 +15,9 @@ import importlib import shlex from importlib.machinery import SourceFileLoader +from snakemake.executors.common import url_can_parse from snakemake.target_jobs import parse_target_jobs_cli_args +from snakemake.executors.common import url_can_parse from snakemake.workflow import Workflow from snakemake.dag import Batch @@ -173,6 +175,9 @@ def snakemake( flux=False, tibanna=False, tibanna_sfn=None, + az_batch=False, + az_batch_enable_autoscale=False, + az_batch_account_url=None, google_lifesciences=False, google_lifesciences_regions=None, google_lifesciences_location=None, @@ -313,6 +318,9 @@ def snakemake( default_remote_prefix (str): prefix for default remote provider (e.g. name of the bucket). tibanna (bool): submit jobs to AWS cloud using Tibanna. tibanna_sfn (str): Step function (Unicorn) name of Tibanna (e.g. tibanna_unicorn_monty). This must be deployed first using tibanna cli. + az_batch (bool): Submit jobs to azure batch. + az_batch_enable_autoscale (bool): Enable autoscaling of the azure batch pool nodes. This sets the initial dedicated node pool count to zero and resizes the pool only after 5 minutes. So this flag is only recommended for relatively long running jobs., + az_batch_account_url (str): Azure batch account url. google_lifesciences (bool): submit jobs to Google Cloud Life Sciences (pipelines API). google_lifesciences_regions (list): a list of regions (e.g., us-east1) google_lifesciences_location (str): Life Sciences API location (e.g., us-central1) @@ -414,6 +422,11 @@ def snakemake( tibanna_config_dict.update({k: v}) tibanna_config = tibanna_config_dict + # Azure batch uses compute engine and storage + if az_batch: + assume_shared_fs = False + default_remote_provider = "AzBlob" + # Google Cloud Life Sciences API uses compute engine and storage if google_lifesciences: assume_shared_fs = False @@ -452,6 +465,7 @@ def snakemake( or drmaa or kubernetes or tibanna + or az_batch or google_lifesciences or tes or slurm @@ -569,7 +583,7 @@ def snakemake( raise WorkflowError("Unknown default remote provider.") if rmt.RemoteProvider.supports_default: _default_remote_provider = rmt.RemoteProvider( - keep_local=True, is_default=True + keep_local=keep_remote_local, is_default=True ) else: raise WorkflowError( @@ -727,6 +741,9 @@ def snakemake( default_remote_prefix=default_remote_prefix, tibanna=tibanna, tibanna_sfn=tibanna_sfn, + az_batch=az_batch, + az_batch_enable_autoscale=az_batch_enable_autoscale, + az_batch_account_url=az_batch_account_url, google_lifesciences=google_lifesciences, google_lifesciences_regions=google_lifesciences_regions, google_lifesciences_location=google_lifesciences_location, @@ -784,6 +801,9 @@ def snakemake( k8s_cpu_scalar=k8s_cpu_scalar, tibanna=tibanna, tibanna_sfn=tibanna_sfn, + az_batch=az_batch, + az_batch_enable_autoscale=az_batch_enable_autoscale, + az_batch_account_url=az_batch_account_url, google_lifesciences=google_lifesciences, google_lifesciences_regions=google_lifesciences_regions, google_lifesciences_location=google_lifesciences_location, @@ -2362,9 +2382,10 @@ def get_argument_parser(profile=None): group_cloud = parser.add_argument_group("CLOUD") group_flux = parser.add_argument_group("FLUX") group_kubernetes = parser.add_argument_group("KUBERNETES") - group_tibanna = parser.add_argument_group("TIBANNA") group_google_life_science = parser.add_argument_group("GOOGLE_LIFE_SCIENCE") + group_kubernetes = parser.add_argument_group("KUBERNETES") group_tes = parser.add_argument_group("TES") + group_tibanna = parser.add_argument_group("TIBANNA") group_kubernetes.add_argument( "--kubernetes", @@ -2474,6 +2495,26 @@ def get_argument_parser(profile=None): "are deleted at the shutdown step of the workflow.", ) + group_azure_batch = parser.add_argument_group("AZURE_BATCH") + + group_azure_batch.add_argument( + "--az-batch", + action="store_true", + help="Execute workflow on azure batch", + ) + + group_azure_batch.add_argument( + "--az-batch-enable-autoscale", + action="store_true", + help="Enable autoscaling of the azure batch pool nodes, this option will set the initial dedicated node count to zero, and requires five minutes to resize the cluster, so is only recommended for longer running jobs.", + ) + + group_azure_batch.add_argument( + "--az-batch-account-url", + nargs="?", + help="Azure batch account url, requires AZ_BATCH_ACCOUNT_KEY environment variable to be set.", + ) + group_flux.add_argument( "--flux", action="store_true", @@ -2701,6 +2742,7 @@ def adjust_path(f): or args.tibanna or args.kubernetes or args.tes + or args.az_batch or args.google_lifesciences or args.drmaa or args.flux @@ -2820,6 +2862,33 @@ def adjust_path(f): ) sys.exit(1) + if args.az_batch: + if not args.default_remote_provider or not args.default_remote_prefix: + print( + "Error: --az-batch must be combined with " + "--default-remote-provider AzBlob and --default-remote-prefix to " + "provide a blob container name\n", + file=sys.stderr, + ) + sys.exit(1) + elif args.az_batch_account_url is None: + print( + "Error: --az-batch-account-url must be set when --az-batch is used\n", + file=sys.stderr, + ) + sys.exit(1) + elif not url_can_parse(args.az_batch_account_url): + print( + "Error: invalide azure batch account url, please use format: https://{account_name}.{location}.batch.azure.com." + ) + sys.exit(1) + elif os.getenv("AZ_BATCH_ACCOUNT_KEY") is None: + print( + "Error: environment variable AZ_BATCH_ACCOUNT_KEY must be set when --az-batch is used\n", + file=sys.stderr, + ) + sys.exit(1) + if args.google_lifesciences: if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): print( @@ -3018,6 +3087,9 @@ def open_browser(): flux=args.flux, tibanna=args.tibanna, tibanna_sfn=args.tibanna_sfn, + az_batch=args.az_batch, + az_batch_enable_autoscale=args.az_batch_enable_autoscale, + az_batch_account_url=args.az_batch_account_url, google_lifesciences=args.google_lifesciences, google_lifesciences_regions=args.google_lifesciences_regions, google_lifesciences_location=args.google_lifesciences_location, diff --git a/snakemake/executors/__init__.py b/snakemake/executors/__init__.py index 43d5c1f20..1fd3a1db6 100644 --- a/snakemake/executors/__init__.py +++ b/snakemake/executors/__init__.py @@ -484,7 +484,8 @@ def __init__( # Zero thread jobs do not need a thread, but they occupy additional workers. # Hence we need to reserve additional workers for them. - self.workers = workers + 5 + workers = workers + 5 if workers is not None else 5 + self.workers = workers self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) @property diff --git a/snakemake/executors/azure_batch.py b/snakemake/executors/azure_batch.py new file mode 100644 index 000000000..55256188d --- /dev/null +++ b/snakemake/executors/azure_batch.py @@ -0,0 +1,924 @@ +__author__ = "Johannes Köster, Andreas Wilm, Jake VanCampen" +__copyright__ = "Copyright 2022, Johannes Köster" +__email__ = "johannes.koester@uni-due.de" +__license__ = "MIT" + +import os +from collections import namedtuple +from urllib.parse import urlparse +import datetime +import uuid +import io +import shutil +import tarfile +import tempfile +import sys +import re +import msrest.authentication as msa +from pprint import pformat + +from snakemake.executors import ClusterExecutor, sleep +from snakemake.exceptions import WorkflowError +from snakemake.logging import logger +from snakemake.common import get_container_image, get_file_hash, async_lock +from snakemake.resources import DefaultResources + +AzBatchJob = namedtuple("AzBatchJob", "job jobid task_id callback error_callback") + + +class AzBatchConfig: + def __init__(self, batch_account_url: str): + # configure defaults + self.batch_account_url = batch_account_url + + # parse batch account name + result = urlparse(self.batch_account_url) + self.batch_account_name = str.split(result.hostname, ".")[0] + + self.batch_account_key = self.set_or_default("AZ_BATCH_ACCOUNT_KEY", None) + + # optional subnet config + self.batch_pool_subnet_id = self.set_or_default("BATCH_POOL_SUBNET_ID", None) + + # managed identity resource id configuration + self.managed_identity_resource_id = self.set_or_default( + "BATCH_MANAGED_IDENTITY_RESOURCE_ID", None + ) + + # parse subscription and resource id + if self.managed_identity_resource_id is not None: + self.subscription_id = self.managed_identity_resource_id.split("/")[2] + self.resource_group = self.managed_identity_resource_id.split("/")[4] + + self.managed_identity_client_id = self.set_or_default( + "BATCH_MANAGED_IDENTITY_CLIENT_ID", None + ) + + if self.batch_pool_subnet_id is not None: + if ( + self.managed_identity_client_id is None + or self.managed_identity_resource_id is None + ): + sys.exit( + "Error: BATCH_MANAGED_IDENTITY_RESOURCE_ID, BATCH_MANAGED_IDENTITY_CLIENT_ID must be set when deploying batch nodes into a private subnet!" + ) + + # parse account details necessary for batch client authentication steps + if self.batch_pool_subnet_id.split("/")[2] != self.subscription_id: + raise WorkflowError( + "Error: managed identity must be in the same subscription as the batch pool subnet." + ) + + if self.batch_pool_subnet_id.split("/")[4] != self.resource_group: + raise WorkflowError( + "Error: managed identity must be in the same resource group as the batch pool subnet." + ) + + # sas url to a batch node start task bash script + self.batch_node_start_task_sasurl = os.getenv("BATCH_NODE_START_TASK_SAS_URL") + + # options configured with env vars or default + self.batch_pool_image_publisher = self.set_or_default( + "BATCH_POOL_IMAGE_PUBLISHER", "microsoft-azure-batch" + ) + self.batch_pool_image_offer = self.set_or_default( + "BATCH_POOL_IMAGE_OFFER", "ubuntu-server-container" + ) + self.batch_pool_image_sku = self.set_or_default( + "BATCH_POOL_IMAGE_SKU", "20-04-lts" + ) + self.batch_pool_vm_container_image = self.set_or_default( + "BATCH_POOL_VM_CONTAINER_IMAGE", "ubuntu" + ) + self.batch_pool_vm_node_agent_sku_id = self.set_or_default( + "BATCH_POOL_VM_NODE_AGENT_SKU_ID", "batch.node.ubuntu 20.04" + ) + self.batch_pool_vm_size = self.set_or_default( + "BATCH_POOL_VM_SIZE", "Standard_D2_v3" + ) + + # dedicated pool node count + self.batch_pool_node_count = self.set_or_default("BATCH_POOL_NODE_COUNT", 1) + + # default tasks per node + # see https://learn.microsoft.com/en-us/azure/batch/batch-parallel-node-tasks + self.batch_tasks_per_node = self.set_or_default("BATCH_TASKS_PER_NODE", 1) + + # possible values "spread" or "pack" + # see https://learn.microsoft.com/en-us/azure/batch/batch-parallel-node-tasks + self.batch_node_fill_type = self.set_or_default( + "BATCH_NODE_FILL_TYPE", "spread" + ) + + # enables simplified batch node communication if set + # see: https://learn.microsoft.com/en-us/azure/batch/simplified-compute-node-communication + self.batch_node_communication_mode = self.set_or_default( + "BATCH_NODE_COMMUNICATION_SIMPLIFIED", None + ) + + self.resource_file_prefix = self.set_or_default( + "BATCH_POOL_RESOURCE_FILE_PREFIX", "resource-files" + ) + + self.container_registry_url = self.set_or_default( + "BATCH_CONTAINER_REGISTRY_URL", None + ) + + self.container_registry_user = self.set_or_default( + "BATCH_CONTAINER_REGISTRY_USER", None + ) + + self.container_registry_pass = self.set_or_default( + "BATCH_CONTAINER_REGISTRY_PASS", None + ) + + @staticmethod + def set_or_default(evar: str, default: str | None): + gotvar = os.getenv(evar) + if gotvar is not None: + return gotvar + else: + return default + + +# the usage of this credential helper is required to authenitcate batch with managed identity credentials +# because not all Azure SDKs support the azure.identity credentials yet, and batch is one of them. +# ref1: https://gist.github.com/lmazuel/cc683d82ea1d7b40208de7c9fc8de59d +# ref2: https://gist.github.com/lmazuel/cc683d82ea1d7b40208de7c9fc8de59d +class AzureIdentityCredentialAdapter(msa.BasicTokenAuthentication): + def __init__( + self, + credential=None, + resource_id="https://management.azure.com/.default", + **kwargs, + ): + """Adapt any azure-identity credential to work with SDK that needs azure.common.credentials or msrestazure. + Default resource is ARM (syntax of endpoint v2) + :param credential: Any azure-identity credential (DefaultAzureCredential by default) + :param str resource_id: The scope to use to get the token (default ARM) + """ + try: + from azure.core.pipeline.policies import BearerTokenCredentialPolicy + from azure.identity import DefaultAzureCredential + + except ImportError: + raise WorkflowError( + "The Python 3 packages 'azure-core' and 'azure-identity' are required" + ) + + super(AzureIdentityCredentialAdapter, self).__init__(None) + if credential is None: + credential = DefaultAzureCredential() + self._policy = BearerTokenCredentialPolicy(credential, resource_id, **kwargs) + + def _make_request(self): + try: + from azure.core.pipeline import PipelineRequest, PipelineContext + from azure.core.pipeline.transport import HttpRequest + except ImportError: + raise WorkflowError("The Python 3 package azure-core is required") + + return PipelineRequest( + HttpRequest("AzureIdentityCredentialAdapter", "https://fakeurl"), + PipelineContext(None), + ) + + def set_token(self): + """Ask the azure-core BearerTokenCredentialPolicy policy to get a token. + Using the policy gives us for free the caching system of azure-core. + We could make this code simpler by using private method, but by definition + I can't assure they will be there forever, so mocking a fake call to the policy + to extract the token, using 100% public API.""" + request = self._make_request() + self._policy.on_request(request) + # Read Authorization, and get the second part after Bearer + token = request.http_request.headers["Authorization"].split(" ", 1)[1] + self.token = {"access_token": token} + + def signed_session(self, session=None): + self.set_token() + return super(AzureIdentityCredentialAdapter, self).signed_session(session) + + +class AzBatchExecutor(ClusterExecutor): + "Azure Batch Executor" + + def __init__( + self, + workflow, + dag, + cores, + jobname="snakejob.{name}.{jobid}.sh", + printreason=False, + quiet=False, + printshellcmds=False, + container_image=None, + regions=None, + location=None, + cache=False, + local_input=None, + restart_times=None, + max_status_checks_per_second=1, + az_batch_account_url=None, + az_batch_enable_autoscale=False, + ): + super().__init__( + workflow, + dag, + None, + jobname=jobname, + printreason=printreason, + quiet=quiet, + printshellcmds=printshellcmds, + restart_times=restart_times, + assume_shared_fs=False, + max_status_checks_per_second=1, + ) + + try: + from azure.batch import BatchServiceClient + from azure.mgmt.batch import BatchManagementClient + from azure.batch.batch_auth import SharedKeyCredentials + from azure.identity import DefaultAzureCredential + from snakemake.remote.AzBlob import AzureStorageHelper + + except ImportError: + raise WorkflowError( + "The Python 3 packages 'azure-batch', 'azure-mgmt-batch', and 'azure-identity'" + " must be installed to use Azure Batch" + ) + + AZURE_BATCH_RESOURCE_ENDPOINT = "https://batch.core.windows.net/" + + # Here we validate that az blob credential is SAS + # token because it is specific to azure batch executor + self.validate_az_blob_credential_is_sas() + self.azblob_helper = AzureStorageHelper() + + # get container from remote prefix + self.prefix_container = str.split(workflow.default_remote_prefix, "/")[0] + + # setup batch configuration sets self.az_batch_config + self.batch_config = AzBatchConfig(az_batch_account_url) + logger.debug(f"AzBatchConfig: {self.mask_batch_config_as_string()}") + + self.workflow = workflow + + # handle case on OSX with /var/ symlinked to /private/var/ causing + # issues with workdir not matching other workflow file dirs + dirname = os.path.dirname(self.workflow.persistence.path) + osxprefix = "/private" + if osxprefix in dirname: + dirname = dirname.removeprefix(osxprefix) + + self.workdir = dirname + self.workflow.default_resources = DefaultResources(mode="bare") + + # Relative path for running on instance + self._set_snakefile() + + # Prepare workflow sources for build package + self._set_workflow_sources() + + # Pool ids can only contain any combination of alphanumeric characters along with dash and underscore + ts = datetime.datetime.now().strftime("%Y-%m%dT%H-%M-%S") + self.pool_id = "snakepool-{:s}".format(ts) + self.job_id = "snakejob-{:s}".format(ts) + + self.envvars = list(self.workflow.envvars) or [] + + self.container_image = container_image or get_container_image() + + # enable autoscale flag + self.az_batch_enable_autoscale = az_batch_enable_autoscale + + # Package workflow sources files and upload to storage + self._build_packages = set() + targz = self._generate_build_source_package() + + # removed after job failure/success + self.resource_file = self._upload_build_source_package( + targz, resource_prefix=self.batch_config.resource_file_prefix + ) + + # authenticate batch client from SharedKeyCredentials + if ( + self.batch_config.batch_account_key is not None + and self.batch_config.managed_identity_client_id is None + ): + logger.debug("Using batch account key for authentication...") + creds = SharedKeyCredentials( + self.batch_config.batch_account_name, + self.batch_config.batch_account_key, + ) + # else authenticate with managed indentity client id + elif self.batch_config.managed_identity_client_id is not None: + logger.debug("Using managed identity batch authentication...") + creds = DefaultAzureCredential( + managed_identity_client_id=self.batch_config.managed_identity_client_id + ) + creds = AzureIdentityCredentialAdapter( + credential=creds, resource_id=AZURE_BATCH_RESOURCE_ENDPOINT + ) + + self.batch_client = BatchServiceClient( + creds, batch_url=self.batch_config.batch_account_url + ) + + if self.batch_config.managed_identity_resource_id is not None: + self.batch_mgmt_client = BatchManagementClient( + credential=DefaultAzureCredential( + managed_identity_client_id=self.batch_config.managed_identity_client_id + ), + subscription_id=self.batch_config.subscription_id, + ) + + try: + self.create_batch_pool() + except WorkflowError: + logger.debug("Error: Failed to create batch pool, shutting down.") + self.shutdown() + + try: + self.create_batch_job() + except WorkflowError: + logger.debug("Error: Failed to create batch job, shutting down.") + self.shutdown() + + def shutdown(self): + # perform additional steps on shutdown + # if necessary (jobs were cancelled already) + + logger.debug("Deleting AzBatch job") + self.batch_client.job.delete(self.job_id) + + logger.debug("Deleting AzBatch pool") + self.batch_client.pool.delete(self.pool_id) + + logger.debug("Deleting workflow sources from blob") + + self.azblob_helper.delete_from_container( + self.prefix_container, self.resource_file.file_path + ) + + super().shutdown() + + def cancel(self): + for task in self.batch_client.task.list(self.job_id): + # strictly not need as job deletion also deletes task + self.batch_client.task.terminate(self.job_id, task.id) + self.shutdown() + + # mask_dict_vals masks sensitive keys from a dictionary of values for + # logging used to mask dicts with sensitive information from logging + @staticmethod + def mask_dict_vals(mdict: dict, keys: list): + ret_dict = mdict.copy() + for k in keys: + if k in ret_dict.keys() and ret_dict[k] is not None: + ret_dict[k] = 10 * "*" + return ret_dict + + # mask blob url is used to mask url values that may contain SAS + # token information from being printed to the logs + def mask_sas_urls(self, attrs: dict): + attrs_new = attrs.copy() + sas_pattern = r"\?s[v|p]=.+(\'|\"|$)" + mask = 10 * "*" + + for k, value in attrs.items(): + if value is not None and re.search(sas_pattern, str(value)): + attrs_new[k] = re.sub(sas_pattern, mask, value) + + return attrs_new + + def mask_batch_config_as_string(self) -> str: + masked_keys = self.mask_dict_vals( + self.batch_config.__dict__, + [ + "batch_account_key", + "managed_identity_client_id", + ], + ) + masked_urls = self.mask_sas_urls(masked_keys) + return pformat(masked_urls, indent=2) + + def run(self, job, callback=None, submit_callback=None, error_callback=None): + import azure.batch._batch_service_client as batch + import azure.batch.models as batchmodels + + super()._run(job) + + envsettings = [] + for key in self.envvars: + try: + envsettings.append( + batchmodels.EnvironmentSetting(name=key, value=os.environ[key]) + ) + except KeyError: + continue + + exec_job = self.format_job_exec(job) + exec_job = f"/bin/sh -c 'tar xzf {self.resource_file.file_path} && {exec_job}'" + + # A string that uniquely identifies the Task within the Job. + task_uuid = str(uuid.uuid1()) + task_id = f"{job.rule.name}-{task_uuid}" + + # This is the admin user who runs the command inside the container. + user = batchmodels.AutoUserSpecification( + scope=batchmodels.AutoUserScope.pool, + elevation_level=batchmodels.ElevationLevel.admin, + ) + + # This is the docker image we want to run + task_container_settings = batchmodels.TaskContainerSettings( + image_name=self.container_image, container_run_options="--rm" + ) + + # https://docs.microsoft.com/en-us/python/api/azure-batch/azure.batch.models.taskaddparameter?view=azure-python + # all directories recursively below the AZ_BATCH_NODE_ROOT_DIR (the root of Azure Batch directories on the node) + # are mapped into the container, all Task environment variables are mapped into the container, + # and the Task command line is executed in the container + task = batch.models.TaskAddParameter( + id=task_id, + command_line=exec_job, + container_settings=task_container_settings, + resource_files=[self.resource_file], # Snakefile, envs, yml files etc. + user_identity=batchmodels.UserIdentity(auto_user=user), + environment_settings=envsettings, + ) + + # register job as active, using your own namedtuple. + self.batch_client.task.add(self.job_id, task) + self.active_jobs.append( + AzBatchJob(job, self.job_id, task_id, callback, error_callback) + ) + logger.debug(f"Added AzBatch task {task_id}") + logger.debug( + f"Added AzBatch task {pformat(self.mask_sas_urls(task.__dict__), indent=2)}" + ) + + # from https://github.com/Azure-Samples/batch-python-quickstart/blob/master/src/python_quickstart_client.py + @staticmethod + def _read_stream_as_string(stream, encoding): + """Read stream as string + :param stream: input stream generator + :param str encoding: The encoding of the file. The default is utf-8. + :return: The file content. + :rtype: str + """ + output = io.BytesIO() + try: + for data in stream: + output.write(data) + if encoding is None: + encoding = "utf-8" + return output.getvalue().decode(encoding) + finally: + output.close() + + # adopted from https://github.com/Azure-Samples/batch-python-quickstart/blob/master/src/python_quickstart_client.py + def _get_task_output(self, job_id, task_id, stdout_or_stderr, encoding=None): + assert stdout_or_stderr in ["stdout", "stderr"] + fname = stdout_or_stderr + ".txt" + try: + stream = self.batch_client.file.get_from_task(job_id, task_id, fname) + content = self._read_stream_as_string(stream, encoding) + except Exception: + content = "" + + return content + + async def _wait_for_jobs(self): + import azure.batch.models as batchmodels + + while True: + # always use self.lock to avoid race conditions + async with async_lock(self.lock): + if not self.wait: + return + active_jobs = self.active_jobs + self.active_jobs = list() + still_running = list() + + # Loop through active jobs and act on status + for batch_job in active_jobs: + async with self.status_rate_limiter: + logger.debug(f"Monitoring {len(active_jobs)} active AzBatch tasks") + task = self.batch_client.task.get(self.job_id, batch_job.task_id) + + if task.state == batchmodels.TaskState.completed: + dt = ( + task.execution_info.end_time + - task.execution_info.start_time + ) + rc = task.execution_info.exit_code + rt = task.execution_info.retry_count + stderr = self._get_task_output( + self.job_id, batch_job.task_id, "stderr" + ) + stdout = self._get_task_output( + self.job_id, batch_job.task_id, "stdout" + ) + logger.debug( + "task {} completed: result={} exit_code={}\n".format( + batch_job.task_id, task.execution_info.result, rc + ) + ) + logger.debug( + "task {} completed: run_time={}, retry_count={}\n".format( + batch_job.task_id, str(dt), rt + ) + ) + + def print_output(): + logger.debug( + "task {}: stderr='{}'\n".format( + batch_job.task_id, stderr + ) + ) + logger.debug( + "task {}: stdout='{}'\n".format( + batch_job.task_id, stdout + ) + ) + + if ( + task.execution_info.result + == batchmodels.TaskExecutionResult.failure + ): + logger.error( + f"Azure task failed: code={str(task.execution_info.failure_info.code)}, message={str(task.execution_info.failure_info.message)}" + ) + for d in task.execution_info.failure_info.details: + logger.error(f"Error Details: {str(d)}") + print_output() + batch_job.error_callback(batch_job.job) + elif ( + task.execution_info.result + == batchmodels.TaskExecutionResult.success + ): + batch_job.callback(batch_job.job) + else: + logger.error( + "Unknown Azure task execution result: {}".format( + task.execution_info.result + ) + ) + print_output() + batch_job.error_callback(batch_job.job) + + # The operation is still running + else: + logger.debug( + f"task {batch_job.task_id}: creation_time={task.creation_time} state={task.state} node_info={task.node_info}\n" + ) + still_running.append(batch_job) + + # fail if start task fails on a node or node state becomes unusable + # and stream stderr stdout to stream + node_list = self.batch_client.compute_node.list(self.pool_id) + for n in node_list: + # error on unusable node (this occurs if your container image fails to pull) + if n.state == "unusable": + if n.errors is not None: + for e in n.errors: + logger.error( + f"Azure task error: {e.message}, {e.error_details[0].__dict__}" + ) + logger.error( + "A node entered an unusable state, quitting." + ) + return + + if n.start_task_info is not None and ( + n.start_task_info.result + == batchmodels.TaskExecutionResult.failure + ): + try: + stderr_file = ( + self.batch_client.file.get_from_compute_node( + self.pool_id, n.id, "/startup/stderr.txt" + ) + ) + stderr_stream = self._read_stream_as_string( + stderr_file, "utf-8" + ) + except Exception: + stderr_stream = "" + + try: + stdout_file = ( + self.batch_client.file.get_from_compute_node( + self.pool_id, n.id, "/startup/stdout.txt" + ) + ) + stdout_stream = self._read_stream_as_string( + stdout_file, "utf-8" + ) + except Exception: + stdout_stream = "" + + logger.error( + "Azure start task execution failed on node: {}.\nSTART_TASK_STDERR:{}\nSTART_TASK_STDOUT: {}".format( + n.start_task_info.failure_info.message, + stdout_stream, + stderr_stream, + ) + ) + return + + async with async_lock(self.lock): + self.active_jobs.extend(still_running) + await sleep() + + def create_batch_pool(self): + """Creates a pool of compute nodes""" + + import azure.batch._batch_service_client as bsc + import azure.batch.models as batchmodels + import azure.mgmt.batch.models as mgmtbatchmodels + + image_ref = bsc.models.ImageReference( + publisher=self.batch_config.batch_pool_image_publisher, + offer=self.batch_config.batch_pool_image_offer, + sku=self.batch_config.batch_pool_image_sku, + version="latest", + ) + + # optional subnet network configuration + # requires AAD batch auth insead of batch key auth + network_config = None + if self.batch_config.batch_pool_subnet_id is not None: + network_config = batchmodels.NetworkConfiguration( + subnet_id=self.batch_config.batch_pool_subnet_id + ) + + # configure a container registry + + # Specify container configuration, fetching an image + # https://docs.microsoft.com/en-us/azure/batch/batch-docker-container-workloads#prefetch-images-for-container-configuration + container_config = batchmodels.ContainerConfiguration( + container_image_names=[self.container_image] + ) + + user = None + passw = None + identity_ref = None + registry_conf = None + + if self.batch_config.container_registry_url is not None: + if ( + self.batch_config.container_registry_user is not None + and self.batch_config.container_registry_pass is not None + ): + user = self.batch_config.container_registry_user + passw = self.batch_config.container_registry_pass + elif self.batch_config.managed_identity_resource_id is not None: + identity_ref = batchmodels.ComputeNodeIdentityReference( + resource_id=self.batch_config.managed_identity_resource_id + ) + else: + raise WorkflowError( + "No container registry authentication scheme set. Please set the BATCH_CONTAINER_REGISTRY_USER and BATCH_CONTAINER_REGISTRY_PASS or set MANAGED_IDENTITY_CLIENT_ID and MANAGED_IDENTITY_RESOURCE_ID." + ) + + registry_conf = [ + batchmodels.ContainerRegistry( + registry_server=self.batch_config.container_registry_url, + identity_reference=identity_ref, + user_name=str(user), + password=str(passw), + ) + ] + + # Specify container configuration, fetching an image + # https://docs.microsoft.com/en-us/azure/batch/batch-docker-container-workloads#prefetch-images-for-container-configuration + container_config = batchmodels.ContainerConfiguration( + container_image_names=[self.container_image], + container_registries=registry_conf, + ) + + # default to no start task + start_task = None + + # if configured us start task bash script from sas url + if self.batch_config.batch_node_start_task_sasurl is not None: + _SIMPLE_TASK_NAME = "start_task.sh" + start_task_admin = batchmodels.UserIdentity( + auto_user=batchmodels.AutoUserSpecification( + elevation_level=batchmodels.ElevationLevel.admin, + scope=batchmodels.AutoUserScope.pool, + ) + ) + start_task = batchmodels.StartTask( + command_line=f"bash {_SIMPLE_TASK_NAME}", + resource_files=[ + batchmodels.ResourceFile( + file_path=_SIMPLE_TASK_NAME, + http_url=self.batch_config.batch_node_start_task_sasurl, + ) + ], + user_identity=start_task_admin, + ) + + # autoscale requires the initial dedicated node count to be zero + if self.az_batch_enable_autoscale: + self.batch_config.batch_pool_node_count = 0 + + node_communication_strategy = None + if self.batch_config.batch_node_communication_mode is not None: + node_communication_strategy = batchmodels.NodeCommunicationMode.simplified + + new_pool = batchmodels.PoolAddParameter( + id=self.pool_id, + virtual_machine_configuration=batchmodels.VirtualMachineConfiguration( + image_reference=image_ref, + container_configuration=container_config, + node_agent_sku_id=self.batch_config.batch_pool_vm_node_agent_sku_id, + ), + network_configuration=network_config, + vm_size=self.batch_config.batch_pool_vm_size, + target_dedicated_nodes=self.batch_config.batch_pool_node_count, + target_node_communication_mode=node_communication_strategy, + target_low_priority_nodes=0, + start_task=start_task, + task_slots_per_node=self.batch_config.batch_tasks_per_node, + task_scheduling_policy=batchmodels.TaskSchedulingPolicy( + node_fill_type=self.batch_config.batch_node_fill_type + ), + ) + + # create pool if not exists + try: + logger.debug(f"Creating pool: {self.pool_id}") + self.batch_client.pool.add(new_pool) + + if self.az_batch_enable_autoscale: + # define the autoscale formula + formula = """$samples = $PendingTasks.GetSamplePercent(TimeInterval_Minute * 5); + $tasks = $samples < 70 ? max(0,$PendingTasks.GetSample(1)) : max( $PendingTasks.GetSample(1), avg($PendingTasks.GetSample(TimeInterval_Minute * 5))); + $targetVMs = $tasks > 0? $tasks:max(0, $TargetDedicatedNodes/2); + $TargetDedicatedNodes = max(0, min($targetVMs, 10)); + $NodeDeallocationOption = taskcompletion;""" + + # Enable autoscale; specify the formula + self.batch_client.pool.enable_auto_scale( + self.pool_id, + auto_scale_formula=formula, + # the minimum allowed autoscale interval is 5 minutes + auto_scale_evaluation_interval=datetime.timedelta(minutes=5), + pool_enable_auto_scale_options=None, + custom_headers=None, + raw=False, + ) + + # update pool with managed identity, enables batch nodes to act as managed identity + if self.batch_config.managed_identity_resource_id is not None: + mid = mgmtbatchmodels.BatchPoolIdentity( + type=mgmtbatchmodels.PoolIdentityType.user_assigned, + user_assigned_identities={ + self.batch_config.managed_identity_resource_id: mgmtbatchmodels.UserAssignedIdentities() + }, + ) + params = mgmtbatchmodels.Pool(identity=mid) + self.batch_mgmt_client.pool.update( + resource_group_name=self.batch_config.resource_group, + account_name=self.batch_config.batch_account_name, + pool_name=self.pool_id, + parameters=params, + ) + + except batchmodels.BatchErrorException as err: + if err.error.code != "PoolExists": + raise WorkflowError( + f"Error: Failed to create pool: {err.error.message}" + ) + else: + logger.debug(f"Pool {self.pool_id} exists.") + + def create_batch_job(self): + """Creates a job with the specified ID, associated with the specified pool""" + import azure.batch._batch_service_client as bsc + + logger.debug(f"Creating job {self.job_id}") + + self.batch_client.job.add( + bsc.models.JobAddParameter( + id=self.job_id, + constraints=bsc.models.JobConstraints(max_task_retry_count=0), + pool_info=bsc.models.PoolInformation(pool_id=self.pool_id), + ) + ) + + @staticmethod + def validate_az_blob_credential_is_sas(): + """ + Validates that the AZ_BLOB_CREDENTIAL is a valid storage account SAS + token, required when using --az-batch with AzBlob remote. + """ + cred = os.environ.get("AZ_BLOB_CREDENTIAL") + if cred is not None: + # regex pattern for Storage Account SAS Token + rgx = r"\?sv=.*&ss=.*&srt=.*&sp=.*&se=.*&st=.*&spr=.*&sig=.*" + if re.compile(rgx).match(cred) is None: + raise WorkflowError( + "AZ_BLOB_CREDENTIAL is not a valid storage account SAS token." + ) + + def _set_snakefile(self): + """The snakefile must be a relative path, which cannot be reliably + derived from the self.workflow.snakefile as we might have moved + execution into a temporary directory, and the initial Snakefile + was somewhere else on the system. + """ + self.snakefile = os.path.basename(self.workflow.main_snakefile) + + # from google_lifesciences.py + def _set_workflow_sources(self): + """We only add files from the working directory that are config related + (e.g., the Snakefile or a config.yml equivalent), or checked into git. + """ + self.workflow_sources = [] + + for wfs in self.dag.get_sources(): + if os.path.isdir(wfs): + for dirpath, dirnames, filenames in os.walk(wfs): + self.workflow_sources.extend( + [ + self.workflow.check_source_sizes(os.path.join(dirpath, f)) + for f in filenames + ] + ) + else: + self.workflow_sources.append( + self.workflow.check_source_sizes(os.path.abspath(wfs)) + ) + + # from google_lifesciences.py + def _generate_build_source_package(self): + """in order for the instance to access the working directory in storage, + we need to upload it. This file is cleaned up at the end of the run. + We do this, and then obtain from the instance and extract. + """ + # Workflow sources for cloud executor must all be under same workdir root + for filename in self.workflow_sources: + if self.workdir not in filename: + raise WorkflowError( + "All source files must be present in the working directory, " + "{workdir} to be uploaded to a build package that respects " + "relative paths, but {filename} was found outside of this " + "directory. Please set your working directory accordingly, " + "and the path of your Snakefile to be relative to it.".format( + workdir=self.workdir, filename=filename + ) + ) + + # We will generate a tar.gz package, renamed by hash + tmpname = next(tempfile._get_candidate_names()) + targz = os.path.join(tempfile.gettempdir(), f"snakemake-{tmpname}.tar.gz") + tar = tarfile.open(targz, "w:gz") + + # Add all workflow_sources files + for filename in self.workflow_sources: + arcname = filename.replace(self.workdir + os.path.sep, "") + tar.add(filename, arcname=arcname) + logger.debug( + f"Created {targz} with the following contents: {self.workflow_sources}" + ) + tar.close() + + # Rename based on hash, in case user wants to save cache + sha256 = get_file_hash(targz) + hash_tar = os.path.join( + self.workflow.persistence.aux_path, f"workdir-{sha256}.tar.gz" + ) + + # Only copy if we don't have it yet, clean up if we do + if not os.path.exists(hash_tar): + shutil.move(targz, hash_tar) + else: + os.remove(targz) + + # We will clean these all up at shutdown + self._build_packages.add(hash_tar) + + return hash_tar + + def _upload_build_source_package(self, targz, resource_prefix=""): + """given a .tar.gz created for a workflow, upload it to the blob + storage account, only if the blob doesn't already exist. + """ + + import azure.batch.models as batchmodels + + blob_name = os.path.join(resource_prefix, os.path.basename(targz)) + + # upload blob to storage using storage helper + bc = self.azblob_helper.upload_to_azure_storage( + self.prefix_container, targz, blob_name=blob_name + ) + + # return resource file + return batchmodels.ResourceFile(http_url=bc.url, file_path=blob_name) diff --git a/snakemake/executors/common.py b/snakemake/executors/common.py index c44972eda..8f1f61df8 100644 --- a/snakemake/executors/common.py +++ b/snakemake/executors/common.py @@ -1,5 +1,6 @@ from collections import UserDict from snakemake.io import not_iterable +from urllib.parse import urlparse def format_cli_arg(flag, value, quote=True, skip=False): @@ -23,3 +24,11 @@ def format_cli_pos_arg(value, quote=True): def join_cli_args(args): return " ".join(arg for arg in args if arg) + + +def url_can_parse(url: str) -> bool: + """ + returns true if urllib.parse.urlparse can parse + scheme and netloc + """ + return all(list(urlparse(url))[:2]) diff --git a/snakemake/jobs.py b/snakemake/jobs.py index 01aff4ac0..c5c65d108 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -839,8 +839,13 @@ def prepare(self): if self.benchmark: self.benchmark.prepare() - # wait for input files - wait_for_files(self.input, latency_wait=self.dag.workflow.latency_wait) + # wait for input files, respecting keep_remote_local + force_stay_on_remote = not self.dag.keep_remote_local + wait_for_files( + self.input, + force_stay_on_remote=force_stay_on_remote, + latency_wait=self.dag.workflow.latency_wait, + ) if not self.is_shadow: return diff --git a/snakemake/remote/AzBlob.py b/snakemake/remote/AzBlob.py index c2aee98c9..2fbd77100 100644 --- a/snakemake/remote/AzBlob.py +++ b/snakemake/remote/AzBlob.py @@ -167,7 +167,7 @@ def __init__(self, *args, **kwargs): if "stay_on_remote" in kwargs: del kwargs["stay_on_remote"] - # if not handed down explicitely, try to read credentials from + # if not handed down explicitly, try to read credentials from # environment variables. for csavar, envvar in [ ("account_url", "AZ_BLOB_ACCOUNT_URL"), @@ -181,11 +181,35 @@ def __init__(self, *args, **kwargs): # remove leading '?' from SAS if needed # if kwargs.get("sas_token", "").startswith("?"): # kwargs["sas_token"] = kwargs["sas_token"][1:] + if kwargs["account_url"] == "" or kwargs["account_url"] is None: + raise ValueError("Blob Account URL is None or empty string") + + if not self.is_valid_azure_storage_account_url(kwargs["account_url"]): + raise ValueError( + "Blob Account URL does not match azure storage blob account url pattern." + ) # by right only account_key or sas_token should be set, but we let # BlobServiceClient deal with the ambiguity self.blob_service_client = BlobServiceClient(**kwargs) + @staticmethod + def is_valid_azure_storage_account_url(blob_account_url: str) -> bool: + """ + Validates if the blob account url is a valid Azure Storage Account URL. + + Args: + blob_account_url (str): The name of the environment variable. + + Returns: + bool: True if the environment variable is a valid Azure Storage Account URL, False otherwise. + """ + url_pattern = re.compile( + r"^https:\/\/[a-z0-9]+(\.[a-z0-9]+)*\.blob\.core\.windows\.net\/?(.+)?$" + ) + + return bool(url_pattern.match(blob_account_url)) + def container_exists(self, container_name): return any( True for _ in self.blob_service_client.list_containers(container_name) @@ -242,7 +266,7 @@ def upload_to_azure_storage( try: with open(file_path, "rb") as data: blob_client.upload_blob(data, blob_type="BlockBlob") - return blob_client.get_blob_properties().name + return blob_client except Exception as e: raise WorkflowError("Error in creating blob. %s" % str(e)) # return None diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py index 9e2cf064a..0ff8f2763 100644 --- a/snakemake/scheduler.py +++ b/snakemake/scheduler.py @@ -83,6 +83,9 @@ def __init__( flux=None, tibanna=None, tibanna_sfn=None, + az_batch=False, + az_batch_enable_autoscale=False, + az_batch_account_url=None, google_lifesciences=None, google_lifesciences_regions=None, google_lifesciences_location=None, @@ -362,6 +365,36 @@ def __init__( printshellcmds=printshellcmds, ) + elif az_batch: + try: + from snakemake.executors.azure_batch import AzBatchExecutor + except ImportError as e: + raise WorkflowError( + "Unable to load Azure Batch executor. You have to install " + "the msrest, azure-core, azure-batch, azure-mgmt-batch, and azure-identity packages.", + e, + ) + self._local_executor = CPUExecutor( + workflow, + dag, + local_cores, + printreason=printreason, + quiet=quiet, + printshellcmds=printshellcmds, + cores=local_cores, + ) + self._executor = AzBatchExecutor( + workflow, + dag, + cores, + container_image=container_image, + az_batch_account_url=az_batch_account_url, + az_batch_enable_autoscale=az_batch_enable_autoscale, + printreason=printreason, + quiet=quiet, + printshellcmds=printshellcmds, + ) + elif google_lifesciences: self._local_executor = CPUExecutor( workflow, diff --git a/snakemake/workflow.py b/snakemake/workflow.py index 6bd55065a..543cc5d41 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -521,6 +521,9 @@ def execute( flux=None, tibanna=None, tibanna_sfn=None, + az_batch=False, + az_batch_enable_autoscale=False, + az_batch_account_url=None, google_lifesciences=None, google_lifesciences_regions=None, google_lifesciences_location=None, @@ -972,6 +975,9 @@ def files(items): flux=flux, tibanna=tibanna, tibanna_sfn=tibanna_sfn, + az_batch=az_batch, + az_batch_enable_autoscale=az_batch_enable_autoscale, + az_batch_account_url=az_batch_account_url, google_lifesciences=google_lifesciences, google_lifesciences_regions=google_lifesciences_regions, google_lifesciences_location=google_lifesciences_location, diff --git a/test-environment.yml b/test-environment.yml index 1019bfc9d..ea153e405 100644 --- a/test-environment.yml +++ b/test-environment.yml @@ -8,7 +8,7 @@ dependencies: - datrie - boto3 - moto - - junit-xml # needed for S3Mocked + - junit-xml # needed for S3Mocked - httpretty - wrapt - pyyaml @@ -66,3 +66,7 @@ dependencies: - graphviz - humanfriendly - throttler + - msrest + - azure-batch + - azure-mgmt-batch + - azure-identity diff --git a/tests/test_azure_batch/Snakefile b/tests/test_azure_batch/Snakefile new file mode 100644 index 000000000..59b6a297a --- /dev/null +++ b/tests/test_azure_batch/Snakefile @@ -0,0 +1,11 @@ +# Test Azure Batch Executor +rule all: + input: + "test.txt", + + +rule write_test: + output: + "test.txt", + shell: + "echo 'test' > {output}" diff --git a/tests/test_azure_batch/expected-results/.gitignore b/tests/test_azure_batch/expected-results/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_azure_batch_executor.py b/tests/test_azure_batch_executor.py new file mode 100644 index 000000000..6d668148e --- /dev/null +++ b/tests/test_azure_batch_executor.py @@ -0,0 +1,25 @@ +import sys +import os +import re + +sys.path.insert(0, os.path.dirname(__file__)) + +from common import * + + +def test_az_batch_executor(): + # AZ_BATCH_ACCOUNT_URL=https://${batch_account_name}.${region}.batch.azure.com + bau = os.getenv("AZ_BATCH_ACCOUNT_URL") + prefix = os.getenv("AZ_BLOB_PREFIX") + wdir = dpath("test_azure_batch") + blob_account_url = os.getenv("AZ_BLOB_ACCOUNT_URL") + assert blob_account_url is not None and blob_account_url.strip() != "" + + run( + path=wdir, + default_remote_prefix=prefix, + container_image="jakevc/snakemake", + envvars=["AZ_BLOB_ACCOUNT_URL", "AZ_BLOB_CREDENTIAL"], + az_batch=True, + az_batch_account_url=bau, + ) diff --git a/versioneer.py b/versioneer.py index 18e34c2f5..ccc663bea 100644 --- a/versioneer.py +++ b/versioneer.py @@ -1,4 +1,3 @@ - # Version: 0.28 """The Versioneer - like a rocketeer, but for versions. @@ -348,11 +347,13 @@ def get_root(): setup_py = os.path.join(root, "setup.py") versioneer_py = os.path.join(root, "versioneer.py") if not (os.path.exists(setup_py) or os.path.exists(versioneer_py)): - err = ("Versioneer was unable to run the project root directory. " - "Versioneer requires setup.py to be executed from " - "its immediate directory (like 'python setup.py COMMAND'), " - "or in a way that lets it use sys.argv[0] to find the root " - "(like 'python path/to/setup.py COMMAND').") + err = ( + "Versioneer was unable to run the project root directory. " + "Versioneer requires setup.py to be executed from " + "its immediate directory (like 'python setup.py COMMAND'), " + "or in a way that lets it use sys.argv[0] to find the root " + "(like 'python path/to/setup.py COMMAND')." + ) raise VersioneerBadRootError(err) try: # Certain runtime workflows (setup.py install/develop in a setuptools @@ -365,8 +366,10 @@ def get_root(): me_dir = os.path.normcase(os.path.splitext(my_path)[0]) vsr_dir = os.path.normcase(os.path.splitext(versioneer_py)[0]) if me_dir != vsr_dir and "VERSIONEER_PEP518" not in globals(): - print("Warning: build in %s is using versioneer.py from %s" - % (os.path.dirname(my_path), versioneer_py)) + print( + "Warning: build in %s is using versioneer.py from %s" + % (os.path.dirname(my_path), versioneer_py) + ) except NameError: pass return root @@ -384,9 +387,9 @@ def get_config_from_root(root): section = None if pyproject_toml.exists() and have_tomllib: try: - with open(pyproject_toml, 'rb') as fobj: + with open(pyproject_toml, "rb") as fobj: pp = tomllib.load(fobj) - section = pp['tool']['versioneer'] + section = pp["tool"]["versioneer"] except (tomllib.TOMLDecodeError, KeyError): pass if not section: @@ -398,7 +401,7 @@ def get_config_from_root(root): section = parser["versioneer"] cfg = VersioneerConfig() - cfg.VCS = section['VCS'] + cfg.VCS = section["VCS"] cfg.style = section.get("style", "") cfg.versionfile_source = section.get("versionfile_source") cfg.versionfile_build = section.get("versionfile_build") @@ -421,15 +424,16 @@ class NotThisMethod(Exception): def register_vcs_handler(vcs, method): # decorator """Create decorator to mark a method as the handler of a VCS.""" + def decorate(f): """Store f in HANDLERS[vcs][method].""" HANDLERS.setdefault(vcs, {})[method] = f return f + return decorate -def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False, - env=None): +def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False, env=None): """Call the given command(s).""" assert isinstance(commands, list) process = None @@ -445,10 +449,14 @@ def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False, try: dispcmd = str([command] + args) # remember shell=False, so use git.cmd on windows, not just git - process = subprocess.Popen([command] + args, cwd=cwd, env=env, - stdout=subprocess.PIPE, - stderr=(subprocess.PIPE if hide_stderr - else None), **popen_kwargs) + process = subprocess.Popen( + [command] + args, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=(subprocess.PIPE if hide_stderr else None), + **popen_kwargs, + ) break except OSError: e = sys.exc_info()[1] @@ -471,7 +479,9 @@ def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False, return stdout, process.returncode -LONG_VERSION_PY['git'] = r''' +LONG_VERSION_PY[ + "git" +] = r''' # This file helps to compute a version number in source trees obtained from # git-archive tarball (such as those provided by githubs download-from-tag # feature). Distribution tarballs (built by setup.py sdist) and build @@ -1187,7 +1197,7 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose): # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of # just "foo-1.0". If we see a "tag: " prefix, prefer those. TAG = "tag: " - tags = {r[len(TAG):] for r in refs if r.startswith(TAG)} + tags = {r[len(TAG) :] for r in refs if r.startswith(TAG)} if not tags: # Either we're using git < 1.8.3, or there really are no tags. We use # a heuristic: assume all version tags have a digit. The old git %d @@ -1196,7 +1206,7 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose): # between branches and tags. By ignoring refnames without digits, we # filter out many common branch names like "release" and # "stabilization", as well as "HEAD" and "master". - tags = {r for r in refs if re.search(r'\d', r)} + tags = {r for r in refs if re.search(r"\d", r)} if verbose: print("discarding '%s', no digits" % ",".join(refs - tags)) if verbose: @@ -1204,24 +1214,31 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose): for ref in sorted(tags): # sorting will prefer e.g. "2.0" over "2.0rc1" if ref.startswith(tag_prefix): - r = ref[len(tag_prefix):] + r = ref[len(tag_prefix) :] # Filter out refs that exactly match prefix or that don't start # with a number once the prefix is stripped (mostly a concern # when prefix is '') - if not re.match(r'\d', r): + if not re.match(r"\d", r): continue if verbose: print("picking %s" % r) - return {"version": r, - "full-revisionid": keywords["full"].strip(), - "dirty": False, "error": None, - "date": date} + return { + "version": r, + "full-revisionid": keywords["full"].strip(), + "dirty": False, + "error": None, + "date": date, + } # no suitable tags, so version is "0+unknown", but full hex is still there if verbose: print("no suitable tags, using unknown + full revision id") - return {"version": "0+unknown", - "full-revisionid": keywords["full"].strip(), - "dirty": False, "error": "no suitable tags", "date": None} + return { + "version": "0+unknown", + "full-revisionid": keywords["full"].strip(), + "dirty": False, + "error": "no suitable tags", + "date": None, + } @register_vcs_handler("git", "pieces_from_vcs") @@ -1243,8 +1260,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, runner=run_command): env.pop("GIT_DIR", None) runner = functools.partial(runner, env=env) - _, rc = runner(GITS, ["rev-parse", "--git-dir"], cwd=root, - hide_stderr=not verbose) + _, rc = runner(GITS, ["rev-parse", "--git-dir"], cwd=root, hide_stderr=not verbose) if rc != 0: if verbose: print("Directory %s not under git control" % root) @@ -1252,10 +1268,19 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, runner=run_command): # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty] # if there isn't one, this yields HEX[-dirty] (no NUM) - describe_out, rc = runner(GITS, [ - "describe", "--tags", "--dirty", "--always", "--long", - "--match", f"{tag_prefix}[[:digit:]]*" - ], cwd=root) + describe_out, rc = runner( + GITS, + [ + "describe", + "--tags", + "--dirty", + "--always", + "--long", + "--match", + f"{tag_prefix}[[:digit:]]*", + ], + cwd=root, + ) # --long was added in git-1.5.5 if describe_out is None: raise NotThisMethod("'git describe' failed") @@ -1270,8 +1295,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, runner=run_command): pieces["short"] = full_out[:7] # maybe improved later pieces["error"] = None - branch_name, rc = runner(GITS, ["rev-parse", "--abbrev-ref", "HEAD"], - cwd=root) + branch_name, rc = runner(GITS, ["rev-parse", "--abbrev-ref", "HEAD"], cwd=root) # --abbrev-ref was added in git-1.6.3 if rc != 0 or branch_name is None: raise NotThisMethod("'git rev-parse --abbrev-ref' returned error") @@ -1311,17 +1335,16 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, runner=run_command): dirty = git_describe.endswith("-dirty") pieces["dirty"] = dirty if dirty: - git_describe = git_describe[:git_describe.rindex("-dirty")] + git_describe = git_describe[: git_describe.rindex("-dirty")] # now we have TAG-NUM-gHEX or HEX if "-" in git_describe: # TAG-NUM-gHEX - mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe) + mo = re.search(r"^(.+)-(\d+)-g([0-9a-f]+)$", git_describe) if not mo: # unparsable. Maybe git-describe is misbehaving? - pieces["error"] = ("unable to parse git-describe output: '%s'" - % describe_out) + pieces["error"] = "unable to parse git-describe output: '%s'" % describe_out return pieces # tag @@ -1330,10 +1353,12 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, runner=run_command): if verbose: fmt = "tag '%s' doesn't start with prefix '%s'" print(fmt % (full_tag, tag_prefix)) - pieces["error"] = ("tag '%s' doesn't start with prefix '%s'" - % (full_tag, tag_prefix)) + pieces["error"] = "tag '%s' doesn't start with prefix '%s'" % ( + full_tag, + tag_prefix, + ) return pieces - pieces["closest-tag"] = full_tag[len(tag_prefix):] + pieces["closest-tag"] = full_tag[len(tag_prefix) :] # distance: number of commits since tag pieces["distance"] = int(mo.group(2)) @@ -1407,15 +1432,21 @@ def versions_from_parentdir(parentdir_prefix, root, verbose): for _ in range(3): dirname = os.path.basename(root) if dirname.startswith(parentdir_prefix): - return {"version": dirname[len(parentdir_prefix):], - "full-revisionid": None, - "dirty": False, "error": None, "date": None} + return { + "version": dirname[len(parentdir_prefix) :], + "full-revisionid": None, + "dirty": False, + "error": None, + "date": None, + } rootdirs.append(root) root = os.path.dirname(root) # up a level if verbose: - print("Tried directories %s but none started with prefix %s" % - (str(rootdirs), parentdir_prefix)) + print( + "Tried directories %s but none started with prefix %s" + % (str(rootdirs), parentdir_prefix) + ) raise NotThisMethod("rootdir doesn't start with parentdir_prefix") @@ -1444,11 +1475,13 @@ def versions_from_file(filename): contents = f.read() except OSError: raise NotThisMethod("unable to read _version.py") - mo = re.search(r"version_json = '''\n(.*)''' # END VERSION_JSON", - contents, re.M | re.S) + mo = re.search( + r"version_json = '''\n(.*)''' # END VERSION_JSON", contents, re.M | re.S + ) if not mo: - mo = re.search(r"version_json = '''\r\n(.*)''' # END VERSION_JSON", - contents, re.M | re.S) + mo = re.search( + r"version_json = '''\r\n(.*)''' # END VERSION_JSON", contents, re.M | re.S + ) if not mo: raise NotThisMethod("no version_json in _version.py") return json.loads(mo.group(1)) @@ -1457,8 +1490,7 @@ def versions_from_file(filename): def write_to_version_file(filename, versions): """Write the given version number to the given _version.py file.""" os.unlink(filename) - contents = json.dumps(versions, sort_keys=True, - indent=1, separators=(",", ": ")) + contents = json.dumps(versions, sort_keys=True, indent=1, separators=(",", ": ")) with open(filename, "w") as f: f.write(SHORT_VERSION_PY % contents) @@ -1490,8 +1522,7 @@ def render_pep440(pieces): rendered += ".dirty" else: # exception #1 - rendered = "0+untagged.%d.g%s" % (pieces["distance"], - pieces["short"]) + rendered = "0+untagged.%d.g%s" % (pieces["distance"], pieces["short"]) if pieces["dirty"]: rendered += ".dirty" return rendered @@ -1520,8 +1551,7 @@ def render_pep440_branch(pieces): rendered = "0" if pieces["branch"] != "master": rendered += ".dev0" - rendered += "+untagged.%d.g%s" % (pieces["distance"], - pieces["short"]) + rendered += "+untagged.%d.g%s" % (pieces["distance"], pieces["short"]) if pieces["dirty"]: rendered += ".dirty" return rendered @@ -1682,11 +1712,13 @@ def render_git_describe_long(pieces): def render(pieces, style): """Render the given version pieces into the requested style.""" if pieces["error"]: - return {"version": "unknown", - "full-revisionid": pieces.get("long"), - "dirty": None, - "error": pieces["error"], - "date": None} + return { + "version": "unknown", + "full-revisionid": pieces.get("long"), + "dirty": None, + "error": pieces["error"], + "date": None, + } if not style or style == "default": style = "pep440" # the default @@ -1710,9 +1742,13 @@ def render(pieces, style): else: raise ValueError("unknown style '%s'" % style) - return {"version": rendered, "full-revisionid": pieces["long"], - "dirty": pieces["dirty"], "error": None, - "date": pieces.get("date")} + return { + "version": rendered, + "full-revisionid": pieces["long"], + "dirty": pieces["dirty"], + "error": None, + "date": pieces.get("date"), + } class VersioneerBadRootError(Exception): @@ -1735,8 +1771,9 @@ def get_versions(verbose=False): handlers = HANDLERS.get(cfg.VCS) assert handlers, "unrecognized VCS '%s'" % cfg.VCS verbose = verbose or cfg.verbose - assert cfg.versionfile_source is not None, \ - "please set versioneer.versionfile_source" + assert ( + cfg.versionfile_source is not None + ), "please set versioneer.versionfile_source" assert cfg.tag_prefix is not None, "please set versioneer.tag_prefix" versionfile_abs = os.path.join(root, cfg.versionfile_source) @@ -1790,9 +1827,13 @@ def get_versions(verbose=False): if verbose: print("unable to compute version") - return {"version": "0+unknown", "full-revisionid": None, - "dirty": None, "error": "unable to compute version", - "date": None} + return { + "version": "0+unknown", + "full-revisionid": None, + "dirty": None, + "error": "unable to compute version", + "date": None, + } def get_version(): @@ -1845,6 +1886,7 @@ def run(self): print(" date: %s" % vers.get("date")) if vers["error"]: print(" error: %s" % vers["error"]) + cmds["version"] = cmd_version # we override "build_py" in setuptools @@ -1866,8 +1908,8 @@ def run(self): # but the build_py command is not expected to copy any files. # we override different "build_py" commands for both environments - if 'build_py' in cmds: - _build_py = cmds['build_py'] + if "build_py" in cmds: + _build_py = cmds["build_py"] else: from setuptools.command.build_py import build_py as _build_py @@ -1884,14 +1926,14 @@ def run(self): # now locate _version.py in the new build/ directory and replace # it with an updated value if cfg.versionfile_build: - target_versionfile = os.path.join(self.build_lib, - cfg.versionfile_build) + target_versionfile = os.path.join(self.build_lib, cfg.versionfile_build) print("UPDATING %s" % target_versionfile) write_to_version_file(target_versionfile, versions) + cmds["build_py"] = cmd_build_py - if 'build_ext' in cmds: - _build_ext = cmds['build_ext'] + if "build_ext" in cmds: + _build_ext = cmds["build_ext"] else: from setuptools.command.build_ext import build_ext as _build_ext @@ -1911,19 +1953,22 @@ def run(self): # it with an updated value if not cfg.versionfile_build: return - target_versionfile = os.path.join(self.build_lib, - cfg.versionfile_build) + target_versionfile = os.path.join(self.build_lib, cfg.versionfile_build) if not os.path.exists(target_versionfile): - print(f"Warning: {target_versionfile} does not exist, skipping " - "version update. This can happen if you are running build_ext " - "without first running build_py.") + print( + f"Warning: {target_versionfile} does not exist, skipping " + "version update. This can happen if you are running build_ext " + "without first running build_py." + ) return print("UPDATING %s" % target_versionfile) write_to_version_file(target_versionfile, versions) + cmds["build_ext"] = cmd_build_ext if "cx_Freeze" in sys.modules: # cx_freeze enabled? from cx_Freeze.dist import build_exe as _build_exe + # nczeczulin reports that py2exe won't like the pep440-style string # as FILEVERSION, but it can be used for PRODUCTVERSION, e.g. # setup(console=[{ @@ -1944,17 +1989,21 @@ def run(self): os.unlink(target_versionfile) with open(cfg.versionfile_source, "w") as f: LONG = LONG_VERSION_PY[cfg.VCS] - f.write(LONG % - {"DOLLAR": "$", - "STYLE": cfg.style, - "TAG_PREFIX": cfg.tag_prefix, - "PARENTDIR_PREFIX": cfg.parentdir_prefix, - "VERSIONFILE_SOURCE": cfg.versionfile_source, - }) + f.write( + LONG + % { + "DOLLAR": "$", + "STYLE": cfg.style, + "TAG_PREFIX": cfg.tag_prefix, + "PARENTDIR_PREFIX": cfg.parentdir_prefix, + "VERSIONFILE_SOURCE": cfg.versionfile_source, + } + ) + cmds["build_exe"] = cmd_build_exe del cmds["build_py"] - if 'py2exe' in sys.modules: # py2exe enabled? + if "py2exe" in sys.modules: # py2exe enabled? try: from py2exe.setuptools_buildexe import py2exe as _py2exe except ImportError: @@ -1973,18 +2022,22 @@ def run(self): os.unlink(target_versionfile) with open(cfg.versionfile_source, "w") as f: LONG = LONG_VERSION_PY[cfg.VCS] - f.write(LONG % - {"DOLLAR": "$", - "STYLE": cfg.style, - "TAG_PREFIX": cfg.tag_prefix, - "PARENTDIR_PREFIX": cfg.parentdir_prefix, - "VERSIONFILE_SOURCE": cfg.versionfile_source, - }) + f.write( + LONG + % { + "DOLLAR": "$", + "STYLE": cfg.style, + "TAG_PREFIX": cfg.tag_prefix, + "PARENTDIR_PREFIX": cfg.parentdir_prefix, + "VERSIONFILE_SOURCE": cfg.versionfile_source, + } + ) + cmds["py2exe"] = cmd_py2exe # sdist farms its file list building out to egg_info - if 'egg_info' in cmds: - _egg_info = cmds['egg_info'] + if "egg_info" in cmds: + _egg_info = cmds["egg_info"] else: from setuptools.command.egg_info import egg_info as _egg_info @@ -1997,7 +2050,7 @@ def find_sources(self): # Modify the filelist and normalize it root = get_root() cfg = get_config_from_root(root) - self.filelist.append('versioneer.py') + self.filelist.append("versioneer.py") if cfg.versionfile_source: # There are rare cases where versionfile_source might not be # included by default, so we must be explicit @@ -2010,18 +2063,21 @@ def find_sources(self): # We will instead replicate their final normalization (to unicode, # and POSIX-style paths) from setuptools import unicode_utils - normalized = [unicode_utils.filesys_decode(f).replace(os.sep, '/') - for f in self.filelist.files] - manifest_filename = os.path.join(self.egg_info, 'SOURCES.txt') - with open(manifest_filename, 'w') as fobj: - fobj.write('\n'.join(normalized)) + normalized = [ + unicode_utils.filesys_decode(f).replace(os.sep, "/") + for f in self.filelist.files + ] + + manifest_filename = os.path.join(self.egg_info, "SOURCES.txt") + with open(manifest_filename, "w") as fobj: + fobj.write("\n".join(normalized)) - cmds['egg_info'] = cmd_egg_info + cmds["egg_info"] = cmd_egg_info # we override different "sdist" commands for both environments - if 'sdist' in cmds: - _sdist = cmds['sdist'] + if "sdist" in cmds: + _sdist = cmds["sdist"] else: from setuptools.command.sdist import sdist as _sdist @@ -2043,8 +2099,10 @@ def make_release_tree(self, base_dir, files): # updated value target_versionfile = os.path.join(base_dir, cfg.versionfile_source) print("UPDATING %s" % target_versionfile) - write_to_version_file(target_versionfile, - self._versioneer_generated_versions) + write_to_version_file( + target_versionfile, self._versioneer_generated_versions + ) + cmds["sdist"] = cmd_sdist return cmds @@ -2104,11 +2162,9 @@ def do_setup(): root = get_root() try: cfg = get_config_from_root(root) - except (OSError, configparser.NoSectionError, - configparser.NoOptionError) as e: + except (OSError, configparser.NoSectionError, configparser.NoOptionError) as e: if isinstance(e, (OSError, configparser.NoSectionError)): - print("Adding sample versioneer config to setup.cfg", - file=sys.stderr) + print("Adding sample versioneer config to setup.cfg", file=sys.stderr) with open(os.path.join(root, "setup.cfg"), "a") as f: f.write(SAMPLE_CONFIG) print(CONFIG_ERROR, file=sys.stderr) @@ -2117,15 +2173,18 @@ def do_setup(): print(" creating %s" % cfg.versionfile_source) with open(cfg.versionfile_source, "w") as f: LONG = LONG_VERSION_PY[cfg.VCS] - f.write(LONG % {"DOLLAR": "$", - "STYLE": cfg.style, - "TAG_PREFIX": cfg.tag_prefix, - "PARENTDIR_PREFIX": cfg.parentdir_prefix, - "VERSIONFILE_SOURCE": cfg.versionfile_source, - }) - - ipy = os.path.join(os.path.dirname(cfg.versionfile_source), - "__init__.py") + f.write( + LONG + % { + "DOLLAR": "$", + "STYLE": cfg.style, + "TAG_PREFIX": cfg.tag_prefix, + "PARENTDIR_PREFIX": cfg.parentdir_prefix, + "VERSIONFILE_SOURCE": cfg.versionfile_source, + } + ) + + ipy = os.path.join(os.path.dirname(cfg.versionfile_source), "__init__.py") if os.path.exists(ipy): try: with open(ipy, "r") as f: