In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import shapefile

# a nice way of filtering out deprecated warnings
import warnings
warnings.filterwarnings("ignore")

# Spark

In [2]:
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

# create a spark session (which will run spark jobs)
spark = SparkSession.builder.getOrCreate()

21/08/15 13:23:58 WARN Utils: Your hostname, LIVIA resolves to a loopback address: 127.0.1.1; using 172.22.214.215 instead (on interface eth0)
21/08/15 13:23:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/15 13:24:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Import CSV from January 2019 to March 2019

In [3]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

sdf = spark.read.csv('../data/yellow_tripdata_2019-0[1-3].csv', header=True)

f"{sdf.count():,} rows."

                                                                                

'22,519,712 rows.'

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
sdf.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,N,239,246,1,14.0,0.5,0.5,1.0,0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.0,1,N,236,236,1,4.5,0.5,0.5,0.0,0,0.3,5.8,
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,0,0.3,7.55,
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0,0.3,55.55,


## Create a Schema

In [5]:
import pyspark.sql.functions as F

from pyspark.sql.types import *
from pyspark.sql.functions import col

In [6]:
ints = ('VendorID', 'passenger_count', 'RatecodeID', 
        'PULocationID', 'DOLocationID', 'payment_type', )
doubles = ('trip_distance', 'fare_amount', 'extra', 
           'mta_tax', 'tip_amount', 'tolls_amount', 
           'improvement_surcharge', 'total_amount', 'congestion_surcharge')
strings = ('store_and_fwd_flag',)
dtimes = ('tpep_pickup_datetime', 'tpep_dropoff_datetime', )

dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: DoubleType() for column in doubles})
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})

In [7]:
schema = StructType()

for column in sdf.columns:
    schema.add(column, # column name
               dtypes[column], # data type
               True # is nullable?
              )

In [9]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

sdf = spark.read.csv('../data/yellow_tripdata_2019-0[1-3].csv', header=True, schema=schema)

f"{sdf.count():,} rows."

                                                                                

'22,519,712 rows.'

In [8]:
sdf.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)



In [10]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
sdf.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.0,1,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,


## Cleaning the Data

### Initial Cleaning

Find trips that are happening in 2019 and ensure that the pickup time is before the dropoff time

In [11]:
sdf = sdf.filter((sdf.tpep_pickup_datetime.startswith('2019')) 
                 & (sdf.tpep_dropoff_datetime.startswith('2019')))

sdf = sdf.filter(sdf.tpep_dropoff_datetime > sdf.tpep_pickup_datetime)

Ensure all trips are not empty within and the maximum passenger count (5)

In [12]:
sdf = sdf.filter(sdf.passenger_count > 0)

sdf = sdf.filter(sdf.passenger_count < 6)

Ensure all trips are within the minimum fare ($2.5)

In [13]:
sdf = sdf.filter(sdf.fare_amount > 2.5)

Ensure the trip distance is more than 0.1

In [14]:
sdf = sdf.filter((sdf.trip_distance > 0.1))

Find trips paid by credit card (1) and unknown (5)

In [15]:
payment_id = [1, 5]
sdf = sdf.filter(F.col("payment_type").isin(payment_id))

Remove trips with unknown location ID (location ID 264 and 265 are unknown location)

In [16]:
sdf = sdf.filter((sdf.PULocationID < 264))
sdf = sdf.filter((sdf.DOLocationID < 264))

