In [1]:
import requests
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import io
from PIL import Image
import shapely.geometry
import pyproj
from dataclasses import dataclass
import json
import re
import math

BASE_URL = 'https://maps.googleapis.com/maps/api/staticmap'
PROJ_DIR = './'
CACHE_DIR = PROJ_DIR + 'cache/'
TESTS_DIR = PROJ_DIR + 'tests/'
STITCHED_DIR = PROJ_DIR + 'stitched/'
RAW_TILES_PATH = 'raw_tiles/'
CROP_TILES_PATH = 'crop_tiles/'

In [81]:
# Helpers
from math import log, pow
import os, os.path

def get_number_of_files_in_dir(dir=PROJ_DIR) -> int:
    return len([name for name in os.listdir(dir)])

def zoom_level_to_metres(zoom: int) -> float:
    return (40000 / pow(2, zoom)) * 2000

def metres_to_zoom_level(metres: float) -> float:
    """
    meters per pixel
    """
    return log(40000 / (metres / 2000), 2)

def get_google_api_key() -> str:
    with open(PROJ_DIR + 'google_api_key') as f:
        return f.readline()

def format_geojson(gridpoints: list[shapely.geometry.Point]):
    features = []
    for p in gridpoints:
        features.append({
            'type': 'Feature', 
            # 'properties': {'marker-color': '#fffb00', 'marker-size': 'small', 'marker-symbol': ''}, 
            'properties': {}, 
            'geometry': {'type': 'Point', 'coordinates': [p.lon, p.lat]}
        })
    geojson = {'type': 'FeatureCollection', 'features': features}
    return json.dumps(geojson, indent=4, sort_keys=True)

def delete_dir_files(dir):
    for f in os.listdir(dir):
        path = os.path.join(dir, f)
        if not os.path.isdir(path):
            os.remove(path)

def image_to_byte_array(image: Image):
    imgByteArr = io.BytesIO()
    image.save(imgByteArr, format=image.format)
    imgByteArr = imgByteArr.getvalue()
    return imgByteArr


assert metres_to_zoom_level(zoom_level_to_metres(20)) == 20

# https://github.com/hulleylm/google_maps_tiler

def latLngToPoint(mapWidth, mapHeight, lat, lng):
    x = (lng + 180) * (mapWidth/360)
    y = ((1 - math.log(math.tan(lat * math.pi / 180) + 1 / math.cos(lat * math.pi / 180)) / math.pi) / 2) * mapHeight
    return(x, y)

def pointToLatLng(mapWidth, mapHeight, x, y):
    lng = x / mapWidth * 360 - 180
    n = math.pi - 2 * math.pi * y / mapHeight
    lat = (180 / math.pi * math. atan(0.5 * (math.exp(n) - math.exp(-n))))
    return(lat, lng)

def getImageBounds(mapWidth, mapHeight, xScale, yScale, lat, lng):
    centreX, centreY = latLngToPoint(mapWidth, mapHeight, lat, lng)
    southWestX = centreX - (mapWidth/2)/ xScale
    southWestY = centreY + (mapHeight/2)/ yScale
    SWlat, SWlng = pointToLatLng(mapWidth, mapHeight, southWestX, southWestY)
    northEastX = centreX + (mapWidth/2)/ xScale
    northEastY = centreY - (mapHeight/2)/ yScale
    NElat, NElng = pointToLatLng(mapWidth, mapHeight, northEastX, northEastY)
    return[SWlat, SWlng, NElat, NElng]

def getLatStep(mapWidth, mapHeight, yScale, lat, lng):
    pointX, pointY = latLngToPoint(mapWidth, mapHeight, lat, lng)
    steppedPointY = pointY - ((mapHeight)/ yScale)
    newLat, originalLng = pointToLatLng(mapWidth, mapHeight, pointX, steppedPointY)
    latStep = lat - newLat
    return (latStep)

