diff --git a/README.md b/README.md index acff988..951a38c 100644 --- a/README.md +++ b/README.md @@ -54,4 +54,14 @@ class Executor(RemoteExecutor) ) # access executor specific settings self.executor_settings + # access workflow + self.workflow + + # IMPORTANT: in your plugin, only access methods and properties of Snakemake objects (like Workflow, Persistence, etc.) + # that are defined in the interfaces found in THIS package. Other parts of those objects + # are NOT guaranteed to remain the same across new releases. + + # To ensure that the used interfaces are not changing, you should depend on this package as + # >=a.b.c, str: + ... diff --git a/snakemake_interface_executor_plugins/exceptions.py b/snakemake_interface_executor_plugins/exceptions.py index b54da35..2b17af6 100644 --- a/snakemake_interface_executor_plugins/exceptions.py +++ b/snakemake_interface_executor_plugins/exceptions.py @@ -3,45 +3,12 @@ __email__ = "johannes.koester@uni-due.de" __license__ = "MIT" -import textwrap +from snakemake_interface_common.exceptions import ApiError -class InvalidPluginException(Exception): + +class InvalidPluginException(ApiError): def __init__(self, plugin_name: str, message: str): super().__init__( f"Snakemake executor plugin {plugin_name} is invalid: {message}" ) - - -class WorkflowError(Exception): - @staticmethod - def format_arg(arg): - if isinstance(arg, str): - return arg - elif isinstance(arg, WorkflowError): - spec = "" - if arg.rule is not None: - spec += f"rule {arg.rule}" - if arg.snakefile is not None: - if spec: - spec += ", " - spec += f"line {arg.lineno}, {arg.snakefile}" - - if spec: - spec = f" ({spec})" - - return "{}{}:\n{}".format( - arg.__class__.__name__, spec, textwrap.indent(str(arg), " ") - ) - else: - return f"{arg.__class__.__name__}: {arg}" - - def __init__(self, *args, lineno=None, snakefile=None, rule=None): - super().__init__("\n".join(self.format_arg(arg) for arg in args)) - if rule is not None: - self.lineno = rule.lineno - self.snakefile = rule.snakefile - else: - self.lineno = lineno - self.snakefile = snakefile - self.rule = rule diff --git a/snakemake_interface_executor_plugins/executors/base.py b/snakemake_interface_executor_plugins/executors/base.py index 770855a..d8ef3db 100644 --- a/snakemake_interface_executor_plugins/executors/base.py +++ b/snakemake_interface_executor_plugins/executors/base.py @@ -6,9 +6,9 @@ from abc import ABC, abstractmethod from typing import List -from snakemake_interface_executor_plugins.dag import DAGExecutorInterface from snakemake_interface_executor_plugins.jobs import ExecutorJobInterface -from snakemake_interface_executor_plugins.utils import format_cli_arg, join_cli_args +from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface +from snakemake_interface_executor_plugins.utils import format_cli_arg from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface @@ -16,49 +16,11 @@ class AbstractExecutor(ABC): def __init__( self, workflow: WorkflowExecutorInterface, - dag: DAGExecutorInterface, - printreason=False, - quiet=False, - printshellcmds=False, - printthreads=True, - keepincomplete=False, + logger: LoggerExecutorInterface, ): self.workflow = workflow - self.dag = dag - self.quiet = quiet - self.printreason = printreason - self.printshellcmds = printshellcmds - self.printthreads = printthreads - self.latency_wait = workflow.latency_wait - self.keepincomplete = workflow.keep_incomplete - - def get_default_remote_provider_args(self): - return join_cli_args( - [ - self.workflow_property_to_arg("default_remote_prefix"), - self.workflow_property_to_arg("default_remote_provider", attr="name"), - ] - ) - - def get_set_resources_args(self): - return format_cli_arg( - "--set-resources", - [ - f"{rule}:{name}={value}" - for rule, res in self.workflow.overwrite_resources.items() - for name, value in res.items() - ], - skip=not self.workflow.overwrite_resources, - ) - - def get_default_resources_args(self, default_resources=None): - default_resources = default_resources or self.workflow.default_resources - return format_cli_arg("--default-resources", default_resources.args) - - def get_resource_scopes_args(self): - return format_cli_arg( - "--set-resource-scopes", self.workflow.overwrite_resource_scopes - ) + self.dag = workflow.dag + self.logger = logger def get_resource_declarations_dict(self, job: ExecutorJobInterface): def isdigit(i): diff --git a/snakemake_interface_executor_plugins/executors/real.py b/snakemake_interface_executor_plugins/executors/real.py index b883823..2601141 100644 --- a/snakemake_interface_executor_plugins/executors/real.py +++ b/snakemake_interface_executor_plugins/executors/real.py @@ -5,18 +5,13 @@ from abc import abstractmethod import os -import sys -from typing import Optional -from snakemake_interface_executor_plugins import ExecutorSettingsBase -from snakemake_interface_executor_plugins.dag import DAGExecutorInterface from snakemake_interface_executor_plugins.executors.base import AbstractExecutor from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface -from snakemake_interface_executor_plugins.persistence import StatsExecutorInterface +from snakemake_interface_executor_plugins.settings import ExecMode from snakemake_interface_executor_plugins.utils import ( encode_target_jobs_cli_args, format_cli_arg, join_cli_args, - lazy_property, ) from snakemake_interface_executor_plugins.jobs import ExecutorJobInterface from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface @@ -26,29 +21,33 @@ class RealExecutor(AbstractExecutor): def __init__( self, workflow: WorkflowExecutorInterface, - dag: DAGExecutorInterface, - stats: StatsExecutorInterface, logger: LoggerExecutorInterface, - executor_settings: Optional[ExecutorSettingsBase], - job_core_limit: Optional[int] = None, + pass_default_remote_provider_args: bool = True, + pass_default_resources_args: bool = True, + pass_envvar_declarations_to_cmd: bool = True, ): super().__init__( workflow, - dag, + logger, ) - self.cores = job_core_limit if job_core_limit else "all" - self.executor_settings = executor_settings - self.assume_shared_fs = workflow.assume_shared_fs - self.stats = stats - self.logger = logger + self.executor_settings = self.workflow.executor_settings self.snakefile = workflow.main_snakefile + self.pass_default_remote_provider_args = pass_default_remote_provider_args + self.pass_default_resources_args = pass_default_resources_args + self.pass_envvar_declarations_to_cmd = pass_envvar_declarations_to_cmd + + @property + @abstractmethod + def cores(self): + # return "all" in case of remote executors, + # otherwise self.workflow.resource_settings.cores + ... def register_job(self, job: ExecutorJobInterface): job.register() def _run(self, job: ExecutorJobInterface, callback=None, error_callback=None): super()._run(job) - self.stats.report_job_start(job) try: self.register_job(job) @@ -74,96 +73,11 @@ def handle_job_success( handle_log=handle_log, handle_touch=handle_touch, ignore_missing_output=ignore_missing_output, - latency_wait=self.latency_wait, - assume_shared_fs=self.assume_shared_fs, - keep_metadata=self.workflow.keep_metadata, ) - self.stats.report_job_end(job) def handle_job_error(self, job: ExecutorJobInterface, upload_remote=True): job.postprocess( error=True, - assume_shared_fs=self.assume_shared_fs, - latency_wait=self.latency_wait, - ) - - def workflow_property_to_arg( - self, property, flag=None, quote=True, skip=False, invert=False, attr=None - ): - if skip: - return "" - - value = getattr(self.workflow, property) - - if value is not None and attr is not None: - value = getattr(value, attr) - - if flag is None: - flag = f"--{property.replace('_', '-')}" - - if invert and isinstance(value, bool): - value = not value - - return format_cli_arg(flag, value, quote=quote) - - @lazy_property - def general_args(self): - """Return a string to add to self.exec_job that includes additional - arguments from the command line. This is currently used in the - ClusterExecutor and CPUExecutor, as both were using the same - code. Both have base class of the RealExecutor. - """ - w2a = self.workflow_property_to_arg - - return join_cli_args( - [ - "--force", - "--keep-target-files", - "--keep-remote", - "--max-inventory-time 0", - "--nocolor", - "--notemp", - "--no-hooks", - "--nolock", - "--ignore-incomplete", - format_cli_arg("--keep-incomplete", self.keepincomplete), - w2a("rerun_triggers"), - w2a("cleanup_scripts", flag="--skip-script-cleanup"), - w2a("shadow_prefix"), - w2a("use_conda"), - w2a("conda_frontend"), - w2a("conda_prefix"), - w2a("conda_base_path", skip=not self.assume_shared_fs), - w2a("use_singularity"), - w2a("singularity_prefix"), - w2a("singularity_args"), - w2a("execute_subworkflows", flag="--no-subworkflows", invert=True), - w2a("max_threads"), - w2a("use_env_modules", flag="--use-envmodules"), - w2a("keep_metadata", flag="--drop-metadata", invert=True), - w2a("wrapper_prefix"), - w2a("overwrite_threads", flag="--set-threads"), - w2a("overwrite_scatter", flag="--set-scatter"), - w2a("local_groupid", skip=self.job_specific_local_groupid), - w2a("conda_not_block_search_path_envvars"), - w2a("overwrite_configfiles", flag="--configfiles"), - w2a("config_args", flag="--config"), - w2a("printshellcmds"), - w2a("latency_wait"), - w2a("scheduler_type", flag="--scheduler"), - format_cli_arg( - "--scheduler-solver-path", - os.path.dirname(sys.executable), - skip=not self.assume_shared_fs, - ), - self.get_set_resources_args(), - self.get_default_remote_provider_args(), - self.get_default_resources_args(), - self.get_resource_scopes_args(), - self.get_workdir_arg(), - join_cli_args(self.additional_general_args()), - format_cli_arg("--mode", self.get_exec_mode()), - ] ) def additional_general_args(self): @@ -212,11 +126,17 @@ def get_python_executable(self): ... @abstractmethod - def get_exec_mode(self): + def get_exec_mode(self) -> ExecMode: ... def get_envvar_declarations(self): - return "" + if self.pass_envvar_declarations_to_cmd: + return " ".join( + f"{var}={repr(os.environ[var])}" + for var in self.workflow.remote_execution_settings.envvars + ) + else: + return "" def get_job_exec_prefix(self, job: ExecutorJobInterface): return "" @@ -231,6 +151,10 @@ def format_job_exec(self, job: ExecutorJobInterface): suffix = self.get_job_exec_suffix(job) if suffix: suffix = f"&& {suffix}" + general_args = self.workflow.spawned_job_args_factory.general_args( + pass_default_remote_provider_args=self.pass_default_remote_provider_args, + pass_default_resources_args=self.pass_default_resources_args, + ) return join_cli_args( [ prefix, @@ -239,7 +163,16 @@ def format_job_exec(self, job: ExecutorJobInterface): "-m snakemake", format_cli_arg("--snakefile", self.get_snakefile()), self.get_job_args(job), - self.general_args, + self.get_default_remote_provider_args(), + self.get_workdir_arg(), + general_args, + self.additional_general_args(), + format_cli_arg("--mode", self.get_exec_mode()), + format_cli_arg( + "--local-groupid", + self.workflow.group_settings.local_groupid, + skip=self.job_specific_local_groupid, + ), suffix, ] ) diff --git a/snakemake_interface_executor_plugins/executors/remote.py b/snakemake_interface_executor_plugins/executors/remote.py index ffb53a8..b017c9b 100644 --- a/snakemake_interface_executor_plugins/executors/remote.py +++ b/snakemake_interface_executor_plugins/executors/remote.py @@ -12,15 +12,12 @@ import sys import tempfile import threading -from typing import Optional -from snakemake_interface_executor_plugins import ExecutorSettingsBase -from snakemake_interface_executor_plugins.dag import DAGExecutorInterface -from snakemake_interface_executor_plugins.exceptions import WorkflowError +from snakemake_interface_common.exceptions import WorkflowError from snakemake_interface_executor_plugins.executors.real import RealExecutor from snakemake_interface_executor_plugins.jobs import ExecutorJobInterface from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface -from snakemake_interface_executor_plugins.persistence import StatsExecutorInterface -from snakemake_interface_executor_plugins.utils import ExecMode, format_cli_arg +from snakemake_interface_executor_plugins.settings import ExecMode +from snakemake_interface_executor_plugins.utils import format_cli_arg from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface from throttler import Throttler @@ -42,31 +39,29 @@ class RemoteExecutor(RealExecutor, ABC): def __init__( self, workflow: WorkflowExecutorInterface, - dag: DAGExecutorInterface, - stats: StatsExecutorInterface, logger: LoggerExecutorInterface, - executor_settings: Optional[ExecutorSettingsBase], - jobname="snakejob.{name}.{jobid}.sh", - max_status_checks_per_second=1, - disable_default_remote_provider_args=False, - disable_default_resources_args=False, - disable_envvar_declarations=False, + pass_default_remote_provider_args: bool = True, + pass_default_resources_args: bool = True, + pass_envvar_declarations_to_cmd: bool = False, ): super().__init__( workflow, - dag, - stats, logger, - executor_settings, + pass_default_remote_provider_args=pass_default_remote_provider_args, + pass_default_resources_args=pass_default_resources_args, + pass_envvar_declarations_to_cmd=pass_envvar_declarations_to_cmd, ) - self.max_status_checks_per_second = max_status_checks_per_second + self.max_status_checks_per_second = ( + self.workflow.remote_execution_settings.max_status_checks_per_second + ) + self.jobname = self.workflow.remote_execution_settings.jobname - if not self.assume_shared_fs: + if not self.workflow.storage_settings.assume_shared_fs: # use relative path to Snakefile self.snakefile = os.path.relpath(workflow.main_snakefile) self.is_default_jobscript = False - jobscript = workflow.jobscript + jobscript = workflow.remote_execution_settings.jobscript if jobscript is None: jobscript = os.path.join(os.path.dirname(__file__), self.default_jobscript) self.is_default_jobscript = True @@ -76,12 +71,12 @@ def __init__( except IOError as e: raise WorkflowError(e) - if "{jobid}" not in jobname: + if "{jobid}" not in self.jobname: raise WorkflowError( - f'Defined jobname ("{jobname}") has to contain the wildcard {{jobid}}.' + f'Defined jobname ("{self.jobname}") has ' + f"to contain the wildcard {{jobid}}." ) - self.jobname = jobname self._tmpdir = None self.active_jobs = list() @@ -91,20 +86,20 @@ def __init__( self.wait_thread.daemon = True self.wait_thread.start() - self.disable_default_remote_provider_args = disable_default_remote_provider_args - self.disable_default_resources_args = disable_default_resources_args - self.disable_envvar_declarations = disable_envvar_declarations - max_status_checks_frac = Fraction( - max_status_checks_per_second + self.max_status_checks_per_second ).limit_denominator() self.status_rate_limiter = Throttler( rate_limit=max_status_checks_frac.numerator, period=max_status_checks_frac.denominator, ) - def get_exec_mode(self): - return ExecMode.remote + @property + def cores(self): + return "all" + + def get_exec_mode(self) -> ExecMode: + return ExecMode.REMOTE def get_default_remote_provider_args(self): if not self.disable_default_remote_provider_args: @@ -112,31 +107,21 @@ def get_default_remote_provider_args(self): else: return "" - def get_default_resources_args(self, default_resources=None): - if not self.disable_default_resources_args: - return super().get_default_resources_args(default_resources) - else: - return "" - def get_workdir_arg(self): - if self.assume_shared_fs: + if self.workflow.storage_settings.assume_shared_fs: return super().get_workdir_arg() return "" - def get_envvar_declarations(self): - if not self.disable_envvar_declarations: - return " ".join( - f"{var}={repr(os.environ[var])}" for var in self.workflow.envvars - ) - else: - return "" - def get_python_executable(self): - return sys.executable if self.assume_shared_fs else "python" + return ( + sys.executable + if self.workflow.storage_settings.assume_shared_fs + else "python" + ) def get_job_args(self, job: ExecutorJobInterface): waitfiles_parameter = "" - if self.assume_shared_fs: + if self.workflow.storage_settings.assume_shared_fs: wait_for_files = [] wait_for_files.append(self.tmpdir) wait_for_files.extend(job.get_wait_for_files()) @@ -171,7 +156,7 @@ def shutdown(self): with self.lock: self.wait = False self.wait_thread.join() - if not self.workflow.immediate_submit: + if not self.workflow.remote_execution_settings.immediate_submit: # Only delete tmpdir (containing jobscripts) if not using # immediate_submit. With immediate_submit, jobs can be scheduled # after this method is completed. Hence we have to keep the @@ -182,7 +167,7 @@ def cancel(self): self.shutdown() def _run(self, job: ExecutorJobInterface, callback=None, error_callback=None): - if self.assume_shared_fs: + if self.workflow.storage_settings.assume_shared_fs: job.remove_existing_output() job.download_remote_input() super()._run(job, callback=callback, error_callback=error_callback) @@ -221,7 +206,7 @@ def write_jobscript(self, job: ExecutorJobInterface, jobscript): else: raise WorkflowError( "Error formatting custom jobscript " - f"{self.workflow.jobscript}: value for {e} not found.\n" + f"{self.jobscript}: value for {e} not found.\n" "Make sure that your custom jobscript is defined as " "expected." ) @@ -245,7 +230,7 @@ def handle_job_error(self, job: ExecutorJobInterface): # It will be removed by the CPUExecutor in case of a shared FS, # but we might not see the removal due to filesystem latency. # By removing it again, we make sure that it is gone on the host FS. - if not self.keepincomplete: + if not self.workflow.execution_settings.keep_incomplete: self.workflow.persistence.cleanup(job) # Also cleanup the jobs output files, in case the remote job # was not able to, due to e.g. timeout. diff --git a/snakemake_interface_executor_plugins/persistence.py b/snakemake_interface_executor_plugins/persistence.py index 742781a..0cc838a 100644 --- a/snakemake_interface_executor_plugins/persistence.py +++ b/snakemake_interface_executor_plugins/persistence.py @@ -5,8 +5,6 @@ from abc import ABC, abstractmethod -from snakemake_interface_executor_plugins.jobs import ExecutorJobInterface - class PersistenceExecutorInterface(ABC): @abstractmethod @@ -22,32 +20,3 @@ def path(self): @abstractmethod def aux_path(self): ... - - -class StatsExecutorInterface(ABC): - @abstractmethod - def report_job_start(self, job: ExecutorJobInterface): - ... - - @abstractmethod - def report_job_end(self, job: ExecutorJobInterface): - ... - - @property - @abstractmethod - def rule_stats(self): - ... - - @property - @abstractmethod - def file_stats(self): - ... - - @property - @abstractmethod - def overall_runtime(self): - ... - - @abstractmethod - def to_json(self, path): - ... diff --git a/snakemake_interface_executor_plugins/registry/plugin.py b/snakemake_interface_executor_plugins/registry/plugin.py index 8e33f37..ffe5909 100644 --- a/snakemake_interface_executor_plugins/registry/plugin.py +++ b/snakemake_interface_executor_plugins/registry/plugin.py @@ -63,7 +63,7 @@ def has_executor_settings(self): self._executor_settings_cls, ExecutorSettingsBase ) - def get_executor_settings(self, args) -> Optional[ExecutorSettingsBase]: + def get_executor_settings(self, args) -> ExecutorSettingsBase: """Return an instance of self.executor_settings with values from args. This helper function will select executor plugin namespaces arguments diff --git a/snakemake_interface_executor_plugins/resources.py b/snakemake_interface_executor_plugins/resources.py new file mode 100644 index 0000000..eacdc8b --- /dev/null +++ b/snakemake_interface_executor_plugins/resources.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod +from typing import List + + +class DefaultResourcesExecutorInterface(ABC): + @property + @abstractmethod + def args(self) -> List[str]: + ... diff --git a/snakemake_interface_executor_plugins/settings.py b/snakemake_interface_executor_plugins/settings.py new file mode 100644 index 0000000..4c010bb --- /dev/null +++ b/snakemake_interface_executor_plugins/settings.py @@ -0,0 +1,80 @@ +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, Optional, Set + +from snakemake_interface_common.settings import SettingsEnumBase + + +class RemoteExecutionSettingsExecutorInterface(ABC): + @property + @abstractmethod + def jobscript(self) -> str: + ... + + @property + @abstractmethod + def immediate_submit(self) -> bool: + ... + + @property + @abstractmethod + def envvars(self) -> Optional[List[str]]: + ... + + @property + @abstractmethod + def max_status_checks_per_second(self) -> int: + ... + + +class ExecMode(SettingsEnumBase): + """ + Enum for execution mode of Snakemake. + This handles the behavior of e.g. the logger. + """ + + DEFAULT = 0 + SUBPROCESS = 1 + REMOTE = 2 + + +class ExecutionSettingsExecutorInterface(ABC): + @property + @abstractmethod + def keep_incomplete(self) -> bool: + ... + + @property + @abstractmethod + def debug(self) -> bool: + ... + + @property + @abstractmethod + def cleanup_scripts(self) -> bool: + ... + + @property + @abstractmethod + def edit_notebook(self) -> Optional[Path]: + ... + + +class StorageSettingsExecutorInterface(ABC): + @property + @abstractmethod + def assume_shared_fs(self) -> bool: + ... + + +class DeploymentMethod(SettingsEnumBase): + CONDA = 0 + APPTAINER = 1 + ENV_MODULES = 2 + + +class DeploymentSettingsExecutorInterface(ABC): + @property + @abstractmethod + def deployment_method(self) -> Set[DeploymentMethod]: + ... diff --git a/snakemake_interface_executor_plugins/utils.py b/snakemake_interface_executor_plugins/utils.py index d1e2ae9..8712bd5 100644 --- a/snakemake_interface_executor_plugins/utils.py +++ b/snakemake_interface_executor_plugins/utils.py @@ -6,21 +6,12 @@ import asyncio import os from collections import UserDict -from typing import List +from typing import Any, List from urllib.parse import urlparse import collections from collections import namedtuple - -class ExecMode: - """ - Enum for execution mode of Snakemake. - This handles the behavior of e.g. the logger. - """ - - default = 0 - subprocess = 1 - remote = 2 +from snakemake_interface_common.settings import SettingsEnumBase def not_iterable(value): @@ -54,11 +45,20 @@ def format_cli_arg(flag, value, quote=True, skip=False): def format_cli_pos_arg(value, quote=True): if isinstance(value, (dict, UserDict)): - return join_cli_args(repr(f"{key}={val}") for key, val in value.items()) + return join_cli_args( + repr(f"{key}={format_cli_value(val)}") for key, val in value.items() + ) elif not_iterable(value): - return repr(value) + return format_cli_value(value) else: - return join_cli_args(repr(v) for v in value) + return join_cli_args(format_cli_value(v) for v in value) + + +def format_cli_value(value: Any) -> str: + if isinstance(value, SettingsEnumBase): + return value.item_to_choice() + else: + return repr(value) def join_cli_args(args): diff --git a/snakemake_interface_executor_plugins/workflow.py b/snakemake_interface_executor_plugins/workflow.py index 22769ba..3578dac 100644 --- a/snakemake_interface_executor_plugins/workflow.py +++ b/snakemake_interface_executor_plugins/workflow.py @@ -5,133 +5,46 @@ from abc import ABC, abstractmethod -from typing import Dict, List, Optional +from snakemake_interface_executor_plugins.cli import ( + SpawnedJobArgsFactoryExecutorInterface, +) from snakemake_interface_executor_plugins.persistence import ( PersistenceExecutorInterface, ) from snakemake_interface_executor_plugins.scheduler import JobSchedulerExecutorInterface +from snakemake_interface_executor_plugins.settings import ( + DeploymentSettingsExecutorInterface, + ExecutionSettingsExecutorInterface, + RemoteExecutionSettingsExecutorInterface, + StorageSettingsExecutorInterface, +) class WorkflowExecutorInterface(ABC): @property @abstractmethod - def assume_shared_fs(self) -> bool: - ... - - @property - @abstractmethod - def keep_incomplete(self) -> bool: - ... - - @property - @abstractmethod - def latency_wait(self) -> int: - ... - - @property - @abstractmethod - def rerun_triggers(self) -> Optional[List[str]]: - ... - - @property - @abstractmethod - def shadow_prefix(self) -> Optional[str]: - ... - - @property - @abstractmethod - def conda_frontend(self) -> Optional[str]: - ... - - @property - @abstractmethod - def conda_prefix(self) -> Optional[str]: - ... - - @property - @abstractmethod - def conda_base_path(self) -> Optional[str]: - ... - - @property - @abstractmethod - def singularity_args(self) -> Optional[str]: + def spawned_job_args_factory(self) -> SpawnedJobArgsFactoryExecutorInterface: ... @property @abstractmethod - def execute_subworkflows(self) -> bool: + def execution_settings(self) -> ExecutionSettingsExecutorInterface: ... @property @abstractmethod - def max_threads(self) -> Optional[int]: + def remote_execution_settings(self) -> RemoteExecutionSettingsExecutorInterface: ... @property @abstractmethod - def keep_metadata(self) -> bool: + def storage_settings(self) -> StorageSettingsExecutorInterface: ... @property @abstractmethod - def wrapper_prefix(self) -> Optional[str]: - ... - - @property - @abstractmethod - def overwrite_threads(self) -> Dict[str, int]: - ... - - @property - @abstractmethod - def overwrite_scatter(self) -> Dict[str, int]: - ... - - @property - @abstractmethod - def local_groupid(self): - ... - - @property - @abstractmethod - def conda_not_block_search_path_envvars(self): - ... - - @property - @abstractmethod - def overwrite_configfiles(self): - ... - - @property - @abstractmethod - def config_args(self): - ... - - @property - @abstractmethod - def printshellcmds(self): - ... - - @property - @abstractmethod - def scheduler_type(self): - ... - - @property - @abstractmethod - def overwrite_resources(self): - ... - - @property - @abstractmethod - def default_resources(self): - ... - - @property - @abstractmethod - def overwrite_resource_scopes(self): + def deployment_settings(self) -> DeploymentSettingsExecutorInterface: ... @property @@ -143,11 +56,6 @@ def resource_scopes(self): def get_cache_mode(self, rule): ... - @property - @abstractmethod - def output_file_cache(self): - ... - @property @abstractmethod def main_snakefile(self): @@ -168,76 +76,12 @@ def linemaps(self): def workdir_init(self): ... - @property - @abstractmethod - def use_conda(self): - ... - - @property - @abstractmethod - def use_singularity(self): - ... - - @property - @abstractmethod - def use_env_modules(self): - ... - - @property - @abstractmethod - def debug(self): - ... - - @property - @abstractmethod - def cleanup_scripts(self): - ... - - @property - @abstractmethod - def edit_notebook(self): - ... - @property @abstractmethod def sourcecache(self): ... - @property - @abstractmethod - def verbose(self): - ... - - @property - @abstractmethod - def jobscript(self): - ... - - @property - @abstractmethod - def envvars(self): - ... - @property @abstractmethod def scheduler(self) -> JobSchedulerExecutorInterface: ... - - @property - @abstractmethod - def immediate_submit(self): - ... - - @property - @abstractmethod - def default_remote_prefix(self): - ... - - @property - @abstractmethod - def rules(self): - ... - - @abstractmethod - def get_rule(self, name): - ...