In [17]:
# Add duration per second for each record
diff_hour_col = (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long"))/3600
sdf = sdf.withColumn( "diff_hour", diff_hour_col )

# Remove trips less than a minute and more than 10 hours

sdf = sdf.filter(sdf.diff_hour <= 10)
sdf = sdf.filter(sdf.diff_hour > (1/60))

In [18]:
tip_pct_col = ((col("tip_amount") / col("total_amount"))*100)
sdf = sdf.withColumn("tip_pct", tip_pct_col)

In [19]:
sdf.limit(1)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,diff_hour,tip_pct
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,0.1111111111111111,16.582914572864322


After initial cleaning, we have:

In [20]:
f"{sdf.count():,} rows."

                                                                                

'15,345,130 rows.'

### Adding columns for hour, month, and day_of_week

NOTE: for day_of week --> 1 - Sunday, 2 - Monday, 3 - Tuesday, etc

In [20]:
sdf = sdf.withColumn('pickup_month', F.month(sdf.tpep_pickup_datetime))
sdf = sdf.withColumn('pickup_day_of_week', F.dayofweek(sdf.tpep_pickup_datetime))
sdf = sdf.withColumn('pickup_hour', F.hour(sdf.tpep_pickup_datetime))

sdf = sdf.withColumn('dropoff_month', F.month(sdf.tpep_dropoff_datetime))
sdf = sdf.withColumn('dropoff_day_of_week', F.dayofweek(sdf.tpep_dropoff_datetime))
sdf = sdf.withColumn('dropoff_hour', F.hour(sdf.tpep_dropoff_datetime))

In [21]:
sdf.limit(5)

21/08/15 13:28:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,diff_hour,tip_pct,pickup_month,pickup_day_of_week,pickup_hour,dropoff_month,dropoff_day_of_week,dropoff_hour
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,0.1111111111111111,16.582914572864322,1,3,0,1,3,0
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,,0.32,6.134969325153374,1,3,0,1,3,1
1,2019-01-01 00:21:28,2019-01-01 00:28:37,1,1.3,1,N,163,229,1,6.5,0.5,0.5,1.25,0.0,0.3,9.05,,0.1191666666666666,13.812154696132596,1,3,0,1,3,0
1,2019-01-01 00:32:01,2019-01-01 00:45:39,1,3.7,1,N,229,7,1,13.5,0.5,0.5,3.7,0.0,0.3,18.5,,0.2272222222222222,20.0,1,3,0,1,3,0
1,2019-01-01 00:57:32,2019-01-01 01:09:32,2,2.1,1,N,141,234,1,10.0,0.5,0.5,1.7,0.0,0.3,13.0,,0.2,13.076923076923078,1,3,0,1,3,1


## Aggregating the Data

### Pickup

In [26]:
sdf_pickup = sdf.groupBy("PULocationID")

In [27]:
sdf_pickup_count = sdf_pickup.count().toPandas()
sdf_pickup_count.to_pickle("pickup_count.pkl")

                                                                                

### Dropoff

In [28]:
sdf_dropoff = sdf.groupBy("DOLocationID")

In [29]:
sdf_dropoff_count = sdf_dropoff.count().toPandas()
sdf_dropoff_count.to_pickle("dropoff_count.pkl")

                                                                                

### Pickup Day of Week

In [30]:
sdf_pickup_day = sdf.groupBy("pickup_day_of_week")

In [31]:
sdf_pickup_day_count = sdf_pickup_day.count().toPandas()
sdf_pickup_day_count.to_pickle("pickup_day_count.pkl")

                                                                                

### Pickup Hour

In [32]:
sdf_pickup_hour = sdf.groupBy("pickup_hour")

In [33]:
sdf_pickup_hour_count = sdf_pickup_hour.count().toPandas()
sdf_pickup_hour_count.to_pickle("pickup_hour_count.pkl")

                                                                                

### Pickup Month

In [34]:
sdf_pickup_month = sdf.groupBy("pickup_month")

In [35]:
sdf_pickup_month_count = sdf_pickup_month.count().toPandas()
sdf_pickup_month_count.to_pickle("pickup_month_count.pkl")

21/08/15 19:50:10 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 147391 ms exceeds timeout 120000 ms
21/08/15 19:50:10 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 172.22.214.215:44193 in 10000 milliseconds
21/08/15 19:50:10 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
	at org

## Dropping Columns

In [23]:
final_sdf = sdf.drop("VendorID","store_and_fwd_flag","tpep_pickup_datetime","tpep_dropoff_datetime",
                     "payment_type", "extra", "mta_tax", "tip_amount", "total_amount", 
                     "improvement_surcharge", "congestion_surcharge")

In [24]:
final_sdf.limit(3)

passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,fare_amount,tolls_amount,diff_hour,tip_pct,pickup_month,pickup_day_of_week,pickup_hour,dropoff_month,dropoff_day_of_week,dropoff_hour
1,1.5,1,151,239,7.0,0.0,0.1111111111111111,16.582914572864322,1,3,0,1,3,0
1,2.6,1,239,246,14.0,0.0,0.32,6.134969325153374,1,3,0,1,3,1
1,1.3,1,163,229,6.5,0.0,0.1191666666666666,13.812154696132596,1,3,0,1,3,0


## Export as Pickled Pandas Dataframe

In [22]:
final_df_1 = final_sdf.select(["passenger_count", "trip_distance", "RatecodeID", 
                             "PULocationID", "DOLocationID", "fare_amount"]).toPandas()

                                                                                

In [23]:
final_df_2 = final_sdf.select(["tolls_amount", "diff_hour", "tip_pct", "pickup_month", 
                               "pickup_day_of_week", "pickup_hour", "dropoff_month", 
                               "dropoff_day_of_week", "dropoff_hour"]).toPandas()

                                                                                

In [27]:
final_df_1.to_pickle("../data/final_df_1.pkl")

In [28]:
final_df_2.to_pickle("../data/final_df_2.pkl")