In [2]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *

import os
import glob
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
#import plotly_express as px

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Function Reads Taxi Zone

In [2]:
def taxiZone():
    taxi = spark.read.csv("/home/yte9pc/Project/Yellow_Cab_Data/Taxi_Zone/taxi_zone_lookup.csv", header = True)
    return taxi

### Function Reads Yellow Cab CSVs

In [32]:
def yellowCSVToDF():
    location = "/home/yte9pc/Project/Yellow_Cab_Data/"

    files = sorted(glob.glob(location + '*.csv'))

    for idx,f in enumerate(files):
        if idx <= 2:
            df = spark.read.csv(f, header = True)
            print(f)

            if idx == 0:
                data = df
            else:
                data = data.union(df)
    return data

### Adds Taxi Zone for Pickup and Dropoff

In [33]:
def yellowTaxiZone():
    # Drop Temp View
    spark.catalog.dropTempView("taxiZone")
    spark.catalog.dropTempView("yellowCab")
    
    taxiZone().createTempView("taxiZone")
    yellowCSVToDF().createTempView("yellowCab")
    yellowCab = spark.sql("SELECT yellowCab.*, \
                     PU.Borough as PUBorough, PU.Zone as PUZone, PU.service_zone as PUSerivce_Zone, \
                     DO.Borough as DOBorough, DO.Zone as DOZone, DO.service_zone as DOSerivce_Zone\
                     FROM yellowCab \
                     INNER JOIN taxiZone PU\
                            ON yellowCab.PULocationID = PU.LocationID \
                     INNER JOIN taxiZone DO\
                            ON yellowCab.DOLocationID = DO.LocationID")
    return yellowCab

In [34]:
data = yellowTaxiZone()

/home/yte9pc/Project/Yellow_Cab_Data/yellow_tripdata_2019-01.csv
/home/yte9pc/Project/Yellow_Cab_Data/yellow_tripdata_2019-02.csv
/home/yte9pc/Project/Yellow_Cab_Data/yellow_tripdata_2019-03.csv
22354693


In [31]:
#data.count()

22354693

In [27]:
data.select(data.columns[-6:]).show(1)

+---------+----------------+--------------+---------+--------------------+--------------+
|PUBorough|          PUZone|PUSerivce_Zone|DOBorough|              DOZone|DOSerivce_Zone|
+---------+----------------+--------------+---------+--------------------+--------------+
|Manhattan|Manhattan Valley|   Yellow Zone|Manhattan|Upper West Side S...|   Yellow Zone|
+---------+----------------+--------------+---------+--------------------+--------------+
only showing top 1 row



### Defining Column Types

In [28]:
data = data.withColumn("tpep_pickup_datetime", data["tpep_pickup_datetime"].cast(TimestampType()))\
    .withColumn("tpep_dropoff_datetime", data["tpep_dropoff_datetime"].cast(TimestampType()))\
    .withColumn("passenger_count", data["passenger_count"].cast(IntegerType()))\
    .withColumn("trip_distance", data["trip_distance"].cast(FloatType()))\
    .withColumn("fare_amount", data["fare_amount"].cast(FloatType()))\
    .withColumn("extra", data["extra"].cast(FloatType()))\
    .withColumn("mta_tax", data["mta_tax"].cast(FloatType()))\
    .withColumn("tip_amount", data["tip_amount"].cast(FloatType()))\
    .withColumn("tolls_amount", data["tolls_amount"].cast(FloatType()))\
    .withColumn("improvement_surcharge", data["improvement_surcharge"].cast(FloatType()))\
    .withColumn("total_amount", data["total_amount"].cast(FloatType()))\
    .withColumn("congestion_surcharge", data["congestion_surcharge"].cast(FloatType()))

### Adding Additional Features

In [29]:
data = data.withColumn("pickup_week_day", date_format("tpep_pickup_datetime", "EEEE"))\
    .withColumn("pickup_hour", hour("tpep_pickup_datetime"))\
    .withColumn("dropoff_week_day", date_format("tpep_dropoff_datetime", "EEEE"))\
    .withColumn("dropoff_hour", hour("tpep_dropoff_datetime"))\
    .withColumn("trip_time(mins)", (col("tpep_dropoff_datetime").cast(LongType()) - col("tpep_pickup_datetime").cast(LongType()))/60)
data.select(["tpep_pickup_datetime", "pickup_hour", "pickup_week_day", "tpep_dropoff_datetime", 
             "dropoff_week_day", "dropoff_hour", "trip_time(mins)"]).show(3)

+--------------------+-----------+---------------+---------------------+----------------+------------+-----------------+
|tpep_pickup_datetime|pickup_hour|pickup_week_day|tpep_dropoff_datetime|dropoff_week_day|dropoff_hour|  trip_time(mins)|
+--------------------+-----------+---------------+---------------------+----------------+------------+-----------------+
| 2019-01-01 00:46:40|          0|        Tuesday|  2019-01-01 00:53:20|         Tuesday|           0|6.666666666666667|
| 2019-01-01 00:59:47|          0|        Tuesday|  2019-01-01 01:18:59|         Tuesday|           1|             19.2|
| 2018-12-21 13:48:30|         13|         Friday|  2018-12-21 13:52:40|          Friday|          13|4.166666666666667|
+--------------------+-----------+---------------+---------------------+----------------+------------+-----------------+
only showing top 3 rows

