In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Read Parquet Files").getOrCreate()
tripdata_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group_8/tripdata", header=True, inferSchema=True)

In [13]:
tripdata_df.printSchema()
tripdata_df.show()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nu



+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

                                                                                

In [14]:
#Dropping columns

columns_to_drop = ["dispatching_base_num", "originating_base_num"]
tripdata_df = tripdata_df.drop(*columns_to_drop)

In [15]:
tripdata_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nullable = true)
 |-- wav_request_flag: string (nullable = true)
 |-- wav_match_flag: integer (nullable = 

In [16]:
#Notes on missing values in trip_data df
#Syntax: percent_missing = 100 * (1 - (tripdata_df.select("col_name").dropna().count() / tripdata_df.select("col_name").count()))

percent_missing_a = 100 * (1 - (tripdata_df.select("wav_request_flag").dropna().count() / tripdata_df.select("wav_request_flag").count()))
# percent_missing_b = 100 * (1 - (tripdata_df.select("wav_match_flag").dropna().count() / tripdata_df.select("wav_match_flag").count()))


print(percent_missing_a)

#Column Name         | % missing data

#hvfhs_license_num   | 0%
#request_datetime    |≈0%
#on_scene_datetime   |≈27.87%
#pickup_datetime     | 0%
#dropoff_datetime    | 0%
#PULocationID        | 0%
#DOLocationID        | 0%
#trip_miles          | 0%
#base_passenger_fare | 0%
#tolls               | 0%
#bcf                 | 0%
#sales_tax           | 0%
#congestion_surcharge|≈0%
#airport_fee         | 100%
#tips                | 0%
#driver_pay          | 0%
#shared_request_flag | 0%
#shared_match_flag   | 0%
#access_a_ride_flag  | 0%
#wav_request_flag    | 0%
#wav_match_flag    | 100%



0.0


                                                                                

In [17]:
#Dropping more columns and removing nulls (based on missing values)
from pyspark.sql.functions import col


#Dropping columns with lots of missing data
columns_to_drop = ["on_scene_datetime", "airport_fee", "wav_match_flag"]
tripdata_df = tripdata_df.drop(*columns_to_drop)


#Removing NA values from columns with minimal missing data
tripdata_df = tripdata_df.where(col("request_datetime").isNotNull())
tripdata_df = tripdata_df.where(col("congestion_surcharge").isNotNull())




In [18]:
#Cleaning data types
from pyspark.sql.functions import when

#Changing Location ID columns to strings
tripdata_df = tripdata_df.withColumn("PULocationID", col("PULocationID").cast("string"))
tripdata_df = tripdata_df.withColumn("DOLocationID", col("DOLocationID").cast("string"))

#Changing yes/no columns to 0/1 integers
tripdata_df = tripdata_df.withColumn("shared_request_flag", when(col("shared_request_flag") == "yes", 1).otherwise(0))
tripdata_df = tripdata_df.withColumn("shared_match_flag", when(col("shared_match_flag") == "yes", 1).otherwise(0))
tripdata_df = tripdata_df.withColumn("access_a_ride_flag", when(col("access_a_ride_flag") == "yes", 1).otherwise(0))
tripdata_df = tripdata_df.withColumn("wav_request_flag", when(col("wav_request_flag") == "yes", 1).otherwise(0))

In [19]:
tripdata_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: integer (nullable = false)
 |-- shared_match_flag: integer (nullable = false)
 |-- access_a_ride_flag: integer (nullable = false)
 |-- wav_request_flag: integer (nullable = false)



In [20]:
from pyspark.sql.functions import unix_timestamp


# Calculate the difference in seconds between two timestamp columns
tripdata_df = tripdata_df.withColumn(
    "wait_time",
    unix_timestamp("pickup_datetime") - unix_timestamp("request_datetime")
)


# Show the result
tripdata_df.select("request_datetime", "pickup_datetime", "wait_time").show()





+-------------------+-------------------+---------+
|   request_datetime|    pickup_datetime|wait_time|
+-------------------+-------------------+---------+
|2019-02-01 00:01:26|2019-02-01 00:05:18|      232|
|2019-02-01 00:26:08|2019-02-01 00:41:29|      921|
|2019-02-01 00:48:58|2019-02-01 00:51:34|      156|
|2019-02-01 00:02:15|2019-02-01 00:03:51|       96|
|2019-02-01 00:06:17|2019-02-01 00:09:44|      207|
|2019-02-01 00:56:01|2019-02-01 00:59:55|      234|
|2019-02-01 00:07:17|2019-02-01 00:12:06|      289|
|2019-02-01 00:43:33|2019-02-01 00:45:35|      122|
|2019-02-01 00:00:35|2019-02-01 00:10:48|      613|
|2019-02-01 00:29:16|2019-02-01 00:32:32|      196|
|2019-02-01 00:55:48|2019-02-01 00:59:54|      246|
|2019-01-31 23:58:20|2019-02-01 00:01:11|      171|
|2019-02-01 00:33:43|2019-02-01 00:36:22|      159|
|2019-02-01 00:54:59|2019-02-01 00:57:50|      171|
|2019-01-31 23:59:01|2019-02-01 00:05:24|      383|
|2019-02-01 00:24:04|2019-02-01 00:27:38|      214|
|2019-02-01 

                                                                                

In [21]:


#Feature engineering for request_datetime
from pyspark.sql.functions import date_format, dayofweek, month, hour

# Function to categorize time of the day
def categorize_time_of_day(df, timestamp_column):
    return df.withColumn(
        "request_time_of_day",
        when(date_format(timestamp_column, "HH").cast("int").between(5, 11), "morning")
        .when(date_format(timestamp_column, "HH").cast("int").between(12, 16), "afternoon")
        .when(date_format(timestamp_column, "HH").cast("int").between(17, 20), "evening")
        .otherwise("night")
    )

# Apply the categorize_time_of_day function
tripdata_df = categorize_time_of_day(tripdata_df, "request_datetime")

# Add day of the week and month columns
tripdata_df = tripdata_df.withColumn("request_hour", hour("request_datetime")) \
                          .withColumn("request_day_of_week", dayofweek("request_datetime")) \
                           .withColumn("request_month", month("request_datetime"))

# Show the result
tripdata_df.select("request_datetime", "request_time_of_day", "request_hour", "request_day_of_week", "request_month").show(10)

+-------------------+-------------------+------------+-------------------+-------------+
|   request_datetime|request_time_of_day|request_hour|request_day_of_week|request_month|
+-------------------+-------------------+------------+-------------------+-------------+
|2019-02-01 00:01:26|              night|           0|                  6|            2|
|2019-02-01 00:26:08|              night|           0|                  6|            2|
|2019-02-01 00:48:58|              night|           0|                  6|            2|
|2019-02-01 00:02:15|              night|           0|                  6|            2|
|2019-02-01 00:06:17|              night|           0|                  6|            2|
|2019-02-01 00:56:01|              night|           0|                  6|            2|
|2019-02-01 00:07:17|              night|           0|                  6|            2|
|2019-02-01 00:43:33|              night|           0|                  6|            2|
|2019-02-01 00:00:35|

In [22]:
#Feature engineering for pickup_datetime
from pyspark.sql.functions import date_format, dayofweek, month, hour

# Function to categorize time of the day
def categorize_time_of_day(df, timestamp_column):
    return df.withColumn(
        "pickup_time_of_day",
        when(date_format(timestamp_column, "HH").cast("int").between(5, 11), "morning")
        .when(date_format(timestamp_column, "HH").cast("int").between(12, 16), "afternoon")
        .when(date_format(timestamp_column, "HH").cast("int").between(17, 20), "evening")
        .otherwise("night")
    )

# Apply the categorize_time_of_day function
tripdata_df = categorize_time_of_day(tripdata_df, "pickup_datetime")

# Add day of the week and month columns
tripdata_df = tripdata_df.withColumn("pickup_hour", hour("pickup_datetime")) \
                          .withColumn("pickup_day_of_week", dayofweek("pickup_datetime")) \
                           .withColumn("pickup_month", month("pickup_datetime"))

# Show the result
tripdata_df.select("pickup_datetime", "pickup_time_of_day", "pickup_hour", "pickup_day_of_week", "pickup_month").show(10)



+-------------------+------------------+-----------+------------------+------------+
|    pickup_datetime|pickup_time_of_day|pickup_hour|pickup_day_of_week|pickup_month|
+-------------------+------------------+-----------+------------------+------------+
|2019-02-01 00:05:18|             night|          0|                 6|           2|
|2019-02-01 00:41:29|             night|          0|                 6|           2|
|2019-02-01 00:51:34|             night|          0|                 6|           2|
|2019-02-01 00:03:51|             night|          0|                 6|           2|
|2019-02-01 00:09:44|             night|          0|                 6|           2|
|2019-02-01 00:59:55|             night|          0|                 6|           2|
|2019-02-01 00:12:06|             night|          0|                 6|           2|
|2019-02-01 00:45:35|             night|          0|                 6|           2|
|2019-02-01 00:10:48|             night|          0|             

                                                                                

In [23]:
tripdata_df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: integer (nullable = false)
 |-- shared_match_flag: integer (nullable = false)
 |-- access_a_ride_flag: integer (nullable = false)
 |-- wav_request_flag: integer (nullable = false)
 |-- wait_time: long (nullable = true)
 |-- request_time_of_day: string (nullable = false)
 |-- request_hour: integer (nullab

In [24]:
# Define the path
output_path = "gs://msca-bdp-student-gcs/Group_8/tripdata_cleaned"

# Write the DataFrame to the specified GCS bucket in Parquet format
tripdata_df.write.mode("overwrite").parquet(output_path)


23/11/25 23:41:27 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                