# Phase 5: Pipeline Orchestration

This notebook creates a complete sklearn pipeline and provides batch prediction functionality.

Note: Using sklearn instead of PySpark ML for compatibility with Databricks Community Edition.
Model is kept in memory (not saved to disk) due to DBFS write restrictions on free tier.

## Import Libraries

In [0]:
from datetime import datetime

import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

## Configuration

In [0]:
# Data tables
TRAIN_TABLE_NAME = "gold_vehicles_train"
TEST_TABLE_NAME = "gold_vehicles_test"

# Feature columns
CATEGORICAL_COLS = ["Make", "BodyType", "FuelType", "Transmission", "Drivetrain"]
NUMERICAL_COLS = [
    "Kilometres",
    "City",
    "Highway",
    "vehicle_age",
    "avg_fuel_efficiency",
    "engine_displacement",
    "cylinder_count",
    "model_frequency_log",
]
TARGET_COL = "Price"

## Load Training Data

In [0]:
# Load train data from Delta table
train_spark_df = spark.table(TRAIN_TABLE_NAME)
print(f"Training data loaded: {train_spark_df.count()} rows")

# Convert to pandas
feature_cols = CATEGORICAL_COLS + NUMERICAL_COLS + [TARGET_COL]
train_df = train_spark_df.select(feature_cols).toPandas()

# Separate features and target
X_train = train_df[CATEGORICAL_COLS + NUMERICAL_COLS]
y_train = train_df[TARGET_COL]

print(f"Training features shape: {X_train.shape}")

Training data loaded: 15634 rows
Training features shape: (15634, 13)


## Build Complete Pipeline

In [0]:
# Create preprocessing pipeline
preprocessor = ColumnTransformer(
    transformers=[
        (
            "cat",
            OneHotEncoder(handle_unknown="ignore", sparse_output=False),
            CATEGORICAL_COLS,
        ),
        ("num", StandardScaler(), NUMERICAL_COLS),
    ]
)

# Create complete pipeline with Random Forest (best performing model)
pipeline = Pipeline(
    [
        ("preprocessor", preprocessor),
        (
            "regressor",
            RandomForestRegressor(
                n_estimators=100,
                max_depth=10,
                min_samples_leaf=5,
                random_state=42,
                n_jobs=-1,
            ),
        ),
    ]
)

print("Pipeline created with stages:")
print("  1. ColumnTransformer (OneHotEncoder + StandardScaler)")
print("  2. RandomForestRegressor")

Pipeline created with stages:
  1. ColumnTransformer (OneHotEncoder + StandardScaler)
  2. RandomForestRegressor


## Train Pipeline

In [0]:
print("Training complete pipeline...")
start_time = datetime.now()

pipeline.fit(X_train, y_train)

training_time = (datetime.now() - start_time).total_seconds()
print(f"Pipeline training complete in {training_time:.2f} seconds")

# Note: Model is kept in memory. DBFS write is not available on Community Edition.
print("Model stored in memory (trained_pipeline variable)")

Training complete pipeline...
Pipeline training complete in 5.73 seconds
Model stored in memory (trained_pipeline variable)


## Test Pipeline on Sample Data

In [0]:
# Test on a small sample from training data
test_sample = X_train.head(5)
sample_predictions = pipeline.predict(test_sample)

print("Sample predictions:")
sample_results = test_sample[["Make"]].copy()
sample_results["Actual_Price"] = y_train.head(5).values
sample_results["Predicted_Price"] = sample_predictions
print(sample_results.to_string(index=False))

Sample predictions:
 Make  Actual_Price  Predicted_Price
Acura       43880.0     38940.895730
Acura       36486.0     35920.127376
Acura       44599.0     39027.551537
Acura       46989.0     38487.158502
Acura       60899.0     62339.299502


## Batch Prediction Function

In [0]:

def batch_predict(input_table_name, output_table_name, model=None):
    """
    Perform batch predictions on new data.

    Parameters:
    - input_table_name: Name of the input Delta table
    - output_table_name: Name of the output Delta table for predictions
    - model: Trained sklearn pipeline (uses global 'pipeline' if not provided)
    """
    # Use provided model or global pipeline
    if model is None:
        model = pipeline

    print(f"Loading input data from {input_table_name}...")
    input_spark_df = spark.table(input_table_name)
    print(f"Input rows: {input_spark_df.count()}")

    # Convert to pandas
    input_df = input_spark_df.toPandas()

    # Prepare features - MUST be in exact same order as during training
    feature_cols = CATEGORICAL_COLS + NUMERICAL_COLS
    X = input_df[feature_cols].copy()

    print("Generating predictions...")
    predictions = model.predict(X)

    # Create output dataframe
    output_df = input_df.copy()
    output_df["prediction"] = predictions
    output_df["prediction_timestamp"] = datetime.now()
    output_df["model_version"] = "v1.0_sklearn"

    # Select output columns
    output_cols = [
        "record_id",
        "Make",
        "Model",
        "Year",
        "Kilometres",
        "Price",
        "prediction",
        "prediction_timestamp",
        "model_version",
    ]
    available_output_cols = [c for c in output_cols if c in output_df.columns]
    output_df = output_df[available_output_cols]

    # Convert back to Spark and save to Delta
    output_spark_df = spark.createDataFrame(output_df)
    output_spark_df.write.format("delta").mode("overwrite").saveAsTable(
        output_table_name
    )

    print(f"Predictions saved to '{output_table_name}' table")
    print(f"Total predictions: {len(output_df)}")

    return output_df


