# Sales Prediction using Lakehouse Data

## Step 1: Create a Feature Dataset for Machine Learning

In this step, we create a machine-learning–ready feature dataset by joining
transactional sales data (`fact_sales`) with product attributes (`dim_products`).

Why this step is important:
- Raw fact tables are not always directly suitable for ML
- Joining dimension data enriches features and improves model quality
- This mirrors real-world feature engineering in production systems

The resulting dataset will be used in subsequent steps for training a sales prediction model.



In [0]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

import mlflow
import mlflow.spark


In [0]:
from pyspark.sql.functions import split, size, slice, concat_ws

# Load Gold tables
fact_df = spark.table("bike_data.gold.fact_sales")
dim_products_df = spark.table("bike_data.gold.dim_products")

# Transform product_key in dim_products to match fact_sales format
dim_products_transformed = dim_products_df.withColumn(
    "product_key_fact",
    concat_ws(
        "-",
        slice(
            split(dim_products_df.product_key, "-"),
            size(split(dim_products_df.product_key, "-")) - 2,

            3
        )
    )
)

# Join fact and dimension tables
feature_df = (
    fact_df.alias("f")
    .join(
        dim_products_transformed.alias("p"),
        col("f.product_key") == col("p.product_key_fact"),
        how="inner"
    )
)

# Basic validation
feature_df.printSchema()
feature_df.display()


# Target Variable and Select Features for ML

In this step, we prepare the dataset for machine learning by:
- Identifying the target variable (`sales`)
- Selecting relevant numerical and categorical features
- Removing columns that are identifiers or not suitable for model training

The goal is to keep the feature set simple, meaningful, and aligned with
real-world availability at prediction time.



In [0]:
# Step 2: Select target and feature columns

ml_df = feature_df.select(
    "sales",            # target variable
    "quantity",
    "price",
    "product_cost",
    "category",
    "subcategory"
)

# Basic validation
ml_df.printSchema()
ml_df.display()


## Encode Categorical Features

Machine learning algorithms require numerical inputs.
In this step, categorical features are converted into numerical form using
StringIndexer.

Why StringIndexer:
- Assigns unique numeric indices to categories
- Handles unseen or invalid values safely
- Scales well for large datasets in Spark

This step prepares categorical columns for feature vector assembly.


In [0]:
from pyspark.ml.feature import StringIndexer

# Index categorical columns
category_indexer = StringIndexer(
    inputCol="category",
    outputCol="category_idx",
    handleInvalid="skip"
)

subcategory_indexer = StringIndexer(
    inputCol="subcategory",
    outputCol="subcategory_idx",
    handleInvalid="skip"
)

# Fit and transform the dataset
ml_df_indexed = category_indexer.fit(ml_df).transform(ml_df)
ml_df_indexed = subcategory_indexer.fit(ml_df_indexed).transform(ml_df_indexed)

# Validate output
ml_df_indexed.printSchema()
ml_df_indexed.display()


##  Assemble Feature Vector

In this step, all selected numerical and encoded categorical features
are combined into a single feature vector.

Why this step is required:
- Spark ML models expect a single `features` column
- VectorAssembler consolidates multiple feature columns efficiently
- This keeps the pipeline clean and scalable

Only relevant, model-ready columns are included in the feature vector.


In [0]:
from pyspark.ml.feature import VectorAssembler

# Assemble numerical and indexed categorical features
assembler = VectorAssembler(
    inputCols=[
        "quantity",
        "price",
        "product_cost",
        "category_idx",
        "subcategory_idx"
    ],
    outputCol="features"
)

ml_df_final = assembler.transform(ml_df_indexed)

# Validate assembled features
ml_df_final.select("features", "sales").display()


## Train–Test Split

To evaluate the model fairly, the dataset is split into training and test sets.
The training set is used to learn patterns, while the test set is used to
evaluate performance on unseen data.

A fixed random seed is used to ensure reproducibility of results.


In [0]:
# Step 5: Split data into training and testing sets

train_df, test_df = ml_df_final.randomSplit([0.8, 0.2], seed=42)

# Basic validation
print(f"Training records: {train_df.count()}")
print(f"Testing records: {test_df.count()}")


## Step 6: Model Selection and Training

In this step, a regression model is trained to predict `sales` based on the
assembled feature vector.

