In [5]:
# -----------------------------------------------------------
# NYC Yellow-Taxi duration model — 50 % sample, 4 GB driver
# -----------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, hour, dayofweek
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [6]:
# Spark session with larger heap & fewer shuffle files
spark = (SparkSession.builder
         .appName("NYC Duration RF 50pct")
         .master("local[*]")
         .config("spark.driver.memory", "4g")          #  ➜ 4 GB heap
         .config("spark.sql.shuffle.partitions", "48") #  ➜ fewer temp files
         .getOrCreate())

In [7]:
# Load parquet slice
df = spark.read.parquet("data/nyc_yellow_2023-01.parquet")

row_cnt = df.count()
col_cnt = len(df.columns)
print(f"df.shape  → ({row_cnt:,}, {col_cnt})")       

print("\n--- df.info()  (via printSchema) ---")
df.printSchema()

print("\n--- first 5 rows (via show) ---")
df.show(5, truncate=False)

df.shape  → (4,591,845, 20)

--- df.info()  (via printSchema) ---
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)


--- firs

In [8]:
# Feature-engineering

taxi = (df
        .withColumn("duration_min",
                    (unix_timestamp("tpep_dropoff_datetime") -
                     unix_timestamp("tpep_pickup_datetime")) / 60)
        .filter((col("duration_min") > 0) & (col("duration_min") < 180))
        .withColumn("hour",  hour("tpep_pickup_datetime"))
        .withColumn("wday",  dayofweek("tpep_pickup_datetime"))
        .select("passenger_count", "trip_distance",
                "hour", "wday", "PULocationID", "DOLocationID",
                "duration_min"))

In [9]:
# Drop remaining nulls + 50 % sample
taxi_clean  = taxi.na.drop()
taxi_sample = taxi_clean.sample(fraction=0.5, seed=42)

print(f"\nRows after cleaning = {taxi_clean.count():,}")
print(f"Rows in 50 % sample  = {taxi_sample.count():,}")

                                                                                


Rows after cleaning = 3,330,149
Rows in 50 % sample  = 1,666,724


                                                                                

In [10]:
# ML pipeline
assembler = VectorAssembler(
    inputCols=["passenger_count","trip_distance","hour",
               "wday","PULocationID","DOLocationID"],
    outputCol="features",
    handleInvalid="skip")

rf = RandomForestRegressor(
        labelCol="duration_min",
        featuresCol="features",
        numTrees=50,           # lighter memory than 100
        maxDepth=12,
        seed=42)

model  = Pipeline(stages=[assembler, rf]).fit(taxi_sample)
preds  = model.transform(taxi_sample)

25/06/25 23:48:20 WARN DAGScheduler: Broadcasting large task binary with size 1022.8 KiB
25/06/25 23:48:35 WARN DAGScheduler: Broadcasting large task binary with size 1978.8 KiB
25/06/25 23:48:52 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
25/06/25 23:49:11 WARN DAGScheduler: Broadcasting large task binary with size 1205.5 KiB
25/06/25 23:49:13 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB
25/06/25 23:49:43 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/06/25 23:49:45 WARN DAGScheduler: Broadcasting large task binary with size 13.7 MiB
25/06/25 23:50:25 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
25/06/25 23:50:29 WARN DAGScheduler: Broadcasting large task binary with size 25.6 MiB
25/06/25 23:51:22 WARN DAGScheduler: Broadcasting large task binary with size 7.7 MiB


In [11]:
# Metrics

ev = RegressionEvaluator(labelCol="duration_min", predictionCol="prediction")
print("\n--- Model metrics on 50 % sample ---")
print("RMSE:", round(ev.setMetricName("rmse").evaluate(preds), 3))
print("MAE :", round(ev.setMetricName("mae").evaluate(preds), 3))
print("R²  :", round(ev.setMetricName("r2").evaluate(preds), 3))


--- Model metrics on 50 % sample ---


                                                                                

RMSE: 6.715


                                                                                

MAE : 4.116




R²  : 0.807


                                                                                

### Results (50 % sample, 50-tree RF)

* **RMSE**: 6.72 min  
* **MAE** : 4.12 min  
* **R²**  : 0.81

Reducing the dataset to 50 % and the forest size to 50 trees lowers
memory footprint from >4 GB to <3 GB, at the cost of ~3 min higher RMSE.
The model still explains 81 % of variance, sufficient for a coursework
demonstration of Spark-based trip-duration prediction.
