Skip to content

Commit

Permalink
fix: fix wait for files in case of using remote storage and remote ex…
Browse files Browse the repository at this point in the history
…ecution (#2718)

### Description

<!--Add a description of your PR here-->

### QC
<!-- Make sure that you can tick the boxes below. -->

* [x] The PR contains a test case for the changes or the changes are
already covered by an existing test case.
* [x] The documentation (`docs/`) is updated to reflect the changes or
this is not necessary (e.g. if the change does neither modify the
language nor the behavior or functionalities of Snakemake).
  • Loading branch information
johanneskoester committed Feb 24, 2024
1 parent 7a47924 commit eec3a5f
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion snakemake/jobs.py
Expand Up @@ -1048,6 +1048,10 @@ async def postprocess(
SharedFSUsage.INPUT_OUTPUT
in self.dag.workflow.storage_settings.shared_fs_usage
)
wait_for_local = (
SharedFSUsage.STORAGE_LOCAL_COPIES
in self.dag.workflow.storage_settings.shared_fs_usage
)
if (
self.dag.workflow.exec_mode == ExecMode.SUBPROCESS
or shared_input_output
Expand All @@ -1063,14 +1067,24 @@ async def postprocess(
self,
wait=self.dag.workflow.execution_settings.latency_wait,
ignore_missing_output=ignore_missing_output,
wait_for_local=True,
wait_for_local=wait_for_local,
)
self.dag.unshadow_output(self, only_log=error)
await self.dag.handle_storage(
self, store_in_storage=store_in_storage, store_only_log=error
)
if not error:
self.dag.handle_protected(self)
elif not shared_input_output and not wait_for_local:
expanded_output = list(self.output)
if self.benchmark:
expanded_output.append(self.benchmark)
await wait_for_files(
expanded_output,
wait_for_local=False,
latency_wait=self.dag.workflow.execution_settings.latency_wait,
ignore_pipe_or_service=True,
)
if not error:
try:
await self.dag.workflow.persistence.finished(self)
Expand Down

0 comments on commit eec3a5f

Please sign in to comment.