## Data Preprocessing Stage 1
Melissa Putri (1389438)
### Importing Libraries Loading PARQUET Data

In [1]:
# Initialise a spark session
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from collections import Counter
import os

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import year, month, dayofmonth, hour, col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, unix_timestamp


spark = (
    SparkSession.builder.appName("HVFHV Data Processing")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "8g")  # Increase driver memory
    .config("spark.executor.memory", "8g")  # Increase executor memory
    .config("spark.executor.instances", "4")  # Increase the number of executor instances
    .config("spark.driver.maxResultSize", "2g")
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/23 13:47:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Define the directory containing the PARQUET files
landing_folder = "../data/landing/"

# List all files in the directory
all_files = os.listdir(landing_folder)

# Filter for PARQUET files that contain 'fhvhv_tripdata' in their file names
parquet_files = [f for f in all_files if f.endswith(".parquet") and 'fhvhv_tripdata' in f]

# Full paths to the filtered files
parquet_paths = [os.path.join(landing_folder, f) for f in parquet_files]

# Check if any files were found
if parquet_paths:
    # Load and combine the filtered PARQUET files with PySpark
    HVFHV_all = spark.read.parquet(*parquet_paths)
    HVFHV_all.show()  # Display some rows for confirmation
else:
    print("No 'fhvhv_tripdata' PARQUET files found in the directory.")

                                                                                

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+-----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee| tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+

In [3]:
HVFHV_all.limit(5)

hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
HV0003,B03404,B03404,2024-02-01 00:13:55,2024-02-01 00:19:59,2024-02-01 00:20:54,2024-02-01 00:27:54,149,210,1.23,420,8.88,0.0,0.24,0.79,0.0,0.0,0.0,5.57,N,N,N,N,N
HV0003,B03404,B03404,2024-02-01 00:00:14,2024-02-01 00:03:28,2024-02-01 00:04:37,2024-02-01 00:09:37,127,136,1.28,300,7.91,0.0,0.22,0.7,0.0,0.0,0.0,5.39,N,N,N,N,N
HV0003,B03404,B03404,2024-02-01 00:13:57,2024-02-01 00:15:44,2024-02-01 00:17:46,2024-02-01 00:31:54,127,69,3.93,848,17.6,0.0,0.48,1.56,0.0,0.0,0.0,13.13,N,N,N,N,N
HV0005,B03406,,2024-02-01 00:40:21,,2024-02-01 00:41:55,2024-02-01 00:50:56,169,169,1.252,541,11.4,0.0,0.31,1.01,0.0,0.0,0.0,6.79,N,N,N,N,N
HV0003,B03404,B03404,2024-02-01 00:14:14,2024-02-01 00:15:55,2024-02-01 00:16:39,2024-02-01 00:27:37,142,152,3.77,658,11.39,0.0,0.31,1.01,2.75,0.0,3.0,15.14,N,N,N,N,Y


In [4]:
# Exploring data structure
print(f'Num of Instances: {HVFHV_all.count()}')
print(f'Num of Columns/Features: {len(HVFHV_all.columns)}')
HVFHV_all.printSchema()

Num of Instances: 237385342
Num of Columns/Features: 24
root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- sh

                                                                                

In [5]:
# HVFHV Companies Present in the Dataset
HVFHV_all.groupby('hvfhs_license_num').count()

                                                                                

hvfhs_license_num,count
HV0005,65242190
HV0003,172143152


## Handling Missing Values

In [6]:
null_counts = HVFHV_all.agg(*(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in HVFHV_all.columns))
null_counts.show()

24/08/23 13:47:53 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

+-----------------+--------------------+--------------------+----------------+-----------------+---------------+----------------+------------+------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime|on_scene_datetime|pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+----------------+-----------------+---------------+----------------+------------+------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+----------+-------------------+-----

                                                                                

In [7]:
# Group by 'hvfhs_license_num' and count nulls for each column within the groups
null_counts_by_license = HVFHV_all.groupBy("hvfhs_license_num").agg(
    *(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in HVFHV_all.columns if c != 'hvfhs_license_num')
)

# Show the result
null_counts_by_license.show()



+-----------------+--------------------+--------------------+----------------+-----------------+---------------+----------------+------------+------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime|on_scene_datetime|pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+----------------+-----------------+---------------+----------------+------------+------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+----------+-------------------+-----

                                                                                

### Reformulating on_scene_datetime feature
Transform on_scene_datetime column into is_accessible column. Where on_scene_datetime is missing, is_accessible will be 0 and vice versa. This is based on the data dictionary provided

In [8]:
# Create the is_accessible column
HVFHV_all = HVFHV_all.withColumn('is_accessible_flag', F.when(HVFHV_all.on_scene_datetime.isNull(), 0).otherwise(1))

