Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,14 @@ class Executor(RemoteExecutor)
)
# access executor specific settings
self.executor_settings
# access workflow
self.workflow

# IMPORTANT: in your plugin, only access methods and properties of Snakemake objects (like Workflow, Persistence, etc.)
# that are defined in the interfaces found in THIS package. Other parts of those objects
# are NOT guaranteed to remain the same across new releases.

# To ensure that the used interfaces are not changing, you should depend on this package as
# >=a.b.c,<d with d=a+1 (i.e. pin the dependency on this package to be at least the version at time of development
# and less than the next major version which would introduce breaking changes).
```
7 changes: 1 addition & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ version = "1.0.1"
argparse-dataclass = "^2.0.0"
python = "^3.9"
throttler = "^1.2.2"
snakemake-interface-common = "^1.3.1"

[tool.poetry.dev-dependencies]
black = "^22.1.0"
Expand All @@ -25,12 +26,6 @@ snakemake-executor-plugin-flux = {git = "https://github.com/snakemake/snakemake-
omit = [".*", "*/site-packages/*"]

[tool.coverage.report]
# exclude CLI handling lines. They cannot be captured properly by coverage, but we have a testcase for them.
exclude_lines = [
"yaml\\.dump\\(result, outfile\\)",
"process_yaml\\(sys.stdin, outfile=sys\\.stdout\\)",
"plac.call\\(cli\\)",
]
fail_under = 64

[build-system]
Expand Down
5 changes: 5 additions & 0 deletions snakemake_interface_executor_plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ class CommonSettings:
"""

non_local_exec: bool
dryrun_exec: bool = False
use_threads: bool = False

@property
def local_exec(self):
return not self.non_local_exec


@dataclass
class ExecutorSettingsBase:
Expand Down
11 changes: 11 additions & 0 deletions snakemake_interface_executor_plugins/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from abc import ABC, abstractmethod


class SpawnedJobArgsFactoryExecutorInterface(ABC):
@abstractmethod
def general_args(
self,
pass_default_remote_provider_args: bool = True,
pass_default_resources_args: bool = False,
) -> str:
...
39 changes: 3 additions & 36 deletions snakemake_interface_executor_plugins/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,12 @@
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"

import textwrap

from snakemake_interface_common.exceptions import ApiError

class InvalidPluginException(Exception):

class InvalidPluginException(ApiError):
def __init__(self, plugin_name: str, message: str):
super().__init__(
f"Snakemake executor plugin {plugin_name} is invalid: {message}"
)


class WorkflowError(Exception):
@staticmethod
def format_arg(arg):
if isinstance(arg, str):
return arg
elif isinstance(arg, WorkflowError):
spec = ""
if arg.rule is not None:
spec += f"rule {arg.rule}"
if arg.snakefile is not None:
if spec:
spec += ", "
spec += f"line {arg.lineno}, {arg.snakefile}"

if spec:
spec = f" ({spec})"

return "{}{}:\n{}".format(
arg.__class__.__name__, spec, textwrap.indent(str(arg), " ")
)
else:
return f"{arg.__class__.__name__}: {arg}"

def __init__(self, *args, lineno=None, snakefile=None, rule=None):
super().__init__("\n".join(self.format_arg(arg) for arg in args))
if rule is not None:
self.lineno = rule.lineno
self.snakefile = rule.snakefile
else:
self.lineno = lineno
self.snakefile = snakefile
self.rule = rule
48 changes: 5 additions & 43 deletions snakemake_interface_executor_plugins/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,21 @@
from abc import ABC, abstractmethod
from typing import List

from snakemake_interface_executor_plugins.dag import DAGExecutorInterface
from snakemake_interface_executor_plugins.jobs import ExecutorJobInterface
from snakemake_interface_executor_plugins.utils import format_cli_arg, join_cli_args
from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface
from snakemake_interface_executor_plugins.utils import format_cli_arg
from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface


class AbstractExecutor(ABC):
def __init__(
self,
workflow: WorkflowExecutorInterface,
dag: DAGExecutorInterface,
printreason=False,
quiet=False,
printshellcmds=False,
printthreads=True,
keepincomplete=False,
logger: LoggerExecutorInterface,
):
self.workflow = workflow
self.dag = dag
self.quiet = quiet
self.printreason = printreason
self.printshellcmds = printshellcmds
self.printthreads = printthreads
self.latency_wait = workflow.latency_wait
self.keepincomplete = workflow.keep_incomplete

def get_default_remote_provider_args(self):
return join_cli_args(
[
self.workflow_property_to_arg("default_remote_prefix"),
self.workflow_property_to_arg("default_remote_provider", attr="name"),
]
)

def get_set_resources_args(self):
return format_cli_arg(
"--set-resources",
[
f"{rule}:{name}={value}"
for rule, res in self.workflow.overwrite_resources.items()
for name, value in res.items()
],
skip=not self.workflow.overwrite_resources,
)

def get_default_resources_args(self, default_resources=None):
default_resources = default_resources or self.workflow.default_resources
return format_cli_arg("--default-resources", default_resources.args)

def get_resource_scopes_args(self):
return format_cli_arg(
"--set-resource-scopes", self.workflow.overwrite_resource_scopes
)
self.dag = workflow.dag
self.logger = logger

def get_resource_declarations_dict(self, job: ExecutorJobInterface):
def isdigit(i):
Expand Down
143 changes: 38 additions & 105 deletions snakemake_interface_executor_plugins/executors/real.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@

from abc import abstractmethod
import os
import sys
from typing import Optional
from snakemake_interface_executor_plugins import ExecutorSettingsBase
from snakemake_interface_executor_plugins.dag import DAGExecutorInterface
from snakemake_interface_executor_plugins.executors.base import AbstractExecutor
from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface
from snakemake_interface_executor_plugins.persistence import StatsExecutorInterface
from snakemake_interface_executor_plugins.settings import ExecMode
from snakemake_interface_executor_plugins.utils import (
encode_target_jobs_cli_args,
format_cli_arg,
join_cli_args,
lazy_property,
)
from snakemake_interface_executor_plugins.jobs import ExecutorJobInterface
from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface
Expand All @@ -26,29 +21,33 @@ class RealExecutor(AbstractExecutor):
def __init__(
self,
workflow: WorkflowExecutorInterface,
dag: DAGExecutorInterface,
stats: StatsExecutorInterface,
logger: LoggerExecutorInterface,
executor_settings: Optional[ExecutorSettingsBase],
job_core_limit: Optional[int] = None,
pass_default_remote_provider_args: bool = True,
pass_default_resources_args: bool = True,
pass_envvar_declarations_to_cmd: bool = True,
):
super().__init__(
workflow,
dag,
logger,
)
self.cores = job_core_limit if job_core_limit else "all"
self.executor_settings = executor_settings
self.assume_shared_fs = workflow.assume_shared_fs
self.stats = stats
self.logger = logger
self.executor_settings = self.workflow.executor_settings
self.snakefile = workflow.main_snakefile
self.pass_default_remote_provider_args = pass_default_remote_provider_args
self.pass_default_resources_args = pass_default_resources_args
self.pass_envvar_declarations_to_cmd = pass_envvar_declarations_to_cmd

@property
@abstractmethod
def cores(self):
# return "all" in case of remote executors,
# otherwise self.workflow.resource_settings.cores
...

def register_job(self, job: ExecutorJobInterface):
job.register()

def _run(self, job: ExecutorJobInterface, callback=None, error_callback=None):
super()._run(job)
self.stats.report_job_start(job)

try:
self.register_job(job)
Expand All @@ -74,96 +73,11 @@ def handle_job_success(
handle_log=handle_log,
handle_touch=handle_touch,
ignore_missing_output=ignore_missing_output,
latency_wait=self.latency_wait,
assume_shared_fs=self.assume_shared_fs,
keep_metadata=self.workflow.keep_metadata,
)
self.stats.report_job_end(job)

def handle_job_error(self, job: ExecutorJobInterface, upload_remote=True):
job.postprocess(
error=True,
assume_shared_fs=self.assume_shared_fs,
latency_wait=self.latency_wait,
)

def workflow_property_to_arg(
self, property, flag=None, quote=True, skip=False, invert=False, attr=None
):
if skip:
return ""

value = getattr(self.workflow, property)

if value is not None and attr is not None:
value = getattr(value, attr)

if flag is None:
flag = f"--{property.replace('_', '-')}"

if invert and isinstance(value, bool):
value = not value

return format_cli_arg(flag, value, quote=quote)

@lazy_property
def general_args(self):
"""Return a string to add to self.exec_job that includes additional
arguments from the command line. This is currently used in the
ClusterExecutor and CPUExecutor, as both were using the same
code. Both have base class of the RealExecutor.
"""
w2a = self.workflow_property_to_arg

return join_cli_args(
[
"--force",
"--keep-target-files",
"--keep-remote",
"--max-inventory-time 0",
"--nocolor",
"--notemp",
"--no-hooks",
"--nolock",
"--ignore-incomplete",
format_cli_arg("--keep-incomplete", self.keepincomplete),
w2a("rerun_triggers"),
w2a("cleanup_scripts", flag="--skip-script-cleanup"),
w2a("shadow_prefix"),
w2a("use_conda"),
w2a("conda_frontend"),
w2a("conda_prefix"),
w2a("conda_base_path", skip=not self.assume_shared_fs),
w2a("use_singularity"),
w2a("singularity_prefix"),
w2a("singularity_args"),
w2a("execute_subworkflows", flag="--no-subworkflows", invert=True),
w2a("max_threads"),
w2a("use_env_modules", flag="--use-envmodules"),
w2a("keep_metadata", flag="--drop-metadata", invert=True),
w2a("wrapper_prefix"),
w2a("overwrite_threads", flag="--set-threads"),
w2a("overwrite_scatter", flag="--set-scatter"),
w2a("local_groupid", skip=self.job_specific_local_groupid),
w2a("conda_not_block_search_path_envvars"),
w2a("overwrite_configfiles", flag="--configfiles"),
w2a("config_args", flag="--config"),
w2a("printshellcmds"),
w2a("latency_wait"),
w2a("scheduler_type", flag="--scheduler"),
format_cli_arg(
"--scheduler-solver-path",
os.path.dirname(sys.executable),
skip=not self.assume_shared_fs,
),
self.get_set_resources_args(),
self.get_default_remote_provider_args(),
self.get_default_resources_args(),
self.get_resource_scopes_args(),
self.get_workdir_arg(),
join_cli_args(self.additional_general_args()),
format_cli_arg("--mode", self.get_exec_mode()),
]
)

def additional_general_args(self):
Expand Down Expand Up @@ -212,11 +126,17 @@ def get_python_executable(self):
...

@abstractmethod
def get_exec_mode(self):
def get_exec_mode(self) -> ExecMode:
...

def get_envvar_declarations(self):
return ""
if self.pass_envvar_declarations_to_cmd:
return " ".join(
f"{var}={repr(os.environ[var])}"
for var in self.workflow.remote_execution_settings.envvars
)
else:
return ""

def get_job_exec_prefix(self, job: ExecutorJobInterface):
return ""
Expand All @@ -231,6 +151,10 @@ def format_job_exec(self, job: ExecutorJobInterface):
suffix = self.get_job_exec_suffix(job)
if suffix:
suffix = f"&& {suffix}"
general_args = self.workflow.spawned_job_args_factory.general_args(
pass_default_remote_provider_args=self.pass_default_remote_provider_args,
pass_default_resources_args=self.pass_default_resources_args,
)
return join_cli_args(
[
prefix,
Expand All @@ -239,7 +163,16 @@ def format_job_exec(self, job: ExecutorJobInterface):
"-m snakemake",
format_cli_arg("--snakefile", self.get_snakefile()),
self.get_job_args(job),
self.general_args,
self.get_default_remote_provider_args(),
self.get_workdir_arg(),
general_args,
self.additional_general_args(),
format_cli_arg("--mode", self.get_exec_mode()),
format_cli_arg(
"--local-groupid",
self.workflow.group_settings.local_groupid,
skip=self.job_specific_local_groupid,
),
suffix,
]
)
Loading