Skip to content

Commit

Permalink
Merge pull request #435 from ungarj/future_batches
Browse files Browse the repository at this point in the history
use future batches when yielding dask futures
  • Loading branch information
ungarj committed Mar 1, 2022
2 parents d787b46 + 1f34a2f commit 95bffc6
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions mapchete/_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,23 +554,24 @@ def _compute_task_graph(
logger.debug("%s tasks sent to scheduler in %s", len(futures), t)

logger.debug("wait for tasks to finish...")
for future in as_completed(
for batch in as_completed(
futures,
with_results=with_results,
raise_errors=raise_errors,
loop=executor._executor.loop,
):
futures.remove(future)
if process.config.output.write_in_parent_process:
yield FinishedFuture(
result=_write(
process_info=future.result(),
output_writer=process.config.output,
append_output=True,
).batches():
for future in batch:
futures.remove(future)
if process.config.output.write_in_parent_process:
yield FinishedFuture(
result=_write(
process_info=future.result(),
output_writer=process.config.output,
append_output=True,
)
)
)
else:
yield future
else:
yield future


def _compute_tasks(
Expand Down

0 comments on commit 95bffc6

Please sign in to comment.