## Run Batch Prediction on Test Set

In [0]:
PREDICTIONS_TABLE_NAME = "vehicle_price_predictions"

print("Running batch prediction on test set...")
predictions_result = batch_predict(
    TEST_TABLE_NAME, PREDICTIONS_TABLE_NAME, model=pipeline
)

print("\nSample predictions:")
print(predictions_result.head(10).to_string(index=False))

Running batch prediction on test set...
Loading input data from gold_vehicles_test...
Input rows: 3827
Generating predictions...
Predictions saved to 'vehicle_price_predictions' table
Total predictions: 3827

Sample predictions:
 record_id  Make Model  Year  Kilometres   Price   prediction       prediction_timestamp model_version
         2 Acura   RDX  2019     33032.0 40888.0 55431.066417 2025-12-05 01:57:49.407402  v1.0_sklearn
         6 Acura   RDX  2020     27800.0 49099.0 59081.510702 2025-12-05 01:57:49.407402  v1.0_sklearn
         8 Acura   RDX  2020     60892.0 38495.0 38593.491441 2025-12-05 01:57:49.407402  v1.0_sklearn
        13 Acura   MDX  2020     65684.0 40995.0 38925.868947 2025-12-05 01:57:49.407402  v1.0_sklearn
        19 Acura   ILX  2017     95000.0 21950.0 22158.743041 2025-12-05 01:57:49.407402  v1.0_sklearn
        23 Acura   TLX  2019     43150.0 33495.0 30181.809181 2025-12-05 01:57:49.407402  v1.0_sklearn
        29 Acura   RDX  2018     98175.0 29800.0 3

## Prediction Statistics

In [0]:
# Load predictions and calculate statistics
predictions_df = spark.table(PREDICTIONS_TABLE_NAME).toPandas()

# Calculate metrics
actual = predictions_df["Price"]
predicted = predictions_df["prediction"]

mae = mean_absolute_error(actual, predicted)
rmse = np.sqrt(mean_squared_error(actual, predicted))
r2 = r2_score(actual, predicted)

print("=" * 80)
print("PREDICTION STATISTICS")
print("=" * 80)
print(f"Mean Absolute Error: ${mae:,.2f}")
print(f"Root Mean Squared Error: ${rmse:,.2f}")
print(f"R2 Score: {r2:.4f}")
print(f"\nActual Price Range: ${actual.min():,.2f} - ${actual.max():,.2f}")
print(f"Predicted Price Range: ${predicted.min():,.2f} - ${predicted.max():,.2f}")
print(f"Average Actual Price: ${actual.mean():,.2f}")
print(f"Average Predicted Price: ${predicted.mean():,.2f}")
print("=" * 80)

PREDICTION STATISTICS
Mean Absolute Error: $8,807.53
Root Mean Squared Error: $20,905.91
R2 Score: 0.8521

Actual Price Range: $1,500.00 - $829,987.00
Predicted Price Range: $5,347.37 - $668,029.58
Average Actual Price: $46,857.95
Average Predicted Price: $46,627.60


## Pipeline Metadata

In [0]:
# Pipeline metadata
pipeline_metadata = {
    "model_storage": "In-memory (Databricks Community Edition limitation)",
    "created_timestamp": datetime.now().isoformat(),
    "categorical_features": CATEGORICAL_COLS,
    "numerical_features": NUMERICAL_COLS,
    "target_variable": TARGET_COL,
    "model_type": "RandomForestRegressor (sklearn)",
    "model_params": {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_leaf": 5,
    },
    "training_time_seconds": training_time,
}

print("Pipeline Metadata:")
for key, value in pipeline_metadata.items():
    print(f"  {key}: {value}")

Pipeline Metadata:
  model_storage: In-memory (Databricks Community Edition limitation)
  created_timestamp: 2025-12-05T01:57:53.051646
  categorical_features: ['Make', 'BodyType', 'FuelType', 'Transmission', 'Drivetrain']
  numerical_features: ['Kilometres', 'City', 'Highway', 'vehicle_age', 'avg_fuel_efficiency', 'engine_displacement', 'cylinder_count', 'model_frequency_log']
  target_variable: Price
  model_type: RandomForestRegressor (sklearn)
  model_params: {'n_estimators': 100, 'max_depth': 10, 'min_samples_leaf': 5}
  training_time_seconds: 5.730247


## Summary

In [0]:
print("=" * 80)
print("PIPELINE ORCHESTRATION COMPLETE")
print("=" * 80)
print("Pipeline model: Stored in memory (variable: pipeline)")
print(f"Test predictions table: {PREDICTIONS_TABLE_NAME}")
print("Batch prediction function: batch_predict()")
print("Model type: sklearn RandomForestRegressor")
print(f"Test set RMSE: ${rmse:,.2f}")
print(f"Test set R2: {r2:.4f}")
print(f"Completion timestamp: {datetime.now()}")
print("=" * 80)

PIPELINE ORCHESTRATION COMPLETE
Pipeline model: Stored in memory (variable: pipeline)
Test predictions table: vehicle_price_predictions
Batch prediction function: batch_predict()
Model type: sklearn RandomForestRegressor
Test set RMSE: $20,905.91
Test set R2: 0.8521
Completion timestamp: 2025-12-05 01:57:53.298417