In [3]:
class Tile:
    SUCCESS = "SUCCESS"
    FROM_CACHE = "FROM_CACHE"

    def __init__(self, lat: float, lon: float, scale: int, zoom: int, size: int=200, type: str='satellite', overwrite: bool=False):
        self.copyright_height = 50
        self.lat = lat
        self.lon = lon
        self.zoom = zoom
        self.scale = scale
        self.width = size
        self.height = size + self.copyright_height*2
        self.map_type = type
        self.url = self._get_tile_url()
        self.overwrite = overwrite
    

    def _get_tile_url(self) -> str:
        url = BASE_URL + '?key=' + get_google_api_key()
        url += '&maptype=' + self.map_type
        url += '&center=' + str(self.lat) + ',' + str(self.lon)
        url += '&zoom=' + str(self.zoom)
        url += '&scale=' + str(self.scale)
        url += '&size=' + str(self.width) + 'x' + str(self.height)
        return url
    

    def _fetch(self, url) -> bytes:
        filename = re.sub(r'[^\w\-_\. ]', '_', self.url)
        path = os.path.join(CACHE_DIR, filename)
        if os.path.exists(path):
            img_data = image_to_byte_array(Image.open(path))
            return [img_data, None, True]

        r = requests.get(url)
        response = r.content
        if r.status_code != 200:
            return [None, str(response), False]
        else:
            self._save_tile(response, CACHE_DIR, filename)
            return [response, None, False]


    def _save_tile(self, img_data: bytes, dir: str, file_name: str) -> str:
        path = dir + file_name
        with open(path, 'wb') as handler:
            handler.write(img_data)
        return path


    def _remove_google_logo(self, path):
        img = Image.open(path)
        width, height = img.size
        left = 0
        top = self.copyright_height*self.scale
        right = width
        bottom = height - self.copyright_height*self.scale
        cropped = img.crop((left, top, right, bottom))
        byte_arr = io.BytesIO()
        cropped.save(byte_arr, format='PNG')
        return byte_arr.getvalue()


    def _show_tile(self, img_path):
        img_data = mpimg.imread(img_path)
        plt.figure(figsize = (10,10))
        plt.imshow(img_data)
        plt.axis('off')


    def _file_exists(self, file_name, dir):
        if self.overwrite: 
            return False
        raw_path = dir + RAW_TILES_PATH + file_name
        crop_path = dir + CROP_TILES_PATH + file_name
        return os.path.exists(raw_path) and os.path.exists(crop_path)


    def run(self, test_name="test", show_image=True) -> str:
        file_name = str(self.lat) + "_" + str(self.lon) + '.png'
        dir = TESTS_DIR + test_name + "/"
        if not self._file_exists(file_name, dir):
            print("Fetching new tile... ")
            print(self.url)
            [raw_img_data, img_fetch_error, is_cached] = self._fetch(self.url)
            if raw_img_data:
                raw_img_path = self._save_tile(raw_img_data, dir + RAW_TILES_PATH, file_name)
                cropped_data = self._remove_google_logo(raw_img_path)
                cropped_img_path = self._save_tile(cropped_data, dir + CROP_TILES_PATH, file_name)
                if show_image:
                    self._show_tile(cropped_img_path)
                return Tile.FROM_CACHE if is_cached else Tile.SUCCESS
            else:
                return "Failed to fetch image from Google Maps: " + img_fetch_error
        else:
            return "File " + file_name + " already exists, skipping..."


In [42]:
@dataclass
class Coordinate:
    lat: float
    lon: float

@dataclass
class MapGridArea:
    nw: Coordinate
    se: Coordinate


class MapGridTiles:
    def __init__(self, zoom, tile_size):
        # Step size in metres
        self.zoom = zoom
        self.stepsize = zoom_level_to_metres(zoom) 
        # Set up transformers, EPSG:3857 is metric, same as EPSG:900913
        self.to_proxy_transformer = pyproj.Transformer.from_crs('epsg:4326', 'epsg:3857')
        self.to_original_transformer = pyproj.Transformer.from_crs('epsg:3857', 'epsg:4326')
        self.size = (0, 0)
        self.map_size = 256
        self.tile_size = tile_size

    def add_m_to_coordinate(self, lat, lon, x, y):
        coordinate = shapely.geometry.Point((lat, lon))
        transformed = self.to_proxy_transformer.transform(coordinate.x, coordinate.y)
        x = transformed[0] + x
        y = transformed[1] + y
        return shapely.geometry.Point(self.to_original_transformer.transform(x, y))

    def create_grid(self, area: MapGridArea) -> list[Coordinate]:
        xScale = math.pow(2, self.zoom) / (self.tile_size / self.map_size)
        yScale = math.pow(2, self.zoom) / (self.tile_size / self.map_size)

        startLat = area.nw.lat
        startLng = area.nw.lon

        startCorners = getImageBounds(self.map_size, self.map_size, xScale, yScale, startLat, startLng)
        lngStep = startCorners[3] - startCorners[1]

        col = 0
        row = 0
        lat = startLat
        gridpoints = []
        while (lat >= area.se.lat):
            lng = startLng
            row = 0

            while lng <= area.se.lon:
                gridpoints.append(Coordinate(lat, lng))
                row += 1
                lng += lngStep

            col -= 1
            lat += getLatStep(self.map_size, self.map_size, yScale, lat, lng)
        self.size = (abs(row), abs(col))
        return gridpoints


    # def save_to_csv(self, gridpoints, test_name: str, file_name: str='gridpoints'):
    #     path = TESTS_DIR + test_name + '/' + file_name + '.csv'
    #     d = self.stepsize / 2
    #     with open(path, 'w') as of:
    #         of.write('lat,long,tl_lat,tl_lon,tr_lat,tr_lon,bl_lat,bl_lon,br_lat,br_lon,size\n')
    #         for p in gridpoints:
    #             tl = self.add_m_to_coordinate(p.x, p.y, -d, -d)
    #             tr = self.add_m_to_coordinate(p.x, p.y, +d, -d)
    #             bl = self.add_m_to_coordinate(p.x, p.y, -d, +d)
    #             br = self.add_m_to_coordinate(p.x, p.y, +d, +d)
    #             of.write('{:f},{:f},{:f},{:f},{:f},{:f},{:f},{:f},{:f},{:f},{:f}\n'.format(
    #                 p.x, p.y,
    #                 tl.x, tl.y,
    #                 tr.x, tr.y,
    #                 bl.x, bl.y,
    #                 br.x, br.y,
    #                 self.stepsize
    #             ))
    
    def save_to_geo_json(self, gridpoints: list[shapely.geometry.Point], test_name: str, file_name: str='geojson'):
        path = TESTS_DIR + test_name + '/' + file_name + '.geojson'
        with open(path, 'w') as of:
            of.write(format_geojson(gridpoints))
            

