In [1]:
import dask.distributed
import dask.dataframe as dd
import pandas as pd
import numpy as np

In [2]:
import geopandas
from shapely.geometry import Point

ImportError: dlopen(/Users/shekhar/anaconda3/lib/python3.5/site-packages/fiona/ogrext.cpython-35m-darwin.so, 2): Library not loaded: @rpath/libnetcdf.11.dylib
  Referenced from: /Users/shekhar/anaconda3/lib/libgdal.20.dylib
  Reason: Incompatible library version: libgdal.20.dylib requires version 12.0.0 or later, but libnetcdf.11.dylib provides version 11.0.0

In [None]:
client = dask.distributed.Client()

In [4]:
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
    """Joins DataFrame with Taxi Zones shapefile.

    This function takes longitude values provided by `lon_var`, and latitude
    values provided by `lat_var` in DataFrame `df`, and performs a spatial join
    with the NYC taxi_zones shapefile. 

    The shapefile is hard coded in, as this function makes a hard assumption of
    latitude and longitude coordinates. It also assumes latitude=0 and 
    longitude=0 is not a datapoint that can exist in your dataset. Which is 
    reasonable for a dataset of New York, but bad for a global dataset.

    Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
    and `df.locid_var` is set to np.nan are updated. 

    Parameters
    ----------
    df : pandas.DataFrame or dask.DataFrame
        DataFrame containing latitudes, longitudes, and location_id columns.
    lon_var : string
        Name of column in `df` containing longitude values. Invalid values 
        should be np.nan.
    lat_var : string
        Name of column in `df` containing latitude values. Invalid values 
        should be np.nan
    locid_var : string
        Name of column in `df` containing taxi_zone location ids. Rows with
        valid, nonzero values are not overwritten. 
    """

    localdf = df[[lon_var, lat_var, locid_var]].copy()
    # localdf = localdf.reset_index()
    localdf[lon_var] = localdf[lon_var].fillna(value=0.)
    localdf[lat_var] = localdf[lat_var].fillna(value=0.)
    localdf['replace_locid'] = (localdf[locid_var].isnull()
                                & (localdf[lon_var] != 0.)
                                & (localdf[lat_var] != 0.))

    if (np.any(localdf['replace_locid'])):
        shape_df = geopandas.read_file('../shapefiles/taxi_zones_latlon.shp')
        shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
                      axis=1, inplace=True)

        try:
            local_gdf = geopandas.GeoDataFrame(
                localdf, crs={'init': 'epsg:4326'},
                geometry=[Point(xy) for xy in
                          zip(localdf[lon_var], localdf[lat_var])])

            local_gdf = geopandas.sjoin(
                local_gdf, shape_df, how='left', op='intersects')

            # one point can intersect more than one zone -- for example if on
            # the boundary between two zones. Deduplicate by taking first valid.
            local_gdf = local_gdf[~local_gdf.index.duplicated(keep='first')]

            local_gdf.LocationID.values[~local_gdf.replace_locid] = (
                (local_gdf[locid_var])[~local_gdf.replace_locid]).values

            return local_gdf.LocationID.rename(locid_var)
        except ValueError as ve:
            print(ve)
            print(ve.stacktrace())
            return df[locid_var]
    else:
        return df[locid_var]


In [5]:
df1 = dd.read_parquet('/bigdata/citibike.parquet')

In [6]:
df1['start_taxizone_id'] = np.nan
df1['end_taxizone_id'] = np.nan

In [7]:
df1.start_station_id.count().compute()

36902025

