diff --git a/mapchete/_core.py b/mapchete/_core.py index 620957e6..8b41d2a0 100644 --- a/mapchete/_core.py +++ b/mapchete/_core.py @@ -1,13 +1,13 @@ """Main module managing processes.""" from cachetools import LRUCache -import concurrent.futures import logging import multiprocessing import threading from mapchete.config import MapcheteConfig from mapchete.errors import MapcheteNodataTile +from mapchete.io import tiles_exist from mapchete._processing import _run_on_single_tile, _run_area, ProcessInfo, TileProcess from mapchete.tile import count_tiles from mapchete._timer import Timer @@ -145,16 +145,10 @@ def skip_tiles(self, tiles=None): ------ tuples : (tile, skip) """ - def _skip(config, tile): - return tile, config.output_reader.tiles_exist(tile) - # only check for existing output in "continue" mode if self.config.mode == "continue": - with concurrent.futures.ThreadPoolExecutor() as executor: - for future in concurrent.futures.as_completed( - (executor.submit(_skip, self.config, tile) for tile in tiles) - ): - yield future.result() + yield from tiles_exist(config=self.config, process_tiles=tiles) + # otherwise don't skip tiles else: for tile in tiles: yield (tile, False) diff --git a/mapchete/index.py b/mapchete/index.py index cc3fb33f..b06741ac 100644 --- a/mapchete/index.py +++ b/mapchete/index.py @@ -33,7 +33,7 @@ from mapchete.config import get_zoom_levels from mapchete.io import ( - path_exists, path_is_remote, get_boto3_bucket, raster, relative_path + path_exists, path_is_remote, get_boto3_bucket, raster, relative_path, tiles_exist ) logger = logging.getLogger(__name__) @@ -148,8 +148,59 @@ def zoom_index_gen( logger.debug("use the following index writers: %s", index_writers) - def _worker(tile): - # if there are indexes to write to, check if output exists + # all output tiles for given process area + logger.debug("determine affected output tiles") + output_tiles = set( + mp.config.output_pyramid.tiles_from_geom( + mp.config.area_at_zoom(zoom), zoom + ) + ) + + # check which tiles exist in any index + logger.debug("check which tiles exist in index(es)") + existing_in_any_index = set( + tile for tile in output_tiles + if any( + [ + i.entry_exists( + tile=tile, + path=_tile_path( + orig_path=mp.config.output.get_path(tile), + basepath=basepath, + for_gdal=for_gdal + ) + ) + for i in index_writers + ] + ) + ) + + logger.debug("{}/{} tiles found in index(es)".format( + len(existing_in_any_index), len(output_tiles)) + ) + # tiles which do not exist in any index + for tile, output_exists in tiles_exist( + mp.config, output_tiles=output_tiles.difference(existing_in_any_index) + ): + tile_path = _tile_path( + orig_path=mp.config.output.get_path(tile), + basepath=basepath, + for_gdal=for_gdal + ) + indexes = [ + i for i in index_writers + if not i.entry_exists(tile=tile, path=tile_path) + ] + if indexes and output_exists: + logger.debug("%s exists", tile_path) + logger.debug("write to %s indexes" % len(indexes)) + for index in indexes: + index.write(tile, tile_path) + # yield tile for progress information + yield tile + + # tiles which exist in at least one index + for tile in existing_in_any_index: tile_path = _tile_path( orig_path=mp.config.output.get_path(tile), basepath=basepath, @@ -160,30 +211,12 @@ def _worker(tile): if not i.entry_exists(tile=tile, path=tile_path) ] if indexes: - output_exists = mp.config.output.tiles_exist(output_tile=tile) - else: - output_exists = None - return tile, tile_path, indexes, output_exists - - with concurrent.futures.ThreadPoolExecutor() as executor: - for task in concurrent.futures.as_completed( - ( - executor.submit(_worker, i) - for i in mp.config.output_pyramid.tiles_from_geom( - mp.config.area_at_zoom(zoom), zoom - ) - ) - ): - tile, tile_path, indexes, output_exists = task.result() - # only write entries if there are indexes to write to and output - # exists - if indexes and output_exists: - logger.debug("%s exists", tile_path) - logger.debug("write to %s indexes" % len(indexes)) - for index in indexes: - index.write(tile, tile_path) - # yield tile for progress information - yield tile + logger.debug("%s exists", tile_path) + logger.debug("write to %s indexes" % len(indexes)) + for index in indexes: + index.write(tile, tile_path) + # yield tile for progress information + yield tile def _index_file_path(out_dir, zoom, ext): @@ -496,7 +529,6 @@ def close(self): rasterXSize=str(vrt_shape.width), rasterYSize=str(vrt_shape.height), ) - # generate pretty XML and write xmlstr = minidom.parseString(ET.tostring(vrt)).toprettyxml(indent=" ") if self._bucket: diff --git a/mapchete/io/__init__.py b/mapchete/io/__init__.py index e6372b2f..e5eb4909 100644 --- a/mapchete/io/__init__.py +++ b/mapchete/io/__init__.py @@ -15,6 +15,7 @@ from mapchete.io._path import ( path_is_remote, path_exists, + tiles_exist, absolute_path, relative_path, makedirs, @@ -28,6 +29,7 @@ "tile_to_zoom_level", "path_is_remote", "path_exists", + "tiles_exist", "absolute_path", "relative_path", "makedirs", diff --git a/mapchete/io/_path.py b/mapchete/io/_path.py index e9a7b7a1..c35b965f 100644 --- a/mapchete/io/_path.py +++ b/mapchete/io/_path.py @@ -1,9 +1,13 @@ +import concurrent.futures +import logging import os from urllib.request import urlopen from urllib.error import HTTPError from mapchete.io._misc import get_boto3_bucket +logger = logging.getLogger(__name__) + def path_is_remote(path, s3=True): """ @@ -42,7 +46,7 @@ def path_exists(path): except HTTPError as e: if e.code == 404: return False - else: + else: # pragma: no cover raise elif path.startswith("s3://"): bucket = get_boto3_bucket(path.split("/")[2]) @@ -112,3 +116,117 @@ def makedirs(path): os.makedirs(path) except OSError: pass + + +def tiles_exist(config=None, output_tiles=None, process_tiles=None): + """ + Yield tiles and whether their output already exists or not. + + Either "output_tiles" or "process_tiles" have to be provided. + + The S3 part of the function are loosely inspired by + https://alexwlchan.net/2019/07/listing-s3-keys/ + + Parameters + ---------- + config : mapchete.config.MapcheteConfig + process_tiles : iterator + + Yields + ------ + tuple : (tile, exists) + """ + if process_tiles is not None and output_tiles is not None: # pragma: no cover + raise ValueError("just one of 'process_tiles' and 'output_tiles' allowed") + elif process_tiles is None and output_tiles is None: # pragma: no cover + raise ValueError("one of 'process_tiles' and 'output_tiles' has to be provided") + + basepath = config.output_reader.path + # make tiles unique + tiles = set(process_tiles) if process_tiles is not None else set(output_tiles) + + # in case no tiles are provided + if not tiles: + return + + # only on TileDirectories on S3 + if ( + not config.output_reader.path.endswith(config.output_reader.file_extension) and + basepath.startswith("s3://") + ): + import boto3 + basekey = "/".join(basepath.split("/")[3:]) + bucket = basepath.split("/")[2] + s3 = boto3.client("s3") + paginator = s3.get_paginator("list_objects_v2") + + # determine zoom + zoom = next(iter(tiles)).zoom + + # get all output tiles + if process_tiles: + output_tiles = set( + [ + t + for process_tile in tiles + for t in config.output_pyramid.intersecting(process_tile) + ] + ) + else: + output_tiles = tiles + + # create a mapping between paths and tiles + paths = dict() + # remember rows + rows = set() + for output_tile in output_tiles: + if output_tile.zoom != zoom: # pragma: no cover + raise ValueError("tiles of different zoom levels cannot be mixed") + path = config.output_reader.get_path(output_tile) + + if process_tiles: + paths[path] = config.process_pyramid.intersecting(output_tile)[0] + else: + paths[path] = output_tile + + rows.add(output_tile.row) + + # remember already yielded tiles + yielded = set() + for row in rows: + # use prefix until row, page through api results + logger.debug("check existing tiles in row %s" % row) + prefix = os.path.join(*[basekey, str(zoom), str(row)]) + logger.debug("read keys %s*" % os.path.join("s3://" + bucket, prefix)) + + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + logger.debug("read next page") + try: + contents = page["Contents"] + except KeyError: + break + + for obj in contents: + # get matching tile + tile = paths[os.path.join("s3://" + bucket, obj["Key"])] + # store and yield process tile if it was not already yielded + if tile not in yielded: + yielded.add(tile) + yield (tile, True) + + # finally, yield all tiles which were not yet yielded as False + for tile in tiles.difference(yielded): + yield (tile, False) + + else: + def _exists(tile): + if process_tiles: + return (tile, config.output_reader.tiles_exist(process_tile=tile)) + else: + return (tile, config.output_reader.tiles_exist(output_tile=tile)) + + with concurrent.futures.ThreadPoolExecutor() as executor: + for future in concurrent.futures.as_completed( + (executor.submit(_exists, tile) for tile in tiles) + ): + yield future.result() diff --git a/mapchete/log.py b/mapchete/log.py index 6d6888a7..33987e76 100644 --- a/mapchete/log.py +++ b/mapchete/log.py @@ -7,6 +7,7 @@ """ from itertools import chain import logging +import sys import warnings from mapchete._registered import drivers, processes @@ -67,7 +68,7 @@ def redact(self, msg): # lower stream output log level formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s') -stream_handler = logging.StreamHandler() +stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setFormatter(formatter) stream_handler.setLevel(logging.WARNING) stream_handler.addFilter(KeyValueFilter(key_value_replace=key_value_replace_patterns))