# Databricks Production Pipeline: Real-Time Electricity Price Forecasting
This notebook demonstrates a production-ready architecture for real-time forecasting using Databricks. It covers ETL with Structured Streaming, feature engineering, model training with MLflow, batch inference, and Delta Table storage. The workflow is designed for deployment as a Databricks Job.

## 1. Configuration & Imports
Set up required libraries, paths, and Databricks configs. Ensure MLflow autologging is enabled for XGBoost.

In [None]:
# Databricks notebook source
import mlflow
import mlflow.xgboost
import xgboost as xgb
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime
import pandas as pd

# Set paths
raw_data_path = "/dbfs/FileStore/energy_price/raw/352_2024-06-06T0000_2025-06-06T0000.csv"
delta_feature_table = "/dbfs/FileStore/energy_price/delta/features"
delta_prediction_table = "/dbfs/FileStore/energy_price/delta/predictions"
mlflow.set_experiment("/Users/your_user/energy_price_xgb")
mlflow.xgboost.autolog()

## 2. ETL & Feature Engineering with Structured Streaming
Use Databricks Auto Loader to ingest CSV as a stream, engineer features, and write to a Delta Table. This simulates real-time ingestion for production.

In [None]:
# Define schema for input data
schema = StructType([
    StructField("start_time", StringType(), True),
    StructField("end_time", StringType(), True),
    StructField("price_eur_mwh", DoubleType(), True)
])

# Read CSV as a stream (Auto Loader)
df_stream = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", True)
    .schema(schema)
    .load(raw_data_path)
)

# Feature engineering
feature_df = (
    df_stream
    .withColumn("start_time", to_timestamp(col("start_time")))
    .withColumn("lag_1d", lag("price_eur_mwh", 1).over(Window.orderBy("start_time")))
    .withColumn("lag_7d", lag("price_eur_mwh", 7).over(Window.orderBy("start_time")))
    .withColumn("roll_mean_7d", avg("price_eur_mwh").over(Window.orderBy("start_time").rowsBetween(-6, 0)))
    .withColumn("roll_mean_30d", avg("price_eur_mwh").over(Window.orderBy("start_time").rowsBetween(-29, 0)))
    .withColumn("day_of_week", dayofweek("start_time"))
    .withColumn("month", month("start_time"))
    .withColumn("is_weekend", (dayofweek("start_time") >= 6).cast("int"))
    .dropna()
)

# Write features to Delta Table
feature_query = (
    feature_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/dbfs/FileStore/energy_price/checkpoints/features")
    .start(delta_feature_table)
)
# Wait for streaming to initialize (in production, this runs continuously)
feature_query.awaitTermination(timeout=60)

## 3. Model Training & Registration (XGBoost + MLflow)
Train XGBoost on the engineered features and register the model in MLflow. In production, this can be triggered on a schedule or by new data arrival.

In [None]:
# Load features from Delta Table for training
features_pdf = spark.read.format("delta").load(delta_feature_table).toPandas()

# Prepare features/target
feature_cols = [c for c in features_pdf.columns if c not in ["start_time", "end_time", "price_eur_mwh"]]
X = features_pdf[feature_cols]
y = features_pdf["price_eur_mwh"]

# Train/test split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False)

# Train and log model
with mlflow.start_run(run_name="xgb_train"):
    model = xgb.XGBRegressor()
    model.fit(X_train, y_train)
    mlflow.xgboost.log_model(model, "model")
    mlflow.log_metric("train_rows", len(X_train))
    mlflow.log_metric("test_rows", len(X_test))
    run_id = mlflow.active_run().info.run_id
    model_uri = f"runs:/{run_id}/model"
    print(f"Model logged to: {model_uri}")

## 4. Batch Inference Pipeline
Load new feature data, run batch inference with the latest XGBoost model, and write predictions to a Delta Table. This can be triggered by new data or on a schedule.

In [None]:
# Load latest model from MLflow
latest_model = mlflow.xgboost.load_model(model_uri)

# Load new features for prediction (simulate with test set)
X_pred = X_test.copy()
pred_df = features_pdf.iloc[X_test.index][["start_time"]].copy()
pred_df["prediction"] = latest_model.predict(X_pred)

# Convert to Spark DataFrame and write to Delta Table
pred_sdf = spark.createDataFrame(pred_df)
pred_sdf.write.format("delta").mode("append").save(delta_prediction_table)

## 5. Databricks SQL Dashboard
Create a Databricks SQL Dashboard to visualize predictions from the Delta Table. (This step is done in the Databricks UI: create a new dashboard, add a query on the `delta_prediction_table`, and set auto-refresh.)

## 6. Databricks Workflow (Job) Setup
To automate this pipeline, create a Databricks Job with the following tasks:
- Run this notebook for ETL & feature engineering
- Run this notebook for model training
- Run this notebook for batch inference

Example JSON for Databricks Job (replace paths as needed):

In [None]:
# Example Databricks Job JSON (for documentation)
import json
job_json = {
    "name": "Energy Price Forecasting Pipeline",
    "tasks": [
        {
            "task_key": "etl_feature_engineering",
            "notebook_task": {"notebook_path": "/Repos/your_user/energy_price/project/databricks_production_pipeline.ipynb", "base_parameters": {"step": "etl"}},
            "cluster_spec": {"existing_cluster_id": "YOUR_CLUSTER_ID"}
        },
        {
            "task_key": "model_training",
            "depends_on": [{"task_key": "etl_feature_engineering"}],
            "notebook_task": {"notebook_path": "/Repos/your_user/energy_price/project/databricks_production_pipeline.ipynb", "base_parameters": {"step": "train"}},
            "cluster_spec": {"existing_cluster_id": "YOUR_CLUSTER_ID"}
        },
        {
            "task_key": "batch_inference",
            "depends_on": [{"task_key": "model_training"}],
            "notebook_task": {"notebook_path": "/Repos/your_user/energy_price/project/databricks_production_pipeline.ipynb", "base_parameters": {"step": "inference"}},
            "cluster_spec": {"existing_cluster_id": "YOUR_CLUSTER_ID"}
        }
    ]
}
print(json.dumps(job_json, indent=2))

---
**Summary:**
- This notebook is ready for Databricks deployment and can be orchestrated as a Job.
- All data is stored in Delta Tables for reliability and performance.
- Model is tracked and versioned with MLflow.
- Dashboarding is handled in Databricks SQL.
- Adjust paths and cluster IDs as needed for your workspace.