From 6140e29864cf0fcdd83194f218408867c54b730d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Fri, 13 Oct 2023 15:19:03 +0200 Subject: [PATCH] feat: add ability to inject conda environments into running Snakefile (#2479) ### Description ### QC * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [ ] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). --- .github/workflows/main.yml | 5 +- setup.cfg | 1 + snakemake/api.py | 41 +++++++++------- snakemake/cli.py | 47 ++++++++++++------- snakemake/common/tests/__init__.py | 25 ++++++---- snakemake/parser.py | 7 +++ snakemake/settings.py | 1 + snakemake/spawn_jobs.py | 1 + snakemake/storage.py | 34 ++++++++++++-- snakemake/workflow.py | 39 +++++++++++++++ tests/common.py | 12 +++-- tests/test_conda_global/Snakefile | 12 +++++ tests/test_conda_global/env.yaml | 5 ++ .../expected-results/.gitkeep | 0 tests/test_executor_test_suite.py | 9 +++- tests/tests.py | 30 ++++++++++++ 16 files changed, 214 insertions(+), 55 deletions(-) create mode 100644 tests/test_conda_global/Snakefile create mode 100644 tests/test_conda_global/env.yaml create mode 100644 tests/test_conda_global/expected-results/.gitkeep diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 983e67705..7ce720fd0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -106,7 +106,10 @@ jobs: #PYTHONTRACEMALLOC: 10 shell: bash -el {0} run: | - pytest --show-capture=stderr --splits 10 --group ${{ matrix.test_group }} -v -x tests/test_expand.py tests/test_io.py tests/test_schema.py tests/test_linting.py tests/test_schema.py tests/test_linting.py tests/test_executor_test_suite.py tests/tests.py + # long tests + pytest --show-capture=stderr --splits 10 --group ${{ matrix.test_group }} -v -x tests/tests.py + # other tests + pytest --show-capture=stderr -v -x tests/test_expand.py tests/test_io.py tests/test_schema.py tests/test_linting.py tests/test_executor_test_suite.py build-container-image: runs-on: ubuntu-latest diff --git a/setup.cfg b/setup.cfg index f5c5c36c1..66fa26c5d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -60,6 +60,7 @@ install_requires = toposort >=1.10 wrapt yte >=1.5.1,<2.0 + conda-inject >=1.1.1,<2.0 [options.extras_require] google-cloud = diff --git a/snakemake/api.py b/snakemake/api.py index 7537e0d78..294eeac27 100644 --- a/snakemake/api.py +++ b/snakemake/api.py @@ -38,7 +38,6 @@ from snakemake_interface_executor_plugins.settings import ExecMode, ExecutorSettingsBase from snakemake_interface_executor_plugins.registry import ExecutorPluginRegistry -from snakemake_interface_storage_plugins.settings import StorageProviderSettingsBase from snakemake_interface_common.exceptions import ApiError from snakemake_interface_common.plugin_registry.plugin import TaggedSettings @@ -100,6 +99,7 @@ def workflow( config_settings: Optional[ConfigSettings] = None, storage_settings: Optional[StorageSettings] = None, workflow_settings: Optional[WorkflowSettings] = None, + deployment_settings: Optional[DeploymentSettings] = None, storage_provider_settings: Optional[Mapping[str, TaggedSettings]] = None, snakefile: Optional[Path] = None, workdir: Optional[Path] = None, @@ -124,6 +124,8 @@ def workflow( storage_settings = StorageSettings() if workflow_settings is None: workflow_settings = WorkflowSettings() + if deployment_settings is None: + deployment_settings = DeploymentSettings() if storage_provider_settings is None: storage_provider_settings = dict() @@ -141,6 +143,7 @@ def workflow( resource_settings=resource_settings, storage_settings=storage_settings, workflow_settings=workflow_settings, + deployment_settings=deployment_settings, storage_provider_settings=storage_provider_settings, ) return self._workflow_api @@ -151,11 +154,11 @@ def _cleanup(self): logger.cleanup() if self._workflow_api is not None: self._workflow_api._workdir_handler.change_back() - if ( - self._workflow_api._workflow_store is not None - and self._workflow_api._workflow._workdir_handler is not None - ): - self._workflow_api._workflow._workdir_handler.change_back() + if self._workflow_api._workflow_store is not None: + for conda_env in self._workflow_api._workflow_store.injected_conda_envs: + conda_env.remove() + if self._workflow_api._workflow._workdir_handler is not None: + self._workflow_api._workflow._workdir_handler.change_back() def print_exception(self, ex: Exception): """Print an exception during workflow execution in a human readable way @@ -231,14 +234,15 @@ class WorkflowApi(ApiBase): resource_settings: ResourceSettings storage_settings: StorageSettings workflow_settings: WorkflowSettings + deployment_settings: DeploymentSettings storage_provider_settings: Mapping[str, TaggedSettings] + _workflow_store: Optional[Workflow] = field(init=False, default=None) _workdir_handler: Optional[WorkdirHandler] = field(init=False) def dag( self, dag_settings: Optional[DAGSettings] = None, - deployment_settings: Optional[DeploymentSettings] = None, ): """Create a DAG API. @@ -248,14 +252,11 @@ def dag( """ if dag_settings is None: dag_settings = DAGSettings() - if deployment_settings is None: - deployment_settings = DeploymentSettings() return DAGApi( self.snakemake_api, self, dag_settings=dag_settings, - deployment_settings=deployment_settings, ) def lint(self, json: bool = False): @@ -312,6 +313,7 @@ def _get_workflow(self, **kwargs): config_settings=self.config_settings, resource_settings=self.resource_settings, workflow_settings=self.workflow_settings, + deployment_settings=self.deployment_settings, storage_settings=self.storage_settings, output_settings=self.snakemake_api.output_settings, overwrite_workdir=self.workdir, @@ -344,11 +346,9 @@ class DAGApi(ApiBase): snakemake_api: SnakemakeApi workflow_api: WorkflowApi dag_settings: DAGSettings - deployment_settings: DeploymentSettings def __post_init__(self): self.workflow_api._workflow.dag_settings = self.dag_settings - self.workflow_api._workflow.deployment_settings = self.deployment_settings def execute_workflow( self, @@ -367,7 +367,6 @@ def execute_workflow( executor: str -- The executor to use. execution_settings: ExecutionSettings -- The execution settings for the workflow. resource_settings: ResourceSettings -- The resource settings for the workflow. - deployment_settings: DeploymentSettings -- The deployment settings for the workflow. remote_execution_settings: RemoteExecutionSettings -- The remote execution settings for the workflow. executor_settings: Optional[ExecutorSettingsBase] -- The executor settings for the workflow. updated_files: Optional[List[str]] -- An optional list where Snakemake will put all updated files. @@ -522,17 +521,23 @@ def cleanup_metadata(self, paths: List[Path]): def conda_cleanup_envs(self): """Cleanup the conda environments of the workflow.""" - self.deployment_settings.imply_deployment_method(DeploymentMethod.CONDA) + self.workflow_api.deployment_settings.imply_deployment_method( + DeploymentMethod.CONDA + ) self.workflow_api._workflow.conda_cleanup_envs() def conda_create_envs(self): """Only create the conda environments of the workflow.""" - self.deployment_settings.imply_deployment_method(DeploymentMethod.CONDA) + self.workflow_api.deployment_settings.imply_deployment_method( + DeploymentMethod.CONDA + ) self.workflow_api._workflow.conda_create_envs() def conda_list_envs(self): """List the conda environments of the workflow.""" - self.deployment_settings.imply_deployment_method(DeploymentMethod.CONDA) + self.workflow_api.deployment_settings.imply_deployment_method( + DeploymentMethod.CONDA + ) self.workflow_api._workflow.conda_list_envs() def cleanup_shadow(self): @@ -541,7 +546,9 @@ def cleanup_shadow(self): def container_cleanup_images(self): """Cleanup the container images of the workflow.""" - self.deployment_settings.imply_deployment_method(DeploymentMethod.APPTAINER) + self.workflow_api.deployment_settings.imply_deployment_method( + DeploymentMethod.APPTAINER + ) self.workflow_api._workflow.container_cleanup_images() def list_changes(self, change_type: ChangeType): diff --git a/snakemake/cli.py b/snakemake/cli.py index 2376f3ed0..53c77d55c 100644 --- a/snakemake/cli.py +++ b/snakemake/cli.py @@ -1305,6 +1305,16 @@ def get_argument_parser(profiles=None): default="", help="Specify prefix for default storage provider. E.g. a bucket name.", ) + group_behavior.add_argument( + "--default-storage-provider-auto-deploy", + action="store_true", + help="Automatically deploy the default storage provider if it is not present " + "in the environment. This uses pip and will modify your current environment " + "by installing the storage plugin and all its dependencies if not present. " + "Use this if you run Snakemake with a remote executor plugin like " + "kubernetes where the jobs will run in a container that might not have the " + "required storage plugin installed.", + ) group_behavior.add_argument( "--no-shared-fs", action="store_true", @@ -1836,6 +1846,14 @@ def args_to_api(args, parser): keep_logger=False, ) ) as snakemake_api: + deployment_method = args.software_deployment_method + if args.use_conda: + deployment_method.add(DeploymentMethod.CONDA) + if args.use_apptainer: + deployment_method.add(DeploymentMethod.APPTAINER) + if args.use_envmodules: + deployment_method.add(DeploymentMethod.ENV_MODULES) + try: workflow_api = snakemake_api.workflow( resource_settings=ResourceSettings( @@ -1866,6 +1884,17 @@ def args_to_api(args, parser): workflow_settings=WorkflowSettings( wrapper_prefix=args.wrapper_prefix, ), + deployment_settings=DeploymentSettings( + deployment_method=deployment_method, + conda_prefix=args.conda_prefix, + conda_cleanup_pkgs=args.conda_cleanup_pkgs, + conda_base_path=args.conda_base_path, + conda_frontend=args.conda_frontend, + conda_not_block_search_path_envvars=args.conda_not_block_search_path_envvars, + apptainer_args=args.apptainer_args, + apptainer_prefix=args.apptainer_prefix, + default_storage_provider_auto_deploy=args.default_storage_provider_auto_deploy, + ), snakefile=args.snakefile, workdir=args.directory, ) @@ -1882,14 +1911,6 @@ def args_to_api(args, parser): elif args.print_compilation: workflow_api.print_compilation() else: - deployment_method = args.software_deployment_method - if args.use_conda: - deployment_method.add(DeploymentMethod.CONDA) - if args.use_apptainer: - deployment_method.add(DeploymentMethod.APPTAINER) - if args.use_envmodules: - deployment_method.add(DeploymentMethod.ENV_MODULES) - dag_api = workflow_api.dag( dag_settings=DAGSettings( targets=args.targets, @@ -1907,16 +1928,6 @@ def args_to_api(args, parser): max_inventory_wait_time=args.max_inventory_time, cache=args.cache, ), - deployment_settings=DeploymentSettings( - deployment_method=deployment_method, - conda_prefix=args.conda_prefix, - conda_cleanup_pkgs=args.conda_cleanup_pkgs, - conda_base_path=args.conda_base_path, - conda_frontend=args.conda_frontend, - conda_not_block_search_path_envvars=args.conda_not_block_search_path_envvars, - apptainer_args=args.apptainer_args, - apptainer_prefix=args.apptainer_prefix, - ), ) if args.preemptible_rules is not None: diff --git a/snakemake/common/tests/__init__.py b/snakemake/common/tests/__init__.py index c038b550e..6ee00611a 100644 --- a/snakemake/common/tests/__init__.py +++ b/snakemake/common/tests/__init__.py @@ -57,6 +57,19 @@ def get_default_storage_provider_settings( ) -> Optional[Mapping[str, TaggedSettings]]: ... + def get_remote_execution_settings(self) -> settings.RemoteExecutionSettings: + return settings.RemoteExecutionSettings( + seconds_between_status_checks=0, + envvars=self.get_envvars(), + ) + + def get_deployment_settings( + self, deployment_method=frozenset() + ) -> settings.DeploymentSettings: + return settings.DeploymentSettings( + deployment_method=deployment_method, + ) + def get_assume_shared_fs(self) -> bool: return True @@ -101,26 +114,20 @@ def _run_workflow(self, test_name, tmp_path, deployment_method=frozenset()): default_storage_prefix=self.get_default_storage_prefix(), assume_shared_fs=self.get_assume_shared_fs(), ), + deployment_settings=self.get_deployment_settings(deployment_method), storage_provider_settings=self.get_default_storage_provider_settings(), workdir=Path(tmp_path), snakefile=tmp_path / "Snakefile", ) - dag_api = workflow_api.dag( - deployment_settings=settings.DeploymentSettings( - deployment_method=deployment_method, - ), - ) + dag_api = workflow_api.dag() dag_api.execute_workflow( executor=self.get_executor(), executor_settings=self.get_executor_settings(), execution_settings=settings.ExecutionSettings( latency_wait=self.latency_wait, ), - remote_execution_settings=settings.RemoteExecutionSettings( - seconds_between_status_checks=0, - envvars=self.get_envvars(), - ), + remote_execution_settings=self.get_remote_execution_settings(), ) @handle_testcase diff --git a/snakemake/parser.py b/snakemake/parser.py index 1fd4db4a5..a8b7ab98b 100644 --- a/snakemake/parser.py +++ b/snakemake/parser.py @@ -366,6 +366,12 @@ def keyword(self): return "global_containerized" +class GlobalConda(GlobalKeywordState): + @property + def keyword(self): + return "global_conda" + + class Localrules(GlobalKeywordState): def block_content(self, token): if is_comma(token): @@ -1180,6 +1186,7 @@ class Python(TokenAutomaton): singularity=GlobalSingularity, container=GlobalContainer, containerized=GlobalContainerized, + conda=GlobalConda, scattergather=Scattergather, storage=Storage, resource_scopes=ResourceScope, diff --git a/snakemake/settings.py b/snakemake/settings.py index f5baa5860..e5087541e 100644 --- a/snakemake/settings.py +++ b/snakemake/settings.py @@ -220,6 +220,7 @@ class DeploymentSettings(SettingsBase, DeploymentSettingsExecutorInterface): conda_not_block_search_path_envvars: bool = False apptainer_args: str = "" apptainer_prefix: Optional[Path] = None + default_storage_provider_auto_deploy: bool = False def imply_deployment_method(self, method: DeploymentMethod): self.deployment_method = set(self.deployment_method) diff --git a/snakemake/spawn_jobs.py b/snakemake/spawn_jobs.py index 271071e28..1083954b2 100644 --- a/snakemake/spawn_jobs.py +++ b/snakemake/spawn_jobs.py @@ -164,6 +164,7 @@ def general_args( ), w2a("deployment_settings.apptainer_prefix"), w2a("deployment_settings.apptainer_args"), + w2a("deployment_settings.default_storage_provider_auto_deploy"), w2a("resource_settings.max_threads"), w2a( "execution_settings.keep_metadata", flag="--drop-metadata", invert=True diff --git a/snakemake/storage.py b/snakemake/storage.py index 694370ec4..fd008a785 100644 --- a/snakemake/storage.py +++ b/snakemake/storage.py @@ -1,14 +1,17 @@ import copy, sys +import subprocess from dataclasses import dataclass, field from typing import Any, Dict, Optional from snakemake.workflow import Workflow -from snakemake_interface_common.exceptions import WorkflowError +from snakemake_interface_common.exceptions import WorkflowError, InvalidPluginException from snakemake_interface_storage_plugins.registry import StoragePluginRegistry from snakemake_interface_storage_plugins.storage_provider import StorageProviderBase from snakemake_interface_storage_plugins.storage_object import ( StorageObjectWrite, StorageObjectRead, ) +from snakemake_interface_executor_plugins.settings import DeploymentMethod +from snakemake.common import __version__ class StorageRegistry: @@ -17,6 +20,7 @@ class StorageRegistry: "_storages", "_default_storage_provider", "default_storage_provider", + "_register_default_storage", "register_storage", "infer_provider", "_storage_object", @@ -30,9 +34,31 @@ def __init__(self, workflow: Workflow): self._default_storage_provider = None if self.workflow.storage_settings.default_storage_provider is not None: - self._default_storage_provider = self.register_storage( - self.workflow.storage_settings.default_storage_provider, is_default=True - ) + self._register_default_storage() + + def _register_default_storage(self): + plugin_name = self.workflow.storage_settings.default_storage_provider + if ( + not StoragePluginRegistry().is_installed(plugin_name) + and self.workflow.deployment_settings.default_storage_provider_auto_deploy + ): + try: + subprocess.run( + ["pip", "install", f"snakemake-storage-plugin-{plugin_name}"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True, + ) + except subprocess.CalledProcessError as e: + raise WorkflowError( + f"Failed to install storage plugin {plugin_name} via pip: {e.stdout.decode()}", + e, + ) + StoragePluginRegistry().collect_plugins() + self._default_storage_provider = self.register_storage( + plugin_name, + is_default=True, + ) @property def default_storage_provider(self): diff --git a/snakemake/workflow.py b/snakemake/workflow.py index aa4342bf8..34fdf8ea8 100644 --- a/snakemake/workflow.py +++ b/snakemake/workflow.py @@ -6,6 +6,7 @@ from dataclasses import dataclass, field import re import os +import subprocess import sys from collections import OrderedDict from collections.abc import Mapping @@ -110,6 +111,7 @@ from snakemake.sourcecache import ( LocalSourceFile, SourceCache, + SourceFile, infer_source_file, ) from snakemake.deployment.conda import Conda @@ -135,6 +137,7 @@ class Workflow(WorkflowExecutorInterface): cache_rules: Mapping[str, str] = field(default_factory=dict) overwrite_workdir: Optional[str] = None _workdir_handler: Optional[WorkdirHandler] = field(init=False, default=None) + injected_conda_envs: List = field(default_factory=list) def __post_init__(self): """ @@ -1727,6 +1730,42 @@ def decorate(ruleinfo): return decorate + def global_conda(self, conda_env): + if DeploymentMethod.CONDA in self.deployment_settings.deployment_method: + from conda_inject import inject_env_file, PackageManager + + try: + package_manager = PackageManager[ + self.deployment_settings.conda_frontend.upper() + ] + except KeyError: + raise WorkflowError( + f"Chosen conda frontend {self.deployment_settings.conda_frontend} is not supported by conda-inject." + ) + + # Handle relative path + if not isinstance(conda_env, SourceFile): + if is_local_file(conda_env) and not os.path.isabs(conda_env): + # Conda env file paths are considered to be relative to the directory of the Snakefile + # hence we adjust the path accordingly. + # This is not necessary in case of receiving a SourceFile. + conda_env = self.current_basedir.join(conda_env) + else: + # infer source file from unmodified uri or path + conda_env = infer_source_file(conda_env) + + logger.info(f"Injecting conda environment {conda_env.get_path_or_uri()}.") + try: + env = inject_env_file( + conda_env.get_path_or_uri(), package_manager=package_manager + ) + except subprocess.CalledProcessError as e: + raise WorkflowError( + f"Failed to inject conda environment {conda_env}: {e.stdout.decode()}", + e, + ) + self.injected_conda_envs.append(env) + def container(self, container_img): def decorate(ruleinfo): # Explicitly set container_img to False if None is passed, indicating that diff --git a/tests/common.py b/tests/common.py index 027622204..20087862d 100644 --- a/tests/common.py +++ b/tests/common.py @@ -220,6 +220,7 @@ def run( cleanup_metadata=None, rerun_triggers=settings.RerunTrigger.all(), storage_provider_settings=None, + default_storage_provider_auto_deploy=False, ): """ Test the Snakefile in the path. @@ -344,6 +345,12 @@ def run( workflow_settings=settings.WorkflowSettings( wrapper_prefix=wrapper_prefix, ), + deployment_settings=settings.DeploymentSettings( + conda_frontend=conda_frontend, + conda_prefix=conda_prefix, + deployment_method=deployment_method, + default_storage_provider_auto_deploy=default_storage_provider_auto_deploy, + ), snakefile=Path(original_snakefile if no_tmpdir else snakefile), workdir=Path(path if no_tmpdir else tmpdir), ) @@ -360,11 +367,6 @@ def run( forceall=forceall, rerun_triggers=rerun_triggers, ), - deployment_settings=settings.DeploymentSettings( - conda_frontend=conda_frontend, - conda_prefix=conda_prefix, - deployment_method=deployment_method, - ), ) if report is not None: diff --git a/tests/test_conda_global/Snakefile b/tests/test_conda_global/Snakefile new file mode 100644 index 000000000..ac1ef7315 --- /dev/null +++ b/tests/test_conda_global/Snakefile @@ -0,0 +1,12 @@ +import sys +import os + +conda: + "env.yaml" + +print(sys.path, file=sys.stderr) +print(os.environ["PATH"], file=sys.stderr) + +import PIL +from snakemake.shell import shell +shell("which rg") diff --git a/tests/test_conda_global/env.yaml b/tests/test_conda_global/env.yaml new file mode 100644 index 000000000..36da17d65 --- /dev/null +++ b/tests/test_conda_global/env.yaml @@ -0,0 +1,5 @@ +channels: + - conda-forge +dependencies: + - ripgrep + - pillow =10.0 \ No newline at end of file diff --git a/tests/test_conda_global/expected-results/.gitkeep b/tests/test_conda_global/expected-results/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_executor_test_suite.py b/tests/test_executor_test_suite.py index 61e4198f9..427da54f4 100644 --- a/tests/test_executor_test_suite.py +++ b/tests/test_executor_test_suite.py @@ -1,5 +1,7 @@ from pathlib import Path -from typing import Optional +from typing import Mapping, Optional + +from snakemake_interface_common.plugin_registry.plugin import TaggedSettings import snakemake.common.tests from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase @@ -23,3 +25,8 @@ def get_default_storage_provider(self) -> Optional[str]: def get_default_storage_prefix(self) -> Optional[str]: return None + + def get_default_storage_provider_settings( + self, + ) -> Optional[Mapping[str, TaggedSettings]]: + return None diff --git a/tests/tests.py b/tests/tests.py index e0468832b..b50e1f65a 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1889,3 +1889,33 @@ def test_config_yte(): def test_load_metawrapper(): run(dpath("test_load_metawrapper"), executor="dryrun") + + +@skip_on_windows +def test_conda_global(): + run( + dpath("test_conda_global"), + deployment_method={DeploymentMethod.CONDA}, + executor="dryrun", + ) + + +@skip_on_windows +def test_default_storage_provider_auto_deploy(s3_storage): + prefix, settings = s3_storage + + subprocess.run(["pip", "uninstall", "--yes", "snakemake-storage-plugin-s3"]) + + try: + run( + dpath("test_default_remote"), + cores=1, + default_storage_provider="s3", + default_storage_prefix=prefix, + storage_provider_settings=settings, + deployment_method={DeploymentMethod.CONDA}, + executor="dryrun", + default_storage_provider_auto_deploy=True, + ) + finally: + subprocess.run(["pip", "install", "snakemake-storage-plugin-s3"])