In [81]:
"""
Just run all of these to completion to transform the data and load it into the curated folder

"""

'\nJust run all of these to completion to transform the data and load it into the curated folder\n\n'

In [82]:
"""
Create a Spark Session

"""

from pyspark.sql import SparkSession
import pyarrow.parquet as pq
spark = (
    SparkSession.builder.appName("MAST30034 Assignment")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

In [83]:
"""
Data transforming for yellow taxi tripdata 2019/02-07 parquet files 

"""

from pyspark.sql.functions import col, month, hour, to_date, dayofweek, unix_timestamp
from pyspark.sql.functions import stddev, mean

# opening the preprocessed parquet file we saved from the landing folder
yellow_taxi = spark.read.parquet("data/raw/raw_yellow_taxi.parquet")

# remove trip distance = 0, too small of a trip
yellow_taxi = yellow_taxi.filter(yellow_taxi['trip_distance'] != 0)

# separarte the pick up date into hour, day of week and date
yellow_taxi = yellow_taxi.withColumn("hour", hour(col("tpep_pickup_datetime")))
yellow_taxi = yellow_taxi.withColumn("date", to_date(col("tpep_pickup_datetime")))
yellow_taxi = yellow_taxi.withColumn("day of week", dayofweek(col("tpep_pickup_datetime")))

# find the trip duration of each trip
yellow_taxi = yellow_taxi.withColumn("pickup_unix", unix_timestamp("tpep_pickup_datetime"))
yellow_taxi = yellow_taxi.withColumn("dropoff_unix", unix_timestamp("tpep_dropoff_datetime"))
yellow_taxi = yellow_taxi.withColumn("trip duration", col("dropoff_unix") - col("pickup_unix"))
yellow_taxi = yellow_taxi.withColumn("revenue/minute", col("total_amount") / col('trip duration') * 60)
yellow_taxi = yellow_taxi.withColumn("revenue/mile", col("total_amount") / col('trip_distance'))

# Calculate mean and standard deviation of the total_amount
amount_mean_stddev = yellow_taxi.select(mean(col('total_amount')).alias('mean'), stddev(col('total_amount')).alias('stddev')).first()
amount_mean = amount_mean_stddev['mean']
amount_stddev = amount_mean_stddev['stddev']

# Calculate mean and standard deviation of the trip_duration
duration_mean_stddev = yellow_taxi.select(mean(col('trip duration')).alias('mean'), stddev(col('trip duration')).alias('stddev')).first()
duration_mean = duration_mean_stddev['mean']
duration_stddev = duration_mean_stddev['stddev']

# Calculate mean and standard deviation of the revenue/minute
minute_mean_stddev = yellow_taxi.select(mean(col('revenue/minute')).alias('mean'), stddev(col('revenue/minute')).alias('stddev')).first()
minute_mean = minute_mean_stddev['mean']
minute_stddev = duration_mean_stddev['stddev']

# Calculate mean and standard deviation of the revenue/mile
mile_mean_stddev = yellow_taxi.select(mean(col('revenue/mile')).alias('mean'), stddev(col('revenue/mile')).alias('stddev')).first()
mile_mean = mile_mean_stddev['mean']
mile_stddev = duration_mean_stddev['stddev']

# Set a threshold for z-scores (e.g., 2 or 3)
zscore_threshold = 3

# Filter out data points where either total_amount or trip_duration is an outlier
yellow_taxi = yellow_taxi.filter(
    (col('total_amount') - amount_mean) / amount_stddev <= zscore_threshold
).filter(
    (col('total_amount') - amount_mean) / amount_stddev >= -zscore_threshold
).filter(
    (col('trip duration') - duration_mean) / duration_stddev <= zscore_threshold
).filter(
    (col('trip duration') - duration_mean) / duration_stddev >= -zscore_threshold
).filter(
    (col('revenue/minute') - minute_mean) / minute_stddev <= zscore_threshold
).filter(
    (col('revenue/minute') - minute_mean) / minute_stddev >= -zscore_threshold
).filter(
    (col('revenue/mile') - mile_mean) / mile_stddev <= zscore_threshold
).filter(
    (col('revenue/mile') - mile_mean) / mile_stddev >= -zscore_threshold
)


yellow_taxi = yellow_taxi.drop("tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "dropoff_unix", 
                               "pickup_unix", "trip duration", "revenue/minute", "revenue/mile", "DOLocationID", "fare_amount")
yellow_taxi.show()

                                                                                

+------------+------------+----+----------+-----------+
|PULocationID|total_amount|hour|      date|day of week|
+------------+------------+----+----------+-----------+
|          48|        12.3|   0|2019-02-01|          6|
|         230|        33.3|   0|2019-02-01|          6|
|          95|         6.8|   0|2019-02-01|          6|
|         140|         6.3|   0|2019-02-01|          6|
|         229|         5.8|   0|2019-02-01|          6|
|          75|         6.3|   0|2019-02-01|          6|
|         246|        15.3|   0|2019-02-01|          6|
|          79|        11.8|   0|2019-02-01|          6|
|         170|         7.0|  23|2019-01-31|          5|
|         107|       11.62|  23|2019-01-31|          5|
|         107|       10.56|  23|2019-01-31|          5|
|          68|       12.35|   0|2019-02-01|          6|
|         246|         8.8|   0|2019-02-01|          6|
|         161|         5.8|   0|2019-02-01|          6|
|         161|       24.75|  23|2019-01-31|     

In [84]:
""" 
Save the data for the purposes of our visual analysis, we will of course further transform the data so that it can be used for the modelling as well.

"""
# Save the DataFrame to a Parquet file
yellow_taxi.repartition(1).write.format("parquet").mode("append").save("data/curated/curated_yellow_taxi_visualisation")

# please rename the generated parquet file as "curated_yellow_taxi_visualisation.parquet", move the file to the curated section and also delete the generated folder afterwards

                                                                                

In [85]:
"""
Join the weather dataset into the yellow taxi data

"""

import pandas as pd 
weather = pd.read_csv("data/raw/raw_weather.csv")

# convert weather date to timestamp
weather = spark.createDataFrame(weather).drop("Unnamed: 0")

yellow_taxi_with_weather = yellow_taxi.join(weather, on='date', how="inner")
yellow_taxi_with_weather.show()



+----------+------------+------------+----+-----------+----------------+------+------+
|      date|PULocationID|total_amount|hour|day of week|temp_categorized|rained|snowed|
+----------+------------+------------+----+-----------+----------------+------+------+
|2019-05-08|         137|        14.0|   0|          4|        Moderate|    No|    No|
|2019-05-08|         141|         8.3|   0|          4|        Moderate|    No|    No|
|2019-05-08|         141|       14.76|   0|          4|        Moderate|    No|    No|
|2019-05-27|         100|       16.56|  14|          2|        Moderate|    No|    No|
|2019-02-23|         249|         9.8|   0|          7|            Cold|   Yes|    No|
|2019-02-23|         234|       23.14|  18|          7|            Cold|   Yes|    No|
|2019-02-23|         114|        15.3|   0|          7|            Cold|   Yes|    No|
|2019-02-23|          24|       15.38|   0|          7|            Cold|   Yes|    No|
|2019-02-23|          41|         6.8|   0|

                                                                                

In [86]:
"""
Join the taxi zones dataset into the yellow taxi data

"""
# event data pre-processing
taxi_zone = spark.createDataFrame(pd.read_csv("data/raw/raw_taxi_zones.csv"))
taxi_zone = taxi_zone.withColumnRenamed("LocationID", "PULocationID").drop("Unnamed: 0")

yellow_taxi_with_locations = yellow_taxi_with_weather.join(taxi_zone, on='PULocationID', how="inner").drop("PULocationID")
yellow_taxi_with_locations.show()




+----------+------------+----+-----------+----------------+------+------+---------+
|      date|total_amount|hour|day of week|temp_categorized|rained|snowed|  Borough|
+----------+------------+----+-----------+----------------+------+------+---------+
|2019-05-08|        14.0|   0|          4|        Moderate|    No|    No|Manhattan|
|2019-05-08|         8.3|   0|          4|        Moderate|    No|    No|Manhattan|
|2019-05-08|       14.76|   0|          4|        Moderate|    No|    No|Manhattan|
|2019-05-27|       16.56|  14|          2|        Moderate|    No|    No|Manhattan|
|2019-02-23|         9.8|   0|          7|            Cold|   Yes|    No|Manhattan|
|2019-02-23|       23.14|  18|          7|            Cold|   Yes|    No|Manhattan|
|2019-02-23|        15.3|   0|          7|            Cold|   Yes|    No|Manhattan|
|2019-02-23|       15.38|   0|          7|            Cold|   Yes|    No|Manhattan|
|2019-02-23|         6.8|   0|          7|            Cold|   Yes|    No|Man

                                                                                

In [87]:
"""
Join events and the yellow taxi dataset

"""

import pandas as pd
from pyspark.sql.functions import lit

# event data pre-processing
event = pd.read_csv("data/raw/raw_events.csv")

event = spark.createDataFrame(event).withColumn("event", lit(True)).drop("Unnamed: 0")

condition = (yellow_taxi_with_locations['date'] == event['date2']) & (yellow_taxi_with_locations['Borough'] == event['Borough2']) & (yellow_taxi_with_locations['hour'] == event['hour2'])
yellow_taxi_with_event = yellow_taxi_with_locations.join(event, on=condition, how="left")

yellow_taxi_with_event = yellow_taxi_with_event.drop(*["Borough2", "date2", "hour2"]).fillna(False, subset=['event'])

yellow_taxi_with_event.show()



+----------+------------+----+-----------+----------------+------+------+---------+-----+
|      date|total_amount|hour|day of week|temp_categorized|rained|snowed|  Borough|event|
+----------+------------+----+-----------+----------------+------+------+---------+-----+
|2019-05-08|        14.0|   0|          4|        Moderate|    No|    No|Manhattan|false|
|2019-05-08|         8.3|   0|          4|        Moderate|    No|    No|Manhattan|false|
|2019-05-08|       14.76|   0|          4|        Moderate|    No|    No|Manhattan|false|
|2019-05-27|       16.56|  14|          2|        Moderate|    No|    No|Manhattan|false|
|2019-02-23|         9.8|   0|          7|            Cold|   Yes|    No|Manhattan|false|
|2019-02-23|       23.14|  18|          7|            Cold|   Yes|    No|Manhattan|false|
|2019-02-23|        15.3|   0|          7|            Cold|   Yes|    No|Manhattan|false|
|2019-02-23|       15.38|   0|          7|            Cold|   Yes|    No|Manhattan|false|
|2019-02-2

                                                                                

In [88]:

"""
Save the data as it is now ready for analysis

"""

# Save the DataFrame to a Parquet file
yellow_taxi_with_event.repartition(1).write.format("parquet").mode("append").save("data/curated/curated_yellow_taxi")

# as it saves to a folder you will have to rename the actual parquet file to "curated_yellow_taxi.parquet" and please remove the output folder after doing so as well

                                                                                

23/08/17 22:31:35 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 362430 ms exceeds timeout 120000 ms
23/08/17 22:31:35 WARN SparkContext: Killing executors is not supported by current scheduler.
23/08/17 22:31:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B