Skip to content
Permalink
Browse files
feat: cluster sidecar (#1397)
  • Loading branch information
holtgrewe committed Feb 17, 2022
1 parent 0593de1 commit b992cd19dc1c011f536e3662a3ddffc8b1bb9f67
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 3 deletions.
@@ -279,6 +279,19 @@ When snakemake is terminated by pressing ``Ctrl-C``, it will cancel all currentl
You can get the same behaviour with ``--cluster`` by adding ``--cluster-cancel`` and passing a command to use for canceling jobs by their jobid (e.g., ``scancel`` for SLURM or ``qdel`` for SGE).
Most job schedulers can be passed multiple jobids and you can use ``--cluster-cancel-nargs`` to limit the number of arguments (default is 1000 which is reasonable for most schedulers).

Using --cluster-sidecar
:::::::::::::::::::::::

In certain situations, it is necessary to not perform calls to cluster commands directly and instead have a "sidecar" process, e.g., providing a REST API.
One example is when using SLURM where regular calls to ``scontrol show job JOBID`` or ``sacct -j JOBID`` puts a high load on the controller.
Rather, it is better to use the ``squeue`` command with the ``-i/--iterate`` option.

When using ``--cluster``, you can use ``--cluster-sidecar`` to pass in a command that starts a sidecar server.
The command should print one line to stdout and then block and accept connections.
The line will subsequently be available in the calls to ``--cluster``, ``--cluster-status``, and ``--cluster-cancel`` in the environment variable ``SNAKEMAKE_CLUSTER_SIDECAR_VARS``.
In the case of a REST server, you can use this to return the port that the server is listening on and credentials.
When the Snakemake process terminates, the sidecar process will be terminated as well.

Constraining wildcards
::::::::::::::::::::::

@@ -168,6 +168,7 @@ def snakemake(
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
cluster_sidecar=None,
export_cwl=None,
show_failed_logs=False,
keep_incomplete=False,
@@ -300,6 +301,7 @@ def snakemake(
cluster_status (str): status command for cluster execution. If None, Snakemake will rely on flag files. Otherwise, it expects the command to return "success", "failure" or "running" when executing with a cluster jobid as a single argument.
cluster_cancel (str): command to cancel multiple job IDs (like SLURM 'scancel') (default None)
cluster_cancel_nargs (int): maximal number of job ids to pass to cluster_cancel (default 1000)
cluster_sidecar (str): command that starts a sidecar process, see cluster documentation (default None)
export_cwl (str): Compile workflow to CWL and save to given file
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
keep_incomplete (bool): keep incomplete output files of failed jobs
@@ -697,6 +699,7 @@ def snakemake(
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_sidecar=cluster_sidecar,
max_jobs_per_second=max_jobs_per_second,
max_status_checks_per_second=max_status_checks_per_second,
overwrite_groups=overwrite_groups,
@@ -785,6 +788,7 @@ def snakemake(
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_sidecar=cluster_sidecar,
report=report,
report_stylesheet=report_stylesheet,
export_cwl=export_cwl,
@@ -2152,6 +2156,12 @@ def get_argument_parser(profile=None):
help="Specify maximal number of job ids to pass to --cluster-cancel "
"command, defaults to 1000.",
)
group_cluster.add_argument(
"--cluster-sidecar",
default=None,
help="Optional command to start a sidecar process during cluster "
"execution. Only active when --cluster is given as well.",
)
group_cluster.add_argument(
"--drmaa-log-dir",
metavar="DIR",
@@ -2424,7 +2434,7 @@ def adjust_path(f):
args.cluster_config = adjust_path(args.cluster_config)
if args.cluster_sync:
args.cluster_sync = adjust_path(args.cluster_sync)
for key in "cluster_status", "cluster_cancel":
for key in "cluster_status", "cluster_cancel", "cluster_sidecar":
if getattr(args, key):
setattr(args, key, adjust_path(getattr(args, key)))
if args.report_stylesheet:
@@ -2900,6 +2910,7 @@ def open_browser():
cluster_status=args.cluster_status,
cluster_cancel=args.cluster_cancel,
cluster_cancel_nargs=args.cluster_cancel_nargs,
cluster_sidecar=args.cluster_sidecar,
export_cwl=args.export_cwl,
show_failed_logs=args.show_failed_logs,
keep_incomplete=args.keep_incomplete,
@@ -18,6 +18,7 @@
import subprocess
import signal
import tempfile
import threading
from functools import partial
from itertools import chain
from collections import namedtuple
@@ -925,6 +926,7 @@ def __init__(
statuscmd=None,
cancelcmd=None,
cancelnargs=None,
sidecarcmd=None,
cluster_config=None,
jobname="snakejob.{rulename}.{jobid}.sh",
printreason=False,
@@ -947,6 +949,7 @@ def __init__(

self.statuscmd = statuscmd
self.cancelcmd = cancelcmd
self.sidecarcmd = sidecarcmd
self.cancelnargs = cancelnargs
self.external_jobid = dict()
# We need to collect all external ids so we can properly cancel even if
@@ -970,6 +973,10 @@ def __init__(
keepmetadata=keepmetadata,
)

self.sidecar_vars = None
if self.sidecarcmd:
self._launch_sidecar()

if statuscmd:
self.exec_job += " && exit 0 || exit 1"
elif assume_shared_fs:
@@ -982,13 +989,53 @@ def __init__(
"specify a cluster status command."
)

def _launch_sidecar(self):
def copy_stdout(executor, process):
"""Run sidecar process and copy it's stdout to our stdout."""
while process.poll() is None and executor.wait:
buf = process.stdout.readline()
if buf:
self.stdout.write(buf)
# one final time ...
buf = process.stdout.readline()
if buf:
self.stdout.write(buf)

def wait(executor, process):
while executor.wait:
time.sleep(0.5)
process.terminate()
process.wait()
logger.info(
"Cluster sidecar process has terminated (retcode=%d)."
% process.returncode
)

logger.info("Launch sidecar process and read first output line.")
process = subprocess.Popen(
self.sidecarcmd, stdout=subprocess.PIPE, shell=False, encoding="utf-8"
)
self.sidecar_vars = process.stdout.readline()
while self.sidecar_vars and self.sidecar_vars[-1] in "\n\r":
self.sidecar_vars = self.sidecar_vars[:-1]
logger.info("Done reading first output line.")

thread_stdout = threading.Thread(
target=copy_stdout, name="sidecar_stdout", args=(self, process)
)
thread_stdout.start()
thread_wait = threading.Thread(
target=wait, name="sidecar_stdout", args=(self, process)
)
thread_wait.start()

def cancel(self):
def _chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]

if self.cancelcmd: # We have --cluster-[m]cancel
if self.cancelcmd: # We have --cluster-cancel
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
# then pass all job ids at once
jobids = list(self.all_ext_jobids)
@@ -998,8 +1045,14 @@ def _chunks(lst, n):
for chunk in chunks:
try:
cancel_timeout = 2 # rather fail on timeout than miss canceling all
env = dict(os.environ)
if self.sidecar_vars:
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
subprocess.check_call(
[self.cancelcmd] + chunk, shell=False, timeout=cancel_timeout
[self.cancelcmd] + chunk,
shell=False,
timeout=cancel_timeout,
env=env,
)
except subprocess.SubprocessError:
failures += 1
@@ -1070,12 +1123,16 @@ def run(self, job, callback=None, submit_callback=None, error_callback=None):
raise WorkflowError(str(e), rule=job.rule if not job.is_group() else None)

try:
env = dict(os.environ)
if self.sidecar_vars:
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
ext_jobid = (
subprocess.check_output(
'{submitcmd} "{jobscript}"'.format(
submitcmd=submitcmd, jobscript=jobscript
),
shell=True,
env=env,
)
.decode()
.split("\n")
@@ -1124,11 +1181,15 @@ def _wait_for_jobs(self):
def job_status(job, valid_returns=["running", "success", "failed"]):
try:
# this command shall return "success", "failed" or "running"
env = dict(os.environ)
if self.sidecar_vars:
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
ret = subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
env=env,
).decode()
except subprocess.CalledProcessError as e:
if e.returncode < 0:
@@ -66,6 +66,7 @@ def __init__(
cluster_sync=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
cluster_sidecar=None,
drmaa=None,
drmaa_log_dir=None,
kubernetes=None,
@@ -199,6 +200,7 @@ def __init__(
statuscmd=cluster_status,
cancelcmd=cluster_cancel,
cancelnargs=cluster_cancel_nargs,
sidecarcmd=cluster_sidecar,
max_status_checks_per_second=max_status_checks_per_second,
)

@@ -617,6 +617,7 @@ def execute(
cluster_status=None,
cluster_cancel=None,
cluster_cancel_nargs=None,
cluster_sidecar=None,
report=None,
report_stylesheet=None,
export_cwl=False,
@@ -986,6 +987,7 @@ def files(items):
cluster_status=cluster_status,
cluster_cancel=cluster_cancel,
cluster_cancel_nargs=cluster_cancel_nargs,
cluster_sidecar=cluster_sidecar,
cluster_config=cluster_config,
cluster_sync=cluster_sync,
jobname=jobname,
@@ -0,0 +1,13 @@
from snakemake import shell


rule all:
input: 'f.1', 'f.2'

rule one:
output: 'f.1'
shell: "touch {output}"

rule two:
output: 'f.2'
shell: "touch {output}"
Empty file.
Empty file.
@@ -0,0 +1,2 @@
SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE
SNAKEMAKE_CLUSTER_SIDECAR_VARS=FIRST_LINE
@@ -0,0 +1,2 @@
sidecar started
sidecar stopped
@@ -0,0 +1,11 @@
#!/bin/bash
set -x
echo "SNAKEMAKE_CLUSTER_SIDECAR_VARS=$SNAKEMAKE_CLUSTER_SIDECAR_VARS" >>launched.txt
echo --sbatch-- >> sbatch.log
echo `date` >> sbatch.log
tail -n1 $1 >> sbatch.log
cat $1 >> sbatch.log
# daemonize job script
nohup sh $1 0<&- &>/dev/null &
# print PID for job number
echo $!
@@ -0,0 +1,20 @@
#!/bin/bash

set -ex

echo "FIRST_LINE"
echo "sidecar started" > sidecar.txt
sleep infinity &
pid=$!

catch()
{
set -x
kill -TERM $pid || true
echo "sidecar stopped" >> sidecar.txt
exit 0
}

trap catch SIGTERM SIGINT

wait
@@ -0,0 +1 @@
testz0r
@@ -135,6 +135,14 @@ def test_cluster_cancelscript():
assert len(scancel_lines[0].split(" ")) == 3


@skip_on_windows
def test_cluster_sidecar():
run(
dpath("test_cluster_sidecar"),
shellcmd=("snakemake -j 10 --cluster=./sbatch --cluster-sidecar=./sidecar.sh"),
)


@skip_on_windows
def test_cluster_cancelscript_nargs1():
outdir = run(

0 comments on commit b992cd1

Please sign in to comment.