# Drop the on_scene_datetime column
HVFHV_all = HVFHV_all.drop('on_scene_datetime')

In [9]:
# Define datetime and categorical columns
datetime_cols = ['request_datetime', 'pickup_datetime', 'dropoff_datetime']
categorical_cols = ['hvfhs_license_num', 'dispatching_base_num', 'shared_request_flag', 'shared_match_flag', 'access_a_ride_flag', 'wav_request_flag', 'wav_match_flag']

### Imputing Missing Values of originating_base_num for Lyft Taxi Type by ML Imputing

In [10]:
LYFT_data = HVFHV_all.filter(F.col('hvfhs_license_num') == 'HV0005')

In [11]:
LYFT_data.groupby('originating_base_num').count()

                                                                                

originating_base_num,count
,65083038
B03380,99
B03406,159053


In [12]:
LYFT_data.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nullable = true)
 |-- wav_request_fla

In [13]:
train = LYFT_data.filter(F.col('originating_base_num').isNotNull())
test = LYFT_data.filter(F.col('originating_base_num').isNull())

In [14]:
# Step 1: Convert categorical columns to numerical using StringIndexer
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep").fit(train)
    for col in ["hvfhs_license_num", "dispatching_base_num", "originating_base_num"]
]

for indexer in indexers:
    train = indexer.transform(train)
    test = indexer.transform(test)

                                                                                

In [15]:
# Step 2: Extract features from datetime columns
def add_datetime_features(df):
    return df.withColumn("request_year", year(col("request_datetime"))) \
             .withColumn("request_month", month(col("request_datetime"))) \
             .withColumn("request_day", dayofmonth(col("request_datetime"))) \
             .withColumn("request_hour", hour(col("request_datetime"))) \
             .withColumn("pickup_year", year(col("pickup_datetime"))) \
             .withColumn("pickup_month", month(col("pickup_datetime"))) \
             .withColumn("pickup_day", dayofmonth(col("pickup_datetime"))) \
             .withColumn("pickup_hour", hour(col("pickup_datetime"))) \
             .withColumn("dropoff_year", year(col("dropoff_datetime"))) \
             .withColumn("dropoff_month", month(col("dropoff_datetime"))) \
             .withColumn("dropoff_day", dayofmonth(col("dropoff_datetime"))) \
             .withColumn("dropoff_hour", hour(col("dropoff_datetime")))

train = add_datetime_features(train)
test = add_datetime_features(test)

In [17]:
# Step 3: Prepare the features vector
feature_cols = ["hvfhs_license_num_index", "dispatching_base_num_index", "PULocationID", "DOLocationID", 
                "trip_miles", "trip_time", "base_passenger_fare", "tolls", "bcf", "sales_tax", 
                "congestion_surcharge", "airport_fee", "tips", "driver_pay", "is_accessible_flag",
                "request_year", "request_month", "request_day", "request_hour",
                "pickup_year", "pickup_month", "pickup_day", "pickup_hour",
                "dropoff_year", "dropoff_month", "dropoff_day", "dropoff_hour"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train = assembler.transform(train)
test = assembler.transform(test)

In [18]:
# Step 4: Train the model
rf = RandomForestClassifier(labelCol="originating_base_num_index", featuresCol="features", numTrees=10)
model = rf.fit(train)

                                                                                

In [19]:
single_test_row = test.limit(1)
        
# Make prediction on the single row
single_prediction = model.transform(single_test_row)

# Convert prediction back to original categorical values if necessary
imputed_single_row = single_prediction.withColumn("imputed_originating_base_num_index", col("prediction"))

# Use StringIndexer to convert the index back to the original string values
converter = IndexToString(inputCol="imputed_originating_base_num_index", outputCol="imputed_originating_base_num", labels=indexers[2].labels)
imputed_single_row = converter.transform(imputed_single_row)

# Show the single row with the prediction
print("Single Test Row Prediction:")
imputed_single_row.show(truncate=False)

Single Test Row Prediction:
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+------------------+-----------------------+--------------------------+--------------------------+------------+-------------+-----------+------------+-----------+------------+----------+-----------+------------+-------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------+---------------------------------------------+----------+----------------------------------+----------------------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime 

24/08/23 13:49:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [20]:
# Step 4: Train the model
if train.count() > 0:
    rf = RandomForestClassifier(labelCol="originating_base_num_index", featuresCol="features", numTrees=10)
    model = rf.fit(train)
    
    # Step 5: Predict missing values
    if test.count() > 0:
        
        # Predict on the entire test set
        predictions = model.transform(test)
        
        # Convert prediction back to original categorical values if necessary
        imputed_df = predictions.withColumn("imputed_originating_base_num_index", col("prediction"))

        # Use StringIndexer to convert the index back to the original string values
        converter = IndexToString(inputCol="imputed_originating_base_num_index", outputCol="imputed_originating_base_num", labels=indexers[2].labels)
        imputed_df = converter.transform(imputed_df)

        # Update the originating_base_num with the imputed values
        imputed_df = imputed_df.withColumn("originating_base_num", 
                                           when(col("originating_base_num").isNull(), col("imputed_originating_base_num"))
                                           .otherwise(col("originating_base_num")))

        # Select only the original columns & combine with training set
        final_df = train.union(imputed_df.select(train.columns))

        # Show the final DataFrame with imputed values
        print("Final DataFrame with imputed values:")
        final_df.show(truncate=False)

    else:
        print("Test DataFrame is empty after filtering.")
else:
    print("Training DataFrame is empty after filtering.")

                                                                                

Final DataFrame with imputed values:
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+------------------+-----------------------+--------------------------+--------------------------+------------+-------------+-----------+------------+-----------+------------+----------+-----------+------------+-------------+-----------+------------+---------------------------------------------------------------------------------------------------------------------------------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime   |pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf |sales_tax|congestion_surcharge|

In [21]:
# Check that there is no more missing values
null_lyft = final_df.agg(*(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in final_df.columns))
null_lyft.show()



+-----------------+--------------------+--------------------+----------------+---------------+----------------+------------+------------+----------+---------+-------------------+-----+---+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+------------------+-----------------------+--------------------------+--------------------------+------------+-------------+-----------+------------+-----------+------------+----------+-----------+------------+-------------+-----------+------------+--------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|request_datetime|pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls|bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|is_accessible_flag|hvfhs_license_num_index|dispatching_base_num_index

                                                                                

In [22]:
final_df.groupby('originating_base_num').count()

ERROR:root:KeyboardInterrupt while sending command.===>           (74 + 8) / 94]
Traceback (most recent call last):
  File "/Users/melissaputri/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/melissaputri/anaconda3/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/melissaputri/anaconda3/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 



