Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:epigen/looper into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
nsheff committed Jul 21, 2017
2 parents 5f14011 + aeabf1b commit 0be1a7a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 66 deletions.
87 changes: 32 additions & 55 deletions looper/looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def parse_arguments():
Argument Parsing.
:return argparse.Namespace, list[str]: namespace parsed according to
arguments defined here, them undefined arguments
arguments defined here, and additional options arguments undefined
here and to be handled downstream
"""

# Main looper program help text messages
Expand Down Expand Up @@ -390,26 +391,37 @@ def run(prj, args, remaining_args):
_LOGGER.warn("Submission settings "
"lack memory specification")

# Add the command string and job name to the submit_settings object
# Add command string and job name to the submit_settings object.
submit_settings["JOBNAME"] = \
sample.sample_name + "_" + pipeline_key
submit_settings["CODE"] = cmd

# Submit job!
_LOGGER.debug("Attempting job submission: '%s' ('%s')",
sample.sample_name, pl_name)
submitted = cluster_submit(
sample, prj.compute.submission_template,
prj.compute.submission_command, submit_settings,
prj.metadata.submission_subdir, sample_output_folder,
pl_name, args.time_delay, submit=True,
dry_run=args.dry_run, ignore_flags=args.ignore_flags,
remaining_args=remaining_args)
if submitted:
# Create submission script (write script to disk)!
_LOGGER.debug("Creating submission script for pipeline %s: '%s'",
pl_name, sample.sample_name)
submit_script = create_submission_script(
sample, prj.compute.submission_template, submit_settings,
submission_folder=prj.metadata.submission_subdir,
pipeline_name=pl_name, remaining_args=remaining_args)

# Determine how to update submission counts and (perhaps) submit.
flag_files = glob.glob(os.path.join(
sample_output_folder, pl_name + "*.flag"))
if not args.ignore_flags and len(flag_files) > 0:
_LOGGER.info("> Not submitting, flag(s) found: {}".
format(flag_files))
_LOGGER.debug("NOT SUBMITTED")
else:
if args.dry_run:
_LOGGER.info("> DRY RUN: I would have submitted this: '%s'",
submit_script)
else:
submission_command = "{} {}".format(
prj.compute.submission_command, submit_script)
subprocess.call(submission_command, shell=True)
time.sleep(args.time_delay) # Delay next job's submission.
_LOGGER.debug("SUBMITTED")
submit_count += 1
else:
_LOGGER.debug("NOT SUBMITTED")

# Report what went down.
_LOGGER.info("Looper finished")
Expand Down Expand Up @@ -630,38 +642,22 @@ def _submission_status_text(curr, total, sample_name, sample_library):



def cluster_submit(
sample, submit_template, submission_command, variables_dict,
submission_folder, sample_output_folder, pipeline_name, time_delay,
submit=False, dry_run=False, ignore_flags=False, remaining_args=None):
def create_submission_script(
sample, submit_template, variables_dict,
submission_folder, pipeline_name, remaining_args=None):
"""
Write cluster submission script to disk and submit job for given Sample.
:param models.Sample sample: the Sample object for submission
:param str submit_template: path to submission script template
:param str submission_command: actual command with which to execute the
submission of the cluster job for the given sample
:param variables_dict: key-value pairs to use to populate fields in
the submission template
:param str submission_folder: path to the folder in which to place
submission files
:param str sample_output_folder: path to folder into which the pipeline
will write file(s), and where to search for flag file to check
if a sample's already been submitted
:param str pipeline_name: name of the pipeline that the job will run
:param int time_delay: number of seconds by which to delay submission
of next job
:param bool submit: whether to even attempt to actually submit the job;
this is useful for skipping certain samples within a project
:param bool dry_run: whether the call is a test and thus the cluster job
created should not actually be submitted; in this case, the return
is a true proxy for whether the job would've been submitted
:param bool ignore_flags: whether to ignore the presence of flag file(s)
in making the determination of whether to submit the job
:param Iterable[str] remaining_args: arguments for this submission,
unconsumed by previous option/argument parsing
:return bool: whether the submission was done,
or would've been if not a dry run
:return str: filepath to submission script
"""

# Create the script and logfile paths.
Expand Down Expand Up @@ -706,26 +702,7 @@ def cluster_submit(
name_sample_subtype, sample.name)
sample.to_yaml(subs_folder_path=submission_folder)

# Check if job is already submitted (unless ignore_flags is set to True)
if not ignore_flags:
flag_files = glob.glob(os.path.join(
sample_output_folder, pipeline_name + "*.flag"))
if len(flag_files) > 0:
_LOGGER.info("> Not submitting, flag(s) found: {}".
format(flag_files))
submit = False
else:
pass

if not submit:
return False
if dry_run:
_LOGGER.info("> DRY RUN: I would have submitted this: '%s'",
submit_script)
else:
subprocess.call(submission_command + " " + submit_script, shell=True)
time.sleep(time_delay) # Delay next job's submission.
return True
return submit_script



Expand Down
28 changes: 17 additions & 11 deletions looper/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,13 @@ def build_submission_bundles(self, protocol, priority=True):
strict_pipe_key, full_pipe_path, full_pipe_path_with_flags = \
proto_iface.finalize_pipeline_key_and_paths(
pipeline_key)

# Skip and warn about nonexistent alleged pipeline path.
if not _os.path.exists(full_pipe_path):
_LOGGER.warn(
"Missing pipeline script: '%s'", full_pipe_path)
continue

# Determine which interface and Sample subtype to use.
sample_subtype = \
proto_iface.fetch_sample_subtype(
Expand Down Expand Up @@ -1128,7 +1135,7 @@ def parse_config_file(self, subproject=None):
# Parse yaml into the project's attributes.
_LOGGER.debug("Adding attributes for {}: {}".format(
self.__class__.__name__, config.keys()))
_LOGGER.debug("Config metadata: {}")
_LOGGER.debug("Config metadata: {}".format(config["metadata"]))
self.add_entries(config)
_LOGGER.debug("{} now has {} keys: {}".format(
self.__class__.__name__, len(self.keys()), self.keys()))
Expand Down Expand Up @@ -1348,7 +1355,7 @@ def _ensure_absolute(self, maybe_relpath):
_LOGGER.log(5, "Already absolute")
return maybe_relpath
# Maybe we have env vars that make the path absolute?
expanded = _os.path.expandvars(maybe_relpath)
expanded = _os.path.expanduser(_os.path.expandvars(maybe_relpath))
_LOGGER.log(5, "Expanded: '%s'", expanded)
if _os.path.isabs(expanded):
_LOGGER.log(5, "Expanded is absolute")
Expand Down Expand Up @@ -1834,8 +1841,7 @@ def set_transcriptome(self, transcriptomes):

def _set_assembly(self, ome, assemblies):
if not assemblies:
_LOGGER.debug("Empty/null assemblies mapping: {} ({})".
format(assemblies, type(assemblies)))
_LOGGER.debug("Empty/null assemblies mapping")
return
try:
assembly = assemblies[self.organism]
Expand Down Expand Up @@ -2619,11 +2625,14 @@ def finalize_pipeline_key_and_paths(self, pipeline_key):
# The strict key is the script name itself, something like "ATACseq.py"
strict_pipeline_key, _, pipeline_key_args = pipeline_key.partition(' ')

if self.pipe_iface.get_attribute(strict_pipeline_key, "path"):
script_path_only = self.pipe_iface.get_attribute(
strict_pipeline_key, "path")[0].strip()
full_pipe_path = \
self.pipe_iface.get_attribute(strict_pipeline_key, "path")
if full_pipe_path:
script_path_only = _os.path.expanduser(_os.path.expandvars(full_pipe_path[0].strip()))
if _os.path.isdir(script_path_only):
script_path_only = _os.path.join(script_path_only, pipeline_key)
script_path_with_flags = \
" ".join([script_path_only, pipeline_key_args])
"{} {}".format(script_path_only, pipeline_key_args)
else:
# backwards compatibility w/ v0.5
script_path_only = strict_pipeline_key
Expand All @@ -2639,9 +2648,6 @@ def finalize_pipeline_key_and_paths(self, pipeline_key):
self.pipelines_path, script_path_with_flags)
_LOGGER.log(5, "Absolute script path with flags: '%s'",
script_path_with_flags)
if not _os.path.exists(script_path_only):
_LOGGER.warn(
"Missing pipeline script: '%s'", script_path_only)

return strict_pipeline_key, script_path_only, script_path_with_flags

Expand Down

0 comments on commit 0be1a7a

Please sign in to comment.