Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix for binding JS_PIPELINE_PATH differences between run and pipelines #164

Merged
merged 4 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions jetstream/backends/slurm_singularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,14 +666,17 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
else:
singularity_args.extend(runner_args)

singularity_exec_args = "--bind $JS_PIPELINE_PATH --bind $PWD --pwd $PWD --workdir /tmp --cleanenv --contain"
singularity_exec_args = "--bind $PWD --pwd $PWD --workdir /tmp --cleanenv --contain"
if os.getenv('JS_PIPELINE_PATH') is not None:
singularity_exec_args += " --bind {}".format(os.getenv('JS_PIPELINE_PATH'))
sbatch_script += "export SINGULARITYENV_JS_PIPELINE_PATH={}\n".format(os.getenv('JS_PIPELINE_PATH'))

if any('gpu' in s for s in [singularity_args, sbatch_args]):
if all('--nv' not in s for s in singularity_args):
singularity_exec_args += ' --nv'

for arg in singularity_args:
singularity_exec_args += f" {arg}"
singularity_exec_args += f" {arg}"

singularity_hostname_arg = ""
if singularity_hostname is not None:
Expand All @@ -689,8 +692,8 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
# We set the SINGULARITY_CACHEDIR to the default if it isn't defined by the user
sbatch_script += f"[[ -v SINGULARITY_CACHEDIR ]] || SINGULARITY_CACHEDIR=$HOME/.singularity/cache\n"
# Searching for the cached image and using it if it exists
sbatch_script += f"for file in $(find $SINGULARITY_CACHEDIR -type f -name \"{singularity_image_digest}\"); do\n"
sbatch_script += f" {singularity_executable} inspect $file > /dev/null 2>&1 && IMAGE_PATH=$file\n"
sbatch_script += f"for file in $(find $SINGULARITY_CACHEDIR/oci-tmp -type f); do\n"
sbatch_script += f" {singularity_executable} inspect $file 2> /dev/null | grep -E '{singularity_image_digest}|{singularity_image}' && IMAGE_PATH=$file && break\n"
sbatch_script += f"done\n"
sbatch_script += f"if [[ -v IMAGE_PATH ]] ; then\n"
sbatch_script += f" {singularity_run_env_vars}{singularity_executable} exec {singularity_exec_args} {singularity_hostname_arg}{singularity_mounts_string} $IMAGE_PATH bash {cmd_script_filename}\n"
Expand Down
13 changes: 10 additions & 3 deletions jetstream/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hashlib
import logging
import os
import re
import urllib.parse
import textwrap
from collections.abc import Mapping
Expand Down Expand Up @@ -76,12 +77,17 @@ def raise_helper(ctx, msg):

@pass_context
def log_helper(ctx, msg, level='INFO'):
"""Allow "raise('msg')" to be used in templates"""
"""Allow "log('msg')" to be used in templates"""
level = logging._checkLevel(level)
log.log(level, f'{ctx.name}: {msg}')
return ''


def expand_path(ctx, path):
""""""
return re.sub(r'\$(JS_PIPELINE_PATH|\{([^}]*)\})', ctx.get("__pipeline__")["path"], os.path.expandvars(path))


def basename(path):
"""Allow "{{ path|basename }}" to be used in templates"""
return os.path.basename(path)
Expand All @@ -105,12 +111,13 @@ def sha256(value):
return h.hexdigest()


def md5(path):
@pass_context
def md5(ctx, path):
"""Allow "{{ path|md5 }}" to be used in templates. A good
use case is to track the md5sum of a script or other file that may
change over time. Causing the render to update on file change"""
hash_md5 = hashlib.md5()
with open(path, "rb") as f:
with open(expand_path(ctx=ctx, path=path), "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
Expand Down