# Generate map tiles from NYC taxi trip data using parallel method

在[上一篇文章](https://yeshuanova.github.io/blog/posts/implement-OSM-map-tiles/)中已展示過了簡單建立圖磚資料的方式。本文將改進前篇的圖磚建立方式，加速產生速度以及能處理大檔案。

- Concurrency (平行化)
    - 使用 [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) 套件將任務平行處理，讓多 CPU 能充分發揮處理能力。
    - 盡可能將處理演算法獨立可分散處理的部份盡量平行化。


- Large size file (單一大檔案)
  - 以 [Divide and Conquer](https://en.wikipedia.org/wiki/Divide_and_conquer_algorithm) 概念來分割檔案並單獨處理，最後再將結果合併。
      
    
- 加速 map tiles 建立速度
  - 先建立一個基底 Tiles 後，再由下往上合併的方式 (Bottom-Up)建立新 Tile，避免不必要的運算。


- 不使用 Log 方式而使用 [Equalization Histogram](https://en.wikipedia.org/wiki/Histogram_equalization) 繪製 Map tile 來取得較好的繪圖效果。


- 不繪製 aggregation 中 count 結果不到 **5** 的點位，避免離散雜訊資料影響結果。

## 資料前處理

### 下載 NYC Taxi Trip Data

使用 `wget` 指令取得 NYC Taxi trip data，這裡一樣使用 2016 年 5 月的資料。

```bash
wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2016-05.csv
```

### 將原始 CSV 分割為小檔案

依照資料筆數切割為許多小檔案

```bash
# 建立 split 資料夾
mkdir split

# 以 1,000,000 lines 為單位，將原始 csv 檔案分割為數個檔案（不包含 header）
tail -n +2 yellow_tripdata_2016-05.csv | split -d -l 1000000 - split/trip.csv.
```

完成後可得到 `trip.csv.00`，`trip.csv.01` 等不包含標頭的分割檔

## 轉換 csv 內容到目標格式

將 csv 檔案轉換到包含 Web Mercator 格式座標(epsg:3857)以及所在基底 Tile 的座標資料。

- 建立 csv 來源與目標檔案列表

In [None]:
import os, glob, csv, mercantile
import gzip, pickle, yaml
import numpy as np
import pandas as pd
import concurrent.futures as futures
import datashader as ds

from mercantile import Tile

source_csvs = glob.glob(os.path.join('split', '*.csv*'))
epsg3857_csvs = [os.path.join(f'{os.path.dirname(f)}-epsg3857', os.path.join(os.path.basename(f))) for f in source_csvs]

output_folder = os.path.join('map')
temp_root = os.path.join(output_folder, 'temp')
agg_root = os.path.join(output_folder, 'agg')
tile_root = os.path.join(output_folder, 'tile')

base_zoom = 14


In [None]:
# 轉換 source csv 到  target csv file，並將
def convTripGpsToWebMercator(source, target, zoom):
    os.makedirs(os.path.dirname(target), exist_ok=True)
    with open(source, 'r') as cf:
        creader = csv.reader(cf, delimiter=',')
        with open(target, 'w') as wf:
            cwriter = csv.writer(wf, delimiter=',')
            cwriter.writerow(['x', 'y', 'zoom', 'xtile', 'ytile'])
            for row in creader:
                try:
                    lng, lat = float(row[5]), float(row[6])
                    if (-180.0 <= lng <= 180.0) and (-85.0511 <= lat <= 85.0511):
                        x, y = ds.utils.lnglat_to_meters(lng, lat)
                        xtile, ytile, tile_zoom = mercantile.tile(lng, lat, zoom)
                        cwriter.writerow([x, y, tile_zoom, xtile, ytile])
                except ValueError:
                    continue

def convTripGpsToWebMercatorWrapper(data):
    source, target, zoom = data
    convTripGpsToWebMercator(source, target, zoom)

- 以 concurrent.futures 同步執行轉換步驟

In [None]:
with futures.ProcessPoolExecutor() as executor:
    datas = zip(source_csvs, epsg3857_csvs, [base_zoom] * len(source_csvs))
    fs = executor.map(convTripGpsToWebMercatorWrapper, datas)
    futures.as_completed(fs)

可得到如下方格式的 csv 檔案

| x | y | zoom | xtile  | ytile |
|-|-|-|-|:-|
| Web Mercator - X | Web Mercator - Y | 地圖基底 zoom | Map Tile X 位置 | Map Tile Y 位置 |

## 建立 map tile aggregation file

建立基底 zoom 中，所有包含資料的 map tile aggregation files

- 用 Pandas 讀取分割的 csv 檔案
- 執行 Groupby() 計算所需建立的 tile group，避免沒必要的 aggregate 計算
- 使用 concurrent.futures 進行平行處理，加快計算速度


In [None]:
# 取得 aggregation file path 
def getAggFilePath(root, x, y, z):
    return os.path.join(root, str(z), str(x), f'{y}.pkl.gz') 

# 取得 aggregation yaml path
def getAggYamlFilePath(root, x, y, z):
    return os.path.join(root, str(z), str(x), f'{y}.yaml')

# 使用 Pickle 序列化 Aggregation 並儲存成  gzip 格式的檔案
def serializeAggToFile(agg, file_path):
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with gzip.open(file_path, mode='wb') as file:
        pickle.dump(agg, file)        

# 建立 Aggregation 檔案的 Yaml 檔
def serializeAggYaml(agg, file_path):
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, mode='w') as file:
        obj = {"max_count": int(agg.values.max())}
        yaml.dump(obj, file, default_flow_style=False)

In [None]:
import datashader as ds

# 依照 Tile 位置建立 datashader.Canvas
def mapTileCanvas(xtile, ytile, zoom, tile_size=(256, 256)):
    bounds = mercantile.xy_bounds(xtile, ytile, zoom)
    canvas = ds.Canvas(plot_width = tile_size[0],
                       plot_height = tile_size[1],
                       x_range = (bounds.left, bounds.right),
                       y_range = (bounds.bottom, bounds.top))
    return canvas

In [None]:
def makeTilesAggregation(source, root):
    
    if not os.path.exists(source):
        raise ValueError(f'Source file {source} doesn\'t exist')
    
    os.makedirs(root, exist_ok=True)
    
    df = pd.read_csv(source,
                     usecols=['x', 'y', 'zoom', 'xtile', 'ytile'],
                     dtype={'x':np.float32,
                            'y':np.float32,
                            'zoom':np.int8,
                            'xtile':np.int32,
                            'ytile':np.int32})
    
    for ((zoom, xtile, ytile), data) in df.groupby(by=['zoom', 'xtile', 'ytile']):
        agg = mapTileCanvas(xtile, ytile, zoom).points(data, 'x', 'y')
        serializeAggToFile(agg, getAggFilePath(root, xtile, ytile, zoom))
        serializeAggYaml(agg, getAggYamlFilePath(root, xtile, ytile, zoom))
        
def makeTilesAggregationWrapper(data):
    source, root = data
    makeTilesAggregation(source, root)

In [None]:
# 平行化建立 Map tiles
with futures.ProcessPoolExecutor() as executor:
    targets_root = [os.path.join(temp_root, os.path.basename(f)) for f in epsg3857_csvs]
    
    datas = zip(epsg3857_csvs, targets_root)
    
    fs = executor.map(makeTilesAggregationWrapper, datas)
    futures.as_completed(fs)    

## Combine multiple aggregation

合併所有 map tiles aggregation files 成為一個包含所有分割 csv 資料的整合 aggregation 結果。

In [None]:
# 由 path 得出 aggregation file 對應的 map tile
# Return: (xtile, ytile, zoom)
def getTileFromPath(agg_path):
    sep = agg_path.split(os.sep)
    if len(sep) < 3:
        raise ValueError("agg_path can not convert to tile path")        
    return (int(sep[-2]), int(sep[-1].split('.')[0]), int(sep[-3]))

def readAggregationFile(file):
    with gzip.open(file, 'rb') as f:
        return pickle.load(f)


In [None]:
# 取得所有需要合併的 map tile 位置
def getCombineTiles(folders):    
    tile_set = set()
    for folder in folders:
        tiles = glob.glob(os.path.join(folder, '*', '*', '*.pkl.gz'))
        for tile_path in tiles:
            x, y, z = getTileFromPath(tile_path)
            tile_set.add((x, y, z))
    return list(tile_set)

In [None]:
from functools import reduce

def combineAggregation(x, y, z, temp_folder, target_folder):
    os.makedirs(target_folder, exist_ok=True)
    files = glob.glob(os.path.join(temp_folder, '*', str(z), str(x), f'{y}.pkl.gz'))
    
    aggs = map(readAggregationFile, files)
    agg = reduce(lambda x, y: x + y, aggs)

    serializeAggToFile(agg, getAggFilePath(target_folder, x, y, z))
    serializeAggYaml(agg, getAggYamlFilePath(target_folder, x, y, z))

def combineAggregationWrapper(datas):
    tiles, temp ,target = datas
    for tile in tiles:
        x, y, z = tile
        combineAggregation(x, y, z, temp, target)

In [None]:
def chunksSize(num, factor):
    s = int(factor * num / os.cpu_count())
    return s if s > 0 else 1
    
def chunks(datas, n):
    for i in range(0, len(datas), n):
        yield datas[i:i + n]

In [None]:
# 平行化合併處理方式
with futures.ProcessPoolExecutor() as executor:
        
    tiles = getCombineTiles(glob.glob(os.path.join(temp_root, '*')))
    tiles_chunk = list(chunks(tiles, chunksSize(len(tiles), 4)))
    tiles_tuple = zip(tiles_chunk, [temp_root] * len(tiles_chunk), [agg_root] * len(tiles_chunk))
    
    fs = executor.map(combineAggregationWrapper, tiles_tuple)
    futures.as_completed(fs)

完成後可以得到合併後的基底 map tile 的 aggregation file

## 使用 Bottom-Up 方式合併並產生新圖層

- 從基底 tile 的 aggregation 的檔案中建立 parents tile 列表
- 讀取 parents tile 中所有的 child tiles 檔案並合併資料後，在寫入 parents tile
  - 如果有的 child tiles 不存在，則建立內部為空值的 map tile
- 不斷往上建立 parents tiles 直到完成為止

### 範例

- 假如現在有一個 tile 座標為 (xtile, ytile, zoom) = (23, 43, 6)，則該 tile 的 parenet tile 為 ptile = (11, 21, 5)。

- 而 pTile 的 四個 children tiles 為 (22, 42, 6), (23, 42, 6), (23, 43, 6), (22, 43, 6)。

- 因此 pTile 的 aggregation file 可從四個 children tiles 中合併得來

In [None]:

# 建立用來讓 canvas 產生 aggregation 的 dummy dataframe
dummy_df = pd.DataFrame.from_dict(data={'x':[0], 'y': [0]})


In [None]:
# 檢查是否為使用 gzip 壓縮後的 pickle file
def isPickleFile(file):
    sep = file.split('.')
    if len(sep) < 3:
        return False
    if sep[-1] != 'gz' or sep[-2] != 'pkl':
        return False
    return True

In [None]:
# 將 matrix size 縮小一半 (ex. 256x256 -> 128x128)
def poolmat(m):
    return m[::2, ::2] + m[::2, 1::2] + m[1::2, 1::2] + m[1::2, ::2]

In [None]:
# 建立特定 Tile 的 Aggregation，若不存在則 則回傳 empty aggregation (created by dummy dataframe)
def makeTileAggegation(agg_root, tile):
    file_path = getAggFilePath(agg_root, *tile)
    if os.path.exists(file_path):
        try:
            with gzip.open(file_path, 'rb') as f:
                return pickle.load(f)
        except:
            return mapTileCanvas(*tile).points(dummy_df, 'x', 'y')
    else:
        return mapTileCanvas(*tile).points(dummy_df, 'x', 'y')

In [None]:

# 以 Bottom-Up 方式建立新的 aggregation data
def combineTileButtomUp(agg_root, x, y, z):
    
    agg = mapTileCanvas(x, y, z).points(dummy_df, 'x', 'y')
    
    row, col = agg.values.shape
    row_c, col_c = int(row/2), int(col/2)
    
    # lt = left-top, rt = right-top, rb = right-bottom, lb = left-bottom
    lt, rt, rb, lb = mercantile.children(Tile(x, y, z)) 
        
    agg.values[0:row_c, 0:col_c] = poolmat(makeTileAggegation(agg_root, lb).values)
    agg.values[0:row_c, col_c:col] = poolmat(makeTileAggegation(agg_root, rb).values)
    agg.values[row_c:row, col_c:col] = poolmat(makeTileAggegation(agg_root, rt).values)
    agg.values[row_c:row, 0:col_c] = poolmat(makeTileAggegation(agg_root, lt).values)
    
    return agg

In [None]:

def getParentTiles(root, zoom):
    ptile_set = set()
    files = glob.glob(os.path.join(root, str(zoom), '*', '*.pkl.gz'))
    for x, y, z in map(getTileFromPath, files):
        ptile = mercantile.parent(Tile(x, y, z))
        ptile_set.add((ptile.x, ptile.y, ptile.z))
    
    return list(ptile_set)

In [None]:
def makeTilesBottomUp(root, x, y, z):
    agg = combineTileButtomUp(root, x, y, z)
    serializeAggToFile(agg, getAggFilePath(root, x, y, z))
    serializeAggYaml(agg, getAggYamlFilePath(root, x, y, z))

def makeTilesBottomUpWrapper(datas):
    for root, x, y, z in datas:
        makeTilesBottomUp(root, x, y, z)
    

In [None]:

for zoom in range(base_zoom, 0, -1):
    print(f'Make parents tiles from zoom {zoom}')
    with futures.ProcessPoolExecutor() as executor:
        
        tiles = [(agg_root, x, y, z) for x, y, z in getParentTiles(agg_root, zoom)]        
        tiles_chunk = chunks(tiles, chunksSize(len(tiles), 4))
        
        fs = executor.map(makeTilesBottomUpWrapper, tiles_chunk)
        futures.as_completed(fs)

## 建立每個 zoom 中的點位最大值資料

In [None]:
# 取得儲存 Yaml 檔案中，最大點數的資訊
def getYamlMaxCount(file_path):
    with open(file_path, 'r') as f:
        try:
            return yaml.load(f)['max_count']
        except BaseException:
            return 0
    return 0

# 計算 zoom 中的最大點數資料
def getZoomMaxCount(zoom_root):
    max_count = 0;
    for root, dirs, files in os.walk(zoom_root):
        for file in files:
            exts = file.split(os.extsep)
            
            if os.path.splitext(file)[1] != '.yaml':
                continue
                
            mc = getYamlMaxCount(os.path.join(root, file))
            max_count = mc if max_count < mc else max_count

    return max_count

In [None]:

for zoom in os.listdir(agg_root):
    
    max_count = getZoomMaxCount(os.path.join(agg_root, zoom))
    zoom_yaml = os.path.join(agg_root, zoom, 'zoom_config.yaml')
    
    with open(zoom_yaml, 'w') as file:
        yaml_obj = {'max_count': max_count}
        yaml.dump(yaml_obj, file, default_flow_style=False)

## 產生 Aggregation 檔案對應的 Tile 影像

In [None]:
import datashader.transfer_functions as tf
from colorcet import fire

def getRenderImage(img_root, agg_path):
    x, y, z = getTileFromPath(agg_path)
    return os.path.join(img_root, f'{z}', f'{x}', f'{y}.png')   


In [None]:
# 從 config.yaml 檔建立
def getMaxCountDict(root):
    max_dict = {}
    
    for folder in os.listdir(agg_root):
        zoom_conf_f = os.path.join(root, folder, 'zoom_config.yaml')
        with open(zoom_conf_f, 'r') as f:
            try:
                obj = yaml.load(f)
                max_dict[folder] = int(obj['max_count'])
            except ValueError as e:
                print('Get max count in zoom ', folder, ': Error')
                print(e)
    return max_dict

In [None]:
def makeTileImage(source, target, method='eq_hist', min_pts = 0, max_dict = {}):

    if not isPickleFile(source):
        return
    
    with gzip.open(source, 'rb') as f:
        
        x, y, z = getTileFromPath(source)
        os.makedirs(os.path.dirname(target), exist_ok=True)
        
        agg = pickle.load(f)
        
        if method == 'eq_hist':
            img = tf.shade(agg.where(agg > min_pts), cmap=fire)
        elif method == 'log':
            img = tf.shade(agg.where(agg > min_pts), cmap=fire, how='log', span=[0, max_dict[str(z)]])
        else:
            raise ValueError(f'No render method {method}')

        with open(target, mode='wb') as out:
            out.write(img.to_bytesio(format='png').read())

def makeTileImageWrapper(datas):
    files, method, min_pts, max_count_dict = datas
    for source, target in files:
        makeTileImage(source, target, method, min_pts, max_count_dict)

In [None]:

def renderTileList(agg_root, tile_root):
    render_agg_list = glob.glob(os.path.join(agg_root, '*', '*', '*.pkl.gz'))
    render_img_list = [getRenderImage(tile_root, agg_path) for agg_path in render_agg_list]
    return list(zip(render_agg_list, render_img_list))

def parallelData(render_list, method='eq_hist', min_pts=0):
    
    datas = list(chunks(render_list, chunksSize(len(render_list), 4)))
    
    method_list = [method] * len(datas)
    min_pts_list = [min_pts] * len(datas)
    
    max_dict = {}
    if method == 'log':
        max_dict = getMaxCountDict(agg_root)
    max_dict_list = [max_dict] * len(datas)
    
    datas = list(zip(datas, method_list, min_pts_list, max_dict_list))

    return datas


In [None]:
def renderMapTiles(agg_root, tile_root, method='eq_hist', min_pts=0):
    with futures.ProcessPoolExecutor() as executor:
        render_list = renderTileList(agg_root, tile_root)
        datas = parallelData(render_list, method=method, min_pts=min_pts)
        
        fs = executor.map(makeTileImageWrapper, datas)
        futures.as_completed(fs)

- Generate tile images

In [None]:
renderMapTiles(agg_root, os.path.join(output_folder, 'tile'))
renderMapTiles(agg_root, os.path.join(output_folder, 'tile_log_10p'), method='log', min_pts=10)

## 使用 Folium 顯示圖層

In [None]:
import folium

# 使用 Carto Dark 建立底圖
fmap = folium.Map(location=[40.772562, -73.974039],
                  tiles='https://cartodb-basemaps-{s}.global.ssl.fastly.net/dark_all/{z}/{x}/{y}.png',
                  max_zoom=14,
                  zoom_start=12,
                  attr='Carto Dark')

# 加入放在 GitHub 存放的 map tiles 位置
fmap.add_tile_layer(tiles='https://raw.githubusercontent.com/yeshuanova/nyc_taxi_trip_map/master/map/tile/{z}/{x}/{y}.png',
                    attr='NYC taxi pickup Heatmap',
                    max_zoom=14)

# 儲存成 html 檔案
fmap.save('index.html')

# 顯示地圖
fmap