Skip to content

Commit

Permalink
feat: add option to control software deployment mode (shared or non s…
Browse files Browse the repository at this point in the history
…hared FS) (#2525)

### Description

<!--Add a description of your PR here-->

### QC
<!-- Make sure that you can tick the boxes below. -->

* [x] The PR contains a test case for the changes or the changes are
already covered by an existing test case.
* [x] The documentation (`docs/`) is updated to reflect the changes or
this is not necessary (e.g. if the change does neither modify the
language nor the behavior or functionalities of Snakemake).
  • Loading branch information
johanneskoester committed Nov 29, 2023
1 parent a70ca76 commit 04ec2c0
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 29 deletions.
8 changes: 8 additions & 0 deletions snakemake/api.py
Expand Up @@ -36,6 +36,7 @@
RemoteExecutionSettings,
ResourceSettings,
StorageSettings,
DeploymentFSMode,
)

from snakemake_interface_executor_plugins.settings import ExecMode, ExecutorSettingsBase
Expand Down Expand Up @@ -133,6 +134,13 @@ def workflow(
if storage_provider_settings is None:
storage_provider_settings = dict()

if deployment_settings.fs_mode is None:
deployment_settings.fs_mode = (
DeploymentFSMode.SHARED
if storage_settings.assume_shared_fs
else DeploymentFSMode.NOT_SHARED
)

self._check_is_in_context()

self._setup_logger(mode=workflow_settings.exec_mode)
Expand Down
8 changes: 8 additions & 0 deletions snakemake/cli.py
Expand Up @@ -38,6 +38,7 @@
StorageSettings,
WorkflowSettings,
GroupSettings,
DeploymentFSMode,
)

from snakemake_interface_executor_plugins.settings import ExecMode
Expand Down Expand Up @@ -1498,6 +1499,12 @@ def get_argument_parser(profiles=None):
default=set(),
help="Specify software environment deployment method.",
)
group_deployment.add_argument(
"--software-deployment-fs-mode",
choices=DeploymentFSMode.choices(),
parse_func=DeploymentFSMode.parse_choice,
help="Whether or not to assume a shared filesystem for software deployment.",
)
group_deployment.add_argument(
"--container-cleanup-images",
action="store_true",
Expand Down Expand Up @@ -1862,6 +1869,7 @@ def args_to_api(args, parser):
),
deployment_settings=DeploymentSettings(
deployment_method=deployment_method,
fs_mode=args.software_deployment_fs_mode,
conda_prefix=args.conda_prefix,
conda_cleanup_pkgs=args.conda_cleanup_pkgs,
conda_base_path=args.conda_base_path,
Expand Down
7 changes: 6 additions & 1 deletion snakemake/dag.py
Expand Up @@ -296,7 +296,12 @@ def update_conda_envs(self):
(job.conda_env_spec, job.container_img_url)
for job in self.jobs
if job.conda_env_spec
and (job.is_local or self.workflow.global_or_node_local_shared_fs)
and (
job.is_local
or self.workflow.deployment_settings.assume_shared_fs
or self.workflow.remote_exec
and not self.workflow.deployment_settings.assume_shared_fs
)
}

