# Data Processing With Pyspark
## Preamble
This ananlysis including three steps, **data Processing**, **exploratory data analysis** and **model building**. This notebook is the first part of analysis, processing raw data of taxi transaction in NYC. The report includes follow aspects:
- Desription of original dataset 
- Data cleaning and processing 
- Feature engineering 
- Summary of the output dataset
Since the dataset is too large to process in the local python. Hadoop HDFS and Pyspark were adopted in this analysis.

## Description of original dataset
As mentioned previously, the dataset is about NYC taxi transaction. The original dataset is provided by **NYC Taxi & Limousine Commission**. (http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml) In this case, only tansaction between 2016 to 2018 were used. There are three type of taxis tansaction provided on the site, Green, Yellow and FHV. See the descriptions of three types of taxi below.

|Index |Taxi Type  | Description | 
| :----------:|:----------: |:---------------------------------------------------------------:|
|1| Green| Hybrid between yellow taxis and FHV, permitted to additionally accept street-hails outside Manhattan |
|2| Yellow| Tranditional NYC taxis that provide transportation exclusively through street-hails. |
|3| FHV| Community car services (livery), black car services, or luxury limo services. |

In this analysis, only data of **Yellow** and **Green** taxis were used because the FHV transaction didn't contain enough information for analysis. There are 6 total csv files for this analysis(Green and Yellow for each year).Noted that the data of 2018 only contain half year of transaction since the website update files semiannually. These 6 datasets mainly incuding follow information:
- **VendorID**: A code indicating the LPEP provider that provided the record. 
- **lpep_pickup_datetime**: The date and time when the meter was engaged.
- **Lpep_dropoff_datetime**: The date and time when the meter was disengaged. 
- **Passenger_count** : The number of passengers in the vehicle. This is a driver-entered value.
- **PULocationID** : Taxi Zone in which the taximeter was engaged
- **DOLocationID** : Taxi Zone in which the taximeter was disengaged
- **RateCodeID** : The final rate code in effect at the end of the trip.
- **Store_and_fwd_flag** : This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, because the vehicle did not have a connection to the server.
- **Payment_type** : A numeric code signifying how the passenger paid for the trip. 
- **Fare_amount** : The time-and-distance fare calculated by the meter. 
- **Extra** : Miscellaneous extras and surcharges. Currently, this only includes the 0.50 and 1 rush hour and overnight charges.
- **MTA_tax** : 0.50 MTA tax that is automatically triggered based on the metered rate in use.
- **Improvement_surcharge** : 0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began in 2015.
- **Tip_amount** : This field is automatically populated for credit card tips. Cash tips are not included.
- **Tolls_amount** : Total amount of all tolls paid in trip. 
- **Total_amount** : The total amount charged to passengers. Does not include cash tips.
- **Trip_type** : A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver.

Since there are some noises in the original datasets, plus there are some features irrelevant to the analysis. Some measures of  data cleaning and processing must be implemented to get the datasets ready for futher analysis.



## Data cleaning and processing 
Since the files are really huge(over 50GB), some pre-processing were made to reduce the overall size, including dropping columns irrelevant to the analysis and take out rows that have different numbers of columns. After the pre-processing, the files were upload to the Haddoop HDFS to begin the data cleaning and processing. The process of data cleaning includes follow steps:
- **Deploy Spark cluster and read mutiple csv files**
- **Check and handle missing value** 

### Deploying Spark cluster and read files
Since we are using **Hadoop Distributed File System** to store our data and **PySpark** to perform analysis, it is important to set a full-fledged Hadoop cluster and use **Yarn** to deploy the cluster. In this case, 63 instances were used for computing.

There are 6 files to analysis, so we decided to read them to separete dataframes and processe them separately, and the process is over, these dataframe would be combined together for exploratory data analysis.

