Skip to content

Commit

Permalink
use boto paging to check multiple tiles at once
Browse files Browse the repository at this point in the history
  • Loading branch information
ungarj committed Jul 6, 2020
1 parent ca7c450 commit ebb457a
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 9 deletions.
12 changes: 3 additions & 9 deletions mapchete/_core.py
@@ -1,13 +1,13 @@
"""Main module managing processes."""

from cachetools import LRUCache
import concurrent.futures
import logging
import multiprocessing
import threading

from mapchete.config import MapcheteConfig
from mapchete.errors import MapcheteNodataTile
from mapchete.io import process_tiles_exist
from mapchete._processing import _run_on_single_tile, _run_area, ProcessInfo, TileProcess
from mapchete.tile import count_tiles
from mapchete._timer import Timer
Expand Down Expand Up @@ -145,16 +145,10 @@ def skip_tiles(self, tiles=None):
------
tuples : (tile, skip)
"""
def _skip(config, tile):
return tile, config.output_reader.tiles_exist(tile)

# only check for existing output in "continue" mode
if self.config.mode == "continue":
with concurrent.futures.ThreadPoolExecutor() as executor:
for future in concurrent.futures.as_completed(
(executor.submit(_skip, self.config, tile) for tile in tiles)
):
yield future.result()
yield from process_tiles_exist(config=self.config, process_tiles=tiles)
# otherwise don't skip tiles
else:
for tile in tiles:
yield (tile, False)
Expand Down
2 changes: 2 additions & 0 deletions mapchete/io/__init__.py
Expand Up @@ -15,6 +15,7 @@
from mapchete.io._path import (
path_is_remote,
path_exists,
process_tiles_exist,
absolute_path,
relative_path,
makedirs,
Expand All @@ -28,6 +29,7 @@
"tile_to_zoom_level",
"path_is_remote",
"path_exists",
"process_tiles_exist",
"absolute_path",
"relative_path",
"makedirs",
Expand Down
96 changes: 96 additions & 0 deletions mapchete/io/_path.py
@@ -1,9 +1,14 @@
from collections import defaultdict
import concurrent.futures
import logging
import os
from urllib.request import urlopen
from urllib.error import HTTPError

from mapchete.io._misc import get_boto3_bucket

logger = logging.getLogger(__name__)


def path_is_remote(path, s3=True):
"""
Expand Down Expand Up @@ -112,3 +117,94 @@ def makedirs(path):
os.makedirs(path)
except OSError:
pass


def process_tiles_exist(config=None, process_tiles=None):
"""
Yield process tiles and whether their output already exists or not.
The S3 part of the function are loosely inspired by
https://alexwlchan.net/2019/07/listing-s3-keys/
Parameters
----------
config : mapchete.config.MapcheteConfig
process_tiles : iterator
Yields
------
tuple : (process_tile, exists)
"""
basepath = config.output_reader.path

# only on TileDirectories on S3
if (
not config.output_reader.path.endswith(config.output_reader.file_extension) and
basepath.startswith("s3://")
):
basekey = "/".join(basepath.split("/")[3:])
bucket = basepath.split("/")[2]
import boto3
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")

# make process tiles unique
process_tiles = set(process_tiles)
# determine current zoom
zoom = next(iter(process_tiles)).zoom
# get all output tiles for process tiles
output_tiles = set(
[
t
for process_tile in process_tiles
for t in config.output_pyramid.intersecting(process_tile)
]
)
# create a mapping between paths and process_tiles
paths = dict()
# group process_tiles by row
rowgroups = defaultdict(list)
# remember already yielded process_tiles
yielded = set()
for output_tile in output_tiles:
if output_tile.zoom != zoom:
raise ValueError("tiles of different zoom levels cannot be mixed")
path = config.output_reader.get_path(output_tile)
process_tile = config.process_pyramid.intersecting(output_tile)[0]
paths[path] = process_tile
rowgroups[process_tile.row].append(process_tile)
# use prefix until row, page through api results
for row, tiles in rowgroups.items():
logger.debug("check existing tiles in row %s" % row)
prefix = os.path.join(*[basekey, str(zoom), str(row)])
logger.debug(
"read keys %s*" % os.path.join("s3://" + bucket, prefix)
)

for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
logger.debug("read next page")
try:
contents = page["Contents"]
except KeyError:
break
for obj in contents:
path = obj["Key"]
# get matching process_tile
process_tile = paths[os.path.join("s3://" + bucket, path)]
# store and yield process tile if it was not already yielded
if process_tile not in yielded:
yielded.add(process_tile)
yield (process_tile, True)

# finally, yield all process tiles which were not yet yielded as False
for process_tile in process_tiles.difference(yielded):
yield (process_tile, False)

else:
def _exists(tile):
return (tile, config.output_reader.tiles_exist(tile))
with concurrent.futures.ThreadPoolExecutor() as executor:
for future in concurrent.futures.as_completed(
(executor.submit(_exists, tile) for tile in process_tiles)
):
yield future.result()

0 comments on commit ebb457a

Please sign in to comment.