
New York Yellow Taxi cabs are an internationally recognized icon and serve to transport visitors to the city and residents alike, throughout the five boroughs. All Yellow Taxis are equipped with GPS monitoring systems that transmit data to a centralized database. 

This post will focus on Pyspark and how it can used to ingest ~10 gb worth of data, perform some analysis on them and even predict trip duration using Pyspark's ML libraries.


### Importing Libraries & Setting up Pyspark Instance

In [None]:
import pandas as pd
import numpy as np
from numpy import array
import datetime as dt
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
## Setting up Pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import sql
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import *

### Importing Data
NYC Taxi and Limousine Commission (TLC) has collected trip data which includes fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. [Found here](http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml)

For this analysis, data for Yellow taxi has been collected for a duration of 6 months (~10 GB)


Six months of data were downloaded from the from the New York City Taxi Cab website and uploaded to the cluster. Each month was originally downloaded as a CSV file and transformed into an RDD. The six RDDs were then merged using an sc.union function to form one data set comprising all six months of data. A pyspark SQL context was then initialized.

In [None]:
data1 = sc.textFile('/data/yellow_tripdata_2016-01.csv')
data2 = sc.textFile('/data/yellow_tripdata_2016-02.csv')
data3 = sc.textFile('/data/yellow_tripdata_2016-03.csv')
data4 = sc.textFile('/data/yellow_tripdata_2016-04.csv')
data5 = sc.textFile('/data/yellow_tripdata_2016-05.csv')
data6 = sc.textFile('/data/yellow_tripdata_2016-06.csv')
myRDDlist = [data1,data2,data3,data4,data5,data6]

### Data Preparation
The RDD was converted to a spark sql dataframe to be used in analysis.

In [None]:
def convert_to_dataframe(data):
    trans = data.map(lambda x: x.encode("ascii", "ignore"))
    tagsheader = trans.first()
    header = sc.parallelize([tagsheader])
    trans_data = trans.subtract(header)
    tuple_data = trans_data.map(lambda x: tuple(str(x).split(",")))
    df = tuple_data.toDF(["VendorID","tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance",\
                 "pickup_longitude","pickup_latitude","RatecodeID","store_and_fwd_flag","dropoff_longitude",\
                 "dropoff_latitude","payment_type","fare_amount","extra","mta_tax","tip_amount","tolls_amount",\
                 "improvement_surcharge","total_amount"])
    
    return df

### Data Cleaning
The datatypes of the columns need to be changed to float type. There are also trips greater than 20 miles which are discarded from the analysis.


In [None]:
def clean_and_filter(df):

    ## Converting to appropriate datatypes
    df = df.withColumn('pickup_longitude', df['pickup_longitude'].cast('float'))
    df = df.withColumn('pickup_latitude', df['pickup_latitude'].cast('float'))
    df = df.withColumn('dropoff_longitude', df['dropoff_longitude'].cast('float'))
    df = df.withColumn('dropoff_latitude', df['dropoff_latitude'].cast('float'))
    df = df.withColumn('trip_distance', df['trip_distance'].cast('float'))
    # removing Outliers ( Checked the distribution of the distance -- Removing distance greater than 20 miles)
    df = df.filter(df.trip_distance<20.0)
    ## Dropping null rows
    df = df.na.drop()
    return df

In [None]:
## Enumerating all Data Frames into a single Data Frame

final_df = pd.DataFrame()
for _,rdd_no in enumerate(myRDDlist):
    df = convert_to_dataframe(myRDDlist[rdd_no])
    df = clean_and_filter(df)
    final_df = final_df.append(df)

### Feature Engineering
#### Computing Average Speed

Average traffic speed can be a good indicator of underlying traffic and can be a good predictor of trip duration. Two new variables were added; avg_speed and duration in minutes, for each observation by Trip_distance/(duration_in_mins).

#### Additional Features
Information about the weekday/weekend can also prove detrimental in determing the trip duration.



In [None]:
## Computing Speed
format = "yyyy-MM-dd HH:mm:ss"
timeDiff = (F.unix_timestamp('tpep_dropoff_datetime', format)
            - F.unix_timestamp('tpep_pickup_datetime', format))/60

df = df.withColumn("tpep_pickup_datetime", from_unixtime(unix_timestamp(df.tpep_pickup_datetime, "yyyy-MM-dd HH:mm:ss")))
df = df.withColumn("pickup_hr",hour(df.tpep_pickup_datetime))
df = df.withColumn("Duration_in_mins", timeDiff)
df = df.withColumn("Speed_mph",df.trip_distance/ ((df.Duration_in_mins)/60))
df = df.withColumn("pickup_month",month(df.tpep_pickup_datetime))

import datetime as dt
## Computing day of the week by using a User Defined Function
def get_weekday(date):
    import datetime
    import calendar
    date = date.split(' ')[0]
    year,month,day = (int(x) for x in date.split('-'))    
    weekday = datetime.date(year, month, day)
    return calendar.day_name[weekday.weekday()]


weekday_udf = udf(get_weekday)
df = df.withColumn('pickup_day', weekday_udf(df.tpep_pickup_datetime))

