Skip to content

Commit

Permalink
Merge 4df5ddc into 427a0b8
Browse files Browse the repository at this point in the history
  • Loading branch information
ungarj committed Jun 17, 2019
2 parents 427a0b8 + 4df5ddc commit f921663
Show file tree
Hide file tree
Showing 18 changed files with 673 additions and 313 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -14,6 +14,8 @@ Changelog
* all CLI multiword options are separated by an hyphen (`-`) instead of underscore (`_`)
(#189)

* overview tiles get also updated if child baselevel tile changes (#179)
* on `batch_process` check which process output exists and only use parallelization for process tiles which will be processed (#)
* fixed `area_at_zoom()` when using input groups (#181)
* fixed single GeoTIFF output bounds should use process area (#182)
* fixed YAML warning (#167)
Expand Down
103 changes: 53 additions & 50 deletions mapchete/_core.py
@@ -1,18 +1,17 @@
"""Main module managing processes."""

from cachetools import LRUCache
import concurrent.futures
import logging
import multiprocessing
import threading

from mapchete.config import MapcheteConfig
from mapchete.errors import MapcheteNodataTile
from mapchete._processing import (
_run_on_single_tile, _run_without_multiprocessing, _run_with_multiprocessing,
ProcessInfo, TileProcess
)
from mapchete.tile import BufferedTile, count_tiles
from mapchete._processing import _run_on_single_tile, _run_area, ProcessInfo, TileProcess
from mapchete.tile import count_tiles
from mapchete._timer import Timer
from mapchete._validate import validate_tile, validate_zooms

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,6 +130,35 @@ def get_process_tiles(self, zoom=None):
):
yield tile

def skip_tiles(self, tiles=None):
"""
Quickly determine whether tiles can be skipped for processing.
The skip value is True if process mode is 'continue' and process output already
exists. In all other cases, skip is False.
Parameters
----------
tiles : list of process tiles
Yields
------
tuples : (tile, skip)
"""
def _skip(config, tile):
return tile, config.output_reader.tiles_exist(tile)

# only check for existing output in "continue" mode
if self.config.mode == "continue":
with concurrent.futures.ThreadPoolExecutor() as executor:
for future in concurrent.futures.as_completed(
(executor.submit(_skip, self.config, tile) for tile in tiles)
):
yield future.result()
else:
for tile in tiles:
yield (tile, False)

def batch_process(
self,
zoom=None,
Expand Down Expand Up @@ -181,7 +209,8 @@ def batch_processor(
multi=multiprocessing.cpu_count(),
max_chunksize=1,
multiprocessing_module=multiprocessing,
multiprocessing_start_method="fork"
multiprocessing_start_method="fork",
skip_output_check=False
):
"""
Process a large batch of tiles and yield report messages per tile.
Expand All @@ -205,6 +234,9 @@ def batch_processor(
multiprocessing_start_method : str
"fork", "forkserver" or "spawn"
(default: "fork")
skip_output_check : bool
skip checking whether process tiles already have existing output before
starting to process;
"""
if zoom and tile:
raise ValueError("use either zoom or tile")
Expand All @@ -215,22 +247,16 @@ def batch_processor(
process=self,
tile=self.config.process_pyramid.tile(*tuple(tile))
)
# run sequentially
elif multi == 1:
for process_info in _run_without_multiprocessing(
process=self,
zoom_levels=list(_get_zoom_level(zoom, self))
):
yield process_info
# run concurrently
elif multi > 1:
for process_info in _run_with_multiprocessing(
# run area
else:
for process_info in _run_area(
process=self,
zoom_levels=list(_get_zoom_level(zoom, self)),
multi=multi,
max_chunksize=max_chunksize,
multiprocessing_module=multiprocessing_module,
multiprocessing_start_method=multiprocessing_start_method
multiprocessing_start_method=multiprocessing_start_method,
skip_output_check=skip_output_check
):
yield process_info

Expand Down Expand Up @@ -274,11 +300,7 @@ def execute(self, process_tile, raise_nodata=False):
data : NumPy array or features
process output
"""
process_tile = (
self.config.process_pyramid.tile(*process_tile)
if isinstance(process_tile, tuple)
else process_tile
)
process_tile = validate_tile(process_tile, self.config.process_pyramid)
try:
return self.config.output.streamline_output(
TileProcess(tile=process_tile, config=self.config).execute()
Expand All @@ -304,16 +326,9 @@ def read(self, output_tile):
data : NumPy array or features
process output
"""
output_tile = validate_tile(output_tile, self.config.output_pyramid)
if self.config.mode not in ["readonly", "continue", "overwrite"]:
raise ValueError("process mode must be readonly, continue or overwrite")
output_tile = (
self.config.output_pyramid.tile(*output_tile)
if isinstance(output_tile, tuple)
else output_tile
)
if not isinstance(output_tile, BufferedTile):
raise TypeError("output_tile must be tuple or BufferedTile")

return self.config.output.read(output_tile)

def write(self, process_tile, data):
Expand All @@ -327,13 +342,7 @@ def write(self, process_tile, data):
data : NumPy array or features
data to be written
"""
process_tile = (
self.config.process_pyramid.tile(*process_tile)
if isinstance(process_tile, tuple)
else process_tile
)
if not isinstance(process_tile, BufferedTile):
raise ValueError("invalid process_tile type: %s" % type(process_tile))
process_tile = validate_tile(process_tile, self.config.process_pyramid)
if self.config.mode not in ["continue", "overwrite"]:
raise ValueError("cannot write output in current process mode")

Expand Down Expand Up @@ -390,11 +399,12 @@ def get_raw_output(self, tile, _baselevel_readonly=False):
data : NumPy array or features
process output
"""
tile = self.config.output_pyramid.tile(*tile) if isinstance(tile, tuple) else tile
if not isinstance(tile, BufferedTile):
raise TypeError("'tile' must be a tuple or BufferedTile")
if _baselevel_readonly:
tile = self.config.baselevels["tile_pyramid"].tile(*tile.id)
tile = validate_tile(tile, self.config.output_pyramid)
tile = (
self.config.baselevels["tile_pyramid"].tile(*tile.id)
if _baselevel_readonly
else tile
)

# Return empty data if zoom level is outside of process zoom levels.
if tile.zoom not in self.config.zoom_levels:
Expand Down Expand Up @@ -520,11 +530,4 @@ def __exit__(self, exc_type, exc_value, exc_traceback):

def _get_zoom_level(zoom, process):
"""Determine zoom levels."""
if zoom is None:
return reversed(process.config.zoom_levels)
if isinstance(zoom, int):
return [zoom]
elif len(zoom) == 2:
return reversed(range(min(zoom), max(zoom)+1))
elif len(zoom) == 1:
return zoom
return reversed(process.config.zoom_levels) if zoom is None else validate_zooms(zoom)

0 comments on commit f921663

Please sign in to comment.