From e9f6bbe35053e7c438da401d6db8ddfeee3cc768 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Wed, 23 Feb 2022 11:36:44 +0100 Subject: [PATCH 1/5] fix max_submitted_tasks mismatch; don't raise client connection exception when cancelling job --- mapchete/_executor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mapchete/_executor.py b/mapchete/_executor.py index 0b9991d1..be5a0b89 100644 --- a/mapchete/_executor.py +++ b/mapchete/_executor.py @@ -152,12 +152,16 @@ def cancel(self): # reset so futures won't linger here for next call self.running_futures = set() - def wait(self): + def wait(self, raise_exc=False): logger.debug("wait for running futures to finish...") try: # pragma: no cover self._wait() except CancelledError: # pragma: no cover pass + except Exception as exc: # pragma: no cover + logger.error("exception caught when waiting for futures: %s", str(exc)) + if raise_exc: + raise exc def close(self): # pragma: no cover self.__exit__(None, None, None) @@ -326,7 +330,6 @@ def as_completed( item, skip, skip_info = item if skip: yield SkippedFuture(item, skip_info=skip_info) - self._submitted -= 1 continue # add processing item to chunk From 69bbba3c5ca0feeda00d156b5c5fbd3ca6e53d6d Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Wed, 23 Feb 2022 13:59:46 +0100 Subject: [PATCH 2/5] pass on skip variable to re-enable continue mode for task graphs; add geo interface to Job object --- mapchete/_processing.py | 19 ++++++++++++++++--- mapchete/_tasks.py | 1 + mapchete/cli/default/execute.py | 15 ++++++++------- mapchete/commands/_execute.py | 3 ++- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/mapchete/_processing.py b/mapchete/_processing.py index c085b492..4f3af52e 100644 --- a/mapchete/_processing.py +++ b/mapchete/_processing.py @@ -5,6 +5,8 @@ from itertools import chain import logging import multiprocessing +from shapely.geometry import mapping +from tilematrix._funcs import Bounds from traceback import format_exc from typing import Generator @@ -57,6 +59,7 @@ def __init__( preprocessing_tasks: int = None, executor_concurrency: str = "processes", executor_kwargs: dict = None, + process_area=None, ): self.func = func self.fargs = fargs or () @@ -69,10 +72,19 @@ def __init__( self.preprocessing_tasks = preprocessing_tasks or 0 self._total = self.preprocessing_tasks + self.tiles_tasks self._as_iterator = as_iterator + self._process_area = process_area + self.bounds = Bounds(*process_area.bounds) if process_area is not None else None if not as_iterator: self._results = list(self._run()) + @property + def __geo_interface__(self): + if self._process_area is not None: + return mapping(self._process_area) + else: + raise AttributeError(f"{self} has no geo information assigned") + def _run(self): if self._total == 0: return @@ -122,6 +134,7 @@ def __repr__(self): # pragma: no cover def task_batches(process, zoom=None, tile=None, skip_output_check=False): + """Create task batches for each processing stage.""" with Timer() as duration: # preprocessing tasks yield TaskBatch( @@ -143,7 +156,7 @@ def task_batches(process, zoom=None, tile=None, skip_output_check=False): ) tiles = { zoom: ( - tile + (tile, skip, process_msg) for tile, skip, process_msg in _filter_skipable( process=process, tiles_batches=process.get_process_tiles(zoom, batch_by="row"), @@ -173,9 +186,9 @@ def task_batches(process, zoom=None, tile=None, skip_output_check=False): and process.config.output_reader.tiles_exist(tile) ) if skip_output_check - else False, + else skip, ) - for tile in tiles[zoom] + for tile, skip in tiles[zoom] ), func=func, fkwargs=fkwargs, diff --git a/mapchete/_tasks.py b/mapchete/_tasks.py index 72a557c4..19e7c2bd 100644 --- a/mapchete/_tasks.py +++ b/mapchete/_tasks.py @@ -409,6 +409,7 @@ def to_dask_collection(batches): with Timer() as t: previous_batch = None for batch in batches: + logger.debug("converting batch %s", batch) if previous_batch: logger.debug("previous batch had %s tasks", len(previous_batch)) for task in batch.values(): diff --git a/mapchete/cli/default/execute.py b/mapchete/cli/default/execute.py index 069d71fb..a178bc0f 100644 --- a/mapchete/cli/default/execute.py +++ b/mapchete/cli/default/execute.py @@ -48,15 +48,16 @@ def execute( for mapchete_file in mapchete_files: tqdm.tqdm.write(f"preparing to process {mapchete_file}") with mapchete.Timer() as t: + job = commands.execute( + mapchete_file, + *args, + as_iterator=True, + msg_callback=tqdm.tqdm.write if verbose else None, + **kwargs, + ) list( tqdm.tqdm( - commands.execute( - mapchete_file, - *args, - as_iterator=True, - msg_callback=tqdm.tqdm.write if verbose else None, - **kwargs, - ), + job, unit="task", disable=debug or no_pbar, ) diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 60c364af..68a35697 100755 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -148,7 +148,7 @@ def _empty_callback(_): tiles_tasks = 1 if tile else mp.count_tiles() total_tasks = preprocessing_tasks + tiles_tasks msg_callback( - f"processing {preprocessing_tasks} preprocessing tasks and {tiles_tasks} on {workers} worker(s)" + f"processing {preprocessing_tasks} preprocessing tasks and {tiles_tasks} tile tasks on {workers} worker(s)" ) # automatically use dask Executor if dask scheduler is defined if dask_scheduler or dask_client: # pragma: no cover @@ -183,6 +183,7 @@ def _empty_callback(_): as_iterator=as_iterator, preprocessing_tasks=preprocessing_tasks, tiles_tasks=tiles_tasks, + process_area=mp.config.init_area, ) # explicitly exit the mp object on failure except Exception as exc: # pragma: no cover From 47aa4c3d4721ca7ddf6b479e02cca13b81c3f080 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Wed, 23 Feb 2022 14:00:23 +0100 Subject: [PATCH 3/5] pass on skip variable to re-enable continue mode for task graphs; add geo interface to Job object --- mapchete/_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mapchete/_processing.py b/mapchete/_processing.py index 4f3af52e..c90be183 100644 --- a/mapchete/_processing.py +++ b/mapchete/_processing.py @@ -79,7 +79,7 @@ def __init__( self._results = list(self._run()) @property - def __geo_interface__(self): + def __geo_interface__(self): # pragma: no cover if self._process_area is not None: return mapping(self._process_area) else: From f80d0442d73f309cfae4e00a05b080237df6656c Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Wed, 23 Feb 2022 14:25:55 +0100 Subject: [PATCH 4/5] fix task graph run; improve log messages --- mapchete/_processing.py | 59 +++++++++++++++++------------------------ mapchete/_tasks.py | 42 ++++++++++++++--------------- 2 files changed, 44 insertions(+), 57 deletions(-) diff --git a/mapchete/_processing.py b/mapchete/_processing.py index c90be183..cfbaf023 100644 --- a/mapchete/_processing.py +++ b/mapchete/_processing.py @@ -88,12 +88,15 @@ def __geo_interface__(self): # pragma: no cover def _run(self): if self._total == 0: return + logger.debug("opening executor for job %s", repr(self)) with Executor( concurrency=self.executor_concurrency, **self.executor_kwargs ) as self.executor: self.status = "running" + logger.debug("change of job status: %s", self) yield from self.func(*self.fargs, executor=self.executor, **self.fkwargs) self.status = "finished" + logger.debug("change of job status: %s", self) def set_executor_kwargs(self, executor_kwargs): """ @@ -156,8 +159,8 @@ def task_batches(process, zoom=None, tile=None, skip_output_check=False): ) tiles = { zoom: ( - (tile, skip, process_msg) - for tile, skip, process_msg in _filter_skipable( + (tile, skip) + for tile, skip, _ in _filter_skipable( process=process, tiles_batches=process.get_process_tiles(zoom, batch_by="row"), target_set=None, @@ -542,13 +545,14 @@ def _compute_task_graph( zoom=zoom_levels, tile=tile, skip_output_check=skip_output_check ) ) - logger.debug("%s dask collection generated in %s", len(coll), t) + logger.debug("dask collection with %s tasks generated in %s", len(coll), t) # send to scheduler with Timer() as t: futures = executor._executor.compute(coll, optimize_graph=True, traverse=True) - logger.debug("sent to scheduler in %s", t) + logger.debug("%s tasks sent to scheduler in %s", len(futures), t) + logger.debug("wait for tasks to finish...") for future in as_completed( futures, with_results=with_results, raise_errors=raise_errors ): @@ -574,29 +578,19 @@ def _compute_tasks( skip_output_check=False, **kwargs, ): - num_processed = 0 if not process.config.preprocessing_tasks_finished: tasks = process.config.preprocessing_tasks() logger.info( "run preprocessing on %s tasks using %s workers", len(tasks), workers ) # process all remaining tiles using todo list from before - for i, future in enumerate( - executor.as_completed( - func=_preprocess_task_wrapper, - iterable=tasks.values(), - fkwargs=dict(append_data=True), - **kwargs, - ), - 1, + for future in executor.as_completed( + func=_preprocess_task_wrapper, + iterable=tasks.values(), + fkwargs=dict(append_data=True), + **kwargs, ): result = future.result() - logger.debug( - "preprocessing task %s/%s %s processed successfully", - i, - len(tasks), - result.task_key, - ) process.config.set_preprocessing_task_result(result.task_key, result.data) yield future @@ -640,23 +634,16 @@ def _compute_tasks( else: _process_batches = _run_multi_no_overviews - for num_processed, future in enumerate( - _process_batches( - zoom_levels=zoom_levels, - executor=executor, - func=func, - process=process, - skip_output_check=skip_output_check, - fkwargs=fkwargs, - write_in_parent_process=write_in_parent_process, - **kwargs, - ), - 1, + for future in _process_batches( + zoom_levels=zoom_levels, + executor=executor, + func=func, + process=process, + skip_output_check=skip_output_check, + fkwargs=fkwargs, + write_in_parent_process=write_in_parent_process, + **kwargs, ): - logger.debug( - "task %s finished", - num_processed, - ) yield future @@ -677,6 +664,7 @@ def _run_multi_overviews( for i, zoom in enumerate(zoom_levels): + logger.debug("sending tasks to executor %s...", executor) # get generator list of tiles, whether they are to be skipped and skip_info # from _filter_skipable and pass on to executor for future in executor.as_completed( @@ -760,6 +748,7 @@ def _run_multi_no_overviews( dask_max_submitted_tasks=None, write_in_parent_process=None, ): + logger.debug("sending tasks to executor %s...", executor) # get generator list of tiles, whether they are to be skipped and skip_info # from _filter_skipable and pass on to executor for future in executor.as_completed( diff --git a/mapchete/_tasks.py b/mapchete/_tasks.py index 19e7c2bd..43f9c26a 100644 --- a/mapchete/_tasks.py +++ b/mapchete/_tasks.py @@ -406,28 +406,26 @@ def to_dask_collection(batches): from dask.delayed import delayed tasks = {} - with Timer() as t: - previous_batch = None - for batch in batches: - logger.debug("converting batch %s", batch) + previous_batch = None + for batch in batches: + logger.debug("converting batch %s", batch) + if previous_batch: + logger.debug("previous batch had %s tasks", len(previous_batch)) + for task in batch.values(): if previous_batch: - logger.debug("previous batch had %s tasks", len(previous_batch)) - for task in batch.values(): - if previous_batch: - dependencies = { - child.id: tasks[child] - for child in previous_batch.intersection(task) - } - logger.debug( - "found %s dependencies from last batch for task %s", - len(dependencies), - task, - ) - else: - dependencies = {} - tasks[task] = delayed(batch.func)( - task, dependencies=dependencies, **batch.fkwargs + dependencies = { + child.id: tasks[child] + for child in previous_batch.intersection(task) + } + logger.debug( + "found %s dependencies from last batch for task %s", + len(dependencies), + task, ) - previous_batch = batch - logger.debug("%s tasks generated in %s", len(tasks), t) + else: + dependencies = {} + tasks[task] = delayed(batch.func)( + task, dependencies=dependencies, **batch.fkwargs + ) + previous_batch = batch return list(tasks.values()) From d7a13ea5f53384f09087fb21054980264910ed97 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Wed, 23 Feb 2022 14:49:05 +0100 Subject: [PATCH 5/5] fix single tile run --- mapchete/_processing.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mapchete/_processing.py b/mapchete/_processing.py index cfbaf023..6d1e559a 100644 --- a/mapchete/_processing.py +++ b/mapchete/_processing.py @@ -150,7 +150,8 @@ def task_batches(process, zoom=None, tile=None, skip_output_check=False): with Timer() as duration: if tile: zoom_levels = [tile.zoom] - tiles = {tile.zoom: [tile]} + skip_output_check = True + tiles = {tile.zoom: [(tile, False)]} else: zoom_levels = list( reversed(process.config.zoom_levels) @@ -185,7 +186,7 @@ def task_batches(process, zoom=None, tile=None, skip_output_check=False): tile=tile, config=process.config, skip=( - process.mode == "continue" + process.config.mode == "continue" and process.config.output_reader.tiles_exist(tile) ) if skip_output_check