#### SparkML Task: Trip Profiling: Predict Likelihood of High Tipping

In [18]:
from pyspark.sql import SparkSession
import os

# Configure Spark environment
os.environ['PYSPARK_PYTHON'] = 'python3'

# Create a Spark session with improved configuration
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("NYC Taxi Model") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.shuffle.partitions", "20") \
    .config("spark.default.parallelism", "20") \
    .getOrCreate()

# Verify Spark session
print("Spark session created successfully.")

Spark session created successfully.


In [19]:
taxi_data = spark.read.parquet("/root/DevDataOps/nyc-taxi-analysis/processed-data/nyc_taxi_processed.parquet")
taxi_data.show()

+---------+-------------------+-------------------+---------------+------------------+--------------------+--------------+-------------------+--------------------+---------------+----------------+-------------+------------------+------------------+-----------+----------+------------------+------------------+------------------+-----------+----------+------------+-----------+--------+------------+------------+---------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_location_id|    pickup_zone_name|pickup_borough|dropoff_location_id|   dropoff_zone_name|dropoff_borough|is_inter_borough|trip_distance|trip_duration_mins|     avg_speed_mph|fare_amount|tip_amount|    tip_percentage|        total_cost|     cost_per_mile|pickup_hour|pickup_day|pickup_month|pickup_year|day_type| time_of_day|payment_type|rate_code|
+---------+-------------------+-------------------+---------------+------------------+--------------------+--------------+-------------------+--------------

## Setting Up Models For Training

In [20]:
from pyspark.sql.functions import when, col

# Copy Parquet File to not tamper with the original one
taxi_data_copy = taxi_data

# taxi_data_copy.show()
num_columns = len(taxi_data.columns)
print(num_columns)


taxi_data_copy.printSchema()

