In [1]:
import seaborn as sns
import metapack as mp
import pandas as pd
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display 
from itertools import chain 
from tqdm import tqdm
import libgeohash as gh
import shapely
from shapely.geometry import Point
from shapely.wkt import loads as loads_wkt
from geoid.censusnames import stusab
import rowgenerators as rg
import utm
import dask
import dask.dataframe as dd
from demosearch.util import munge_pbar,  run_mp, gh_data_path, disaggregate

tqdm.pandas()

%matplotlib inline
sns.set_context('notebook')
mp.jupyter.init()

utm_crs = 26911


In [2]:
#pkg = mp.jupyter.open_package()
pkg = mp.jupyter.open_source_package()
pkg

In [3]:
extract_tags = ['amenity', 'tourism', 'shop', 'leisure', 'natural', 'parking']

In [4]:
from dask.distributed import Client
client = Client(n_workers=10, threads_per_worker=1, processes=True, memory_limit='40GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:60640  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 10  Cores: 10  Memory: 400.00 GB


In [11]:
%%time
fp = pkg.reference('lines').resolved_url.fspath
df = dd.read_csv(fp, blocksize='64MB',dtype={'aerialway': 'object', 'barrier': 'object','man_made': 'object'}) 


CPU times: user 35.2 ms, sys: 2.78 ms, total: 38 ms
Wall time: 36.8 ms


In [53]:
from shapely.geometry import LineString

In [58]:

geometry = df.partitions[:1].geometry.apply(shapely.wkt.loads, meta = LineString).compute()
#centroids = geometry.centroid
#length = geometry.length




100321

In [77]:
from libgeohash import polygon_to_geohash
x = [ polygon_to_geohash(g.envelope, 5) for g in geometry[:500] ]

In [80]:
set([e[:4] for e in chain(*x)])

{'9mum',
 '9mup',
 '9muw',
 '9mvx',
 '9q5b',
 '9q5c',
 '9q5d',
 '9q5f',
 '9q5s',
 '9q5v',
 '9q5y',
 '9qh0',
 '9qh2',
 '9qh4',
 '9qhy',
 '9qj0',
 '9qj3',
 '9qjp',
 '9qjr',
 '9rv0',
 '9tbq',
 'dpbm',
 'dpmr',
 'dpsb',
 'dpt2',
 'dpwy',
 'dpwz',
 'dpxn',
 'dpxq',
 'dpxt',
 'dpxv',
 'dpxw',
 'dpxy',
 'dpz8',
 'dpz9',
 'dpzc',
 'dq8y',
 'dqbc',
 'dqbf',
 'dr46',
 'dr8w',
 'dr9m',
 'dr9n',
 'dr9q',
 'dr9t',
 'dr9v',
 'dr9w',
 'dr9x',
 'dr9y',
 'drc7',
 'drce',
 'drcg',
 'drcw',
 'drcx',
 'drcy',
 'drcz',
 'drfz',
 'dxft',
 'dxfv',
 'dxfw',
 'dxfy',
 'f21d',
 'f244',
 'f24g',
 'f255',
 'f256',
 'f25d',
 'f25s',
 'f84f',
 'f84x',
 'f85m',
 'f85n',
 'f85p',
 'f85q',
 'f866',
 'f868',
 'f87b'}

In [16]:
t['geometry'] = t.geometry.progress_apply(shapely.wkt.loads)

gdf = gpd.GeoDataFrame(t, geometry='geometry')

                                     

In [25]:
def utm_zone(p)
[ utm.from_latlon(*e) for e in  zip(gdf.centroid.y, gdf.centroid.x)]

[(301033.9246679279, 4934358.050100996, 16, 'T'),
 (294671.8300894175, 4935224.855389891, 16, 'T'),
 (297281.2512604027, 4928388.635446693, 16, 'T'),
 (295780.863919666, 4932773.75872271, 16, 'T'),
 (610215.1387833524, 3733598.8985452726, 11, 'S')]

In [39]:
t.iloc[0].geometry.representative_point().wkt

'POINT (-89.50443509999999 44.5350939)'

In [24]:
[ e for e in  zip(gdf.centroid.x, gdf.centroid.y)]

[(-89.50427738814578, 44.53509516507668),
 (-89.5846188108215, 44.54110758253171),
 (-89.54912157860954, 44.48036255523982),
 (-89.56970248573194, 44.519378359705115),
 (-115.81021301852383, 33.736707921492986)]

In [None]:
def _extract_tags(df, extract_tags=None):

    from sqlalchemy.dialects.postgresql import HSTORE

    h = HSTORE()
    f = h.result_processor(None, None)

    # Prune the dataset to just the records that have the tags we want.
    # before getting to the more expensive operation of extracting the tags.
    # This should reduce the dataset from 24M rows to less than 6M.
    t = df.dropna(subset=['other_tags'])
   
    if extract_tags:
        flags = [t.other_tags.str.contains(e) for e in extract_tags]
        comb_flags = [any(e) for e in list(zip(*flags))]

        t = t[comb_flags]

    rows = []
    errors = []
    for idx, r in t.set_index('osm_id')[['other_tags']].iterrows():
        try:
            d = f(r.other_tags)
            if extract_tags:
                rows.append([idx] + [d.get(e) for e in extract_tags])
            else:
                d['idx'] = idx
                rows.append(d)
        except TypeError as e:
            errors.append(r, e)

    return (rows, errors)

In [None]:
# Split the file and extract tags in multiprocessing
N_task = 200
tasks = [(e, extract_tags) for e in np.array_split(df, N_task)]

results = run_mp(_extract_tags, tasks, 'Split OSM other_tags')
tags = list(chain(*[e[0] for e in results]))
errors = list(chain(*[e[1] for e in results]))

In [None]:
tags_df = pd.DataFrame(tags, columns=['osm_id'] + extract_tags)

tags_df = gpd.GeoDataFrame(pd.merge(tags_df, df[['osm_id', 'highway', 'geometry']], on='osm_id'), 
                          geometry='geometry', crs=4326)

In [None]:
tags_df.to_crs(utm_crs).length

In [None]:
frames = [rg.geoframe(f'censusgeo://2019/5/{st}/blockgroup') for st in stusab.values()]
bg = pd.concat(frames)
len(bg)

In [None]:
#frames = [rg.geoframe(f'censusgeo://2019/5/{st}/block') for st in tqdm(list(stusab.values()), desc='loading blocks')]
#blk = pd.concat(frames)
#len(blk)

In [None]:
def encode(v):
    return gh.encode(*list(map(float, v[7:-1].split()))[::-1])

tags_df['geohash'] = tags_df.geometry.progress_apply(encode)


tags_df['geometry'] = tags_df.geometry.progress_apply(shapely.wkt.loads)

tags_df = gpd.GeoDataFrame(tags_df, geometry='geometry', crs=4326)

In [None]:
tags_df.head().fillna(0)

In [None]:
tags_df['class'] = tags_df.loc[:, ('amenity', 'tourism', 'shop', 'leisure', 'natural', 'parking')].fillna(
    method='ffill', axis=1).fillna(method='bfill', axis=1).iloc[:, 0]

replace = {'parking': 'parking_space',
           'pub': 'bar',
           }
cls = ['restaurant', 'bar', 'cafe', 'fast_food', 'supermarket', 'grave_yard', 'playground',
       'bicycle_parking', 'park', 'fuel', 'bank', 'hotel', 'fitness_centre',
       'laundry', 'clothes', 'convenience', 'parking', 'parking_space']

t = tags_df[['geohash', 'class']].replace(replace)
t = t[t['class'].isin(cls)]

cls_df = t.groupby([t.geohash.str.slice(0, 8), 'class']).count().unstack().fillna(0).droplevel(0, axis=1)


# At 8 digits, geohashes are, on average 4m by 20M over the US
# At 6, 146m x 610m
# At 4, 4Km x 20Km
# Clip to 5 because it's really unlikely that there are actually more than 10
# amenities in a cell.

group_counts = tags_df.groupby(tags_df.geohash.str.slice(0, 8))\
    [['amenity', 'tourism', 'shop', 'leisure', 'natural', 'parking']].count().clip(0, 10)

t = group_counts.join(cls_df, how='outer').fillna(0).astype(int)

t['geometry'] = [Point(gh.decode(e)[::-1]) for e in t.index]

In [None]:
geohash_tags = gpd.GeoDataFrame(t, geometry='geometry', crs=4326).to_crs(utm_crs).reset_index()