Skip to content
Permalink
Browse files
fix: keep flags with apply_wildcards on cloned IOFile (#1416)
* fix: default remote application on input files after checkpoints

* fix: keep flags with apply_wildcards on cloned IOFile

* undo dbg changes

* fix: iterate over requested files instead of all output files.
  • Loading branch information
johanneskoester committed Feb 22, 2022
1 parent b007979 commit 23c943f0e285f2dc725aa3e4a2e8798021085cb3
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 6 deletions.
@@ -280,7 +280,7 @@ def create_conda_envs(
env_set = {
(job.conda_env_spec, job.container_img_url)
for job in jobs
if job.conda_env_spec
if job.conda_env_spec and (self.workflow.run_local or job.is_local)
}
# Then based on md5sum values
self.conda_envs = dict()
@@ -62,7 +62,7 @@ def __init__(
# Attach variables for easy access
self.workflow = workflow
self.quiet = quiet
self.workdir = os.path.dirname(self.workflow.persistence.path)
self.workdir = os.path.realpath(os.path.dirname(self.workflow.persistence.path))
self._save_storage_cache = cache

# Relative path for running on instance
@@ -631,7 +631,7 @@ def _generate_build_source_package(self):
"""
# Workflow sources for cloud executor must all be under same workdir root
for filename in self.workflow_sources:
if self.workdir not in filename:
if self.workdir not in os.path.realpath(filename):
raise WorkflowError(
"All source files must be present in the working directory, "
"{workdir} to be uploaded to a build package that respects "
@@ -735,7 +735,7 @@ def format_dynamic(self):
def clone_flags(self, other):
if isinstance(self._file, str):
self._file = AnnotatedString(self._file)
if isinstance(other._file, AnnotatedString):
if isinstance(other._file, AnnotatedString) or isinstance(other._file, _IOFile):
self._file.flags = getattr(other._file, "flags", {}).copy()
if "remote_object" in self._file.flags:
self._file.flags["remote_object"] = copy.copy(
@@ -892,7 +892,6 @@ def concretize_param(p, wildcards, is_from_callable):
def handle_incomplete_checkpoint(exception):
"""If checkpoint is incomplete, target it such that it is completed
before this rule gets executed."""
print(exception.targetfile)
if exception.targetfile in input:
return TBDString()
else:
@@ -19,9 +19,14 @@ checkpoint copy:
run:
shell("cp {input} {output}")

def get_pack_input(wildcards):
output = checkpoints.copy.get().output[0]
return output


rule pack:
input:
lambda wildcards: checkpoints.copy.get().output[0]
get_pack_input
output:
"landsat-data.txt.bz2"
conda:

0 comments on commit 23c943f

Please sign in to comment.