27
root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- pickup_zone_name: string (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- dropoff_zone_name: string (nullable = true)
 |-- dropoff_borough: string (nullable = true)
 |-- is_inter_borough: boolean (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- trip_duration_mins: double (nullable = true)
 |-- avg_speed_mph: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tip_percentage: double (nullable = true)
 |-- total_cost: double (nullable = true)
 |-- cost_per_mile: double (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_day: integer (nullable = true)
 |-- pickup_mon

In [21]:
from pyspark.sql.functions import when, col, hour, dayofweek, expr

# Create binary target column for high_tip (1 if tip_amount > 0.15 * fare_amount, 0 otherwise)
taxi_data_ml = taxi_data_copy.withColumn(
    "high_tip", 
    when(col("tip_amount") > (0.15 * col("fare_amount")), 1).otherwise(0)
)

# Feature engineering
# 1. Extract time features
taxi_data_ml = taxi_data_ml.withColumn("pickup_hour", hour(col("pickup_datetime")))
taxi_data_ml = taxi_data_ml.withColumn("pickup_day_of_week", dayofweek(col("pickup_datetime")))

# 2. Create fare_per_mile feature
taxi_data_ml = taxi_data_ml.withColumn(
    "fare_per_mile",
    when(col("trip_distance") > 0, col("fare_amount") / col("trip_distance")).otherwise(0)
)

# 3. Calculate fare_per_minute feature
taxi_data_ml = taxi_data_ml.withColumn(
    "fare_per_minute",
    when(col("trip_duration_mins") > 0, col("fare_amount") / col("trip_duration_mins")).otherwise(0)
)

# Display the first few rows with the new features
taxi_data_ml.select(
    "pickup_hour", "pickup_day_of_week", "passenger_count", "trip_distance", 
    "trip_duration_mins", "fare_amount", "fare_per_mile", "fare_per_minute",
    "pickup_borough", "dropoff_borough", "tip_amount", "high_tip"
).show(5)

+-----------+------------------+---------------+-------------+------------------+-----------+------------------+------------------+--------------+---------------+----------+--------+
|pickup_hour|pickup_day_of_week|passenger_count|trip_distance|trip_duration_mins|fare_amount|     fare_per_mile|   fare_per_minute|pickup_borough|dropoff_borough|tip_amount|high_tip|
+-----------+------------------+---------------+-------------+------------------+-----------+------------------+------------------+--------------+---------------+----------+--------+
|         21|                 4|            1.0|          8.2|              23.0|       26.0|3.1707317073170733|1.1304347826086956|     Manhattan|      Manhattan|      5.45|       1|
|         11|                 7|            1.0|          2.0|              20.0|       13.5|              6.75|             0.675|     Manhattan|      Manhattan|      2.15|       1|
|         20|                 6|            2.0|          1.9|              15.0|    

In [22]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Handle categorical variables: pickup_borough and dropoff_borough
# Step 1: Convert string columns to indices
indexers = [
    StringIndexer(inputCol=col_name, outputCol=f"{col_name}_index", handleInvalid="keep")
    for col_name in ["pickup_borough", "dropoff_borough"]
]

# Step 2: Apply the indexers
for indexer in indexers:
    taxi_data_ml = indexer.fit(taxi_data_ml).transform(taxi_data_ml)

# Step 3: One-hot encode the indexed columns
encoder = OneHotEncoder(
    inputCols=["pickup_borough_index", "dropoff_borough_index"],
    outputCols=["pickup_borough_vec", "dropoff_borough_vec"]
)
taxi_data_ml = encoder.fit(taxi_data_ml).transform(taxi_data_ml)

# Show the encoded data
taxi_data_ml.select(
    "pickup_borough", "pickup_borough_index", "pickup_borough_vec",
    "dropoff_borough", "dropoff_borough_index", "dropoff_borough_vec"
).show(5)

                                                                                

+--------------+--------------------+------------------+---------------+---------------------+-------------------+
|pickup_borough|pickup_borough_index|pickup_borough_vec|dropoff_borough|dropoff_borough_index|dropoff_borough_vec|
+--------------+--------------------+------------------+---------------+---------------------+-------------------+
|     Manhattan|                 0.0|     (6,[0],[1.0])|      Manhattan|                  0.0|      (6,[0],[1.0])|
|     Manhattan|                 0.0|     (6,[0],[1.0])|      Manhattan|                  0.0|      (6,[0],[1.0])|
|     Manhattan|                 0.0|     (6,[0],[1.0])|      Manhattan|                  0.0|      (6,[0],[1.0])|
|     Manhattan|                 0.0|     (6,[0],[1.0])|      Manhattan|                  0.0|      (6,[0],[1.0])|
|     Manhattan|                 0.0|     (6,[0],[1.0])|      Manhattan|                  0.0|      (6,[0],[1.0])|
+--------------+--------------------+------------------+---------------+--------

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Select feature columns
feature_cols = [
    "passenger_count",
    "trip_distance",
    "trip_duration_mins",
    "fare_amount",
    "pickup_hour",
    "pickup_day_of_week",
    "fare_per_mile",
    "fare_per_minute",
    "pickup_borough_vec",
    "dropoff_borough_vec",
    "payment_type"  # Added Payment Type
]

# Assemble features into a single vector
vector_assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_unscaled",
    handleInvalid="skip"
)

assembled_data = vector_assembler.transform(taxi_data_ml)

# Standardize the features
scaler = StandardScaler(
    inputCol="features_unscaled",
    outputCol="features",
    withStd=True,
    withMean=True
)

scaled_data = scaler.fit(assembled_data).transform(assembled_data)

# Show the assembled and scaled features
scaled_data.select("features_unscaled", "features", "high_tip").show(5, truncate=False)

                                                                                

+-------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|features_unscaled                                                                                            |features                                                                                                                                                                                                                                                                                                                                   

In [24]:
# Split data into training and testing sets (70% training, 30% testing)
train_data, test_data = scaled_data.randomSplit([0.7, 0.3], seed=42)

print(f"Training data count: {train_data.count()}")
print(f"Testing data count: {test_data.count()}")

# Cache the datasets to improve performance
train_data.cache()
test_data.cache()

                                                                                

Training data count: 721031




Testing data count: 308375


                                                                                

DataFrame[vendor_id: int, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: double, pickup_location_id: int, pickup_zone_name: string, pickup_borough: string, dropoff_location_id: int, dropoff_zone_name: string, dropoff_borough: string, is_inter_borough: boolean, trip_distance: double, trip_duration_mins: double, avg_speed_mph: double, fare_amount: double, tip_amount: double, tip_percentage: double, total_cost: double, cost_per_mile: double, pickup_hour: int, pickup_day: int, pickup_month: int, pickup_year: int, day_type: string, time_of_day: string, payment_type: int, rate_code: int, high_tip: int, pickup_day_of_week: int, fare_per_mile: double, fare_per_minute: double, pickup_borough_index: double, dropoff_borough_index: double, pickup_borough_vec: vector, dropoff_borough_vec: vector, features_unscaled: vector, features: vector]

In [25]:
# Check class distribution of high_tip target
class_counts = taxi_data_ml.groupBy("high_tip").count().orderBy("high_tip")
class_counts.show()

# Calculate class proportions
total = taxi_data_ml.count()
class_counts_with_proportions = class_counts.withColumn(
    "proportion", 
    (col("count") / total) * 100
)
class_counts_with_proportions.show()

+--------+------+
|high_tip| count|
+--------+------+
|       0|439918|
|       1|589488|
+--------+------+

+--------+------+-----------------+
|high_tip| count|       proportion|
+--------+------+-----------------+
|       0|439918| 42.7351307453036|
|       1|589488|57.26486925469639|
+--------+------+-----------------+



## Model 1: Logistic Regression

In [26]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Create and train the improved Logistic Regression model
lr = LogisticRegression(
    featuresCol="features",
    labelCol="high_tip",
    maxIter=30,          # Increased from 10
    regParam=0.1,        # Decreased from 0.3 for less regularization
    elasticNetParam=0.5, # Changed from 0.8 for more L2 regularization
    threshold=0.4,       # Adjusted for potential class imbalance
    standardization=True # Ensure internal standardization
)

lr_model = lr.fit(train_data)

# Make predictions on test data
lr_predictions = lr_model.transform(test_data)

# Show sample predictions
print("Sample predictions from Logistic Regression:")
lr_predictions.select("high_tip", "prediction", "probability").show(10)

# Evaluate the model
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="high_tip", 
    rawPredictionCol="rawPrediction", 
    metricName="areaUnderROC"
)

multiclass_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_tip",
    predictionCol="prediction",
    metricName="accuracy"
)

