# Intro to RAPIDS using the New York City Yellow Taxi Data 
light on Data Science, heavy on comparisons.

This notebook is for the The Toronto Machine Learning Summit, Nov 16 -29, 2020


This notebook includes

* cudf - for basic ETL and some __statistical analysis__ 
* cugraph - for some __graph analysis__
* cuxfilter - for __visualization__


----
# Setup

In [None]:
# load the libraries
import cudf

import numpy as np
import pandas as pd
import math

import os
import gc

from collections import OrderedDict
import argparse
import datetime
import time

In [None]:
try: 
    import tqdm
except ModuleNotFoundError:
    os.system('pip install tqdm')
    import tqdm

In [None]:
# Let's use Unified Memory (aka managed memory) so that we try and avoid OOM errors 
# start by importing the RAPIDS Memory Manager and then reinitializing with managed memory turn on
import rmm

rmm.reinitialize(   
    managed_memory=True,        # Use managed memory, this allows for oversubscription of the GPU
    pool_allocator=False,       # default is False
    devices=0,                  # GPU device IDs to register. By default, registers only GPU 0.
)

## Download the data

In [None]:
top_dir = "./"
data_dir = "./nyctaxi"

In [None]:
# Download Taxi data

if os.path.exists(data_dir) == False:
    import nyctaxi_data

    print("downloading data")
    nyctaxi_data.download_nyctaxi_data(["2016"], top_dir)
    

----

# cuDF - Accelerated Data Frame 

In [None]:
# get a list of files
data_path = top_dir + "nyctaxi/2016"

files = []

for f in sorted(os.listdir(data_path)):
    if f[0:6] != 'yellow':
        continue
        
    fname = os.path.join(data_path, f)
            
    files.append(fname)


In [None]:
files

In [None]:
!du -sh $data_path

## Loading data performance test

In [None]:
def read_pandas(f):
    start_t = time.time()
    df = pd.read_csv(f)
    end_t = time.time() - start_t

    return df, end_t

In [None]:
def read_cudf(f):
    start_t = time.time()
    df = cudf.read_csv(f)
    end_t = time.time() - start_t

    return df, end_t

In [None]:
_ = read_pandas(files[0])

In [None]:
# Load data with Pandas

data = []

start_t = time.time()

for f in files:
    print("\treading " + f, end = '')
    df, t = read_pandas(f)
    print(" ... in time of " + str(t) + " seconds")
    data.append(df)
  
taxi_pdf = pd.concat(data)

end_t = time.time()

print(f"loaded {len(taxi_pdf):,} records in {(end_t - start_t):2f}  seconds")

del data

In [None]:
# Load data with RAPIDS cuDF

data = []

start_t = time.time()

for f in files:
    print("\treading " + f, end = '')
    df, t = read_cudf(f)
    print(" ... in time of " + str(t)+ " seconds")
    data.append(df)

taxi_gdf = cudf.concat(data)

end_t = time.time()

print(f"loaded {len(taxi_gdf):,} records in {(end_t - start_t):2f}  seconds")

del data

In [None]:
taxi_gdf.head(5)

## Sort Comparisons - Single Field

In [None]:
%%time
sp = taxi_pdf.sort_values(by='trip_distance',ascending=False)

In [None]:
sp.head(5)

In [None]:
%%time
sg = taxi_gdf.sort_values(by='trip_distance',ascending=False)

In [None]:
sg.head(5)

## Group By - Single Column 

In [None]:
%%time
gbp = taxi_pdf.groupby('passenger_count').count()

In [None]:
gbp.head(5)

In [None]:
%%time
gbg = taxi_gdf.groupby('passenger_count').count()

In [None]:
gbg.head(5)

## Fun with Data

In [None]:
%%time
print(f"Max fare was ${taxi_pdf['fare_amount'].max():,}")

In [None]:
%%time
print(f"Max fare was ${taxi_gdf['fare_amount'].max():,}")

In [None]:
# looking at that huge fare
maxf = taxi_gdf['fare_amount'].max()
taxi_gdf.query('fare_amount == @maxf')

In [None]:
print(f"Farthest trip was {taxi_gdf['trip_distance'].max():,} miles")

In [None]:
# How long did it take to drive that distance?
maxd= taxi_gdf['trip_distance'].max()
taxi_gdf.query('trip_distance == @maxd')

### Changing data types

In [None]:
# change some data types
taxi_gdf = taxi_gdf.astype({'tpep_pickup_datetime':'datetime64[ms]', 'tpep_dropoff_datetime':'datetime64[ms]'})

### Filtering data

In [None]:
# filter out records with missing or outlier values
query_frags = ("(fare_amount > 0 and fare_amount < 500) " +
        "and (passenger_count > 0 and passenger_count < 6) " +
        "and (pickup_longitude > -75 and pickup_longitude < -73) " +
        "and (dropoff_longitude > -75 and dropoff_longitude < -73) " +
        "and (pickup_latitude > 40 and pickup_latitude < 42) " +
        "and (dropoff_latitude > 40 and dropoff_latitude < 42)" +
        "and (pickup_latitude != dropoff_latitude) " +
        "and (pickup_longitude != dropoff_longitude)"
    )

taxi_gdf = taxi_gdf.query(query_frags)

