Skip to content

Commit

Permalink
Merged in job-groups2 (pull request #290)
Browse files Browse the repository at this point in the history
Group job support
  • Loading branch information
johanneskoester committed May 13, 2018
2 parents 42a2f09 + 966ffaf commit 6fa49e7
Show file tree
Hide file tree
Showing 30 changed files with 1,104 additions and 248 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ dist/
.snakemake*

.idea

.pytest*
.cache
.ipynb*
.ropeproject
.test*
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Change Log

# [5.0.0] - 2018-05-11
# Added
- Group jobs for reduced queuing and network overhead, in particular with short running jobs.
- Output files can be marked as pipes, such that producing and consuming job are executed simultaneously and interfomation is transferred directly without using disk.
- Command line flags to clean output files.
- Command line flag to list files in working directory that are not tracked by Snakemake.
# Changes
- Fix of --default-remote-prefix in case of input functions returning lists or dicts.
- Scheduler no longer prefers jobs with many downstream jobs.

# [4.8.1] - 2018-04-25
# Added
- Allow URLs for the conda directive.
Expand Down
11 changes: 11 additions & 0 deletions docs/executable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ Of course, if any input or output already defines a different remote location, t
Importantly, this means that Snakemake does **not** require a shared network
filesystem to work in the cloud.


.. sidebar:: Note

Consider to :ref:`group jobs <snakefiles-grouping>` in order to minimize overhead, in particular for short-running jobs.

Currently, this mode requires that the Snakemake workflow is stored in a git repository.
Snakemake uses git to query necessary source files (the Snakefile, scripts, config, ...)
for workflow execution and encodes them into the kubernetes job.
Expand Down Expand Up @@ -144,6 +149,11 @@ In this case, Snakemake simply needs to be given a submit command that accepts a
Here, ``-j`` denotes the number of jobs submitted being submitted to the cluster at the same time (here 32).
The cluster command can be decorated with job specific information, e.g.

.. sidebar:: Note

Consider to :ref:`group jobs <snakefiles-grouping>` in order to minimize overhead, in particular for short-running jobs.


.. code-block:: console
$ snakemake --cluster "qsub {threads}"
Expand Down Expand Up @@ -176,6 +186,7 @@ With DRMAA, no ``qsub`` command needs to be provided, but system specific argume
Note that the string has to contain a leading whitespace.
Else, the arguments will be interpreted as part of the normal Snakemake arguments, and execution will fail.


Job Properties
..............

Expand Down
86 changes: 86 additions & 0 deletions docs/snakefiles/rules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -922,3 +922,89 @@ The resulting tsv file can be used as input for other rules, just like any other
Note that benchmarking is only possible in a reliable fashion for subprocesses (thus for tasks run through the ``shell``, ``script``, and ``wrapper`` directive).
In the ``run`` block, the variable ``bench_record`` is available that you can pass to ``shell()`` as ``bench_record=bench_record``.
When using ``shell(..., bench_record=bench_record)``, the maximum of all measurements of all ``shell()`` calls will be used but the running time of the rule execution including any Python code.
.. _snakefiles-grouping:
Defining groups for execution
-----------------------------
From Snakemake 5.0 on, it is possible to assign rules to groups.
Such groups will be executed together in cluster or cloud mode, as a so-called **group job**, i.e., all jobs of a particular group will be submitted at once, to the same computing node. By this, queueing and execution time can be
safed, in particular if one or several short-running rules are involved.
Groups can be defined via the ``group`` keyword, e.g.,
.. code-block:: python
samples = [1,2,3,4,5]
rule all:
input:
"test.out"
rule a:
output:
"a/{sample}.out"
group: "mygroup"
shell:
"touch {output}"
rule b:
input:
"a/{sample}.out"
output:
"b/{sample}.out"
group: "mygroup"
shell:
"touch {output}"
rule c:
input:
expand("b/{sample}.out", sample=samples)
output:
"test.out"
shell:
"touch {output}"
Here, jobs from rule ``a`` and ``b`` end up in one group ``mygroup``, whereas jobs from rule ``c`` are executed separately.
Note that Snakemake always determines a **connected subgraph** with the same group id to be a **group job**.
Here, this means that, e.g., the jobs creating ``a/1.out`` and ``b/1.out`` will be in one group, and the jobs creating ``a/2.out`` and ``b/2.out`` will be in a separate group.
However, if we would add ``group: "mygroup"`` to rule ``c``, all jobs would end up in a single group, including the one spawned from rule ``c``, because ``c`` connects all the other jobs.
Piped output
------------
From Snakemake 5.0 on, it is possible to mark output files as pipes, via the ``pipe`` flag, e.g.:
.. code-block:: python
rule all:
input:
expand("test.{i}.out", i=range(2))
rule a:
output:
pipe("test.{i}.txt")
shell:
"for i in {{0..2}}; do echo {wildcards.i} >> {output}; done"
rule b:
input:
"test.{i}.txt"
output:
"test.{i}.out"
shell:
"grep {wildcards.i} < {input} > {output}"
If an output file is marked to be a pipe, then Snakemake will first create a `named pipe <https://en.wikipedia.org/wiki/Named_pipe>`_ with the given name and then execute the creating job simultaneously with the consuming job, inside a **group job** (see above).
Naturally, a pipe output may only have a single consumer.
It is possible to combine explicit group definition as above with pipe outputs.
Thereby, pipe jobs can live within, or (automatically) extend existing groups.
However, the two jobs connected by a pipe may not exist in conflicting groups.
9 changes: 6 additions & 3 deletions snakemake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ def snakemake(snakefile,
else:
cluster_config_content = dict()

run_local = not (cluster or cluster_sync or drmaa or kubernetes)

# force thread use for any kind of cluster
use_threads = force_use_threads or (os.name != "posix") or cluster or cluster_sync or drmaa
if not keep_logger:
Expand Down Expand Up @@ -381,7 +383,8 @@ def snakemake(snakefile,
restart_times=restart_times,
attempt=attempt,
default_remote_provider=_default_remote_provider,
default_remote_prefix=default_remote_prefix)
default_remote_prefix=default_remote_prefix,
run_local=run_local)
success = True
workflow.include(snakefile,
overwrite_first_rule=True,
Expand Down Expand Up @@ -974,10 +977,10 @@ def get_argument_parser(profile=None):
"installation directory.")
parser.add_argument(
"--jobname", "--jn",
default="snakejob.{rulename}.{jobid}.sh",
default="snakejob.{name}.{jobid}.sh",
metavar="NAME",
help="Provide a custom name for the jobscript that is submitted to the "
"cluster (see --cluster). NAME is \"snakejob.{rulename}.{jobid}.sh\" "
"cluster (see --cluster). NAME is \"snakejob.{name}.{jobid}.sh\" "
"per default. The wildcard {jobid} has to be present in the name.")
parser.add_argument(
"--cluster-status",
Expand Down
17 changes: 3 additions & 14 deletions snakemake/cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from snakemake.utils import format
from snakemake.logging import logger
from snakemake.exceptions import WorkflowError

from snakemake.shell import shell


def cwl(path, basedir, input, output, params, wildcards, threads, resources,
log, config, rulename, use_singularity, bench_record):
log, config, rulename, use_singularity, bench_record, jobid):
"""
Load cwl from the given basedir + path and execute it.
"""
Expand Down Expand Up @@ -55,15 +55,4 @@ def file_spec(f):
json.dump(inputs, input_file)
input_file.flush()
cmd = "cwltool {} {} {}".format(args, sourceurl, input_file.name)
try:
subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise WorkflowError("Error executing cwltool:\n" + e.stdout.decode())


# import cwltool.factory
# fac = cwltool.factory.Factory()
#
# tool = fac.make(sourceurl)
#
# tool(**inputs)
shell(cmd, bench_record=bench_record)

0 comments on commit 6fa49e7

Please sign in to comment.