In [3]:
!pip install pyspark
!pip install scikit-learn



## Importing Libraries for Data Preprocessing

In [4]:
import warnings

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.sql.types import FloatType

from sklearn.preprocessing import StandardScaler

from math import radians, cos, sin, asin, sqrt
import os

In [5]:
warnings.filterwarnings('ignore')

### Getting Cleaned Data

In [6]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
df = spark.read.csv("drive/MyDrive/NYC Home/Data/NYC Taxi Duration Cleaned/*.csv", header=True)
df.printSchema()

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: string (nullable = true)



In [9]:
df.show()

+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1|-73.98215484619139| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423827|40.738563537597656|-73.99948120117188|40.731151580810554|                 N|          663|
|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|              1|-73.97902679443358|40.763938903808594|-74.005332

## Preprocess and add additional data into DataFrame

In [10]:
# Change Datatype for each column.

def change_type(df, column_name, column_type, is_time):
    if column_type == "int" or column_type == "float":
        new_df = df.withColumn(column_name, df[column_name].cast(column_type))
    elif column_type == "bool":
        new_df = df.withColumn(column_name, functions.when(df[column_name] == 'N', 0).otherwise(1))
    elif is_time == True:
        new_df = df.withColumn(column_name, to_timestamp(df[column_name], "yyyy-MM-dd HH:mm:ss"))
    else:
        new_df = df

    return new_df

In [11]:
# Calling change_type function for each column.

column_types = ["int", "time", "time", "int", "float", "float", "float", "float", "bool", "int"]
count = 0

for col in df.dtypes:
    if column_types[count] == "time":
        df = change_type(df, col[0], column_types[count], True)
    else:
        df = change_type(df, col[0], column_types[count], False)
    count += 1

In [12]:
# Printing Datatype for all columns.

for col in df.dtypes:
    print(col[0]+" , "+col[1])

vendor_id , int
pickup_datetime , timestamp
dropoff_datetime , timestamp
passenger_count , int
pickup_longitude , float
pickup_latitude , float
dropoff_longitude , float
dropoff_latitude , float
store_and_fwd_flag , int
trip_duration , int


In [13]:
# Calculate distance between two places.

def distance(pickUpLat1, dropOffLat2, pickUpLon1, dropOffLon2):
    longitude = radians(dropOffLon2) - radians(pickUpLon1)
    x = sin(longitude / 2)**2 + cos(radians(pickUpLat1)) * cos(radians(dropOffLat2)) * sin(longitude / 2)**2

    temp = 2 * asin(sqrt(x))
    earth_radius = 6371
    return(temp * earth_radius)

In [14]:
# Calculating distance between pickup and dropoff latitude and longtitude. In (K.M.)

distance_func = udf(distance, FloatType())
df = df.withColumn("distance", distance_func(df.pickup_latitude, df.dropoff_latitude, df.pickup_longitude, df.dropoff_longitude))

In [15]:
# filter the data where distance is greater than 0.

df = df.filter(df.distance > 0)

In [16]:
# Remove all trips that have trip_duration more than 7200 seconds which is equivalent to more than 2 hours and less than 180 seconds.
# Remove all trips which total distance is less than 1.5 and more than 60 KM because no taxi accepts rides as the destination is more than 60 kilometers far away.

df = df.filter((df.trip_duration > 180) & (df.trip_duration < 7200) & (df.distance > 1.5) & (df.distance < 60))


In [17]:
df.show()

+---------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+---------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|store_and_fwd_flag|trip_duration| distance|
+---------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+---------+
|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1|      -73.982155|      40.767937|        -73.96463|       40.765602|                 0|          455|2.4444735|
|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|      -73.980415|      40.738564|        -73.99948|        40.73115|                 0|          663|2.6599078|
|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|              1|       -73.97903|       40.76394|     

### Add timeline for each ride

In [18]:
df = df.withColumn("week_day", date_format(df["pickup_datetime"], "E"))
df = df.withColumn('year', date_format(df["pickup_datetime"], "y"))
df = df.withColumn('month', date_format(df["pickup_datetime"], "M"))
df = df.withColumn('quarter_of_year', quarter('pickup_datetime'))
df = df.withColumn('hour', date_format(df["pickup_datetime"], "H"))

df = df.drop("pickup_datetime")
df = df.drop("dropoff_datetime")

In [19]:
df.show()

+---------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+---------+--------+----+-----+---------------+----+
|vendor_id|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|store_and_fwd_flag|trip_duration| distance|week_day|year|month|quarter_of_year|hour|
+---------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+---------+--------+----+-----+---------------+----+
|        2|              1|      -73.982155|      40.767937|        -73.96463|       40.765602|                 0|          455|2.4444735|     Mon|2016|    3|              1|  17|
|        1|              1|      -73.980415|      40.738564|        -73.99948|        40.73115|                 0|          663|2.6599078|     Sun|2016|    6|              2|   0|
|        2|              1|       -73.97903|       40.76394|        -74.00533|       40.710087|     

### Save Preprocessed and Scaler Data

In [20]:
if os.path.exists("./NYC Taxi Duration Preprocessed") == False:
    df.write.option("header", "true").csv("./NYC Taxi Duration Preprocessed")