In [5]:
def merge_images(images, rows: int, columns: int, size: float=None):
  """Merge two images into one, displayed side by side
  :param images: PIL list of image files
  :param rows: number of rows in grid
  :param columns: number of columns in grid
  :param image_width: pixel width of each tile
  :param image_height: pixel height of each tile
  :return: the merged Image object
  """

  imgs = images.copy()
  _row, _column = 0, 0

  (_image_width, _image_height) = images[0].size
  image_width = size or _image_width
  image_height = size or _image_height

  result = Image.new('RGB', (rows*image_width, columns*image_height))

  def print_progress(): 
    print(str(len(imgs)) + ' of ' + str(len(images)) + ' images remaining...')
  
  while len(imgs) > 0:
    print_progress()
    img = imgs.pop()
    img.resize((image_width, image_height), Image.ANTIALIAS)
    result.paste(im=img, box=(_row*image_width, _column*image_height))

    _row += 1
    if _row >= rows:
      _row = 0
      _column += 1
  print_progress()

  return result


def stitch_files(dir, rows: int, columns: int, tile_size: float=100):
  filenames = [f for f in os.listdir(dir) if os.path.isfile(os.path.join(dir, f)) and f.endswith('.png')]
  filenames.sort()
  images = list(map(lambda x: Image.open(dir + x), filenames))
  return merge_images(images, rows=rows, columns=columns, size=tile_size)

In [78]:
def run_test(test_name: str, area: MapGridArea, zoom_level: int, tile_size: int, clear=False):
    dir = TESTS_DIR + test_name + '/'
    if clear is True:
        delete_dir_files(dir)
        delete_dir_files(os.path.join(dir + RAW_TILES_PATH))
        delete_dir_files(os.path.join(dir + CROP_TILES_PATH))

    tiles = MapGridTiles(zoom=zoom_level, tile_size=tile_size)
    gridpoints = tiles.create_grid(area)
    l = str(len(gridpoints))
    rows, columns = tiles.size
    print(l + ' tiles: ' + str(rows) + 'x' + str(columns))
    tiles.save_to_geo_json(gridpoints, test_name, 'gridpoints_zl' + str(zoom_level))

    # i = 0
    # for gridpoint in gridpoints:
    #     print(">>> TILE " + str(i) + ' of ' + l)
    #     print(">>>", gridpoint.lat, gridpoint.lon)
    #     tile = Tile(lat=gridpoint.lat, lon=gridpoint.lon, zoom=zoom_level, size=tile_size, scale=1)
    #     status = tile.run(test_name, show_image=False)
    #     print(status)
    #     i += 1

    # stitched = stitch_files(dir + "crop_tiles/", rows, columns, tile_size)
    # stitched.save(STITCHED_DIR + test_name + '.png')
    # print("Saved.")


def how_many_tiles(area: MapGridArea, zoom_level, tile_size: int):
    tiles = MapGridTiles(zoom_level, tile_size)
    gridpoints = tiles.create_grid(area)
    rows, columns = tiles.size
    print('area has ' + str(len(gridpoints)) + ' tiles: ' + str(rows) + 'x' + str(columns))

# Run code below here 👇

In [53]:
ALL_EDINBURGH_MAP_AREA = MapGridArea(nw=Coordinate(55.990219, -3.330831), se=Coordinate(55.881181, -3.110212))
UNIVERSITY_MAP_AREA = MapGridArea(nw=Coordinate(55.9455, -3.192), se=Coordinate(55.942, -3.185))
APPLETON_MAP_AREA = MapGridArea(nw=Coordinate(55.9446, -3.1877), se=Coordinate(55.9440, -3.1860))
PEFF_PITCH_MAP_AREA = MapGridArea(nw=Coordinate(55.932, -3.16), se=Coordinate(55.929, -3.153))

In [86]:
how_many_tiles(PEFF_PITCH_MAP_AREA, 17, 200)

area has 12 tiles: 4x3


In [82]:
run_test('FULL', ALL_EDINBURGH_MAP_AREA, 18, 500, clear=True)

6059 tiles: 83x73