## Fititng K means Model 
from pyspark.ml.feature import VectorAssembler

### Converting lat long to float values

from pyspark.sql.types import DoubleType

df = df.withColumn('pickup_longitude', df['pickup_longitude'].cast('float'))
df = df.withColumn('pickup_latitude', df['pickup_latitude'].cast('float'))
df = df.withColumn('dropoff_longitude', df['dropoff_longitude'].cast('float'))
df = df.withColumn('dropoff_latitude', df['dropoff_latitude'].cast('float'))
vecAssembler = VectorAssembler(inputCols=["dropoff_latitude", "dropoff_longitude"], outputCol="features")
new_df = vecAssembler.transform(df)


#### Clustering into zones
Traffic and trip duration can also be attributed to the location in the city. Certain regions in the city can have high traffic and thus can be a good indicator of trip duration. Pyspark ML library has KMEANS clustering modules and that can be used to get clusters for our data.

The cluster information is uesd to create additional features such as:
1. pickup cluster
2. dropoff cluster

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=15, seed=1) 
model = kmeans.fit(new_df.select('features'))

### Vecorizing and getting pickup clusters
vecAssembler = VectorAssembler(inputCols=["pickup_latitude", "pickup_longitude"], outputCol="features")
new_df = vecAssembler.transform(df)
df = model.transform(new_df)

## assigniung prediction to pickup cluster
df = data1_df.withColumnRenamed('prediction', 'pickup_cluster')
df = data1_df.drop('features')

### Vecorizing and getting dropoff clusters
vecAssembler = VectorAssembler(inputCols=["dropoff_latitude", "dropoff_longitude"], outputCol="features")
new_df = vecAssembler.transform(df)
df = model.transform(new_df)

## assigniung prediction to dropoff cluster
df = data1_df.withColumnRenamed('prediction', 'dropoff_cluster')
df = data1_df.drop('features')

### Visualizing the cluster

The figure below the different clusters based on the lat/long of pickup co-ordinates.

In [None]:
import seaborn as sns
% matplotlib inline
### Visualizing the clusters
pd_df = df.toPandas()
pd_df = pd_df.sample(frac= 0.1)
sns.set_style("whitegrid")
sns.lmplot(x="pickup_latitude", y="pickup_longitude",data = pd_df[pd_df['pickup_latitude']!=0.0],fit_reg=False,hue='pickup_cluster',size=10,scatter_kws={"s":100})

### Predicting Trip Duration

The features used for the prediction are converted to appropriated data types.
Null values were dropped from the dataframe and then the data was split into training and test data sets

A simple linear regression module of Pyspark's ML library was used to compute the Mean square errors and other measure parameters.

In [None]:
df = df.withColumn('VendorID', df['VendorID'].cast('double'))
df = df.withColumn('passenger_count', df['passenger_count'].cast('double'))
df = df.withColumn('trip_distance', df['trip_distance'].cast('double'))
df = df.withColumn('RatecodeID', df['RatecodeID'].cast('double'))
df = df.withColumn('store_and_fwd_flag', df['store_and_fwd_flag'].cast('double'))
df = df.withColumn('payment_type', df['payment_type'].cast('double'))
df = df.withColumn('fare_amount', df['fare_amount'].cast('double'))
df = df.withColumn('extra', df['extra'].cast('double'))
df = df.withColumn('mta_tax', df['mta_tax'].cast('double'))
df = df.withColumn('tip_amount', df['tip_amount'].cast('double'))
df = df.withColumn('tolls_amount', df['tolls_amount'].cast('double'))
df = df.withColumn('improvement_surcharge', df['improvement_surcharge'].cast('double'))
df = df.withColumn('total_amount', df['total_amount'].cast('double'))
## Importing ML libraries
from pyspark.ml.regression import LinearRegression  
from pyspark.ml.feature import VectorAssembler  
from pyspark.ml.feature import StandardScaler  
from pyspark.ml import Pipeline  
from pyspark.sql.functions import *  

features = ['passenger_count', 'trip_distance', \
            'RatecodeID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',\
            'improvement_surcharge', 'pickup_cluster', 'dropoff_cluster', 'pickup_hr', 'Speed_mph']  
lr_data = df.select(col("Duration_in_mins").alias("label"), *features) 

## check for Null Values and dropping null values if any
for f in features:
    print (f)
    print (lr_data.where(col(f).isNull()).count())
    
lr_data = lr_data.dropna()

(training, test) = lr_data.randomSplit([.7, .3])

vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")  
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")  
lr = LinearRegression(maxIter=10, regParam=.01)

stages = [vectorAssembler, standardScaler, lr]  
pipeline = Pipeline(stages=stages) 

model = pipeline.fit(training)  
prediction = model.transform(test)

from pyspark.ml.evaluation import RegressionEvaluator  
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(prediction)  
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(prediction, {eval.metricName: "mse"})  
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(prediction, {eval.metricName: "mae"})  
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(prediction, {eval.metricName: "r2"})  
print("r2: %.3f" %r2)  