In [1]:
import pandas as pd
import geopandas as gpd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, date_format, when, year, weekofyear, hour, round, expr, log
import pyspark.sql.functions as F
from datetime import datetime

import os
import warnings
warnings.filterwarnings('ignore')


In [2]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 ")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

23/08/20 17:55:49 WARN Utils: Your hostname, LAPTOP-D9335T9D resolves to a loopback address: 127.0.1.1; using 192.168.0.77 instead (on interface wifi0)
23/08/20 17:55:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/20 17:55:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Preprocess the 2019's march tlc taxi data in the same way as we preprocessed training data 

import the location data

In [3]:
file_path = "../data/curated/taxi_zone_gdf/taxi_zone_gdf.geojson"
# Read the GeoPandas DataFrame from the specified file
gdf = gpd.read_file(file_path)
location = spark.createDataFrame(gdf[["LocationID", "Borough"]])


### simple type transformation and filtering to raw data

In [4]:
# saving the taxi data with only relevant features 

# Define the data type conversions for each column
data_type_mappings = {
'tpep_pickup_datetime': 'timestamp',
'tpep_dropoff_datetime': 'timestamp',
'passenger_count': 'int',
'trip_distance': 'double',
'PULocationID': 'int',
'DOLocationID': 'int',
'payment_type': 'int',
'tip_amount': 'double',
'fare_amount': 'double'
}


taxi_sdf = spark.read.parquet('../data/landing/test_taxi_2019/*')

# since we will not include features like VendorID, Extra and MTA_tax. 
# we will remove any data that breaks the bussiness rule for these features
# such we will remove any instance with invalid vendorid, and remove any instance with mta_tax!= 0.5
taxi_sdf = taxi_sdf.filter(taxi_sdf["VendorID"].isin([1,2]))
taxi_sdf = taxi_sdf.filter(F.col('MTA_tax') == 0.5)

# remove any data with invalid Extra any extra other than 0.5 and 1 or 0
taxi_sdf = taxi_sdf.filter(taxi_sdf["Extra"].isin([0.5,1,0]))

# remove any data with invalid RateCodeID
taxi_sdf = taxi_sdf.filter(taxi_sdf["RateCodeID"].isin([1,2,3,4,5,6]))

# retain any record with fare amount less than total amount
taxi_sdf = taxi_sdf.filter(taxi_sdf["Fare_amount"] < taxi_sdf["Total_amount"])

# retain record with valid toll_amount
taxi_sdf = taxi_sdf.filter(F.col('Tolls_amount') >= 0)
taxi_sdf= taxi_sdf.filter(F.col('Tolls_amount') <=38)


# change the casing for feature name 
filter_sdf = taxi_sdf.select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 
                         'PULocationID', 'DOLocationID', 'payment_type', 'tip_amount', 'Fare_amount')
for column_name, new_data_type in data_type_mappings.items():
     filter_sdf = filter_sdf.withColumn(column_name, col(column_name).cast(new_data_type))

consistent_col_casing = [F.col(col_name).alias(col_name.lower()) for col_name in filter_sdf.columns]
filter_sdf = filter_sdf.select(*consistent_col_casing)

# focusing on manhattan data
taxi_sdf = taxi_sdf.join(location, location['LocationID']==taxi_sdf["pulocationid"], "inner")
taxi_sdf = taxi_sdf.drop('LocationID')
taxi_sdf = taxi_sdf.filter(F.col("Borough") == 'Manhattan')


output_folder = "../data/raw/test_taxi_2019/03"  # Replace with the desired output folder
filter_sdf.write.parquet(output_folder, mode="overwrite")


                                                                                

### filtering and cleaning to curated data file

In [5]:
# read data from raw
temp_sdf = spark.read.parquet('../data/raw/test_taxi_2019/*')

# remove any record with invalide passenger counts
# valid passenger count are between 1-7
temp_sdf = temp_sdf.where((F.col('passenger_count') > 0) & (F.col('passenger_count')<=7 ))

# remove any record with less than 3 dollar fare_amount and more than $120
temp_sdf = temp_sdf.filter(F.col('fare_amount') > 3)
temp_sdf = temp_sdf.filter(F.col('fare_amount') <120)


# remove any record with negative tipping and over $100 tip
temp_sdf = temp_sdf.filter(F.col('tip_amount') >= 0)
temp_sdf = temp_sdf.filter(F.col('tip_amount') <= 100)

#remove any instance with trip distance less than 0.2 mile 
temp_sdf = temp_sdf.filter(F.col('trip_distance') >= 0.2)    

#Define valid payment types
valid_payment_types = [1, 2, 3, 4, 5, 6]
# Remove rows with invalid payment types
temp_sdf = temp_sdf.filter(temp_sdf["payment_type"].isin(valid_payment_types))

#Define valid location id
location_ids = list(range(1,264))
# Remove rows with invalid pick up location id
# as we are mostly interest in trips that begin inside new york
temp_sdf = temp_sdf.filter(temp_sdf["pulocationid"].isin(location_ids))

#filter out records where payment_type is not equal to 1 but tip_amount is greater than 0
# as record with payment_type!=1 and tip_amount >0 violates the bussiness rule 
### remove negative tip
temp_sdf = temp_sdf.filter((F.col("payment_type") == 1) | (F.col("tip_amount") <= 0))

#filter out all record with non credicard payments 
temp_sdf = temp_sdf.filter((F.col("payment_type") == 1))

