Skip to content
Permalink
Browse files
perf: more extensive caching of source files, including wrappers. (#1182
)

* initial draft of more caching of source files

* fixes

* fixes

* add retry behavior

* fixes

* fixes

* current basedir joining

* fixes

* fix config schema handling

* fix

* fix path

* fix

* fix path simplification

* fmt

* fix lints

* fix lints

* handle different path types

* fixes

* fix arg name

* handle local file URLs

* fix

* fixes

* fixes

* remove retry code

* fix archive

* handle is_local

* exception handling fix

* dbg

* fixes

* gitlab support, docs

* minor
  • Loading branch information
johanneskoester committed Sep 24, 2021
1 parent 75a544b commit bdb75f828a3ae27ba97ea6cd5e71a34ac7b27eea
Show file tree
Hide file tree
Showing 17 changed files with 538 additions and 204 deletions.
@@ -69,14 +69,20 @@ Consider the following example:
configfile: "config/config.yaml"
module dna_seq:
snakefile: "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/Snakefile"
config: config
snakefile:
# here, it is also possible to provide a plain raw URL like "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/workflow/Snakefile"
github("snakemake-workflows/dna-seq-gatk-variant-calling", path="workflow/Snakefile" tag="v2.0.1")
config:
config
use rule * from dna_seq
First, we load a local configuration file.
Next, we define the module ``dna_seq`` to be loaded from the URL ``https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/blob/v2.0.1/Snakefile``, while using the contents of the local configuration file.
Next, we define the module ``dna_seq`` to be loaded from the URL ``https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/workflow/Snakefile``, while using the contents of the local configuration file.
Note that it is possible to either specify the full URL pointing to the raw Snakefile as a string or to use the github marker as done here.
With the latter, Snakemake can however cache the used source files persistently (if a tag is given), such that they don't have to be downloaded on each invocation.
Finally we declare all rules of the dna_seq module to be used.

This kind of deployment is equivalent to just cloning the original repository and modifying the configuration in it.
However, the advantage here is that we are (a) able to easily extend of modify the workflow, while making the changes transparent, and (b) we can store this workflow in a separate (e.g. private) git repository, along with for example configuration and meta data, without the need to duplicate the workflow code.
Finally, we are always able to later combine another module into the current workflow, e.g. when further kinds of analyses are needed.
@@ -92,7 +98,9 @@ For example, we can easily add another rule to extend the given workflow:
configfile: "config/config.yaml"
module dna_seq:
snakefile: "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/Snakefile"
snakefile:
# here, it is also possible to provide a plain raw URL like "https://github.com/snakemake-workflows/dna-seq-gatk-variant-calling/raw/v2.0.1/workflow/Snakefile"
github("snakemake-workflows/dna-seq-gatk-variant-calling", path="workflow/Snakefile" tag="v2.0.1")
config: config
use rule * from dna_seq
@@ -106,6 +114,8 @@ For example, we can easily add another rule to extend the given workflow:
notebook:
"notebooks/plot-vafs.py.ipynb"
Moreover, it is possible to further extend the workflow with other modules, thereby generating an integrative analysis.

