# MOdule 6 - E2E


### To create an end-to-end machine learning solution in Microsoft Fabric using PySpark
 we can follow a structured workflow that includes:

* Data Preparation: Loading data from both a Lakehouse and Data Lake, and merging them.
* Exploratory Data Analysis (EDA): Understanding the dataset through visualization and descriptive statistics.
* Feature Engineering: Preprocessing the data to prepare it for machine learning.
* Model Building: Training a prediction model.
* Model Evaluation: Testing the model on unseen data.
* API or Inference: Implementing a simple method for inference to make predictions using the trained model.

## Fabric Prerequistis

You need to have Lakehouse enabled and connected. 

Link to Lakehouse:
- Tables: `abfss://Fabric_2024@onelake.dfs.fabric.microsoft.com/LK_flights.Lakehouse/Tables`
- Files: `abfss://Fabric_2024@onelake.dfs.fabric.microsoft.com/LK_flights.Lakehouse/Files`

You will also need:
- link to https://www.kaggle.com/datasets/camnugent/california-housing-prices
- PySpark notebook and connect it to the Fabric standard session
- data for inference

## Dataset: California Housing Prices

- Dataset Source: Available from the California Housing Prices dataset on Kaggle.
- File Format: CSV (or you can download it through sklearn.datasets in Python).
- Target Variable: **median_house_value**
- Features: Includes _median_income_, _house_age_, _total_rooms_, _population_, etc.

In addition, save this set of values for inference data:

```
longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income
-122.23,37.88,41,880,129,322,126,8.3252
-122.22,37.86,21,7099,1106,2401,1138,8.3014
-122.24,37.85,52,1467,190,496,177,7.2574
-122.25,37.85,52,1274,235,558,219,5.6431
-122.25,37.85,52,1627,280,565,259,3.8462
```


## Step 1: Data preparation

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CaliforniaHousingML").getOrCreate()

# Load historical data from Lakehouse
lakehouse_data_path = "Files/Users/Lakehouse/CaliforniaHousing.csv"
df_lakehouse = spark.read.csv(lakehouse_data_path, header=True, inferSchema=True)

# Load additional data from Data Lake
data_lake_path = "Files/Users/DataLake/NewCaliforniaHousing.csv"
df_datalake = spark.read.csv(data_lake_path, header=True, inferSchema=True)

df_lakehouse.printSchema()
df_datalake.printSchema()

# Merge the datasets 
df = df_lakehouse.unionByName(df_datalake)

df.show(5)


## Step 2: Exploratory Data Analysis (EDA)

Perform Exploratory Data Analysis to understand key characteristics of the dataset. This includes viewing the distribution of key features, checking for missing data, and visualizing relationships.

In [None]:
from pyspark.sql.functions import count, isnan, col, mean, corr

# Step 1: Check for missing data
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Step 2: Compute summary statistics for numerical columns
df.describe().show()

# Step 3: Calculate correlation between features and the target variable
df.select(corr("median_income", "median_house_value").alias("corr_income_housevalue")).show()
df.select(corr("total_rooms", "median_house_value").alias("corr_rooms_housevalue")).show()

# Visualization example (assuming matplotlib is supported for plots)
import matplotlib.pyplot as plt
import pandas as pd

# Convert to Pandas for quick visualizations
pandas_df = df.toPandas()

# Plotting median income vs house value
plt.scatter(pandas_df['median_income'], pandas_df['median_house_value'])
plt.xlabel("Median Income")
plt.ylabel("Median House Value")
plt.title("Income vs House Value")
plt.show()


## Step 3: Feature Engineering

Now, we prepare the dataset for modeling by performing feature engineering, which may include handling missing values, normalizing features, and creating a feature vector.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import when

# Step 1: Handle missing values (impute with the mean for simplicity)
df = df.fillna(df.agg(*[mean(c).alias(c) for c in df.columns]).first().asDict())

# Step 2: Feature Engineering - Assemble all features into a single feature vector
assembler = VectorAssembler(
    inputCols=["median_income", "house_age", "total_rooms", "total_bedrooms", "population"],
    outputCol="features"
)

df_final = assembler.transform(df)

# Select features and the target column for modeling
df_final = df_final.select("features", "median_house_value")
df_final.show(5)


## Step 4: Build and Train the Model

We will now build a Linear Regression model to predict house prices (**median_house_value**) based on the feature vector created in the previous step.

In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Split the data into training and test sets
train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)

# Step 2: Initialize the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="median_house_value")

# Step 3: Train the model
lr_model = lr.fit(train_df)

# Step 4: Print model coefficients and intercept
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")


## Step 5: Model Evaluation

Evaluate the model’s performance using **Root Mean Squared Error** (RMSE) and R-squared to understand how well it predicts house prices.

In [None]:
# Step 1: Make predictions on the test set
predictions = lr_model.transform(test_df)

# Step 2: Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse}")
print(f"R-Squared: {r2}")

# Step 3: Show sample predictions
predictions.select("features", "median_house_value", "prediction").show(5)


Note!

The RMSE is used to measure the error between predicted and actual house prices. The R-squared value explains how much variance is explained by the model.

<Finally, sample predictions are displayed to see how the model performed on the test data.

## Step 6: Model Inference and API Implementation

In this step, we implement a simple function to perform inference. This can be integrated into an API or used as part of a pipeline to make predictions on new data.

In [None]:
def predict_house_price(new_data):
    """
    Function to make predictions on new data.
    Input: new_data - A PySpark DataFrame containing the same features as used in training
    Output: Predicted house prices
    """
    # Ensure the new data has the same feature columns as training data
    assembler = VectorAssembler(
        inputCols=["median_income", "house_age", "total_rooms", "total_bedrooms", "population"],
        outputCol="features"
    )
    
    new_data_transformed = assembler.transform(new_data)
    
    # Use the trained model to make predictions
    predictions = lr_model.transform(new_data_transformed)
    
    return predictions.select("features", "prediction")

# Example usage with new data
new_data_path = "Files/Users/DataLake/InferenceData.csv"
new_df = spark.read.csv(new_data_path, header=True, inferSchema=True)
predictions = predict_house_price(new_df)
predictions.show(5)


You can see the function **predict_house_price** takes new data as input, applies the same feature engineering steps, and uses the trained model to make predictions.

This could be wrapped into an API (e.g., using Flask or FastAPI) to provide real-time predictions.

## Conclusion

This end-to-end PySpark notebook demonstrates how to build a machine learning solution in Microsoft Fabric:

- Data is loaded from both Data Lake and Lakehouse.
- Exploratory Data Analysis (EDA) helps understand the dataset.
- A Linear Regression model is built to predict house prices.
- The model is evaluated and inference is performed on new data.

You can expand this workflow by adding more sophisticated preprocessing, trying different models, or automating the pipeline for real-time data updates and model retraining.