Skip to content

Commit

Permalink
Merge 0d1b26f into ca7c450
Browse files Browse the repository at this point in the history
  • Loading branch information
ungarj committed Jul 6, 2020
2 parents ca7c450 + 0d1b26f commit 753e30b
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 39 deletions.
12 changes: 3 additions & 9 deletions mapchete/_core.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Main module managing processes."""

from cachetools import LRUCache
import concurrent.futures
import logging
import multiprocessing
import threading

from mapchete.config import MapcheteConfig
from mapchete.errors import MapcheteNodataTile
from mapchete.io import tiles_exist
from mapchete._processing import _run_on_single_tile, _run_area, ProcessInfo, TileProcess
from mapchete.tile import count_tiles
from mapchete._timer import Timer
Expand Down Expand Up @@ -145,16 +145,10 @@ def skip_tiles(self, tiles=None):
------
tuples : (tile, skip)
"""
def _skip(config, tile):
return tile, config.output_reader.tiles_exist(tile)

# only check for existing output in "continue" mode
if self.config.mode == "continue":
with concurrent.futures.ThreadPoolExecutor() as executor:
for future in concurrent.futures.as_completed(
(executor.submit(_skip, self.config, tile) for tile in tiles)
):
yield future.result()
yield from tiles_exist(config=self.config, process_tiles=tiles)
# otherwise don't skip tiles
else:
for tile in tiles:
yield (tile, False)
Expand Down
88 changes: 60 additions & 28 deletions mapchete/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from mapchete.config import get_zoom_levels
from mapchete.io import (
path_exists, path_is_remote, get_boto3_bucket, raster, relative_path
path_exists, path_is_remote, get_boto3_bucket, raster, relative_path, tiles_exist
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -148,8 +148,59 @@ def zoom_index_gen(

logger.debug("use the following index writers: %s", index_writers)

def _worker(tile):
# if there are indexes to write to, check if output exists
# all output tiles for given process area
logger.debug("determine affected output tiles")
output_tiles = set(
mp.config.output_pyramid.tiles_from_geom(
mp.config.area_at_zoom(zoom), zoom
)
)

# check which tiles exist in any index
logger.debug("check which tiles exist in index(es)")
existing_in_any_index = set(
tile for tile in output_tiles
if any(
[
i.entry_exists(
tile=tile,
path=_tile_path(
orig_path=mp.config.output.get_path(tile),
basepath=basepath,
for_gdal=for_gdal
)
)
for i in index_writers
]
)
)

logger.debug("{}/{} tiles found in index(es)".format(
len(existing_in_any_index), len(output_tiles))
)
# tiles which do not exist in any index
for tile, output_exists in tiles_exist(
mp.config, output_tiles=output_tiles.difference(existing_in_any_index)
):
tile_path = _tile_path(
orig_path=mp.config.output.get_path(tile),
basepath=basepath,
for_gdal=for_gdal
)
indexes = [
i for i in index_writers
if not i.entry_exists(tile=tile, path=tile_path)
]
if indexes and output_exists:
logger.debug("%s exists", tile_path)
logger.debug("write to %s indexes" % len(indexes))
for index in indexes:
index.write(tile, tile_path)
# yield tile for progress information
yield tile

# tiles which exist in at least one index
for tile in existing_in_any_index:
tile_path = _tile_path(
orig_path=mp.config.output.get_path(tile),
basepath=basepath,
Expand All @@ -160,30 +211,12 @@ def _worker(tile):
if not i.entry_exists(tile=tile, path=tile_path)
]
if indexes:
output_exists = mp.config.output.tiles_exist(output_tile=tile)
else:
output_exists = None
return tile, tile_path, indexes, output_exists

with concurrent.futures.ThreadPoolExecutor() as executor:
for task in concurrent.futures.as_completed(
(
executor.submit(_worker, i)
for i in mp.config.output_pyramid.tiles_from_geom(
mp.config.area_at_zoom(zoom), zoom
)
)
):
tile, tile_path, indexes, output_exists = task.result()
# only write entries if there are indexes to write to and output
# exists
if indexes and output_exists:
logger.debug("%s exists", tile_path)
logger.debug("write to %s indexes" % len(indexes))
for index in indexes:
index.write(tile, tile_path)
# yield tile for progress information
yield tile
logger.debug("%s exists", tile_path)
logger.debug("write to %s indexes" % len(indexes))
for index in indexes:
index.write(tile, tile_path)
# yield tile for progress information
yield tile


def _index_file_path(out_dir, zoom, ext):
Expand Down Expand Up @@ -496,7 +529,6 @@ def close(self):
rasterXSize=str(vrt_shape.width),
rasterYSize=str(vrt_shape.height),
)

# generate pretty XML and write
xmlstr = minidom.parseString(ET.tostring(vrt)).toprettyxml(indent=" ")
if self._bucket:
Expand Down
2 changes: 2 additions & 0 deletions mapchete/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from mapchete.io._path import (
path_is_remote,
path_exists,
tiles_exist,
absolute_path,
relative_path,
makedirs,
Expand All @@ -28,6 +29,7 @@
"tile_to_zoom_level",
"path_is_remote",
"path_exists",
"tiles_exist",
"absolute_path",
"relative_path",
"makedirs",
Expand Down
120 changes: 119 additions & 1 deletion mapchete/io/_path.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import concurrent.futures
import logging
import os
from urllib.request import urlopen
from urllib.error import HTTPError

from mapchete.io._misc import get_boto3_bucket

logger = logging.getLogger(__name__)


def path_is_remote(path, s3=True):
"""
Expand Down Expand Up @@ -42,7 +46,7 @@ def path_exists(path):
except HTTPError as e:
if e.code == 404:
return False
else:
else: # pragma: no cover
raise
elif path.startswith("s3://"):
bucket = get_boto3_bucket(path.split("/")[2])
Expand Down Expand Up @@ -112,3 +116,117 @@ def makedirs(path):
os.makedirs(path)
except OSError:
pass


def tiles_exist(config=None, output_tiles=None, process_tiles=None):
"""
Yield tiles and whether their output already exists or not.
Either "output_tiles" or "process_tiles" have to be provided.
The S3 part of the function are loosely inspired by
https://alexwlchan.net/2019/07/listing-s3-keys/
Parameters
----------
config : mapchete.config.MapcheteConfig
process_tiles : iterator
Yields
------
tuple : (tile, exists)
"""
if process_tiles is not None and output_tiles is not None: # pragma: no cover
raise ValueError("just one of 'process_tiles' and 'output_tiles' allowed")
elif process_tiles is None and output_tiles is None: # pragma: no cover
raise ValueError("one of 'process_tiles' and 'output_tiles' has to be provided")

basepath = config.output_reader.path
# make tiles unique
tiles = set(process_tiles) if process_tiles is not None else set(output_tiles)

# in case no tiles are provided
if not tiles:
return

# only on TileDirectories on S3
if (
not config.output_reader.path.endswith(config.output_reader.file_extension) and
basepath.startswith("s3://")
):
import boto3
basekey = "/".join(basepath.split("/")[3:])
bucket = basepath.split("/")[2]
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")

# determine zoom
zoom = next(iter(tiles)).zoom

# get all output tiles
if process_tiles:
output_tiles = set(
[
t
for process_tile in tiles
for t in config.output_pyramid.intersecting(process_tile)
]
)
else:
output_tiles = tiles

# create a mapping between paths and tiles
paths = dict()
# remember rows
rows = set()
for output_tile in output_tiles:
if output_tile.zoom != zoom: # pragma: no cover
raise ValueError("tiles of different zoom levels cannot be mixed")
path = config.output_reader.get_path(output_tile)

if process_tiles:
paths[path] = config.process_pyramid.intersecting(output_tile)[0]
else:
paths[path] = output_tile

rows.add(output_tile.row)

# remember already yielded tiles
yielded = set()
for row in rows:
# use prefix until row, page through api results
logger.debug("check existing tiles in row %s" % row)
prefix = os.path.join(*[basekey, str(zoom), str(row)])
logger.debug("read keys %s*" % os.path.join("s3://" + bucket, prefix))

for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
logger.debug("read next page")
try:
contents = page["Contents"]
except KeyError:
break

for obj in contents:
# get matching tile
tile = paths[os.path.join("s3://" + bucket, obj["Key"])]
# store and yield process tile if it was not already yielded
if tile not in yielded:
yielded.add(tile)
yield (tile, True)

# finally, yield all tiles which were not yet yielded as False
for tile in tiles.difference(yielded):
yield (tile, False)

else:
def _exists(tile):
if process_tiles:
return (tile, config.output_reader.tiles_exist(process_tile=tile))
else:
return (tile, config.output_reader.tiles_exist(output_tile=tile))

with concurrent.futures.ThreadPoolExecutor() as executor:
for future in concurrent.futures.as_completed(
(executor.submit(_exists, tile) for tile in tiles)
):
yield future.result()
3 changes: 2 additions & 1 deletion mapchete/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
from itertools import chain
import logging
import sys
import warnings

from mapchete._registered import drivers, processes
Expand Down Expand Up @@ -67,7 +68,7 @@ def redact(self, msg):

# lower stream output log level
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')
stream_handler = logging.StreamHandler()
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.WARNING)
stream_handler.addFilter(KeyValueFilter(key_value_replace=key_value_replace_patterns))
Expand Down

0 comments on commit 753e30b

Please sign in to comment.