In [0]:
import os
import pickle 
import numpy as np
import pyspark.sql.functions as F
import pyspark.pandas as ps

from pyspark.sql import DataFrame
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.ensemble import RandomForestRegressor

Ingest Data

In [0]:
df = spark.table("samples.nyctaxi.trips")  

In [0]:
df.printSchema()
df = df.limit(200)

Preprocess Data

In [0]:
df_clean = (
    df
    .filter("fare_amount > 0")
    .filter("trip_distance > 0")
    .dropna(subset=["tpep_pickup_datetime", "tpep_dropoff_datetime"])
)


In [0]:
df_clean = (
    df_clean
    .withColumn("tpep_pickup_datetime", F.to_timestamp("tpep_pickup_datetime"))
)

In [0]:
psdf = df_clean.pandas_api()

Feature Engineering

In [0]:
psdf["tpep_pickup_datetime"] = ps.to_datetime(psdf["tpep_pickup_datetime"])
psdf["hour"] = psdf["tpep_pickup_datetime"].dt.hour
psdf["day"] = psdf["tpep_pickup_datetime"].dt.dayofweek
psdf["month"] = psdf["tpep_pickup_datetime"].dt.month


In [0]:
model_psdf = psdf[
    [
        "fare_amount",
        "trip_distance",
        "hour",
        "day",
        "month"
    ]
].dropna()


In [0]:
pdf = model_psdf.to_pandas() # For sklearn

Feature Selection

In [0]:
corr = pdf.corr(numeric_only=True)["fare_amount"].abs().sort_values(ascending=False)


In [0]:
corr

In [0]:
selected_features = ["trip_distance", "hour", "day"]


Modelling

In [0]:
X = pdf[selected_features]
y = pdf["fare_amount"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)


In [0]:
model = RandomForestRegressor(
    n_estimators=200,
    max_depth=14,
    random_state=42
)

model.fit(X_train, y_train)


Evaluation

In [0]:
preds = model.predict(X_test)

mae = mean_absolute_error(y_test, preds)
rmse = np.sqrt(mean_squared_error(y_test, preds))

print("MAE:", mae)
print("RMSE:", rmse)


Saving models and features

In [0]:

model_filename = "model_1.pkl"
features_filename = "features_1.pkl"

with open(model_filename, "wb") as f:
    pickle.dump(model, f)

with open(features_filename, "wb") as f:
    pickle.dump(selected_features, f)

print("Model Saved")
print("Features Saved")