Skip to content
Permalink
Browse files
feat: adding service jobs, i.e. the ability to define jobs that provi…
…de a resource for consumers (like a shared memory device or a database), and will be automatically terminated by Snakemake once all consumers are finished. (#1413)

* feat: ability to flag output files as service, meaning that the generating job is expected to stay active until the file is not needed anymore by consuming jobs

* handle service job termination

* docs: added service rule/job documentation

* add missing testcase
  • Loading branch information
johanneskoester committed Feb 22, 2022
1 parent 23c943f commit a471adbb785e5ac7f0c854fd09781c502b577c65
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 44 deletions.
@@ -1521,6 +1521,8 @@ This enables to almost arbitrarily partition the DAG, e.g. in order to safe netw

For execution on the cloud using Google Life Science API and preemptible instances, we expect all rules in the group to be homogenously set as preemptible instances (e.g., with command-line option ``--preemptible-rules``), such that a preemptible VM is requested for the execution of the group job.

.. _snakefiles-piped-output:

Piped output
------------

@@ -1555,6 +1557,50 @@ It is possible to combine explicit group definition as above with pipe outputs.
Thereby, pipe jobs can live within, or (automatically) extend existing groups.
However, the two jobs connected by a pipe may not exist in conflicting groups.


.. _snakefiles-service-rules:

Service rules/jobs
------------------

From Snakemake 7.0 on, it is possible to define so-called service rules.
Jobs spawned from such rules provide at least one special output file that is marked as ``service``, which means that it is considered to provide a resource that shall be kept available until all consuming jobs are finished.
This can for example be the socket of a database, a shared memory device, a ramdisk, and so on.
It can even just be a dummy file, and access to the service might happen via a different channel (e.g. a local http port).
Service jobs are expected to not exit after creating that resource, but instead wait until Snakemake terminates them (e.g. via SIGTERM on Unixoid systems).

Consider the following example:

.. code-block:: python
rule the_service:
output:
service("foo.socket")
shell:
# here we simulate some kind of server process that provides data via a socket
"ln -s /dev/random {output}; sleep 10000"
rule consumer1:
input:
"foo.socket"
output:
"test.txt"
shell:
"head -n1 {input} > {output}"
rule consumer2:
input:
"foo.socket"
output:
"test2.txt"
shell:
"head -n1 {input} > {output}"
Snakemake will schedule the service with all consumers to the same physical node (in the future we might provide further controls and other modes of operation).
Once all consumer jobs are finished, the service job will be terminated automatically by Snakemake, and the service output will be removed.

.. _snakefiles-paramspace:

Parameter space exploration
@@ -494,7 +494,7 @@ def check_and_touch_output(
expanded_output,
latency_wait=wait,
force_stay_on_remote=force_stay_on_remote,
ignore_pipe=True,
ignore_pipe_or_service=True,
)
except IOError as e:
raise MissingOutputException(
@@ -1215,14 +1215,14 @@ def postprocess(self, update_needrun=True):
if update_needrun:
self.update_needrun()
self.update_priority()
self.handle_pipes()
self.handle_pipes_and_services()
self.update_groups()
self.update_ready()
self.close_remote_objects()
self.update_checkpoint_outputs()

def handle_pipes(self):
"""Use pipes to determine job groups. Check if every pipe has exactly
def handle_pipes_and_services(self):
"""Use pipes and services to determine job groups. Check if every pipe has exactly
one consumer"""

visited = set()
@@ -1231,9 +1231,11 @@ def handle_pipes(self):
if job.group is not None:
candidate_groups.add(job.group)
all_depending = set()
has_pipe = False
has_pipe_or_service = False
for f in job.output:
if is_flagged(f, "pipe"):
is_pipe = is_flagged(f, "pipe")
is_service = is_flagged(f, "service")
if is_pipe or is_service:
if job.is_run:
raise WorkflowError(
"Rule defines pipe output but "
@@ -1244,11 +1246,11 @@ def handle_pipes(self):
rule=job.rule,
)

has_pipe = True
has_pipe_or_service = True
depending = [
j for j, files in self.depending[job].items() if f in files
]
if len(depending) > 1:
if is_pipe and len(depending) > 1:
raise WorkflowError(
"Output file {} is marked as pipe "
"but more than one job depends on "
@@ -1259,40 +1261,40 @@ def handle_pipes(self):
)
elif len(depending) == 0:
raise WorkflowError(
"Output file {} is marked as pipe "
"Output file {} is marked as pipe or service "
"but it has no consumer. This is "
"invalid because it can lead to "
"a dead lock.".format(f),
rule=job.rule,
)

depending = depending[0]

if depending.is_run:
raise WorkflowError(
"Rule consumes pipe input but "
"uses a 'run' directive. This is "
"not possible for technical "
"reasons. Consider using 'shell' or "
"'script'.",
rule=job.rule,
)
for dep in depending:
if dep.is_run:
raise WorkflowError(
"Rule consumes pipe or service input but "
"uses a 'run' directive. This is "
"not possible for technical "
"reasons. Consider using 'shell' or "
"'script'.",
rule=job.rule,
)

all_depending.add(depending)
if depending.group is not None:
candidate_groups.add(depending.group)
if not has_pipe:
all_depending.add(dep)
if dep.group is not None:
candidate_groups.add(dep.group)
if not has_pipe_or_service:
continue

if len(candidate_groups) > 1:
if all(isinstance(group, CandidateGroup) for group in candidate_groups):
# all candidates are newly created groups, merge them into one
group = candidate_groups.pop()
for g in candidate_groups:
group.merge(g)
else:
raise WorkflowError(
"An output file is marked as "
"pipe, but consuming jobs "
"pipe or service, but consuming jobs "
"are part of conflicting "
"groups.",
rule=job.rule,
@@ -540,16 +540,17 @@ def run_single_job(self, job):
return future

def run_group_job(self, job):
"""Run a pipe group job.
"""Run a pipe or service group job.
This lets all items run simultaneously."""
# we only have to consider pipe groups because in local running mode,
# we only have to consider pipe or service groups because in local running mode,
# these are the only groups that will occur

futures = [self.run_single_job(j) for j in job]
n_non_service = sum(1 for j in job if not j.is_service)

while True:
k = 0
n_finished = 0
for f in futures:
if f.done():
ex = f.exception()
@@ -561,8 +562,19 @@ def run_group_job(self, job):
shell.kill(j.jobid)
raise ex
else:
k += 1
if k == len(futures):
n_finished += 1
if n_finished >= n_non_service:
# terminate all service jobs since all consumers are done
for j in job:
if j.is_service:
logger.info(
f"Terminating service job {j.jobid} since all consuming jobs are finished."
)
shell.terminate(j.jobid)
logger.info(
f"Service job {j.jobid} has been successfully terminated."
)

return
time.sleep(1)

@@ -790,7 +790,7 @@ def __hash__(self):


def wait_for_files(
files, latency_wait=3, force_stay_on_remote=False, ignore_pipe=False
files, latency_wait=3, force_stay_on_remote=False, ignore_pipe_or_service=False
):
"""Wait for given files to be present in the filesystem."""
files = list(files)
@@ -807,7 +807,10 @@ def get_missing():
and (force_stay_on_remote or f.should_stay_on_remote)
)
else os.path.exists(f)
if not (is_flagged(f, "pipe") and ignore_pipe)
if not (
(is_flagged(f, "pipe") or is_flagged(f, "service"))
and ignore_pipe_or_service
)
else True
)
]
@@ -1018,6 +1021,14 @@ def pipe(value):
return flag(value, "pipe", not ON_WINDOWS)


