# Dask Geohash Sorted

In [1]:
import logging
import time
from datetime import datetime
from pathlib import Path
from shapely.geometry import Polygon, box
from polygon_geohasher.polygon_geohasher import polygon_to_geohashes, geohashes_to_polygon
import geohash
from functools import reduce

import numpy as np
import pandas as pd
import geopandas as gpd
import dask.dataframe as dd
from distributed import LocalCluster, Client

import spatialpandas as spd
from spatialpandas.io import read_parquet, read_parquet_dask
import geopandas as gpd
from pathlib import Path
from distributed import LocalCluster, Client
import numpy as np
import dask.dataframe as dd

In [2]:
# set up data paths
base_path = Path().cwd().parent.parent
data_dir = base_path.joinpath('data')

In [3]:
# create local dask cluster
cluster = LocalCluster(#silence_logs=logging.ERROR,
                       dashboard_address=':8790',
                       n_workers=1,
                       threads_per_worker=4,
                       memory_limit='8 GB')
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8790/status,

0,1
Dashboard: http://127.0.0.1:8790/status,Workers: 1
Total threads: 4,Total memory: 7.45 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:44985,Workers: 1
Dashboard: http://127.0.0.1:8790/status,Total threads: 4
Started: Just now,Total memory: 7.45 GiB

0,1
Comm: tcp://192.168.0.118:40151,Total threads: 4
Dashboard: http://192.168.0.118:38007/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:37049,
Local directory: /home/joris/scipy/repos/scipy2020_spatial_algorithms_at_scale/02_compared_cases/spatialsort/dask-worker-space/worker-an_g7099,Local directory: /home/joris/scipy/repos/scipy2020_spatial_algorithms_at_scale/02_compared_cases/spatialsort/dask-worker-space/worker-an_g7099


In [4]:
# load spatially sorted us data
spatial_sort_path = data_dir.joinpath('us_cont_spatiallysorted.parquet')
df = read_parquet_dask(spatial_sort_path)
df.head(2)

Unnamed: 0_level_0,position,latitude,longitude
hilbert_distance,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
25629,"Point([-124.443, 24.447])",24.447,-124.443
99850,"Point([-124.532, 24.704])",24.704,-124.532


In [5]:
len_df = len(df)
len_df

113944489

In [6]:
# load various size subsets of the zip code data as spatialpandas.geodataframes
zips_1 = gpd.read_file(data_dir.joinpath('zip_codes', 'zips_1.geojson')).loc[:, ['geometry']]
zips_1 = spd.geodataframe.GeoDataFrame(zips_1, geometry='geometry')
zips_10 = gpd.read_file(data_dir.joinpath('zip_codes', 'zips_10.geojson')).loc[:, ['geometry']]
zips_10 = spd.geodataframe.GeoDataFrame(zips_10, geometry='geometry')
zips_100 = gpd.read_file(data_dir.joinpath('zip_codes', 'zips_100.geojson')).loc[:, ['geometry']]
zips_100 = spd.geodataframe.GeoDataFrame(zips_100, geometry='geometry')
zips_1000 = gpd.read_file(data_dir.joinpath('zip_codes', 'zips_1000.geojson')).loc[:, ['geometry']]
zips_1000 = spd.geodataframe.GeoDataFrame(zips_1000, geometry='geometry')
zips_10000 = gpd.read_file(data_dir.joinpath('zip_codes', 'zips_10000.geojson')).loc[:, ['geometry']]
zips_10000 = spd.geodataframe.GeoDataFrame(zips_10000, geometry='geometry')

In [8]:
%%time
total_points = len_df
num_partitions = df.npartitions
num_polygons = []
time_sec = []
num_result_points = []
num_result_partitions = []
num_points = len_df

t00 = time.time()
for zip_gdf in [zips_1, zips_10, zips_100, zips_1000, zips_10000]:
    num_polygons.append(len(zip_gdf))
    t0 = time.time()
    
    rdf = spd.sjoin(df, zip_gdf, how='inner')
    # calculate lenght as cheap operation to avoid computing the full resulting frame into memory
    len_result = len(rdf)
    
    time_sec.append(time.time() - t0)
    num_result_points.append(len_result)
    num_result_partitions.append(rdf.npartitions)
    print(f'num_polygons[-1]: {num_polygons[-1]}, time_sec[-1]: {time_sec[-1]:.0f} s')

num_polygons[-1]: 1, time_sec[-1]: 9 s
num_polygons[-1]: 10, time_sec[-1]: 3 s
num_polygons[-1]: 100, time_sec[-1]: 22 s
num_polygons[-1]: 1000, time_sec[-1]: 85 s
num_polygons[-1]: 10000, time_sec[-1]: 208 s
CPU times: user 53.6 s, sys: 3.11 s, total: 56.7 s
Wall time: 5min 28s


In [9]:
# save summary info to file
results_df = pd.DataFrame({'num_polygons': num_polygons,
                           'num_points': num_points,
                           'num_result_partitions': num_result_partitions,
                           'num_result_points': num_result_points,
                           'time_min': np.asarray(time_sec)/60})                      
results_df.to_csv(f'spatially_sorted_results_{datetime.now()}.csv')
results_df

Unnamed: 0,num_polygons,num_points,num_result_partitions,num_result_points,time_min
0,1,113944489,3,1031,0.007984
1,10,113944489,21,6551,0.031385
2,100,113944489,179,203284,0.408854
3,1000,113944489,648,2403824,1.535418
4,10000,113944489,1035,25877947,3.644482


In [10]:
# release the dask workers
cluster.scale(0)

In [11]:
client.shutdown()