# PyMapD and PyGDF Demo on NY Taxi Data Subset

In this example, we will use PyMapD to create and populate a table of NY Taxi data from a CSV file.  Then, we query the MapD database to get a PyGDF GPU dataframe and manipulate the data using groupby, join and other dataframe operations.

In [1]:
import sys
sys.path.append('/rapids/dask-cuml/')
import dask.delayed
import dask_cuml as cuml
import dask.dataframe as dd
import cudf
import numpy as np
import pandas as pd
import dask_cudf as dcudf
import os
import gzip

  data = yaml.load(f.read()) or {}


## Helper Functions

### Load data from CSV

In [3]:
def load_data(nrows, ncols, cached = '/rapids/notebooks/cuml/data/nytaxi_pre_mapd_200k.csv.gz'):
    if os.path.exists(cached):
        print('use nytaxi data')
        with gzip.open(cached) as f:
            X = cudf.read_csv(cached)
            #X = cudf.DataFrame.from_pandas(cached)
    #else:
    #    print('use random data')
    #    X = np.random.rand(nrows,ncols)
    #df = pd.DataFrame({'fea%d'%i:X[:,i] for i in range(X.shape[1])})
    return X

In [2]:
def load_dask_data(nrows, ncols, cached = '/rapids/notebooks/cuml/data/nytaxi_pre_mapd_200k.csv.gz'):
    if os.path.exists(cached):
        print('use nytaxi data')
        with gzip.open(cached) as f:
            X = dcudf.read_csv(cached)
            #X = cudf.DataFrame.from_pandas(cached)
    #else:
    #    print('use random data')
    #    X = np.random.rand(nrows,ncols)
    #df = pd.DataFrame({'fea%d'%i:X[:,i] for i in range(X.shape[1])})
    return X

## Get Data and put it into a cudf Dataframe

In [5]:
nrows = 2**25
ncols = 40

df = load_data(nrows,ncols)
print('data',df.shape)

use nytaxi data
data (200000, 25)


In [6]:
ddf = load_dask_data(nrows,ncols)
print('data',ddf.shape)

use nytaxi data
data (Delayed('int-021fb552-455e-4f50-bfcd-b96e8aec60bb'), 25)


In [7]:
df.dtypes

vendor_id                         object
rate_code                          int64
store_and_fwd_flag                object
passenger_count                    int64
trip_time_in_secs                  int64
trip_distance                    float64
pickup_longitude                 float64
pickup_latitude                  float64
dropoff_longitude         datetime64[ms]
dropoff_latitude          datetime64[ms]
tolls_amount                     float64
tip_amount                       float64
total_amount                     float64
mta_tax                          float64
fare_amount                      float64
payment_type                      object
surcharge                        float64
pickup_datetime_year               int64
pickup_datetime_month              int64
pickup_datetime_day                int64
pickup_datetime_hours              int64
dropoff_datetime_year              int64
dropoff_datetime_month             int64
dropoff_datetime_day               int64
dropoff_datetime

### Make table

In [8]:
ddf.dtypes

vendor_id                         object
rate_code                          int64
store_and_fwd_flag                object
passenger_count                    int64
trip_time_in_secs                  int64
trip_distance                    float64
pickup_longitude                 float64
pickup_latitude                  float64
dropoff_longitude         datetime64[ms]
dropoff_latitude          datetime64[ms]
tolls_amount                     float64
tip_amount                       float64
total_amount                     float64
mta_tax                          float64
fare_amount                      float64
payment_type                      object
surcharge                        float64
pickup_datetime_year               int64
pickup_datetime_month              int64
pickup_datetime_day                int64
pickup_datetime_hours              int64
dropoff_datetime_year              int64
dropoff_datetime_month             int64
dropoff_datetime_day               int64
dropoff_datetime

In [11]:
print(df.head(5))

   vendor_id  rate_code  store_and_fwd_flag  passenger_count  trip_time_in_secs       trip_distance    pickup_longitude ...  dropoff_datetime_hours
0        CMT          1                   N                4                382                 1.0          -73.978165 ...                      15
1        CMT          1                   N                1                259                 1.5  -74.00668300000001 ...                       0
2        CMT          1                   N                1                282                 1.1          -74.004707 ...                      18
3        CMT          1                   N                2                244  0.7000000000000001          -73.974602 ...                      23
4        CMT          1                   N                1                560                 2.1  -73.97625000000001 ...                      23
[17 more columns]


