In [0]:
import mlflow
import mlflow.sklearn
from pyspark.sql import functions as F
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os

# Use same database
spark.sql("USE dbx_demo")

# Compute a trip duration feature (minutes) and keep small, clean sample
df = (
    spark.table("nyc_taxi")
    .withColumn(
        "duration_min",
        (F.col("tpep_dropoff_datetime").cast("long") - F.col("tpep_pickup_datetime").cast("long")) / 60.0
    )
    .select("trip_distance", "duration_min", "fare_amount")
    .where(
        (F.col("fare_amount") > 0) &
        (F.col("trip_distance") > 0) &
        (F.col("duration_min") > 0) &
        (F.col("duration_min") < 180)
    )
    .limit(10000)
)

pdf = df.toPandas()

X = pdf[["trip_distance", "duration_min"]]
y = pdf["fare_amount"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Set an experiment under your user automatically
user = spark.sql("SELECT current_user()").collect()[0][0]
mlflow.set_experiment(f"/Users/{user}/dbx_demo_nyc_taxi")



with mlflow.start_run():
    # Model
    model = LinearRegression()
    model.fit(X_train, y_train)

    # Eval
    y_pred = model.predict(X_test)
    r2  = r2_score(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mae  = mean_absolute_error(y_test, y_pred)

    # Log params/metrics
    mlflow.log_param("features", list(X.columns))
    mlflow.log_param("n_rows", int(len(pdf)))
    mlflow.log_metric("r2", float(r2))
    mlflow.log_metric("rmse", float(rmse))
    mlflow.log_metric("mae", float(mae))

    # Log model
    mlflow.sklearn.log_model(
    model,
    artifact_path="model",
    input_example=X_train.iloc[[0]]
)

    # Coefficients artifact
    coeffs = pd.DataFrame({"feature": X.columns, "coef": model.coef_})
    coeffs.to_csv(coeffs_path, index=False)
    mlflow.log_artifact(coeffs_path)

    # Plot: predicted vs actual
    plt.figure()
    plt.scatter(y_test, y_pred, alpha=0.3)
    plt.xlabel("Actual fare_amount")
    plt.ylabel("Predicted fare_amount")
    plt.title("Predicted vs Actual")
    plt.savefig(plot_path, bbox_inches="tight")
    mlflow.log_artifact(plot_path)

    print(f"R^2:  {r2:.3f}")
    print(f"RMSE: {rmse:.3f}")
    print(f"MAE:  {mae:.3f}")