Skip to content
Permalink
Browse files
feat: Adding --cluster-cancel and --cluster-cancel-nargs (#1395)
* Adding support for --cluster-[m]cancel

* Adding support for --cluster-[m]cancel.

* Adressing points from code review

* [ci skip] fix typos

Co-authored-by: Johannes Köster <johannes.koester@uni-due.de>
  • Loading branch information
holtgrewe and johanneskoester committed Feb 17, 2022
1 parent 0246bc9 commit 0593de134499712929ba75e65f65df90491eac2e
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 34 deletions.
@@ -48,8 +48,7 @@ and forward it to the cluster scheduler:
In order to avoid specifying ``runtime_min`` for each rule, you can make use of the ``--default-resources`` flag, see ``snakemake --help``.

If your cluster system supports `DRMAA <https://www.drmaa.org/>`_, Snakemake can make use of that to increase the control over jobs.
E.g. jobs can be cancelled upon pressing ``Ctrl+C``, which is not possible with the generic ``--cluster`` support.
If your cluster system supports `DRMAA <https://www.drmaa.org/>`_, Snakemake can make use of that to control jobs.
With DRMAA, no ``qsub`` command needs to be provided, but system specific arguments can still be given as a string, e.g.

.. code-block:: console
@@ -61,6 +60,13 @@ Else, the arguments will be interpreted as part of the normal Snakemake argument

Adapting to a specific cluster can involve quite a lot of options. It is therefore a good idea to setup a :ref:`a profile <profiles>`.

.. note::
Are you using the SLURM job scheduler?

In this case, it will be more robust to use the ``--cluster``, ``--cluster-status``, and ``--cluster-cancel`` arguments than using DRMAA.
The reason is that the slurm-drmaa package is not maintained by the SLURM vendor SchedMD and less well supported.
Effectively, you will run into timeouts in DRMAA calls sooner.

--------------
Job Properties
--------------
@@ -272,6 +272,13 @@ To use this script call snakemake similar to below, where ``status.py`` is the s
$ snakemake all --jobs 100 --cluster "sbatch --cpus-per-task=1 --parsable" --cluster-status ./status.py
Using --cluster-cancel
::::::::::::::::::::::

When snakemake is terminated by pressing ``Ctrl-C``, it will cancel all currently running node when using ``--drmaa``.
You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` and passing a command to use for canceling jobs by their jobid (e.g., ``scancel`` for SLURM or ``qdel`` for SGE).
Most job schedulers can be passed multiple jobids and you can use ``--cluster-cancel-nargs`` to limit the number of arguments (default is 1000 which is reasonable for most schedulers).

Constraining wildcards
::::::::::::::::::::::

@@ -166,6 +166,8 @@ def snakemake(
tibanna_config=False,
assume_shared_fs=True,
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
export_cwl=None,
show_failed_logs=False,
keep_incomplete=False,
@@ -296,6 +298,8 @@ def snakemake(
tibanna_config (list): Additional tibanna config e.g. --tibanna-config spot_instance=true subnet=<subnet_id> security group=<security_group_id>
assume_shared_fs (bool): assume that cluster nodes share a common filesystem (default true).
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as a single argument.
cluster_cancel (str): command to cancel multiple job IDs (like SLURM 'scancel') (default None)
cluster_cancel_nargs (int): maximal number of job ids to pass to cluster_cancel (default 1000)
export_cwl (str): Compile workflow to CWL and save to given file
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
keep_incomplete (bool): keep incomplete output files of failed jobs
@@ -691,6 +695,8 @@ def snakemake(
tibanna_config=tibanna_config,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
overwrite_groups=overwrite_groups,
@@ -777,6 +783,8 @@ def snakemake(
conda_create_envs_only=conda_create_envs_only,
assume_shared_fs=assume_shared_fs,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
report=report,
report_stylesheet=report_stylesheet,
export_cwl=export_cwl,
@@ -2131,6 +2139,19 @@ def get_argument_parser(profile=None):
"'success' if the job was successfull, 'failed' if the job failed and "
"'running' if the job still runs.",
)
group_cluster.add_argument(
"--cluster-cancel",
default=None,
help="Specify a command that allows to stop currently running jobs. "
"The command will be passed a single argument, the job id.",
)
group_cluster.add_argument(
"--cluster-cancel-nargs",
type=int,
default=1000,
help="Specify maximal number of job ids to pass to --cluster-cancel "
"command, defaults to 1000.",
)
group_cluster.add_argument(
"--drmaa-log-dir",
metavar="DIR",
@@ -2403,8 +2424,9 @@ def adjust_path(f):
args.cluster_config = adjust_path(args.cluster_config)
if args.cluster_sync:
args.cluster_sync = adjust_path(args.cluster_sync)
if args.cluster_status:
args.cluster_status = adjust_path(args.cluster_status)
for key in "cluster_status", "cluster_cancel":
if getattr(args, key):
setattr(args, key, adjust_path(getattr(args, key)))
if args.report_stylesheet:
args.report_stylesheet = adjust_path(args.report_stylesheet)

@@ -2876,6 +2898,8 @@ def open_browser():
default_remote_prefix=args.default_remote_prefix,
assume_shared_fs=not args.no_shared_fs,
cluster_status=args.cluster_status,
cluster_cancel=args.cluster_cancel,
cluster_cancel_nargs=args.cluster_cancel_nargs,
export_cwl=args.export_cwl,
show_failed_logs=args.show_failed_logs,
keep_incomplete=args.keep_incomplete,
@@ -108,7 +108,7 @@ def get_set_resources_args(self):
"{}:{}={}".format(rule, name, value)
for rule, res in self.workflow.overwrite_resources.items()
for name, value in res.items()
),
)
)
return ""

@@ -923,6 +923,8 @@ def __init__(
cores,
submitcmd="qsub",
statuscmd=None,
cancelcmd=None,
cancelnargs=None,
cluster_config=None,
jobname="snakejob.{rulename}.{jobid}.sh",
printreason=False,
@@ -944,7 +946,12 @@ def __init__(
)

self.statuscmd = statuscmd
self.cancelcmd = cancelcmd
self.cancelnargs = cancelnargs
self.external_jobid = dict()
# We need to collect all external ids so we can properly cancel even if
# the status update queue is running.
self.all_ext_jobids = list()

super().__init__(
workflow,
@@ -976,8 +983,38 @@ def __init__(
)

def cancel(self):
logger.info("Will exit after finishing currently running jobs.")
self.shutdown()
def _chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]

if self.cancelcmd: # We have --cluster-[m]cancel
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
# then pass all job ids at once
jobids = list(self.all_ext_jobids)
chunks = list(_chunks(jobids, self.cancelnargs or len(jobids)))
# Go through the chunks and cancel the jobs, warn in case of failures.
failures = 0
for chunk in chunks:
try:
cancel_timeout = 2 # rather fail on timeout than miss canceling all
subprocess.check_call(
[self.cancelcmd] + chunk, shell=False, timeout=cancel_timeout
)
except subprocess.SubprocessError:
failures += 1
if failures:
logger.info(
(
"{} out of {} calls to --cluster-cancel failed. This is safe to "
"ignore in most cases."
).format(failures, len(chunks))
)
else:
logger.info(
"No --cluster-cancel given. Will exit after finishing currently running jobs."
)
self.shutdown()

def register_job(self, job):
# Do not register job here.
@@ -1008,6 +1045,7 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
)
submit_callback(job)
with self.lock:
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
@@ -1063,6 +1101,7 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
submit_callback(job)

with self.lock:
self.all_ext_jobids.append(ext_jobid)
self.active_jobs.append(
GenericClusterJob(
job,
@@ -4,6 +4,7 @@
__license__ = "MIT"

import os, signal, sys
import datetime
import threading
import operator
import time
@@ -63,6 +64,8 @@ def __init__(
cluster_status=None,
cluster_config=None,
cluster_sync=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
drmaa=None,
drmaa_log_dir=None,
kubernetes=None,
@@ -194,6 +197,8 @@ def __init__(
constructor = partial(
GenericClusterExecutor,
statuscmd=cluster_status,
cancelcmd=cluster_cancel,
cancelnargs=cluster_cancel_nargs,
max_status_checks_per_second=max_status_checks_per_second,
)

@@ -449,7 +454,7 @@ def schedule(self):
if user_kill or (not self.keepgoing and errors) or executor_error:
if user_kill == "graceful":
logger.info(
"Will exit after finishing " "currently running jobs."
"Will exit after finishing currently running jobs (scheduler)."
)

if executor_error:
@@ -581,10 +586,7 @@ def _free_resources(self, job):
value = self.calc_resource(name, value)
self.resources[name] += value

def _proceed(
self,
job,
):
def _proceed(self, job):
"""Do stuff after job is finished."""
with self._lock:
self._tofinish.append(job)
@@ -653,10 +655,7 @@ def job_selector_ilp(self, jobs):
# assert self.resources["_cores"] > 0
scheduled_jobs = {
job: pulp.LpVariable(
"job_{}".format(idx),
lowBound=0,
upBound=1,
cat=pulp.LpInteger,
"job_{}".format(idx), lowBound=0, upBound=1, cat=pulp.LpInteger
)
for idx, job in enumerate(jobs)
}
@@ -615,6 +615,8 @@ def execute(
conda_create_envs_only=False,
assume_shared_fs=True,
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
report=None,
report_stylesheet=None,
export_cwl=False,
@@ -982,6 +984,8 @@ def files(items):
touch=touch,
cluster=cluster,
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_config=cluster_config,
cluster_sync=cluster_sync,
jobname=jobname,
@@ -4,8 +4,11 @@
__license__ = "MIT"

import os
import signal
import sys
import shlex
import shutil
import time
from os.path import join
import tempfile
import hashlib
@@ -98,6 +101,7 @@ def run(
targets=None,
container_image=os.environ.get("CONTAINER_IMAGE", "snakemake/snakemake:latest"),
shellcmd=None,
sigint_after=None,
**params,
):
"""
@@ -156,16 +160,24 @@ def run(
raise ValueError("shellcmd does not start with snakemake")
shellcmd = "{} -m {}".format(sys.executable, shellcmd)
try:
subprocess.check_output(
shellcmd,
cwd=path if no_tmpdir else tmpdir,
shell=True,
)
success = True
if sigint_after is None:
subprocess.check_output(
shellcmd, cwd=path if no_tmpdir else tmpdir, shell=True
)
success = True
else:
with subprocess.Popen(
shlex.split(shellcmd), cwd=path if no_tmpdir else tmpdir
) as process:
time.sleep(sigint_after)
process.send_signal(signal.SIGINT)
time.sleep(2)
success = process.returncode == 0
except subprocess.CalledProcessError as e:
success = False
print(e.stderr, file=sys.stderr)
else:
assert sigint_after is None, "Cannot sent SIGINT when calling directly"
success = snakemake(
snakefile=original_snakefile if no_tmpdir else snakefile,
cores=cores,
@@ -0,0 +1,13 @@
from snakemake import shell


rule all:
input: 'f.1', 'f.2'

rule one:
output: 'f.1'
shell: "sleep 120s; touch {output}"

rule two:
output: 'f.2'
shell: "sleep 120s; touch {output}"
@@ -0,0 +1,2 @@
cancel
cancel
@@ -0,0 +1,10 @@
#!/bin/bash
set -x
echo --sbatch-- >> sbatch.log
echo `date` >> sbatch.log
tail -n1 $1 >> sbatch.log
cat $1 >> sbatch.log
# daemonize job script
nohup sh $1 0<&- &>/dev/null &
# print PID for job number
echo $!
@@ -0,0 +1,20 @@
#!/bin/bash
set -x
echo `date`
echo cancel $* >>scancel.txt

list_descendants ()
{
local children=$(ps -o pid= --ppid "$1")

for pid in $children
do
list_descendants "$pid"
done

echo "$children"
}

for x in $*; do
kill $(list_descendants $x)
done
@@ -0,0 +1 @@
echo running
@@ -0,0 +1 @@
testz0r

0 comments on commit 0593de1

Please sign in to comment.