In [1]:
%%time
#init
#set path
import pandas as pd
import numpy as np
import math
from pyspark.sql import SparkSession
from pyspark.sql.functions import sqrt
from pyspark.sql import functions as F
from pyspark.sql import types
import os
import subprocess
mypath = "/user/hadoop/NYCTaxi/16-18" #your dir
cmd = ('hdfs dfs -ls '+mypath).split()
files = subprocess.check_output(cmd).strip().split()[3:]
fullpath=[]
for i in range(7,len(files),8):
    fullpath.append(str(files[i])[2:-1])
files=[i[27:] for i in fullpath]
df = ['PU_Time','DO_Time','Trip_Distance','Rate_Code','PU_Lon','PU_Lat','DO_Lon','DO_Lat','Fare_Amt','Extra','Tip_Amt','PULocationID','DOLocationID']
fullpath=fullpath[6:-1]
files=files[6:-1]

CPU times: user 174 ms, sys: 141 ms, total: 315 ms
Wall time: 3.62 s


In [2]:
%%time
#infile
spark = SparkSession.builder.master("yarn").config('spark.executor.instances','48').appName("badass").getOrCreate()
print('目錄下的檔案有:')
t=[]
for  f in fullpath:
    print(f)
    t += [spark.read.csv(f,sep = ',',header = True,inferSchema = True)]
df0=t[0]
print('第一個檔案的原始欄位')
print(df0.count(),len(df0.columns))
print('---------')
print(df0.dtypes)