----------------------------------
Uploading workflows to WorkflowHub
----------------------------------
@@ -126,12 +126,14 @@ With Snakemake 6.0 and later, it is possible to define external workflows as mod
min_version("6.0")
module other_workflow:
snakefile: "other_workflow/Snakefile"
snakefile:
# here, plain paths, URLs and the special markers for code hosting providers (see below) are possible.
"other_workflow/Snakefile"
use rule * from other_workflow as other_*
The first statement registers the external workflow as a module, by defining the path to the main snakefile.
The snakefile property of the module can either take a local path or a HTTP/HTTPS url.
Here, plain paths, HTTP/HTTPS URLs and special markers for code hosting providers like Github or Gitlab are possible (see :ref:`snakefile-code-hosting-providers`).
The second statement declares all rules of that module to be used in the current one.
Thereby, the ``as other_*`` at the end renames all those rule with a common prefix.
This can be handy to avoid rule name conflicts (note that rules from modules can otherwise overwrite rules from your current workflow or other modules).
@@ -149,6 +151,7 @@ It is possible to overwrite the global config dictionary for the module, which i
configfile: "config/config.yaml"
module other_workflow:
# here, plain paths, URLs and the special markers for code hosting providers (see below) are possible.
snakefile: "other_workflow/Snakefile"
config: config["other-workflow"]
@@ -167,6 +170,7 @@ This modification can be performed after a general import, and will overwrite an
min_version("6.0")
module other_workflow:
# here, plain paths, URLs and the special markers for code hosting providers (see below) are possible.
snakefile: "other_workflow/Snakefile"
config: config["other-workflow"]
@@ -261,3 +265,34 @@ This function automatically determines the absolute path to the file (here ``../
When executing, snakemake first tries to create (or update, if necessary) ``test.txt`` (and all other possibly mentioned dependencies) by executing the subworkflow.
Then the current workflow is executed.
This can also happen recursively, since the subworkflow may have its own subworkflows as well.


.. _snakefile-code-hosting-providers:

----------------------
Code hosting providers
----------------------

To obtain the correct URL to an external source code resource (e.g. a snakefile, see :ref:`snakefiles-modules`), Snakemake provides markers for code hosting providers.
Currently, Github

.. code-block:: python
github("owner/repo", path="workflow/Snakefile", tag="v1.0.0")
and Gitlab are supported:

.. code-block:: python
gitlab("owner/repo", path="workflow/Snakefile", tag="v1.0.0")
For the latter, it is also possible to specify an alternative host, e.g.

.. code-block:: python
gitlab("owner/repo", path="workflow/Snakefile", tag="v1.0.0", host="somecustomgitlab.org")
While specifying a tag is highly encouraged, it is alternatively possible to specify a `commit` or a `branch` via respective keyword arguments.
Note that only when specifying a tag or a commit, Snakemake is able to persistently cache the source, thereby avoiding to repeatedly query it in case of multiple executions.
@@ -35,6 +35,7 @@ def cwl(
use_singularity,
bench_record,
jobid,
runtime_sourcecache_path,
):
"""
Load cwl from the given basedir + path and execute it.
@@ -5,6 +5,7 @@

import os
import re
from snakemake.sourcecache import LocalGitFile, LocalSourceFile, infer_source_file
import subprocess
import tempfile
from urllib.request import urlopen
@@ -45,7 +46,7 @@ class Env:
def __init__(
self, env_file, workflow, env_dir=None, container_img=None, cleanup=None
):
self.file = env_file
self.file = infer_source_file(env_file)

self.frontend = workflow.conda_frontend
self.workflow = workflow
@@ -161,7 +162,9 @@ def create_archive(self):
try:
# Download
logger.info(
"Downloading packages for conda environment {}...".format(self.file)
"Downloading packages for conda environment {}...".format(
self.file.get_path_or_uri()
)
)
os.makedirs(env_archive, exist_ok=True)
try:
@@ -216,11 +219,16 @@ def create(self, dryrun=False):
env_file = self.file
tmp_file = None

if not is_local_file(env_file) or env_file.startswith("git+file:/"):
if not isinstance(env_file, LocalSourceFile) or isinstance(
env_file, LocalGitFile
):
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp:
# write to temp file such that conda can open it
tmp.write(self.content)
env_file = tmp.name
tmp_file = tmp.name
else:
env_file = env_file.get_path_or_uri()

env_hash = self.hash
env_path = self.path
@@ -258,13 +266,13 @@ def create(self, dryrun=False):
if dryrun:
logger.info(
"Incomplete Conda environment {} will be recreated.".format(
utils.simplify_path(self.file)
self.file.simplify_path()
)
)
else:
logger.info(
"Removing incomplete Conda environment {}...".format(
utils.simplify_path(self.file)
self.file.simplify_path()
)
)
shutil.rmtree(env_path, ignore_errors=True)
@@ -274,15 +282,13 @@ def create(self, dryrun=False):
if dryrun:
logger.info(
"Conda environment {} will be created.".format(
utils.simplify_path(self.file)
self.file.simplify_path()
)
)
return env_path
conda = Conda(self._container_img)
logger.info(
"Creating conda environment {}...".format(
utils.simplify_path(self.file)
)
"Creating conda environment {}...".format(self.file.simplify_path())
)
# Check if env archive exists. Use that if present.
env_archive = self.archive_file
@@ -164,6 +164,11 @@ def __init__(self, *args, lineno=None, snakefile=None, rule=None):
self.rule = rule


class SourceFileError(WorkflowError):
def __init__(self, msg):
super().__init__("Error in source file definition: {}".format(msg))


class WildcardError(WorkflowError):
pass

@@ -515,6 +515,7 @@ def job_args_and_prepare(self, job):
self.workflow.edit_notebook,
self.workflow.conda_base_path,
job.rule.basedir,
self.workflow.sourcecache.runtime_cache_path,
)

def run_single_job(self, job):
@@ -2296,6 +2297,7 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
):
"""
Wrapper around the run method that handles exceptions and benchmarking.
@@ -2376,6 +2378,7 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
)
else:
# The benchmarking is started here as we have a run section
@@ -2406,6 +2409,7 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
)
# Store benchmark record for this iteration
bench_records.append(bench_record)
@@ -2434,20 +2438,26 @@ def run_wrapper(
edit_notebook,
conda_base_path,
basedir,
runtime_sourcecache_path,
)
except (KeyboardInterrupt, SystemExit) as e:
# Re-raise the keyboard interrupt in order to record an error in the
# scheduler but ignore it
raise e
except (Exception, BaseException) as ex:
log_verbose_traceback(ex)
# this ensures that exception can be re-raised in the parent thread
lineno, file = get_exception_origin(ex, linemaps)
raise RuleException(
format_error(
ex, lineno, linemaps=linemaps, snakefile=file, show_traceback=True
origin = get_exception_origin(ex, linemaps)
if origin is not None:
log_verbose_traceback(ex)
lineno, file = origin
raise RuleException(
format_error(
ex, lineno, linemaps=linemaps, snakefile=file, show_traceback=True
)
)
)
else:
# some internal bug, just reraise
raise ex

if benchmark is not None:
try:
@@ -1345,11 +1345,11 @@ def git_content(git_file):
"""
This function will extract a file from a git repository, one located on
the filesystem.
Expected format is git+file:///path/to/your/repo/path_to_file@@version
Expected format is git+file:///path/to/your/repo/path_to_file@version
Args:
env_file (str): consist of path to repo, @, version and file information
Ex: git+file:////home/smeds/snakemake-wrappers/bio/fastqc/wrapper.py@0.19.3
Ex: git+file:///home/smeds/snakemake-wrappers/bio/fastqc/wrapper.py@0.19.3
Returns:
file content or None if the expected format isn't meet
"""
@@ -10,6 +10,7 @@
from snakemake.logging import logger
from snakemake.common import is_local_file
from snakemake.common import ON_WINDOWS
from snakemake.sourcecache import SourceCache

KERNEL_STARTED_RE = re.compile(r"Kernel started: (?P<kernel_id>\S+)")
KERNEL_SHUTDOWN_RE = re.compile(r"Kernel shutdown: (?P<kernel_id>\S+)")
@@ -152,6 +153,7 @@ def get_preamble(self):
self.bench_iteration,
self.cleanup_scripts,
self.shadow_dir,
self.is_local,
preamble_addendum=preamble_addendum,
)