In [23]:
# Exclude Other Columns
selected_columns = final_df.columns[:24]
lyft_complete = final_df.select(*selected_columns)

### Handle Missing Values in Uber taxi type by listwise detection
Since the number of rows with missing values are considered to be a small portion of the Uber dataset, it is reasonable to simply drop the rows.

In [24]:
uber_complete = HVFHV_all.filter((F.col('hvfhs_license_num') == 'HV0003') & (F.col('originating_base_num').isNotNull()))



In [25]:
null_uber = uber_complete.agg(*(F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in uber_complete.columns))
null_uber.show()

ERROR:root:KeyboardInterrupt while sending command.                (0 + 0) / 47]
Traceback (most recent call last):
  File "/Users/melissaputri/anaconda3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/melissaputri/anaconda3/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/melissaputri/anaconda3/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

## Convert Flags to Boolean Data Type

In [26]:
def convert_flag_columns_to_boolean(data):
    flag_columns = [col_name for col_name in data.columns if 'flag' in col_name]
    
    for col_name in flag_columns:
        data = data.withColumn(
            col_name,
            (F.col(col_name) == '1').cast('BOOLEAN')
        )
    
    return data

uber_data = convert_flag_columns_to_boolean(uber_complete)
lyft_data = convert_flag_columns_to_boolean(lyft_complete)

## Validate Data Range

In [27]:
def validate_record(data):
    return data.withColumn(
        'is_valid_record',
        F.when(
            (F.col('trip_miles') > 0)  # Positive trip distance
            & (F.col('trip_time') > 0)  # Positive trip time
            & (F.col('base_passenger_fare') > 0),  # Positive passenger fare
            True
        ).otherwise(False)
    )

In [28]:
print(f'Num of Uber Instances before validation: {uber_data.count()}')
uber_data = validate_record(uber_data)
print(f'Num of Uber Instances after validation: {uber_data.count()}')
print(f'Num of Lyft Instances before validation: {lyft_data.count()}')
lyft_data = validate_record(lyft_data)
print(f'Num of Uber Instances after validation: {lyft_data.count()}')

                                                                                

Num of Uber Instances before validation: 172142764


                                                                                

Num of Uber Instances after validation: 172142764


                                                                                

Num of Lyft Instances before validation: 65242190




Num of Uber Instances after validation: 65242190


                                                                                

In [29]:
uber_data.write.parquet('../data/raw/cleaned_uber')
lyft_data.write.parquet('../data/raw/cleaned_lyft')

                                                                                

24/08/23 14:57:52 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 285793 ms exceeds timeout 120000 ms
24/08/23 14:57:52 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/23 14:57:56 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$