目錄下的檔案有:
/user/hadoop/NYCTaxi/16-18/cleaned_2016_Green.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2016_Yellow.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2017_Green.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2017_Yellow.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Green-01.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Green-02.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Green-03.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Green-04.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Green-05.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Green-06.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Yellow-01.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Yellow-02.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Yellow-03.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Yellow-04.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Yellow-05.csv
/user/hadoop/NYCTaxi/16-18/cleaned_2018_Yellow-06.csv
第一個檔案的原始欄位
16385532 13
---------
[('PU_Time', 'string'), ('DO_Time', 'string'), ('Rate_Code', 'int'), ('PU_Lon', 'double'), ('PU_Lat', 'double'), (

### Check and handle missing values
Since there are sufficient data for analysis, all rows with missing vlaue were dropped by decision.

In [1]:
%%time
#Check NA values
for i in range(0,len(t)):
    for col in df:
        if col not in t[i].columns:
            print('Table',files[i],'do not have column', col)
        else:
            tmp=t[i].filter(t[i][col].isNull()).count()
            if tmp!=0:
                print('In table',files[i],'column',col,'has',tmp,'NA values.')
    print('---------------------------------------------')

NameError: name 't' is not defined

In [4]:
%%time
#Handle NA values
for i in range(2):
    for j in ['PU_Lon','PU_Lat','DO_Lon','DO_Lat']:
        t[i]=t[i].drop(j)

#Drop few NA rows
for i in range(0,len(t)):
     t[i] = t[i].na.drop()

CPU times: user 26.1 ms, sys: 387 µs, total: 26.5 ms
Wall time: 353 ms


**After dropping rows with missing value, the shape of all csv files were checked to see if they still have sufficient data** 

In [7]:
for i in range(len(t)):
    print(files[i],[t[i].count(),len(t[i].columns)])

cleaned_2016_Green.csv [7367502, 9]
cleaned_2016_Yellow.csv [61758523, 9]
cleaned_2017_Green.csv [11740667, 9]
cleaned_2017_Yellow.csv [113496874, 9]
cleaned_2018_Green-01.csv [793529, 9]
cleaned_2018_Green-02.csv [769940, 9]
cleaned_2018_Green-03.csv [837149, 9]
cleaned_2018_Green-04.csv [800084, 9]
cleaned_2018_Green-05.csv [797233, 9]
cleaned_2018_Green-06.csv [739373, 9]
cleaned_2018_Yellow-01.csv [8759874, 9]
cleaned_2018_Yellow-02.csv [8492076, 9]
cleaned_2018_Yellow-03.csv [9430376, 9]
cleaned_2018_Yellow-04.csv [9305515, 9]
cleaned_2018_Yellow-05.csv [9224063, 9]
cleaned_2018_Yellow-06.csv [8713831, 9]


## Feature engineering 
After the basic data processing, the step of feature engineering took place to get further imformation of the original dataset. The process of Feature engineering includes follow steps:
- Feature extraction 
- Check and handle values that don't make logical sense
- Check and handle outlier
- Combine all the dataframes into one

In [6]:
%%time
#New Columns


# timeDiff = (F.unix_timestamp('EndDateTime')
#             - F.unix_timestamp('StartDateTime'))
# df = df.withColumn("Duration", timeDiff)

for i in range(len(t)):
    t[i]=t[i].withColumn('PU_Time',F.unix_timestamp(t[i]['PU_Time'], 'MM/dd/yyyy h:mm:ss a').cast('timestamp'))
    t[i]=t[i].withColumn('DO_Time',F.unix_timestamp(t[i]['DO_Time'], 'MM/dd/yyyy h:mm:ss a').cast('timestamp'))


for i in range(len(t)):
    tt = F.unix_timestamp(t[i].DO_Time)-F.unix_timestamp(t[i].PU_Time)
    t[i] = t[i].withColumn('Trip_Time', tt)
        

CPU times: user 29.3 ms, sys: 14.2 ms, total: 43.5 ms
Wall time: 826 ms


### Feature Extraction 
When analyzing traffic, time is always a critical factor. In this analysis, the Pick Up time, which means the begining time of a transaction, was breakdown to several categorical features: Year, Month, Day, Hour, Minute, Second, WeekDay with the attempt to find the specific period of time when the duration of the taxi rides were affected by the traffic.
In addition, the speed were calculated to if there were peculiar value between time and distance.

In [7]:
#PU_Year
#PU_Month
#PU_Day
#PU_Hour
#PU_Minute
#PU_Second
#PU_WeekDay(0:6)
#Trip_Distance :miles to km
#Speed(km/hr)
for i in range(len(t)):
    t[i] = t[i].withColumn('PU_Year',F.year(t[i].PU_Time))
    t[i] = t[i].withColumn('PU_Month',F.month(t[i].PU_Time))
    t[i] = t[i].withColumn('PU_Day',F.dayofmonth(t[i].PU_Time))
    t[i] = t[i].withColumn('PU_Hour',F.hour(t[i].PU_Time))
    t[i] = t[i].withColumn('PU_Minute',F.minute(t[i].PU_Time))
    t[i] = t[i].withColumn('PU_Second',F.second(t[i].PU_Time))
    t[i] = t[i].withColumn('PU_WeekDay',F.dayofweek(t[i].PU_Time))
    t[i] = t[i].withColumn('Trip_Distance',t[i].Trip_Distance*1.609344)
    t[i] = t[i].withColumn('Speed',t[i].Trip_Distance*3600/t[i].Trip_Time)
print(t[0].columns)
print(t[0].head())

['PU_Time', 'DO_Time', 'Rate_Code', 'Trip_Distance', 'Fare_Amt', 'Extra', 'Tip_Amt', 'PULocationID', 'DOLocationID', 'Trip_Time', 'PU_Year', 'PU_Month', 'PU_Day', 'PU_Hour', 'PU_Minute', 'PU_Second', 'PU_WeekDay', 'Speed']
Row(PU_Time=datetime.datetime(2016, 7, 1, 0, 38, 47), DO_Time=datetime.datetime(2016, 7, 1, 0, 40, 53), Rate_Code=1, Trip_Distance=1.11044736, Fare_Amt=-4.0, Extra=-0.5, Tip_Amt=0.0, PULocationID=95, DOLocationID=95, Trip_Time=126, PU_Year=2016, PU_Month=7, PU_Day=1, PU_Hour=0, PU_Minute=38, PU_Second=47, PU_WeekDay=6, Speed=31.727067428571427)


### Check and handle values that don't make logical sense

In [13]:
#Trip_Time<=0
#Trip_Distance<=0
#'Fare_Amt', 'Extra', 'Tip_Amt'<0
#Trip_Distance<True_Distance
#True_Distance>=5
for i in range(len(t)):
    print(files[i])
    tttmp=0
    tmp0 = t[i].filter(t[i].Trip_Time<=0).count()
    tmp1 = t[i].filter(t[i].Trip_Distance<=0).count()
    tmp2 = t[i].filter(t[i].Fare_Amt<0).count()
    tmp3 = t[i].filter(t[i].Extra<0).count()
    tmp4 = t[i].filter(t[i].Tip_Amt<0).count()
    if tmp0>0 :
        tttmp+=tmp0
        print('Trip_Time<=0 的資料有',tmp0,'筆')
    if tmp1>0 :
        tttmp+=tmp1
        print('Trip_Distance<=0 的資料有',tmp1,'筆')
    if tmp2>0 :
        tttmp+=tmp2
        print('Fare_Amt<0 的資料有',tmp1,'筆')
    if tmp3>0 :
        tttmp+=tmp3
        print('df0.Extra<0 的資料有',tmp3,'筆')
    if tmp4>0 :
        tttmp+=tmp4
        print('Tip_Amt<0 的資料有',tmp4,'筆')
        
    if 'True_Distance' in t[i].columns:
        tt0 = t[i].filter(t[i].Trip_Distance<=t[i].True_Distance).count()
        if tt0>0 :
            tttmp+=tt0
            print('Trip_Distance<=True_Distance 的資料有',tt0,'筆')
        tt = t[i].filter(t[i].True_Distance>=5).count()
        if tt>0 :
            tttmp+=tt
            print('True_Distance>=5 的資料有',tt,'筆')
    print()
    print('Total:',tttmp)
    print('---------------------------------')

cleaned_2016_Green.csv
Trip_Time<=0 的資料有 5103 筆
Trip_Distance<=0 的資料有 99175 筆
Fare_Amt<0 的資料有 99175 筆
df0.Extra<0 的資料有 7904 筆
Tip_Amt<0 的資料有 127 筆

Total: 127550
---------------------------------
cleaned_2016_Yellow.csv
Trip_Time<=0 的資料有 64171 筆
Trip_Distance<=0 的資料有 377078 筆
Fare_Amt<0 的資料有 377078 筆
df0.Extra<0 的資料有 13632 筆
Tip_Amt<0 的資料有 482 筆

Total: 483790
---------------------------------
cleaned_2017_Green.csv
Trip_Time<=0 的資料有 6694 筆
Trip_Distance<=0 的資料有 135743 筆
Fare_Amt<0 的資料有 135743 筆
df0.Extra<0 的資料有 14104 筆
Tip_Amt<0 的資料有 208 筆

Total: 183388
---------------------------------
cleaned_2017_Yellow.csv
Trip_Time<=0 的資料有 108458 筆
Trip_Distance<=0 的資料有 743152 筆
Fare_Amt<0 的資料有 743152 筆
df0.Extra<0 的資料有 27486 筆
Tip_Amt<0 的資料有 844 筆

Total: 936029
---------------------------------
cleaned_2018_Green-01.csv
Trip_Time<=0 的資料有 462 筆
Trip_Distance<=0 的資料有 9216 筆
Fare_Amt<0 的資料有 9216 筆
df0.Extra<0 的資料有 1017 筆
Tip_Amt<0 的資料有 13 筆

Total: 12647
---------------------------------
cleaned_

In [8]:
#Handle nonsense values
for j in range(0,len(t)):
    t[j]=t[j].filter(t[j]['Trip_Time']>0)
    t[j]=t[j].filter(t[j].Trip_Distance>0)
    t[j]=t[j].filter(t[j].Fare_Amt>=0)
    t[j]=t[j].filter(t[j].Extra>=0)
    t[j]=t[j].filter(t[j].Tip_Amt>=0)
    if 'True_Distance' in t[j].columns:
        t[j]=t[j].filter(t[j].True_Distance<5)
        t[j]=t[j].filter(t[j].Trip_Distance>t[j].True_Distance)
for j in range(0,len(t)):        
    print(t[j].count(),len(t[j].columns))

7255409 18
61355211 18
11581590 18
112704296 18
782611 18
760605 18
826456 18
789393 18
785784 18
729533 18
8700826 18
8436063 18
9364906 18
9243415 18
9160205 18
8648906 18


In [None]:
# Checking time value that make no logical sense
for i in range(len(t)): 
    tmp1 = t[i].filter(df.PU_Month<0).count() + t[i].filter(df.PU_Month>12).count()
    tmp2 = t[i].filter(df.PU_Day<0).count() + t[i].filter(df.PU_Day>31).count()
    tmp3 = t[i].filter(df.PU_Minute<0).count() + t[i].filter(df.PU_Minute>60).count()
    tmp4 = t[i].filter(df.PU_Second<0).count() + t[i].filter(df.PU_Second>60).count()
    tmp5 = t[i].filter(df.PU_WeekDay<1).count() + t[i].filter(df.PU_WeekDay>7).count()
    tmp6 = t[i].filter(df['PU_Year'] >= 2018) + t[i].filter(df['PU_Year']<=2016)
    tmp7 = t[i].filter(df['Speed']>200)
print(tmp1,tmp2,tmp3,tmp4,tmp5,tmp6,tmp7)

# Get rid of nonse time value
for i in range(len(t)): 
    t[i] = t[i].filter(df['PU_Year']<=2018).filter(df['PU_Year']>=2016)
    t[i] = t[i].filter(df['Speed']<200)

### Combine all the dataframes into one
After the process of data cleaning and feature engineering, all dataframes were combine to one for the sake of convenience whem manipulating dataframe. But before this, the columns of all dataframes should be checked if they were in the same order. If not, the orders should be rearranged, otherwise that might cause prblem in the future.

**First checking the schemas and the orders of all the data frame**

In [19]:
for i in range(len(t)):
    print(t[i].printSchema())

root
 |-- PU_Time: timestamp (nullable = true)
 |-- DO_Time: timestamp (nullable = true)
 |-- Rate_Code: integer (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- Extra: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- Trip_Time: long (nullable = true)
 |-- PU_Year: integer (nullable = true)
 |-- PU_Month: integer (nullable = true)
 |-- PU_Day: integer (nullable = true)
 |-- PU_Hour: integer (nullable = true)
 |-- PU_Minute: integer (nullable = true)
 |-- PU_Second: integer (nullable = true)
 |-- PU_WeekDay: integer (nullable = true)
 |-- Speed: double (nullable = true)

None
root
 |-- PU_Time: timestamp (nullable = true)
 |-- DO_Time: timestamp (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Rate_Code: integer (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- Extra: double (nullable = true

**Second unify the Order of columns in every dataframe**


In [9]:
for i in range(len(t)):
    t[i]=t[i].select('PU_Time','DO_Time','Rate_Code','Trip_Distance','Fare_Amt','Extra','Tip_Amt','PULocationID','DOLocationID','Trip_Time','PU_Year','PU_Month','PU_Day','PU_Hour', 'PU_Minute', 'PU_Second', 'PU_WeekDay', 'Speed')

**Then double check the orders**

In [10]:
for i in range(len(t)):
    print(t[i].columns)

['PU_Time', 'DO_Time', 'Rate_Code', 'Trip_Distance', 'Fare_Amt', 'Extra', 'Tip_Amt', 'PULocationID', 'DOLocationID', 'Trip_Time', 'PU_Year', 'PU_Month', 'PU_Day', 'PU_Hour', 'PU_Minute', 'PU_Second', 'PU_WeekDay', 'Speed']
['PU_Time', 'DO_Time', 'Rate_Code', 'Trip_Distance', 'Fare_Amt', 'Extra', 'Tip_Amt', 'PULocationID', 'DOLocationID', 'Trip_Time', 'PU_Year', 'PU_Month', 'PU_Day', 'PU_Hour', 'PU_Minute', 'PU_Second', 'PU_WeekDay', 'Speed']
['PU_Time', 'DO_Time', 'Rate_Code', 'Trip_Distance', 'Fare_Amt', 'Extra', 'Tip_Amt', 'PULocationID', 'DOLocationID', 'Trip_Time', 'PU_Year', 'PU_Month', 'PU_Day', 'PU_Hour', 'PU_Minute', 'PU_Second', 'PU_WeekDay', 'Speed']
['PU_Time', 'DO_Time', 'Rate_Code', 'Trip_Distance', 'Fare_Amt', 'Extra', 'Tip_Amt', 'PULocationID', 'DOLocationID', 'Trip_Time', 'PU_Year', 'PU_Month', 'PU_Day', 'PU_Hour', 'PU_Minute', 'PU_Second', 'PU_WeekDay', 'Speed']
['PU_Time', 'DO_Time', 'Rate_Code', 'Trip_Distance', 'Fare_Amt', 'Extra', 'Tip_Amt', 'PULocationID', 'DOLoca

**Finally all dataframes were combined to one using union function**

In [11]:
for i in t[1:]:
    t[0]=t[0].union(i)
df=t[0]
df.printSchema()

### Change the schema of columns

In [None]:
df=df.withColumn('PU_WeekDay',df['PU_WeekDay'].cast('integer'))
df=df.withColumn('PU_Hour',df['PU_Hour'].cast('integer'))
df=df.withColumn('Rate_Code',df['Rate_Code'].cast('integer'))
df=df.withColumn('Trip_Distance',df['Trip_Distance'].cast('double'))
df=df.withColumn('Fare_Amt',df['Fare_Amt'].cast('double'))
df=df.withColumn('Extra',df['Extra'].cast('double'))
df=df.withColumn('Tip_Amt',df['Tip_Amt'].cast('double'))
df=df.withColumn('PULocationID',df['PULocationID'].cast('integer'))
df=df.withColumn('DOLocationID',df['DOLocationID'].cast('integer'))
df=df.withColumn('Trip_Time',df['Trip_Time'].cast('double'))
df=df.withColumn('PU_Month',df['PU_Month'].cast('integer'))
df=df.withColumn('PU_Year',df['PU_Year'].cast('integer'))
df=df.withColumn('PU_Day',df['PU_Day'].cast('integer'))
df=df.withColumn('PU_Minute',df['PU_Minute'].cast('integer'))
df=df.withColumn('PU_Second',df['PU_Second'].cast('integer'))
df=df.withColumn('Speed',df['Speed'].cast('double'))

### Filter out PU_Time and DO_Time that was out of date(2018/6/30)

In [None]:
expr = "^(2018)(-)(0[7-9]|1[012])"

df.filter(~df["PU_Time"].rlike(expr)).count()

## Summary of the output dataset
After the combination process, some basic description were made to see the full picture of processed dataset.
In the processed dataset, There were:
1. **251125209 rows in total**
1. **18 columns** 

In [15]:
df_final.count()

251125209

In [16]:
# df_final.coalesce(1).write.csv("/user/hadoop/NYCTaxi/16-18/final_df",header=True,sep=',')
df_final.write.csv("/user/hadoop/NYCTaxi/16-18/final_df",header=True,sep=',',timestampFormat='yyyy-MM-dd HH:mm:ss')
