# Import modules

In [None]:
import numpy as np
import pandas as pd
import math

import cudf
import cugraph

import os
from collections import OrderedDict
import datetime
import subprocess

# Load, clean and featurize data

In [None]:
def week_day(day, month, year, day_of_week):
    for i, _ in enumerate(zip(day, month, year)):
        d = day[i]
        shift = month[i] if month[i] < 3 else 0
        m = month[i] + shift + 1
        y = year[i] - (month[i] < 3) - 2000
        day_of_week[i] = int((math.floor(2.6 * m) + (d + y - 35) + (y // 4)) % 7)

In [None]:
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    df['diff'] = df['dropoff_datetime'].astype('int64') - df['pickup_datetime'].astype('int64')

    df = df.apply_rows(
        week_day,
        incols=['day', 'month', 'year'], 
        outcols={'day_of_week': np.int32},
        kwargs={}
    )

    df = df.drop(
        columns=[
            'pickup_datetime',
            'dropoff_datetime'
        ]
    )
        
    return df

In [None]:
columns_dtypes = OrderedDict(
    [
        ('vendor_id', 'int32'),
        ('pickup_datetime', 'date'),
        ('dropoff_datetime', 'date'),
        ('passenger_count', 'int32'),
        ('trip_distance', 'int32'),
        ('pickup_longitude', 'float64'),
        ('pickup_latitude', 'float64'),
        ('rate_code', 'int32'),
        ('store_and_fwd_flag', 'int32'),
        ('dropoff_longitude', 'float64'),
        ('dropoff_latitude', 'float64'),
        ('payment_type', 'int32'),
        ('fare_amount', 'float64'),
        ('extra', 'float64'),
        ('mta_tax', 'float64'),
        ('tip_amount', 'float64'),
        ('tolls_amount', 'float64'),
        ('surcharge', 'float64'),
        ('total_amount', 'float64')
    ]
)

use_col  = [
    'pickup_datetime',
    'dropoff_datetime',
    'passenger_count', 
    'pickup_longitude',
    'pickup_latitude',     
    'dropoff_longitude',
    'dropoff_latitude'
]

In [None]:
nyctaxi_months = range(1, 13)
nyctaxi_dir = 's3://bsql/data/nytaxi/yellow/2016'
nyctaxi_files = [f'{nyctaxi_dir}/yellow_tripdata_2016-{month:02}.csv' for month in nyctaxi_months]

In [None]:
data = [
    cudf.read_csv(
        nyctaxi_file,
        names=list(columns_dtypes.keys()),
        dtype=list(columns_dtypes.values()),
        skip_rows=1,
        usecols=use_col,
        storage_options={'anon': True}
    )
for nyctaxi_file in nyctaxi_files]

taxi_df = cudf.concat(data)
del data  # clean up

In [None]:
taxi_df.head()

In [None]:
print(f'Raw number of rows: {len(taxi_df):,}')

In [None]:
# filter out records with missing or outlier values
query_frags = (
    "(passenger_count > 0 and passenger_count < 6) " +
    "and (pickup_longitude > -75 and pickup_longitude < -73) " +
    "and (dropoff_longitude > -75 and dropoff_longitude < -73) " +
    "and (pickup_latitude > 40 and pickup_latitude < 42) " +
    "and (dropoff_latitude > 40 and dropoff_latitude < 42)" +
    "and (pickup_latitude  != dropoff_latitude) " +     # remove data where pickup location and drop-off the same
    "and (pickup_longitude != dropoff_longitude)"       # remove data where pickup location and drop-off the same
)

taxi_df = taxi_df.query(query_frags)

In [None]:
taxi_df = add_features(taxi_df)

In [None]:
taxi_df.head()

In [None]:
print(f'Filtered number of rows: {len(taxi_df):,}')

# Building and analyzing graph
## Create list of nodes

In [None]:
nodes = [
    taxi_df[['pickup_longitude', 'pickup_latitude']].drop_duplicates().rename(columns={'pickup_longitude': 'long', 'pickup_latitude': 'lat'}),
    taxi_df[['dropoff_longitude', 'dropoff_latitude']].drop_duplicates().rename(columns={'dropoff_longitude': 'long', 'dropoff_latitude': 'lat'})
]

nodes = cudf.concat(nodes).drop_duplicates().reset_index(drop=True).reset_index().rename(columns={'index': 'id'})
nodes.head()

In [None]:
print('Total number of geo points in the dataset: {0:,}'.format(len(nodes)))

## Create a list of edges

In [None]:
edges = (
    taxi_df[['pickup_longitude', 'pickup_latitude','dropoff_longitude', 'dropoff_latitude']]
    .drop_duplicates()
    .rename(columns={'pickup_longitude': 'long', 'pickup_latitude': 'lat'})
    .merge(nodes, on=['lat', 'long'])
    .rename(columns={'long': 'pickup_longitude', 'lat': 'pickup_latitude', 'id': 'pickup_id', 'dropoff_longitude': 'long', 'dropoff_latitude': 'lat'})
    .merge(nodes, on=['lat', 'long'])
    .rename(columns={'long': 'dropoff_longitude', 'lat': 'dropoff_latitude', 'id': 'dropoff_id'})
)[['pickup_id', 'dropoff_id']]

edges.head()

In [None]:
g = cugraph.from_cudf_edgelist(
    edges,
    source='pickup_id',
    destination='dropoff_id'
)

In [None]:
del nodes
del edges

## Graph distribution of degrees

In [None]:
%matplotlib inline
g.degree().to_pandas()['degree'].hist(bins=100)

## Pagerank

In [None]:
page = cugraph.pagerank(g, alpha=.85, max_iter=1000, tol=1.0e-05)
page.sort_values(by='pagerank', ascending=False).head()

## Jaccard similarity

In [None]:
df = cugraph.jaccard(g)
df.sort_values(by='jaccard_coeff', ascending=False).head()