lr_auc = binary_evaluator.evaluate(lr_predictions)
lr_accuracy = multiclass_evaluator.evaluate(lr_predictions)

# Also evaluate F1 score which is better for imbalanced datasets
lr_f1 = MulticlassClassificationEvaluator(
    labelCol="high_tip",
    predictionCol="prediction",
    metricName="f1"
).evaluate(lr_predictions)

print(f"Logistic Regression - Area under ROC: {lr_auc:.4f}")
print(f"Logistic Regression - Accuracy: {lr_accuracy:.4f}")
print(f"Logistic Regression - F1 Score: {lr_f1:.4f}")

# Display model coefficients
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

                                                                                

Sample predictions from Logistic Regression:


                                                                                

+--------+----------+--------------------+
|high_tip|prediction|         probability|
+--------+----------+--------------------+
|       1|       1.0|[0.25455326922552...|
|       0|       1.0|[0.25455326922552...|
|       1|       1.0|[0.25455326922552...|
|       0|       1.0|[0.25455326922552...|
|       0|       1.0|[0.25455326922552...|
|       0|       0.0|[0.82168725221710...|
|       1|       1.0|[0.25455326922552...|
|       1|       1.0|[0.25455326922552...|
|       0|       0.0|[0.82168725221710...|
|       1|       1.0|[0.25455326922552...|
+--------+----------+--------------------+
only showing top 10 rows



                                                                                

Logistic Regression - Area under ROC: 0.8547
Logistic Regression - Accuracy: 0.8761
Logistic Regression - F1 Score: 0.8716
Coefficients: (21,[20],[-1.2419024166453427])
Intercept: 0.26908406188758877


## Model 2: Decision Tree

In [27]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Create and train the improved Decision Tree model
dt = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="high_tip",
    maxDepth=8,         # Increased from 5
    maxBins=64,         # Added parameter
    minInstancesPerNode=10, # Added parameter
    impurity="gini",    # Specified impurity measure
    seed=42
)

dt_model = dt.fit(train_data)

# Make predictions
dt_predictions = dt_model.transform(test_data)

# Show sample predictions
print("Sample predictions from Decision Tree:")
dt_predictions.select("high_tip", "prediction", "probability").show(10)

# Evaluate the model
multiclass_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_tip",
    predictionCol="prediction"
)
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="high_tip",
    rawPredictionCol="prediction"
)

dt_accuracy = multiclass_evaluator.evaluate(dt_predictions)
dt_auc = binary_evaluator.evaluate(dt_predictions)

# Calculate F1 score
dt_f1 = MulticlassClassificationEvaluator(
    labelCol="high_tip",
    predictionCol="prediction",
    metricName="f1"
).evaluate(dt_predictions)

print(f"Decision Tree - Area under ROC: {dt_auc:.4f}")
print(f"Decision Tree - Accuracy: {dt_accuracy:.4f}")
print(f"Decision Tree - F1 Score: {dt_f1:.4f}")

# Display feature importances
print("Feature Importances:")
for feature, importance in zip(feature_cols, dt_model.featureImportances.toArray()):
    print(f"{feature}: {importance:.4f}")

                                                                                

Sample predictions from Decision Tree:
+--------+----------+--------------------+
|high_tip|prediction|         probability|
+--------+----------+--------------------+
|       1|       1.0|[0.24040914256850...|
|       0|       1.0|[0.24040914256850...|
|       1|       1.0|[0.24040914256850...|
|       0|       1.0|[0.21839183985949...|
|       0|       1.0|[0.24040914256850...|
|       0|       0.0|[0.99994068937199...|
|       1|       1.0|[0.11249784816663...|
|       1|       1.0|[0.24040914256850...|
|       0|       0.0|[0.99994068937199...|
|       1|       1.0|[0.21839183985949...|
+--------+----------+--------------------+
only showing top 10 rows

Decision Tree - Area under ROC: 0.8556
Decision Tree - Accuracy: 0.8722
Decision Tree - F1 Score: 0.8722
Feature Importances:
passenger_count: 0.0000
trip_distance: 0.0000
trip_duration_mins: 0.0006
fare_amount: 0.0154
pickup_hour: 0.0006
pickup_day_of_week: 0.0000
fare_per_mile: 0.0002
fare_per_minute: 0.0001
pickup_borough_vec: 0

## Model 3: Random Forest

In [28]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Create and train the improved Random Forest model
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="high_tip",
    numTrees=100,      # Increased from 20
    maxDepth=10,       # Increased from 5
    minInstancesPerNode=5, # Added parameter
    maxBins=128,       # Added parameter
    bootstrap=True,    # Enable bootstrap sampling
    impurity="gini",   # Specified impurity measure
    seed=42
)

rf_model = rf.fit(train_data)

# Make predictions
rf_predictions = rf_model.transform(test_data)

# Show sample predictions
print("Sample predictions from Random Forest:")
rf_predictions.select("high_tip", "prediction", "probability").show(10)

# Evaluate the model
multiclass_evaluator = MulticlassClassificationEvaluator(
    labelCol="high_tip",
    predictionCol="prediction"
)
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="high_tip",
    rawPredictionCol="prediction"
)

rf_accuracy = multiclass_evaluator.evaluate(rf_predictions)
rf_auc = binary_evaluator.evaluate(rf_predictions)

# Calculate F1 score
rf_f1 = MulticlassClassificationEvaluator(
    labelCol="high_tip",
    predictionCol="prediction",
    metricName="f1"
).evaluate(rf_predictions)

print(f"Random Forest - Area under ROC: {rf_auc:.4f}")
print(f"Random Forest - Accuracy: {rf_accuracy:.4f}")
print(f"Random Forest - F1 Score: {rf_f1:.4f}")

# Display feature importances
print("Feature Importances:")
for feature, importance in zip(feature_cols, rf_model.featureImportances.toArray()):
    print(f"{feature}: {importance:.4f}")

25/05/23 23:00:54 WARN DAGScheduler: Broadcasting large task binary with size 1525.1 KiB
25/05/23 23:01:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
25/05/23 23:01:44 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
25/05/23 23:02:18 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
25/05/23 23:02:53 WARN DAGScheduler: Broadcasting large task binary with size 1182.7 KiB
                                                                                

Sample predictions from Random Forest:


25/05/23 23:02:52 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


+--------+----------+--------------------+
|high_tip|prediction|         probability|
+--------+----------+--------------------+
|       1|       1.0|[0.24921694381596...|
|       0|       1.0|[0.26819814216803...|
|       1|       1.0|[0.23831019510976...|
|       0|       1.0|[0.21672361186197...|
|       0|       1.0|[0.29064978043296...|
|       0|       0.0|[0.96699949866913...|
|       1|       1.0|[0.12211167092472...|
|       1|       1.0|[0.25181373912848...|
|       0|       0.0|[0.96699624419378...|
|       1|       1.0|[0.21604001703933...|
+--------+----------+--------------------+
only showing top 10 rows



25/05/23 23:02:53 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
25/05/23 23:02:56 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
25/05/23 23:02:59 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB

Random Forest - Area under ROC: 0.8556
Random Forest - Accuracy: 0.8723
Random Forest - F1 Score: 0.8723
Feature Importances:
passenger_count: 0.0002
trip_distance: 0.0038
trip_duration_mins: 0.0035
fare_amount: 0.0066
pickup_hour: 0.0021
pickup_day_of_week: 0.0005
fare_per_mile: 0.0021
fare_per_minute: 0.0018
pickup_borough_vec: 0.0007
dropoff_borough_vec: 0.0011
payment_type: 0.0004


                                                                                

## Model Comparison and Conclusion

## Model 4: Gradient Boosted Trees

In [29]:
from pyspark.ml.classification import GBTClassifier

# Create and train the Gradient Boosted Trees model
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="high_tip",
    maxIter=50,        # Number of iterations
    maxDepth=8,        # Tree depth
    stepSize=0.1,      # Learning rate
    minInstancesPerNode=10,
    maxBins=64,
    seed=42
)

gbt_model = gbt.fit(train_data)

# Make predictions
gbt_predictions = gbt_model.transform(test_data)

# Show sample predictions
print("Sample predictions from Gradient Boosted Trees:")
gbt_predictions.select("high_tip", "prediction").show(10)

# Evaluate the model
gbt_accuracy = multiclass_evaluator.evaluate(gbt_predictions)
gbt_auc = binary_evaluator.evaluate(gbt_predictions)

# Calculate F1 score
gbt_f1 = MulticlassClassificationEvaluator(
    labelCol="high_tip",
    predictionCol="prediction",
    metricName="f1"
).evaluate(gbt_predictions)

print(f"Gradient Boosted Trees - Area under ROC: {gbt_auc:.4f}")
print(f"Gradient Boosted Trees - Accuracy: {gbt_accuracy:.4f}")
print(f"Gradient Boosted Trees - F1 Score: {gbt_f1:.4f}")

# Display feature importances
print("Feature Importances:")
for feature, importance in zip(feature_cols, gbt_model.featureImportances.toArray()):
    print(f"{feature}: {importance:.4f}")

25/05/23 23:03:52 WARN DAGScheduler: Broadcasting large task binary with size 1006.0 KiB
25/05/23 23:03:53 WARN DAGScheduler: Broadcasting large task binary with size 1008.1 KiB
25/05/23 23:03:54 WARN DAGScheduler: Broadcasting large task binary with size 1008.5 KiB
25/05/23 23:03:54 WARN DAGScheduler: Broadcasting large task binary with size 1009.2 KiB
25/05/23 23:03:51 WARN DAGScheduler: Broadcasting large task binary with size 1010.2 KiB
25/05/23 23:03:51 WARN DAGScheduler: Broadcasting large task binary with size 1012.5 KiB
25/05/23 23:03:51 WARN DAGScheduler: Broadcasting large task binary with size 1016.8 KiB
25/05/23 23:03:52 WARN DAGScheduler: Broadcasting large task binary with size 1024.5 KiB
25/05/23 23:03:52 WARN DAGScheduler: Broadcasting large task binary with size 1038.4 KiB
25/05/23 23:03:52 WARN DAGScheduler: Broadcasting large task binary with size 1040.5 KiB
25/05/23 23:03:53 WARN DAGScheduler: Broadcasting large task binary with size 1041.0 KiB
25/05/23 23:03:53 WAR

Sample predictions from Gradient Boosted Trees:


25/05/23 23:04:47 WARN DAGScheduler: Broadcasting large task binary with size 1727.6 KiB
25/05/23 23:04:47 WARN DAGScheduler: Broadcasting large task binary with size 1747.0 KiB


+--------+----------+
|high_tip|prediction|
+--------+----------+
|       1|       1.0|
|       0|       1.0|
|       1|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       1|       1.0|
|       1|       1.0|
|       0|       0.0|
|       1|       1.0|
+--------+----------+
only showing top 10 rows



25/05/23 23:04:49 WARN DAGScheduler: Broadcasting large task binary with size 1735.8 KiB
25/05/23 23:04:50 WARN DAGScheduler: Broadcasting large task binary with size 1747.0 KiB
[Stage 1922:>                                                      (0 + 8) / 17]

Gradient Boosted Trees - Area under ROC: 0.8564
Gradient Boosted Trees - Accuracy: 0.8729
Gradient Boosted Trees - F1 Score: 0.8729
Feature Importances:
passenger_count: 0.0025
trip_distance: 0.0076
trip_duration_mins: 0.0071
fare_amount: 0.0382
pickup_hour: 0.0127
pickup_day_of_week: 0.0046
fare_per_mile: 0.0084
fare_per_minute: 0.0070
pickup_borough_vec: 0.0016
dropoff_borough_vec: 0.0027
payment_type: 0.0020


                                                                                

In [30]:
# Compare model performances
model_performance = {
    "Logistic Regression": {"AUC": lr_auc, "Accuracy": lr_accuracy, "F1": lr_f1},
    "Decision Tree": {"AUC": dt_auc, "Accuracy": dt_accuracy, "F1": dt_f1},
    "Random Forest": {"AUC": rf_auc, "Accuracy": rf_accuracy, "F1": rf_f1},
    "Gradient Boosted Trees": {"AUC": gbt_auc, "Accuracy": gbt_accuracy, "F1": gbt_f1}
}

print("Model Comparison:")
print("-" * 60)
print(f"{'Model':<25} {'AUC':<10} {'Accuracy':<10} {'F1 Score':<10}")
print("-" * 60)
for model, metrics in model_performance.items():
    print(f"{model:<25} {metrics['AUC']:<10.4f} {metrics['Accuracy']:<10.4f} {metrics['F1']:<10.4f}")

# Find the best model based on AUC
best_model_auc = max(model_performance.items(), key=lambda x: x[1]['AUC'])
print(f"\nBest model based on AUC: {best_model_auc[0]} with AUC = {best_model_auc[1]['AUC']:.4f}")

# Find the best model based on F1 score (better for imbalanced datasets)
best_model_f1 = max(model_performance.items(), key=lambda x: x[1]['F1'])
print(f"Best model based on F1 Score: {best_model_f1[0]} with F1 = {best_model_f1[1]['F1']:.4f}")

# Analyze important features from the best model
print("\nTop 5 Most Important Features:")
if best_model_auc[0] == "Random Forest" or best_model_f1[0] == "Random Forest":
    rf_importances = sorted(list(zip(feature_cols, rf_model.featureImportances.toArray())), key=lambda x: x[1], reverse=True)
    for feature, importance in rf_importances[:5]:
        print(f"{feature}: {importance:.4f}")
elif best_model_auc[0] == "Gradient Boosted Trees" or best_model_f1[0] == "Gradient Boosted Trees":
    gbt_importances = sorted(list(zip(feature_cols, gbt_model.featureImportances.toArray())), key=lambda x: x[1], reverse=True)
    for feature, importance in gbt_importances[:5]:
        print(f"{feature}: {importance:.4f}")

Model Comparison:
------------------------------------------------------------
Model                     AUC        Accuracy   F1 Score  
------------------------------------------------------------
Logistic Regression       0.8547     0.8761     0.8716    
Decision Tree             0.8556     0.8722     0.8722    
Random Forest             0.8556     0.8723     0.8723    
Gradient Boosted Trees    0.8564     0.8729     0.8729    

Best model based on AUC: Gradient Boosted Trees with AUC = 0.8564
Best model based on F1 Score: Gradient Boosted Trees with F1 = 0.8729

Top 5 Most Important Features:
fare_amount: 0.0382
pickup_hour: 0.0127
fare_per_mile: 0.0084
trip_distance: 0.0076
trip_duration_mins: 0.0071


## Optimizing Prediction Threshold

Prompt used: "Review the notebook and propose areas for improvement. Optimize one of the 3 models implemented to increase its accuracy even further"

In [31]:
# Let's optimize the threshold for models that support probability predictions
# GBT doesn't support probability predictions, so we'll use a different approach
best_model_name = best_model_auc[0]
best_predictions = None

if best_model_name == "Logistic Regression":
    best_predictions = lr_predictions
elif best_model_name == "Random Forest":
    best_predictions = rf_predictions
elif best_model_name == "Decision Tree":
    best_predictions = dt_predictions
elif best_model_name == "Gradient Boosted Trees":
    print(f"Note: {best_model_name} doesn't support probability predictions.")
    print("Using raw prediction scores for threshold optimization instead.")
    best_predictions = gbt_predictions

if best_predictions is not None:
    # Check if the model supports probability predictions
    has_probability = "probability" in best_predictions.columns
    
    if has_probability and best_model_name != "Gradient Boosted Trees":
        # For models with probability predictions (LR, RF, DT)
        def evaluate_threshold(threshold):
            from pyspark.sql.functions import when, col
            from pyspark.ml.linalg import VectorUDT
            from pyspark.sql.functions import udf
            from pyspark.sql.types import DoubleType
            
            # Extract probability of positive class using UDF
            def extract_prob(probability_vector):
                if probability_vector is not None:
                    return float(probability_vector[1])  # probability of class 1
                return 0.0
            
            extract_prob_udf = udf(extract_prob, DoubleType())
            
            # Create custom prediction using threshold
            threshold_predictions = best_predictions.withColumn(
                "prob_positive",
                extract_prob_udf(col("probability"))
            ).withColumn(
                "custom_prediction",
                when(col("prob_positive") > threshold, 1.0).otherwise(0.0)
            )
            
            # Calculate metrics
            accuracy = MulticlassClassificationEvaluator(
                labelCol="high_tip",
                predictionCol="custom_prediction",
                metricName="accuracy"
            ).evaluate(threshold_predictions)
            
            f1 = MulticlassClassificationEvaluator(
                labelCol="high_tip",
                predictionCol="custom_prediction",
                metricName="f1"
            ).evaluate(threshold_predictions)
            
            return threshold, accuracy, f1
            
    else:
        # For GBT - use raw prediction scores
        def evaluate_threshold(threshold):
            from pyspark.sql.functions import when, col
            
            # For GBT, we can't optimize threshold in the same way
            # since it doesn't provide probability scores
            # We'll return the original metrics
            accuracy = MulticlassClassificationEvaluator(
                labelCol="high_tip",
                predictionCol="prediction",
                metricName="accuracy"
            ).evaluate(best_predictions)
            
            f1 = MulticlassClassificationEvaluator(
                labelCol="high_tip",
                predictionCol="prediction",
                metricName="f1"
            ).evaluate(best_predictions)
            
            return threshold, accuracy, f1
    
    # Test different thresholds
    if best_model_name != "Gradient Boosted Trees":
        thresholds = [0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7]
        print(f"\nOptimizing threshold for {best_model_name}...")
        threshold_metrics = [evaluate_threshold(t) for t in thresholds]
        
        # Print threshold evaluation results
        print("\nThreshold Optimization Results:")
        print("-" * 50)
        print(f"{'Threshold':<10} {'Accuracy':<10} {'F1 Score':<10}")
        print("-" * 50)
        for threshold, accuracy, f1 in threshold_metrics:
            print(f"{threshold:<10.2f} {accuracy:<10.4f} {f1:<10.4f}")
        
        # Find best threshold
        best_threshold = max(threshold_metrics, key=lambda x: x[2])[0]
        print(f"\nBest threshold: {best_threshold} (optimized for F1 score)")
        
        # Apply the best threshold for final evaluation
        from pyspark.sql.functions import udf
        from pyspark.sql.types import DoubleType
        
        def extract_prob(probability_vector):
            if probability_vector is not None:
                return float(probability_vector[1])
            return 0.0
        
        extract_prob_udf = udf(extract_prob, DoubleType())
        
        optimized_predictions = best_predictions.withColumn(
            "prob_positive",
            extract_prob_udf(col("probability"))
        ).withColumn(
            "optimized_prediction",
            when(col("prob_positive") > best_threshold, 1.0).otherwise(0.0)
        )
        
        # Calculate final metrics with optimized threshold
        opt_accuracy = MulticlassClassificationEvaluator(
            labelCol="high_tip",
            predictionCol="optimized_prediction",
            metricName="accuracy"
        ).evaluate(optimized_predictions)
        
        opt_f1 = MulticlassClassificationEvaluator(
            labelCol="high_tip",
            predictionCol="optimized_prediction",
            metricName="f1"
        ).evaluate(optimized_predictions)
        
        print(f"\nFinal optimized metrics for {best_model_name}:")
        print(f"Accuracy: {opt_accuracy:.4f}")
        print(f"F1 Score: {opt_f1:.4f}")
        
    else:
        # For GBT, show that threshold optimization isn't applicable
        print(f"\nThreshold optimization not applicable for {best_model_name}")
        print("GBT uses internal threshold optimization during training.")
        print(f"Current metrics for {best_model_name}:")
        print(f"Accuracy: {gbt_accuracy:.4f}")
        print(f"F1 Score: {gbt_f1:.4f}")

Note: Gradient Boosted Trees doesn't support probability predictions.
Using raw prediction scores for threshold optimization instead.

Threshold optimization not applicable for Gradient Boosted Trees
GBT uses internal threshold optimization during training.
Current metrics for Gradient Boosted Trees:
Accuracy: 0.8729
F1 Score: 0.8729