def service(value):
if is_flagged(value, "protected"):
raise SyntaxError("Pipes may not be protected.")
if is_flagged(value, "remote"):
raise SyntaxError("Pipes may not be remote files.")
return flag(value, "service")


def temporary(value):
"""An alias for temp."""
return temp(value)
@@ -38,9 +38,11 @@
def format_files(job, io, dynamicio):
for f in io:
if f in dynamicio:
yield "{} (dynamic)".format(f.format_dynamic())
yield f"{f.format_dynamic()} (dynamic)"
elif is_flagged(f, "pipe"):
yield "{} (pipe)".format(f)
yield f"{f} (pipe)"
elif is_flagged(f, "service"):
yield f"{f} (service)"
elif is_flagged(f, "checkpoint_target"):
yield TBDString()
else:
@@ -186,7 +188,6 @@ def __init__(
self._attempt = self.dag.workflow.attempt

# TODO get rid of these
self.pipe_output = set(f for f in self.output if is_flagged(f, "pipe"))
self.dynamic_output, self.dynamic_input = set(), set()
self.temp_output, self.protected_output = set(), set()
self.touch_output = set()
@@ -466,7 +467,11 @@ def is_run(self):

@property
def is_pipe(self):
return any([is_flagged(o, "pipe") for o in self.output])
return any(is_flagged(o, "pipe") for o in self.output)

@property
def is_service(self):
return any(is_flagged(o, "service") for o in self.output)

@property
def expanded_output(self):
@@ -562,9 +567,9 @@ def output_mintime(self):

def missing_output(self, requested):
def handle_file(f):
# pipe output is always declared as missing
# pipe or service output is always declared as missing
# (even if it might be present on disk for some reason)
if f in self.pipe_output or not f.exists:
if is_flagged(f, "pipe") or is_flagged(f, "service") or not f.exists:
yield f

if self.dynamic_output:
@@ -1256,7 +1261,9 @@ def check_string_resource(res, value1, value2):

self._resources = defaultdict(int)
self._resources["_nodes"] = 1
pipe_group = any([job.is_pipe for job in self.jobs])
pipe_or_service_group = any(
[job.is_pipe or job.is_service for job in self.jobs]
)
# iterate over siblings that can be executed in parallel
for siblings in self.toposorted:
sibling_resources = defaultdict(int)
@@ -1287,7 +1294,7 @@ def check_string_resource(res, value1, value2):
for res, value in sibling_resources.items():
if isinstance(value, int):
if res != "_nodes":
if self.dag.workflow.run_local or pipe_group:
if self.dag.workflow.run_local or pipe_or_service_group:
# in case of local execution, this must be a
# group of jobs that are connected with pipes
# and have to run simultaneously
@@ -1375,11 +1382,11 @@ def postprocess(self, error=False, **kwargs):
if not error:
for job in self.jobs:
self.dag.handle_temp(job)
# remove all pipe outputs since all jobs of this group are done and the
# pipes are no longer needed
# remove all pipe and service outputs since all jobs of this group are done and the
# outputs are no longer needed
for job in self.jobs:
for f in job.output:
if is_flagged(f, "pipe"):
if is_flagged(f, "pipe") or is_flagged(f, "service"):
f.remove()

@property
@@ -1510,6 +1517,7 @@ class Reason:
"nooutput",
"derived",
"pipe",
"service",
"target",
"finished",
]
@@ -1525,6 +1533,7 @@ def __init__(self):
self.nooutput = False
self.derived = True
self.pipe = False
self.service = False