In [8]:
df1.head()

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender,start_taxizone_id,end_taxizone_id
0,634,2013-07-01 00:00:00,2013-07-01 00:10:34,164,E 47 St & 2 Ave,40.753231,-73.970322,504,1 Ave & E 15 St,40.73222,-73.981659,16950,Customer,,0,,
1,1547,2013-07-01 00:00:02,2013-07-01 00:25:49,388,W 26 St & 10 Ave,40.749718,-74.002953,459,W 20 St & 11 Ave,40.746746,-74.007759,19816,Customer,,0,,
2,178,2013-07-01 00:01:04,2013-07-01 00:04:02,293,Lafayette St & E 8 St,40.730286,-73.990768,237,E 11 St & 2 Ave,40.730473,-73.986725,14548,Subscriber,1980.0,2,,
3,1580,2013-07-01 00:01:06,2013-07-01 00:27:26,531,Forsyth St & Broome St,40.718941,-73.992661,499,Broadway & W 60 St,40.769154,-73.981918,16063,Customer,,0,,
4,757,2013-07-01 00:01:10,2013-07-01 00:13:47,382,University Pl & E 14 St,40.734928,-73.992004,410,Suffolk St & Stanton St,40.720665,-73.985176,19213,Subscriber,1986.0,1,,


In [9]:
df1['start_taxizone_id'] = df1.map_partitions(
    assign_taxi_zones, "start_station_longitude", "start_station_latitude",
    "start_taxizone_id", meta=('start_taxizone_id', np.float64))
df1['end_taxizone_id'] = df1.map_partitions(
    assign_taxi_zones, "end_station_longitude", "end_station_latitude",
    "end_taxizone_id", meta=('end_taxizone_id', np.float64))

In [10]:
df1.head()

Unnamed: 0,trip_duration,start_time,stop_time,start_station_id,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender,start_taxizone_id,end_taxizone_id
0,634,2013-07-01 00:00:00,2013-07-01 00:10:34,164,E 47 St & 2 Ave,40.753231,-73.970322,504,1 Ave & E 15 St,40.73222,-73.981659,16950,Customer,,0,233,224
1,1547,2013-07-01 00:00:02,2013-07-01 00:25:49,388,W 26 St & 10 Ave,40.749718,-74.002953,459,W 20 St & 11 Ave,40.746746,-74.007759,19816,Customer,,0,246,246
2,178,2013-07-01 00:01:04,2013-07-01 00:04:02,293,Lafayette St & E 8 St,40.730286,-73.990768,237,E 11 St & 2 Ave,40.730473,-73.986725,14548,Subscriber,1980.0,2,113,79
3,1580,2013-07-01 00:01:06,2013-07-01 00:27:26,531,Forsyth St & Broome St,40.718941,-73.992661,499,Broadway & W 60 St,40.769154,-73.981918,16063,Customer,,0,148,142
4,757,2013-07-01 00:01:10,2013-07-01 00:13:47,382,University Pl & E 14 St,40.734928,-73.992004,410,Suffolk St & Stanton St,40.720665,-73.985176,19213,Subscriber,1986.0,1,113,148


In [11]:
df1.to_parquet('/bigdata/citibike_locid.parquet', has_nulls=True, compression="SNAPPY",
              object_encoding='json')

In [34]:
df1 = dd.read_parquet('/bigdata/citibike_locid.parquet')

In [35]:
df1.start_station_id.count().compute()

36902025

In [44]:
df2 = df1[df1.start_taxizone_id.notnull() & df1.end_taxizone_id.notnull()]

In [45]:
df2.start_station_id.count().compute()

36901769

In [46]:
unique_id_values = (df2.start_station_id.unique().compute().values.tolist() 
                    + df2.end_station_id.unique().compute().values.tolist())

In [47]:
df3 = df2.set_index('start_station_id', divisions=sorted(list(set(unique_id_values))),
                    npartitions=663
                   )
df3 = df3.map_partitions(lambda d: df.sort_values(by='start_time'), meta=df3)

In [49]:
df3.get_partition(1)

Unnamed: 0_level_0,trip_duration,start_time,stop_time,start_station_name,start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude,end_station_longitude,bike_id,user_type,birth_year,gender,start_taxizone_id,end_taxizone_id
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
79,int32,datetime64[ns],datetime64[ns],object,float32,float32,int32,object,float32,float32,int32,object,float32,int32,float64,float64
82,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
