Skip to content

Commit

Permalink
fix(executor): override default resources to remove mem/disk (#91)
Browse files Browse the repository at this point in the history
Closes #90
  • Loading branch information
mdonadoni committed Mar 21, 2024
1 parent b0e3669 commit 9471069
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions reana_workflow_engine_snakemake/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from snakemake.common import async_lock
from snakemake.executors import ClusterExecutor, GenericClusterExecutor
from snakemake.jobs import Job
from snakemake.resources import DefaultResources
from snakemake import scheduler # for monkeypatch

from reana_workflow_engine_snakemake.config import (
Expand Down Expand Up @@ -251,14 +252,27 @@ def run_jobs(
):
"""Run Snakemake jobs using custom REANA executor."""

def _generate_report(workflow_file_path):
workflow_file_path = os.path.join(workflow_workspace, workflow_file)

common_snakemake_args = dict(
snakefile=workflow_file_path,
config=workflow_parameters,
workdir=workflow_workspace,
keep_logger=True,
# Since Snakemake v7.3.0, if default resources are not specified when in
# cluster mode, then `mem_mb` and `disk_mb` are also added to the resources
# of each snakemake rule. This is misleading for the users, as they would see
# in the logs wrong disk/memory limits, which are not actually applied to their
# workflows. For this reason, default resources are here overridden with only the
# "bare" ones, that is only `tmpdir`.
default_resources=DefaultResources(mode="bare"),
)

def _generate_report():
"""Generate HTML report."""
success = snakemake(
workflow_file_path,
config=workflow_parameters,
workdir=workflow_workspace,
**common_snakemake_args,
report=operational_options.get("report", DEFAULT_SNAKEMAKE_REPORT_FILENAME),
keep_logger=True,
)
if not success:
log.error("Error generating workflow HTML report.")
Expand All @@ -269,21 +283,17 @@ def _generate_report(workflow_file_path):
# Monkeypatch GenericClusterExecutor class in `scheduler` module
scheduler.GenericClusterExecutor = REANAClusterExecutor

workflow_file_path = os.path.join(workflow_workspace, workflow_file)
success = snakemake(
workflow_file_path,
**common_snakemake_args,
printshellcmds=True,
# FIXME: Can be anything as it's not directly used. It's supposed
# to be the shell command to submit to job e.g. `condor_q`,
# but we call RJC API client instead.
cluster="reana",
config=workflow_parameters,
workdir=workflow_workspace,
notemp=True,
nodes=SNAKEMAKE_MAX_PARALLEL_JOBS, # enables DAG parallelization
keep_logger=True,
)
# Once the workflow is finished, generate the report,
# taking into account the metadata generated.
_generate_report(workflow_file_path)
_generate_report()
return success

0 comments on commit 9471069

Please sign in to comment.