@@ -218,7 +220,8 @@ def notebook(
bench_iteration,
cleanup_scripts,
shadow_dir,
edit=None,
edit,
runtime_sourcecache_path,
):
"""
Load a script from the given basedir + path and execute it.
@@ -251,9 +254,12 @@ def notebook(
)

if not draft:
path, source, language = get_source(path, basedir, wildcards, params)
path, source, language, is_local = get_source(
path, SourceCache(runtime_sourcecache_path), basedir, wildcards, params
)
else:
source = None
is_local = True

exec_class = get_exec_class(language)

@@ -280,6 +286,7 @@ def notebook(
bench_iteration,
cleanup_scripts,
shadow_dir,
is_local,
)

if draft:
@@ -507,7 +507,7 @@ def start(self):
"resources, log, version, rule, conda_env, container_img, "
"singularity_args, use_singularity, env_modules, bench_record, jobid, "
"is_shell, bench_iteration, cleanup_scripts, shadow_dir, edit_notebook, "
"conda_base_path, basedir):".format(
"conda_base_path, basedir, runtime_sourcecache_path):".format(
rulename=self.rulename
if self.rulename is not None
else self.snakefile.rulecount
@@ -608,7 +608,7 @@ def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir"
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)


@@ -621,7 +621,7 @@ def args(self):
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, "
"edit_notebook"
"edit_notebook, runtime_sourcecache_path"
)


@@ -634,7 +634,7 @@ def args(self):
", input, output, params, wildcards, threads, resources, log, "
"config, rule, conda_env, conda_base_path, container_img, singularity_args, env_modules, "
"bench_record, workflow.wrapper_prefix, jobid, bench_iteration, "
"cleanup_scripts, shadow_dir"
"cleanup_scripts, shadow_dir, runtime_sourcecache_path"
)


@@ -645,7 +645,7 @@ class CWL(Script):
def args(self):
yield (
", basedir, input, output, params, wildcards, threads, resources, log, "
"config, rule, use_singularity, bench_record, jobid"
"config, rule, use_singularity, bench_record, jobid, runtime_sourcecache_path"
)


@@ -1157,7 +1157,7 @@ def python(self, token):

class Snakefile:
def __init__(self, path, workflow, rulecount=0):
self.path = path
self.path = path.get_path_or_uri()
self.file = workflow.sourcecache.open(path)
self.tokens = tokenize.generate_tokens(self.file.readline)
self.rulecount = rulecount

0 comments on commit bdb75f8

Please sign in to comment.