@lazy_property
def updated_input(self):
@@ -1582,6 +1591,14 @@ def __str__(self):
", ".join(self.updated_input_run)
)
)
if self.pipe:
s.append(
"Output file is a pipe and has to be filled for consuming job."
)
if self.service:
s.append(
"Job provides a service which has to be kept active until all consumers are finished."
)
s = "; ".join(s)
if self.finished:
return "Finished (was: {s})".format(s=s)
@@ -1596,5 +1613,6 @@ def __bool__(self):
or self.noio
or self.nooutput
or self.pipe
or self.service
)
return v and not self.finished
@@ -531,6 +531,7 @@ def _set_inoutput_item(self, item, output=False, name=None):
"directory",
"touch",
"pipe",
"service",
]:
logger.warning(
"The flag '{}' used in rule {} is only valid for outputs, not inputs.".format(
@@ -911,7 +911,7 @@ def calc_resource(self, name, value):
raise WorkflowError(
"Job needs {name}={res} but only {name}={gres} "
"are available. This is likely because two "
"jobs are connected via a pipe and have to run "
"jobs are connected via a pipe or a service output and have to run "
"simultaneously. Consider providing more "
"resources (e.g. via --cores).".format(name=name, res=value, gres=gres)
)

0 comments on commit a471adb

Please sign in to comment.