diff --git a/mapchete/_core.py b/mapchete/_core.py index 620957e6..bde9f21c 100644 --- a/mapchete/_core.py +++ b/mapchete/_core.py @@ -1,13 +1,13 @@ """Main module managing processes.""" from cachetools import LRUCache -import concurrent.futures import logging import multiprocessing import threading from mapchete.config import MapcheteConfig from mapchete.errors import MapcheteNodataTile +from mapchete.io import process_tiles_exist from mapchete._processing import _run_on_single_tile, _run_area, ProcessInfo, TileProcess from mapchete.tile import count_tiles from mapchete._timer import Timer @@ -145,16 +145,10 @@ def skip_tiles(self, tiles=None): ------ tuples : (tile, skip) """ - def _skip(config, tile): - return tile, config.output_reader.tiles_exist(tile) - # only check for existing output in "continue" mode if self.config.mode == "continue": - with concurrent.futures.ThreadPoolExecutor() as executor: - for future in concurrent.futures.as_completed( - (executor.submit(_skip, self.config, tile) for tile in tiles) - ): - yield future.result() + yield from process_tiles_exist(config=self.config, process_tiles=tiles) + # otherwise don't skip tiles else: for tile in tiles: yield (tile, False) diff --git a/mapchete/io/__init__.py b/mapchete/io/__init__.py index e6372b2f..5bbda3eb 100644 --- a/mapchete/io/__init__.py +++ b/mapchete/io/__init__.py @@ -15,6 +15,7 @@ from mapchete.io._path import ( path_is_remote, path_exists, + process_tiles_exist, absolute_path, relative_path, makedirs, @@ -28,6 +29,7 @@ "tile_to_zoom_level", "path_is_remote", "path_exists", + "process_tiles_exist", "absolute_path", "relative_path", "makedirs", diff --git a/mapchete/io/_path.py b/mapchete/io/_path.py index e9a7b7a1..de719502 100644 --- a/mapchete/io/_path.py +++ b/mapchete/io/_path.py @@ -1,9 +1,14 @@ +from collections import defaultdict +import concurrent.futures +import logging import os from urllib.request import urlopen from urllib.error import HTTPError from mapchete.io._misc import get_boto3_bucket +logger = logging.getLogger(__name__) + def path_is_remote(path, s3=True): """ @@ -112,3 +117,94 @@ def makedirs(path): os.makedirs(path) except OSError: pass + + +def process_tiles_exist(config=None, process_tiles=None): + """ + Yield process tiles and whether their output already exists or not. + + The S3 part of the function are loosely inspired by + https://alexwlchan.net/2019/07/listing-s3-keys/ + + Parameters + ---------- + config : mapchete.config.MapcheteConfig + process_tiles : iterator + + Yields + ------ + tuple : (process_tile, exists) + """ + basepath = config.output_reader.path + + # only on TileDirectories on S3 + if ( + not config.output_reader.path.endswith(config.output_reader.file_extension) and + basepath.startswith("s3://") + ): + basekey = "/".join(basepath.split("/")[3:]) + bucket = basepath.split("/")[2] + import boto3 + s3 = boto3.client("s3") + paginator = s3.get_paginator("list_objects_v2") + + # make process tiles unique + process_tiles = set(process_tiles) + # determine current zoom + zoom = next(iter(process_tiles)).zoom + # get all output tiles for process tiles + output_tiles = set( + [ + t + for process_tile in process_tiles + for t in config.output_pyramid.intersecting(process_tile) + ] + ) + # create a mapping between paths and process_tiles + paths = dict() + # group process_tiles by row + rowgroups = defaultdict(list) + # remember already yielded process_tiles + yielded = set() + for output_tile in output_tiles: + if output_tile.zoom != zoom: + raise ValueError("tiles of different zoom levels cannot be mixed") + path = config.output_reader.get_path(output_tile) + process_tile = config.process_pyramid.intersecting(output_tile)[0] + paths[path] = process_tile + rowgroups[process_tile.row].append(process_tile) + # use prefix until row, page through api results + for row, tiles in rowgroups.items(): + logger.debug("check existing tiles in row %s" % row) + prefix = os.path.join(*[basekey, str(zoom), str(row)]) + logger.debug( + "read keys %s*" % os.path.join("s3://" + bucket, prefix) + ) + + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + logger.debug("read next page") + try: + contents = page["Contents"] + except KeyError: + break + for obj in contents: + path = obj["Key"] + # get matching process_tile + process_tile = paths[os.path.join("s3://" + bucket, path)] + # store and yield process tile if it was not already yielded + if process_tile not in yielded: + yielded.add(process_tile) + yield (process_tile, True) + + # finally, yield all process tiles which were not yet yielded as False + for process_tile in process_tiles.difference(yielded): + yield (process_tile, False) + + else: + def _exists(tile): + return (tile, config.output_reader.tiles_exist(tile)) + with concurrent.futures.ThreadPoolExecutor() as executor: + for future in concurrent.futures.as_completed( + (executor.submit(_exists, tile) for tile in process_tiles) + ): + yield future.result()