# Then based on md5sum values
Expand Down
2 changes: 1 addition & 1 deletion snakemake/io.py
Expand Up @@ -1419,7 +1419,7 @@ def __init__(
elif plainstr:
self.extend(
# use original query if storage is not retrieved by snakemake
(x if x.storage_object.retrieve else x.storage_object.query)
(str(x) if x.storage_object.retrieve else x.storage_object.query)
if isinstance(x, _IOFile) and x.storage_object is not None
else str(x)
for x in toclone
Expand Down
57 changes: 35 additions & 22 deletions snakemake/jobs.py
Expand Up @@ -5,13 +5,11 @@

import asyncio
import os
import sys
import base64
import tempfile
import json
import shutil

from collections import defaultdict
from itertools import chain, filterfalse
from operator import attrgetter
from typing import Optional
Expand All @@ -29,6 +27,7 @@
from snakemake.io import (
_IOFile,
IOFile,
is_callable,
Wildcards,
Resources,
is_flagged,
Expand All @@ -37,7 +36,7 @@
)
from snakemake.resources import GroupResources
from snakemake.target_jobs import TargetSpec
from snakemake.utils import format, listfiles
from snakemake.utils import format
from snakemake.exceptions import RuleException, ProtectedOutputException, WorkflowError

from snakemake.logging import logger
Expand Down Expand Up @@ -89,22 +88,28 @@ def has_products(self, include_logfiles=True):
return True
return False


def _get_scheduler_resources(job):
if job._scheduler_resources is None:
if job.dag.workflow.local_exec or job.is_local:
job._scheduler_resources = job.resources
else:
job._scheduler_resources = Resources(
fromdict={
k: job.resources[k]
for k in (
set(job.resources.keys())
- job.dag.workflow.resource_scopes.locals
)
}
)
return job._scheduler_resources
def _get_scheduler_resources(self):
if self._scheduler_resources is None:
if self.dag.workflow.local_exec or self.is_local:
self._scheduler_resources = Resources(
fromdict={
k: v
for k, v in self.resources.items()
if not isinstance(self.resources[k], TBDString)
}
)
else:
self._scheduler_resources = Resources(
fromdict={
k: self.resources[k]
for k in (
set(self.resources.keys())
- self.dag.workflow.resource_scopes.locals
)
if not isinstance(self.resources[k], TBDString)
}
)
return self._scheduler_resources


class JobFactory:
Expand Down Expand Up @@ -387,10 +392,18 @@ def attempt(self, attempt):
def resources(self):
if self._resources is None:
if self.dag.workflow.local_exec or self.is_local:
skip_evaluation = None
skip_evaluation = set()
else:
# tmpdir should be evaluated in the context of the actual execution
skip_evaluation = {"tmpdir"}
if not self._params_and_resources_resetted:
# initial evaluation, input files of job are probably not yet present.
# Therefore skip all functions
skip_evaluation.update(
name
for name, val in self.rule.resources.items()
if is_callable(val)
)
self._resources = self.rule.expand_resources(
self.wildcards_dict,
self.input,
Expand All @@ -401,7 +414,7 @@ def resources(self):

@property
def scheduler_resources(self):
return _get_scheduler_resources(self)
return self._get_scheduler_resources()

def reset_params_and_resources(self):
if not self._params_and_resources_resetted:
Expand Down Expand Up @@ -1275,7 +1288,7 @@ def resources(self):

@property
def scheduler_resources(self):
return _get_scheduler_resources(self)
return self._get_scheduler_resources()

@property
def input(self):
Expand Down
13 changes: 13 additions & 0 deletions snakemake/settings.py
Expand Up @@ -193,6 +193,11 @@ class CondaCleanupPkgs(SettingsEnumBase):
CACHE = 1


class DeploymentFSMode(SettingsEnumBase):
SHARED = 0
NOT_SHARED = 1


@dataclass
class DeploymentSettings(SettingsBase, DeploymentSettingsExecutorInterface):
"""
Expand All @@ -214,6 +219,7 @@ class DeploymentSettings(SettingsBase, DeploymentSettingsExecutorInterface):
"""

deployment_method: Set[DeploymentMethod] = frozenset()
fs_mode: Optional[DeploymentFSMode] = None
conda_prefix: Optional[Path] = None
conda_cleanup_pkgs: Optional[CondaCleanupPkgs] = None
conda_base_path: Optional[Path] = None
Expand All @@ -226,6 +232,13 @@ def imply_deployment_method(self, method: DeploymentMethod):
self.deployment_method = set(self.deployment_method)
self.deployment_method.add(method)

@property
def assume_shared_fs(self):
assert (
self.fs_mode is not None
), "bug: called DeploymentSettings.assume_shared_fs before fs_mode has been inferred from StorageSettings"
return True if self.fs_mode == DeploymentFSMode.SHARED else False


@dataclass
class SchedulingSettings(SettingsBase):
Expand Down
5 changes: 3 additions & 2 deletions snakemake/spawn_jobs.py
Expand Up @@ -200,14 +200,15 @@ def general_args(
w2a("deployment_settings.conda_prefix"),
w2a(
"conda_base_path",
skip=not self.workflow.storage_settings.assume_shared_fs,
skip=not self.workflow.deployment_settings.assume_shared_fs,
),
w2a("deployment_settings.apptainer_prefix"),
w2a("deployment_settings.apptainer_args"),
w2a("resource_settings.max_threads"),
w2a(
"storage_settings.assume_shared_fs", flag="--no-shared-fs", invert=True
),
w2a("deployment_settings.fs_mode", flag="--software-deployment-fs-mode"),
w2a(
"execution_settings.keep_metadata", flag="--drop-metadata", invert=True
),
Expand All @@ -223,7 +224,7 @@ def general_args(
format_cli_arg(
"--scheduler-solver-path",
os.path.dirname(sys.executable),
skip=not self.workflow.storage_settings.assume_shared_fs,
skip=not self.workflow.deployment_settings.assume_shared_fs,
),
w2a(
"overwrite_workdir",
Expand Down
12 changes: 9 additions & 3 deletions snakemake/workflow.py
Expand Up @@ -959,7 +959,7 @@ def conda_list_envs(self):

if (
DeploymentMethod.APPTAINER in self.deployment_settings.deployment_method
and self.storage_settings.assume_shared_fs
and self.deployment_settings.assume_shared_fs
):
self.dag.pull_container_imgs()
self.dag.create_conda_envs(
Expand Down Expand Up @@ -987,7 +987,7 @@ def conda_create_envs(self):

if (
DeploymentMethod.APPTAINER in self.deployment_settings.deployment_method
and self.storage_settings.assume_shared_fs
and self.deployment_settings.assume_shared_fs
):
self.dag.pull_container_imgs()
self.dag.create_conda_envs()
Expand Down Expand Up @@ -1083,14 +1083,18 @@ def execute(
f for job in self.dag.needrun_jobs() for f in job.output
)

if self.global_or_node_local_shared_fs:
if self.deployment_settings.assume_shared_fs or (
self.remote_exec and not self.deployment_settings.assume_shared_fs
):
if (
DeploymentMethod.APPTAINER
in self.deployment_settings.deployment_method
):
self.dag.pull_container_imgs()
if DeploymentMethod.CONDA in self.deployment_settings.deployment_method:
self.dag.create_conda_envs()

if self.global_or_node_local_shared_fs:
async_run(self.dag.retrieve_storage_inputs())

if (
Expand Down Expand Up @@ -1518,6 +1522,7 @@ def decorate(ruleinfo):
rule.set_output(*ruleinfo.output.paths, **ruleinfo.output.kwpaths)
if ruleinfo.params:
rule.set_params(*ruleinfo.params[0], **ruleinfo.params[1])

# handle default resources
if self.resource_settings.default_resources is not None:
rule.resources = copy.deepcopy(
Expand Down Expand Up @@ -1572,6 +1577,7 @@ def decorate(ruleinfo):
)
else:
rule.shadow_depth = ruleinfo.shadow_depth

if ruleinfo.resources:
args, resources = ruleinfo.resources
if args:
Expand Down

0 comments on commit 04ec2c0

Please sign in to comment.