A **Linear Regression** model is chosen as a baseline because:
- The target variable (`sales`) is continuous
- Linear Regression is simple, fast, and interpretable
- It provides a strong baseline before exploring complex models

The model is trained using the training dataset created in the previous step.


In [0]:
from pyspark.ml.regression import LinearRegression

# Initialize Linear Regression model
lr = LinearRegression(
    featuresCol="features",
    labelCol="sales"
)

# Train the model
lr_model = lr.fit(train_df)

print("Linear Regression model training completed.")


## Generate Predictions

In this step, the trained regression model is applied to the test dataset
to generate sales predictions.

These predictions will be used in the next steps to:
- Evaluate model performance
- Understand how close predicted values are to actual sales
- Validate whether the model learns meaningful patterns


In [0]:
# Step 7: Generate predictions on test data

predictions_df = lr_model.transform(test_df)


In [0]:
final_predictions_df = predictions_df.select(
    "quantity",
    "price",
    "product_cost",
    "category",
    "subcategory",
    "sales",
    "prediction"
)

final_predictions_df.display()


## Step 8: Model Evaluation

In this step, we evaluate the performance of the regression model using
standard regression metrics.

Evaluation helps answer:
- How close are predicted sales values to actual sales?
- How well does the model explain variance in the data?

Two metrics are used:
- RMSE (Root Mean Squared Error): Measures average prediction error magnitude
- R² (Coefficient of Determination): Indicates how much variance is explained


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE evaluation
rmse_evaluator = RegressionEvaluator(
    labelCol="sales",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = rmse_evaluator.evaluate(predictions_df)

# R2 evaluation
r2_evaluator = RegressionEvaluator(
    labelCol="sales",
    predictionCol="prediction",
    metricName="r2"
)

r2 = r2_evaluator.evaluate(predictions_df)

print(f"RMSE: {rmse}")
print(f"R2 Score: {r2}")


## Step 9: Experiment Tracking with MLflow

In this step, the trained model and evaluation metrics are logged using MLflow.

Why MLflow is used:
- Tracks experiments and metrics
- Stores trained model artifacts
- Enables reproducibility and comparison of runs
- Demonstrates end-to-end ML lifecycle management in Databricks

This completes the machine learning workflow for sales prediction.


In [0]:
import os

# Set MLflow temp directory to Unity Catalog volume
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/bike_data/ml/mlflow_tmp"


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bike_data.ml;
CREATE VOLUME IF NOT EXISTS bike_data.ml.mlflow_tmp;


In [0]:
import mlflow
import mlflow.spark

# Set MLflow experiment
mlflow.set_experiment("/Shared/sales_prediction_experiment")

with mlflow.start_run():
    
    # Log evaluation metrics
    mlflow.log_metric("RMSE", rmse)
    mlflow.log_metric("R2", r2)
    
    # Log trained model
    mlflow.spark.log_model(lr_model, "linear_regression_model")
    
    print("Model and metrics logged to MLflow successfully.")


## Persisting Predictions Back to Delta Tables

To complete the Database ↔ AI workflow, model predictions are written back
to a Gold-layer Delta table. This enables downstream analytics, reporting,
and business consumption of AI outputs.


In [0]:
from pyspark.sql.functions import current_timestamp

final_predictions_df = predictions_df.select(
    "quantity",
    "price",
    "product_cost",
    "category",
    "subcategory",
    "sales",
    "prediction"
).withColumn(
    "prediction_ts",
    current_timestamp()
)

final_predictions_df.write.mode("overwrite").saveAsTable(
    "bike_data.gold.sales_predictions"
)


In [0]:
final_predictions_df.display()

## Conclusion

In this notebook, we implemented an end-to-end machine learning workflow on top of a
modern lakehouse architecture using Databricks.

Starting from analytics-ready Gold tables, we created a feature dataset by joining
transactional sales data with product dimension attributes. This ensured that the
model was trained on clean, enriched, and business-relevant data.

A baseline Linear Regression model was trained to predict sales values, demonstrating
how machine learning can be applied to support revenue forecasting and planning use cases.
Model performance was evaluated using RMSE and R² metrics, and experiments were tracked
using MLflow to ensure reproducibility and transparency.

This approach highlights the seamless integration between data engineering and AI
workloads, where Delta tables serve as both analytics and ML feature sources.
The solution is scalable and can be further enhanced with advanced models,
additional feature engineering, or real-time prediction use cases.