In [None]:
print(dask.delayed(df.head(5))

As you can see, unfortunately, Dask_cudf currently is a work in progress, with many limiations, and this is why we are still using cudf for the data pre-processing

In [14]:
ddf = dcudf.from_cudf(df, npartitions=2)

Decompress CSV archive

In [16]:
type(df)

cudf.dataframe.dataframe.DataFrame

In [17]:
print('nrows', len(df))

nrows 200000


Inspect column types

In [18]:
df.dtypes

vendor_id                         object
rate_code                          int64
store_and_fwd_flag                object
passenger_count                    int64
trip_time_in_secs                  int64
trip_distance                    float64
pickup_longitude                 float64
pickup_latitude                  float64
dropoff_longitude         datetime64[ms]
dropoff_latitude          datetime64[ms]
tolls_amount                     float64
tip_amount                       float64
total_amount                     float64
mta_tax                          float64
fare_amount                      float64
payment_type                      object
surcharge                        float64
pickup_datetime_year               int64
pickup_datetime_month              int64
pickup_datetime_day                int64
pickup_datetime_hours              int64
dropoff_datetime_year              int64
dropoff_datetime_month             int64
dropoff_datetime_day               int64
dropoff_datetime

In [20]:
df.head().to_pandas()

Unnamed: 0,vendor_id,rate_code,store_and_fwd_flag,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,...,payment_type,surcharge,pickup_datetime_year,pickup_datetime_month,pickup_datetime_day,pickup_datetime_hours,dropoff_datetime_year,dropoff_datetime_month,dropoff_datetime_day,dropoff_datetime_hours
0,CMT,1,N,4,382,1.0,-73.978165,40.757977,2016-09-26 22:34:40,1969-12-31 23:59:59.999,...,CSH,0.0,2013,1,1,15,2013,1,1,15
1,CMT,1,N,1,259,1.5,-74.006683,40.731781,1914-03-03 12:05:20,1969-12-31 23:59:59.999,...,CSH,0.5,2013,1,6,0,2013,1,6,0
2,CMT,1,N,1,282,1.1,-74.004707,40.73777,2005-09-24 07:19:28,1969-12-31 23:59:59.999,...,CSH,1.0,2013,1,5,18,2013,1,5,18
3,CMT,1,N,2,244,0.7,-73.974602,40.759945,1948-07-07 22:00:32,1969-12-31 23:59:59.999,...,CSH,0.5,2013,1,7,23,2013,1,7,23
4,CMT,1,N,1,560,2.1,-73.97625,40.748528,1971-02-22 14:17:36,1969-12-31 23:59:59.999,...,CSH,0.5,2013,1,7,23,2013,1,7,23


## Groupby lat lon grid

We want to group each record by their pickup location. We will to round the lat lon with the ``round_latlon`` method.  By using ``.applymap``, the rounding method will be compiled into GPU code.

In [21]:
from math import floor

def round_latlon(x):
    scale = 5
    return floor(x * scale) / scale

In [22]:
group_df = df.loc[:, ['pickup_longitude', 'pickup_latitude', 'tip_amount', 'fare_amount']] 

group_df['pickup_longitude'] = group_df['pickup_longitude'].applymap(round_latlon)
group_df['pickup_latitude'] = group_df['pickup_latitude'].applymap(round_latlon)

group_df['tip_ratio'] = group_df['tip_amount'] / group_df['fare_amount']


In [23]:
group_df.dtypes

pickup_longitude    float64
pickup_latitude     float64
tip_amount          float64
fare_amount         float64
tip_ratio           float64
dtype: object

In [24]:
group_df.head().to_pandas()

Unnamed: 0,pickup_longitude,pickup_latitude,tip_amount,fare_amount,tip_ratio
0,-74.0,40.6,0.0,6.5,0.0
1,-74.2,40.6,0.0,6.0,0.0
2,-74.2,40.6,0.0,5.5,0.0
3,-74.0,40.6,0.0,5.0,0.0
4,-74.0,40.6,0.0,9.5,0.0


Here, we run groupby and specify the aggregating methods on each column.

In [34]:
from numba import cuda,jit,float32

@cuda.jit(device=True)
def compute_std_with_mean(array,std,mean):
    # std is a shared memory array
    # mean is a scaler, the mean value of array
    # len(std) == TPB
    # the kernel has only one TB
    # the final result is in std[0]
    tid = cuda.threadIdx.x
    initialize(std,0,len(std))
    cuda.syncthreads()

    tid = cuda.threadIdx.x
    for i in range(cuda.threadIdx.x, len(array), cuda.blockDim.x):
        std[tid] += (array[i]-mean)**2
    cuda.syncthreads()

    reduction_sum_SM(std)
    if tid == 0:
        if len(array)>1:
            std[0] = math.sqrt(std[0]/(len(array)-1))
        else:
            std[0] = 0
    cuda.syncthreads()

@cuda.jit(device=True)            
def compute_mean(array,mean): 
    # mean is a shared memory array
    # the kernel has only one TB
    # the final result is in mean[0]
    tid = cuda.threadIdx.x
    initialize(mean,0,TPB)
    cuda.syncthreads()
   
    tid = cuda.threadIdx.x 
    for i in range(cuda.threadIdx.x, len(array), cuda.blockDim.x):
        mean[tid] += array[i]
    cuda.syncthreads()

    reduction_sum_SM(mean)
    if tid == 0: 
        mean[0]/=len(array)
    cuda.syncthreads()

def compute_std(array,std):
    # std is a shared memory array
    # len(std) == TPB
    # the kernel has only one TB
    # the final result is in std[0]
    compute_mean(array,std)
    mean = std[0]
    cuda.syncthreads()
    compute_std_with_mean(array,std,mean)

In [33]:
from collections import OrderedDict

# Aggregating methods to apply to each column
aggs = OrderedDict()
aggs['tip_amount'] = 'mean'
aggs['fare_amount'] = ['mean','count']
aggs['tip_ratio'] = 'mean'
compute_

grouped_stats = group_df.groupby(['pickup_longitude', 'pickup_latitude']).agg(aggs)
print('total groups', len(grouped_stats))
grouped_stats.head().to_pandas()

RuntimeError: ERROR: this aggregator has not been implemented yet

Reorder the grouped dataframe by `count_fare_amount`

In [30]:
grouped_stats.sort_values('count_fare_amount', ascending=False).head().to_pandas()

Unnamed: 0,pickup_longitude,pickup_latitude,mean_tip_amount,mean_fare_amount,count_fare_amount,mean_tip_ratio
13,-74.0,40.6,1.175462,11.251773,162910,0.099798
9,-74.2,40.6,1.279872,11.780425,24133,0.112059
20,-73.8,40.6,4.731888,45.513919,5011,0.097381
14,-74.0,40.8,1.2763,12.8891,4211,0.09165
43,0.0,0.0,1.88457,15.715963,3582,0.16114


## Groupby payment type

We can also group by categorical columns.

In [31]:
group_pay = df.loc[:, ['payment_type', 'tip_amount', 'fare_amount']]
group_pay['tip_ratio'] = group_df['tip_ratio']

groupby_payment = group_pay.groupby(['payment_type']).mean()
groupby_payment.sort_values('tip_ratio', ascending=False).to_pandas()

NotImplementedError: Strings are not yet supported in the index

## Join table with payment_type meaning

We can use `.join()` to add a description column for each payment type

In [None]:
import pandas
import numpy as np

payment_code = {
    'CRD': 'Credit Card',
    'CSH': 'Cash',
    'NOC': 'No Charge',
    'DIS': 'Dispute',
    'UNK': 'Unknown',
}

payment_meaning = pygdf.DataFrame()

# Customize codes.dtype to match storage type from mapd
src_cat = group_pay.payment_type
cat = pandas.Categorical(payment_code.keys(), categories=src_cat.cat.categories)
payment_meaning['payment_type'] = pygdf.Series.from_categorical(cat, codes=cat.codes.astype(src_cat.data.dtype))

payment_meaning['payment_meaning'] = pandas.Categorical(payment_code.values())
payment_meaning = payment_meaning.set_index('payment_type')

payment_meaning.to_pandas()

In [None]:
joined = groupby_payment.set_index('payment_type').join(payment_meaning)
joined.sort_values('tip_ratio', ascending=False).to_pandas()