Skip to content

Commit

Permalink
Merge pull request #502 from ungarj/dask_settings
Browse files Browse the repository at this point in the history
make sure dask_compute_graph and dask_chunksize are passed on
  • Loading branch information
ungarj committed Nov 21, 2022
2 parents 6f749b5 + 2f78df0 commit 12bbd8f
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 10 deletions.
4 changes: 4 additions & 0 deletions mapchete/_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ def _compute_tasks(
zoom_levels=None,
tile=None,
skip_output_check=False,
dask_max_submitted_tasks=None,
dask_chunksize=None,
**kwargs,
):
if not process.config.preprocessing_tasks_finished:
Expand All @@ -625,6 +627,8 @@ def _compute_tasks(
func=_preprocess_task_wrapper,
iterable=tasks.values(),
fkwargs=dict(append_data=True),
max_submitted_tasks=dask_max_submitted_tasks,
chunksize=dask_chunksize,
**kwargs,
):
future = future_raise_exception(future)
Expand Down
10 changes: 0 additions & 10 deletions mapchete/commands/_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,12 @@ def _empty_callback(_):
def _process_everything(
msg_callback,
mp,
executor=None,
workers=None,
concurrency=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
dask_compute_graph=True,
**kwargs,
):
try:
for i, future in enumerate(
mp.compute(
executor=executor,
workers=workers,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
dask_compute_graph=dask_compute_graph,
**kwargs,
),
1,
Expand Down

0 comments on commit 12bbd8f

Please sign in to comment.