### Add some new features

In [None]:
# easier to reference time by YYYY MM DD version a time stamps
taxi_gdf['hour']  = taxi_gdf['tpep_pickup_datetime'].dt.hour
taxi_gdf['year']  = taxi_gdf['tpep_pickup_datetime'].dt.year
taxi_gdf['month'] = taxi_gdf['tpep_pickup_datetime'].dt.month
taxi_gdf['day']   = taxi_gdf['tpep_pickup_datetime'].dt.day
taxi_gdf['diff']  = taxi_gdf['tpep_dropoff_datetime'].astype('int64') - taxi_gdf['tpep_pickup_datetime'].astype('int64')

In [None]:
def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] < 3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m * 2.6) + y + (y // 4) + (c // 4) - 2 * c) % 7
        
taxi_gdf = taxi_gdf.apply_rows(
        day_of_the_week_kernel
        , incols = ['day', 'month', 'year']
        , outcols = {'day_of_week': np.int32}
        , kwargs = {}
    )

In [None]:
taxi_gdf.head(5)

---

## Basic Statistical Data Science

### Look at some feature - by Hour

In [None]:
# 1) Let's look at a plot of fare by hour
%matplotlib inline
taxi_gdf.groupby('hour').fare_amount.mean().to_pandas().sort_index().plot(legend=True)

In [None]:
# 2) Tips by hour
%matplotlib inline
taxi_gdf.groupby('hour').tip_amount.mean().to_pandas().sort_index().plot(legend=True)

In [None]:
# 3) Number of taxi rides by Hour
%matplotlib inline
taxi_gdf['hour'].groupby(taxi_gdf['hour']).count().to_pandas().sort_index().plot(legend=True)

In [None]:
# Look at what days are the busiest
%matplotlib inline
taxi_gdf.groupby('day_of_week').day_of_week.count().to_pandas().sort_index().plot(legend=True)

In [None]:
# What days have the best tips
%matplotlib inline
taxi_gdf.groupby('day_of_week').tip_amount.mean().to_pandas().sort_index().plot(legend=True)

# Dropping Columns

In [None]:
taxi_gdf = taxi_gdf.drop('store_and_fwd_flag', axis=1)

In [None]:
taxi_gdf.dtypes

---
# cuGraph - Accelerated Graph Analytics

We need vertex IDs to be integer values but what we have are lat-long pairs (float64).  There are two way that we can address the issue. The hard way and an easy way

In [None]:
import cugraph

In [None]:
taxi_subset = taxi_gdf[['pickup_longitude', 'pickup_latitude','dropoff_longitude', 'dropoff_latitude', 'trip_distance']].reset_index()
taxi_subset['count'] = 1
del taxi_gdf

### Create vertices and edges the hard way

In [None]:
# create node ID from lat-long combinatiuons
nodes = [
      taxi_subset[['pickup_longitude', 'pickup_latitude']].drop_duplicates().rename(columns={'pickup_longitude': 'long', 'pickup_latitude': 'lat'})
    , taxi_subset[['dropoff_longitude', 'dropoff_latitude']].drop_duplicates().rename(columns={'dropoff_longitude': 'long', 'dropoff_latitude': 'lat'})
]

In [None]:
nodes = cudf.concat(nodes).drop_duplicates().reset_index(drop=True).reset_index().rename(columns={'index': 'id'})
nodes.head(5)

In [None]:
print('Total number of geo points in the dataset: {0:,}'.format(len(nodes)))

In [None]:
edges = (
    taxi_subset[['pickup_longitude', 'pickup_latitude','dropoff_longitude', 'dropoff_latitude', 'trip_distance']]
    .drop_duplicates()
    .rename(columns={'pickup_longitude': 'long', 'pickup_latitude': 'lat'})
    .merge(nodes, on=['lat', 'long'])
    .rename(columns={'long': 'pickup_longitude', 'lat': 'pickup_latitude', 'id': 'pickup_id', 'dropoff_longitude': 'long', 'dropoff_latitude': 'lat'})
    .merge(nodes, on=['lat', 'long'])
    .rename(columns={'long': 'dropoff_longitude', 'lat': 'dropoff_latitude', 'id': 'dropoff_id'})
)[['pickup_id', 'dropoff_id', 'trip_distance']]

edges.head(5)

In [None]:
len(edges)

In [None]:
g = cugraph.Graph()
g.from_cudf_edgelist(edges, source='pickup_id', destination='dropoff_id')

## Pagerank

In [None]:
%%time
page = cugraph.pagerank(g, alpha=.85, max_iter=1000, tol=1.0e-05)

In [None]:
page.sort_values(by='pagerank', ascending=False).head(5).to_pandas()

## Now the easy way

In [None]:
g2 = cugraph.Graph()
g2.from_cudf_edgelist(taxi_subset, 
                      source=['pickup_longitude', 'pickup_latitude'], 
                      destination=['dropoff_longitude', 'dropoff_latitude'], 
                      edge_attr='count',
                      renumber=True)

In [None]:
page = cugraph.pagerank(g2, alpha=.85, max_iter=1000, tol=1.0e-05)
page.sort_values(by='pagerank', ascending=False).head(5).to_pandas()

---