Skip to content

Commit

Permalink
fix: deduplicate input files before retrieval from storage (#2600)
Browse files Browse the repository at this point in the history
### Description

<!--Add a description of your PR here-->

### QC
<!-- Make sure that you can tick the boxes below. -->

* [x] The PR contains a test case for the changes or the changes are
already covered by an existing test case.
* [x] The documentation (`docs/`) is updated to reflect the changes or
this is not necessary (e.g. if the change does neither modify the
language nor the behavior or functionalities of Snakemake).
  • Loading branch information
johanneskoester committed Jan 11, 2024
1 parent d6b48a1 commit 37cf475
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions snakemake/dag.py
Expand Up @@ -4,6 +4,7 @@
__license__ = "MIT"

import asyncio
from builtins import ExceptionGroup
import html
import os
import shutil
Expand Down Expand Up @@ -346,12 +347,17 @@ async def retrieve_storage_inputs(self):
self.workflow.remote_exec and not shared_local_copies
):
logger.info("Retrieving input from storage.")
to_retrieve = {
f
for job in self.needrun_jobs()
for f in job.input
if f.is_storage and self.is_external_input(f, job)
}

async with asyncio.TaskGroup() as tg:
for job in self.needrun_jobs():
for f in job.input:
if f.is_storage and self.is_external_input(f, job):
logger.info(f"Retrieving {f} from storage.")
tg.create_task(f.retrieve_from_storage())
for f in to_retrieve:
logger.info(f"Retrieving {f} from storage.")
tg.create_task(f.retrieve_from_storage())

async def store_storage_outputs(self):
if self.workflow.remote_exec:
Expand Down

0 comments on commit 37cf475

Please sign in to comment.