# Create new feature trip_duration to calculate trip duration in minute
temp_sdf = temp_sdf.withColumn("trip_duration_minutes", round((unix_timestamp(col("tpep_dropoff_datetime")).cast("double") - unix_timestamp(col("tpep_pickup_datetime")).cast("double"))/ 60, 3))

# remove any trip/instance with trip duration less than a minute and longer than 2.5 hours 
temp_sdf = temp_sdf.filter(F.col('trip_duration_minutes') >= 1)
temp_sdf = temp_sdf.filter(F.col('trip_duration_minutes') <= 150)


# Create new feature of pickup_day and hour (the assitant feature) from pickup_datetime attribute
temp_sdf = temp_sdf.withColumn("pickup_day", date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd"))
temp_sdf = temp_sdf.withColumn("hour", date_format(col("tpep_pickup_datetime"), "HH").cast("int"))

#### Remove any record that is not in our research time period 
# Define the start and end dates of the desired period
start_date = datetime.strptime("2019-03-01", "%Y-%m-%d")
end_date = datetime.strptime("2019-03-31", "%Y-%m-%d")
# Filter out records not within the desired period
temp_sdf = temp_sdf.filter((col("pickup_day") >= start_date) & (col("pickup_day") <= end_date))


# Include new feature week number
temp_sdf = temp_sdf.withColumn("week_number", weekofyear("pickup_day"))



# Drop the tpep_pickup_datetime, tpep_dropoff_datetime, and other temporary/assistant features like hour and what_day, since we no longer using them
temp_sdf = temp_sdf.drop("tpep_pickup_datetime", "tpep_dropoff_datetime", "payment_type", "dolocationid")

# dropping any row/instance of data contain missing value
temp_sdf = temp_sdf.na.drop()

output_folder = "../data/curated/test_taxi_data/"  # Replace with the desired output folder
temp_sdf.write.parquet(output_folder, mode="overwrite")

                                                                                

In [6]:
test_taxi_sdf = spark.read.parquet('../data/curated/test_taxi_data/*')
test_taxi_sdf.count()

3443252

## Now preprocess the weather test data 

In [7]:
weather_sdf = spark.read.csv("../data/landing/test_weather_data/test_weather_data.csv", header=True, inferSchema=True)

In [8]:
columns_to_keep =  ["datetime", "temp", 'feelslike', "snow","windspeed", 'cloudcover', "humidity", "sealevelpressure", "conditions"]
weather_sdf = weather_sdf.select(*columns_to_keep)

# save to raw 
output_folder = "../data/raw/test_weather_data/"  # Replace with the desired output folder
weather_sdf.write.parquet(output_folder, mode="overwrite")

### Data cleaning

In [9]:
sdf = spark.read.parquet("../data/raw/test_weather_data/*")

In [10]:
# snow depth cannot be negative 
sdf = sdf.filter(F.col("snow") >= 0) 
# wind speed cannot be negative 
sdf = sdf.filter(F.col("windspeed") >= 0) 
# assume temperate of a day in new york in 2018 does not 
# exceed the maximum temperature and minimum temperature record in US
sdf = sdf.filter(F.col("temp") < 56) 
sdf = sdf.filter(F.col("temp") > -62)
#remove any record with null 
sdf = sdf.na.drop()

# create a new feature to store the hour time of the day
sdf = sdf.withColumn("hour_of_the_day", hour(col("datetime")))
sdf = sdf.withColumn("date", date_format(col("datetime"), "yyyy-MM-dd"))

# Extract day of the week from pickup_time (Like Monday, Tuesday.....etc)
sdf = sdf.withColumn("what_day", date_format(col("date"), "EEEE"))
# Create a new column for weekday or weekend from what day 
sdf = sdf.withColumn("day_of_the_week", when(col("what_day").isin("Saturday", "Sunday"), "Weekend").otherwise("Weekday"))

# Drop the original "datetime" column as we will focuse on hourly analysi
sdf = sdf.drop("datetime", "what_day")

In [11]:
# save to curated 
output= "../data/curated/test_weather_data/"  # Replace with the desired output folder
sdf.write.parquet(output, mode="overwrite")

## Now merge the two data to form our test data

In [12]:
taxi_test = spark.read.parquet('../data/curated/test_taxi_data/*')
weather = spark.read.parquet('../data/curated/test_weather_data/*')

#### group the taxi test based one the date and hourly time

In [13]:
# Group by the hour of the day and calculate the average trip duration and tip amount
gb_taxi = taxi_test.groupBy('pickup_day',"hour").agg(
    F.count("*").alias("trip_count"),
)

### join with weather 

In [14]:
# Join the DataFrames using pickup_date and formatted_datetime columns
# also rename to sample for convenient 
test_sdf = gb_taxi.join(weather, (gb_taxi["pickup_day"] == weather["date"]) & (gb_taxi["hour"] == weather["hour_of_the_day"]), "inner")


test_sdf = test_sdf.withColumn('log(windspeed)', log(test_sdf['windspeed']+1)) # add 1 to aviod zero windspeed

# drop the duplictae coloumn
test_sdf = test_sdf.drop("windspeed", "pickup_day", "hour_of_the_day", 'snow', 'feelslike', 'sealevelpressure','date', 'cloudcover')


In [15]:
test_sdf.count()


                                                                                

719

#### save the test sample to curated file 

In [16]:
# save to curated 
output_folder = "../data/curated/test_data/"  # Replace with the desired output folder
test_sdf.write.parquet(output_folder, mode="overwrite")

                                                                                