
**Summary of this Notebook:**

This notebook demonstrates a simplified MLOps process, including:

1. **ETL Process:**
   - Extract data from an operational database (e.g., SQL Server) and save it to a datalake in parquet format.
   - Read and clean the data from the datalake for model training.

2. **MLOps Process:**
   - Use GridSearchCV to find the best model.
   - Compare the current model's score with the previous best score.
   - Deploy the model if it performs better, logging all relevant parameters and features for future comparison.

The process can be automated to run at regular intervals or triggered manually when new data arrives.   

**Warning**: Since this is a demo standard security features are not followed

In [0]:
storage_account_name = "sastest247"
container_name = "price-data-parquet"
account_key = ""


spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    account_key
)


ETL Part : Extracted the data from the sql and saved into the datalake as parquet file

In [0]:
# Read data from SQL Server
jdbc_url = "jdbc:sqlserver://servername.database.windows.net:1433;databaseName=houspredictiondb"
connection_properties = {
    "user": "",
    "password": "",
    "driver": ""
}

df = spark.read.jdbc(url=jdbc_url, table="HouseDetails", properties=connection_properties)

# Save the DataFrame as Parquet in Azure Data Lake
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/house_details.parquet"
df.write.mode("overwrite").parquet(output_path)


In [0]:
import mlflow
import mlflow.azure
import mlflow.sklearn
from pyspark.sql.functions import col
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from xgboost import XGBRegressor

import pandas as pd

Data Engineering ETL Part: For simplicity the data is read as parquet format without any manipulations

In [0]:
# Read Parquet file from Azure storage
file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/house_details.parquet"

# Read Parquet file from Azure storage
df = spark.read.parquet(file_path)

# Select relevant features and target with non-null values
selected_features = ["Neighborhood", "YearBuilt", "TotalBsmtSF", "GrLivArea", "OverallQual", "FullBath", "TotRmsAbvGrd", "GarageArea"]
target = "SalePrice"

#sample for data cleaning. This can be extended as per the requirements
data = df.select(selected_features + [target]).dropna().toPandas()



In [0]:
data.head()

Unnamed: 0,Neighborhood,YearBuilt,TotalBsmtSF,GrLivArea,OverallQual,FullBath,TotRmsAbvGrd,GarageArea,SalePrice
0,Gilbert,1990,1000,1000,4,3,4,3,0.0
1,Gilbert,1900,5000,2000,4,4,5,1000,0.0
2,CollgCr,2003,856,1710,7,2,8,548,208500.0
3,Veenker,1976,1262,1262,6,2,6,460,181500.0
4,CollgCr,2001,920,1786,7,2,6,608,223500.0


ML Part

In [0]:
# Split data into training and test sets
X = data[selected_features]
y = data[target]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Preprocessing pipelines for both numeric and categorical features
numeric_features = ["YearBuilt", "TotalBsmtSF", "GrLivArea", "OverallQual", "FullBath", "TotRmsAbvGrd", "GarageArea"]
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_features = ["Neighborhood"]
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore')),
    ('scaler', StandardScaler(with_mean=False))
])

# Combine preprocessing steps
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# Define regression models to evaluate
models = {
    'RandomForest': RandomForestRegressor(random_state=42),
    'LinearRegression': LinearRegression(),
    'SVR': SVR(),
    'XGBoost': XGBRegressor(random_state=42)
}

# Set up parameter grids for each model
param_grids = {
    'RandomForest': {'regressor__n_estimators': [100, 200]},
    'LinearRegression': {},
    'SVR': {'regressor__C': [0.1, 1, 10], 'regressor__kernel': ['linear', 'rbf']},
    'XGBoost': {'regressor__n_estimators': [100, 200], 'regressor__learning_rate': [0.01, 0.1]}
}



In [0]:
best_model = None
best_score = float('-inf')

# Start an MLflow run
with mlflow.start_run() as run:
    for model_name, model in models.items():
        pipeline = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('regressor', model)
        ])
        
        grid_search = GridSearchCV(pipeline, param_grids[model_name], cv=5, scoring='r2')
        grid_search.fit(X_train, y_train)
        
        if grid_search.best_score_ > best_score:
            best_score = grid_search.best_score_
            best_model = grid_search.best_estimator_
        
        print(f"Model: {model_name}, Best Score: {grid_search.best_score_}, Best Params: {grid_search.best_params_}")
    
    # Log the best model to MLflow and the hyper parameters used
    mlflow.sklearn.log_model(best_model, "house_price_model")
    mlflow.log_metric("best_score", best_score)
    mlflow.log_params(grid_search.best_params_)
    
    
    # Print the model URI
    model_uri = mlflow.get_artifact_uri("house_price_model")
    print(f"Model URI: {model_uri}")

Model: RandomForest, Best Score: -5.691300159723833, Best Params: {'regressor__n_estimators': 100}
Model: LinearRegression, Best Score: 0.17212479387021576, Best Params: {}
Model: SVR, Best Score: 0.3215192564803498, Best Params: {'regressor__C': 10, 'regressor__kernel': 'linear'}
Model: XGBoost, Best Score: -1.1962150067214454, Best Params: {'regressor__learning_rate': 0.01, 'regressor__n_estimators': 100}




Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Model URI: dbfs:/databricks/mlflow-tracking/874797719351422/a5ad17f6d6de4cbfb499770906599299/artifacts/house_price_model


In [0]:
from sklearn.metrics import r2_score, mean_squared_error

# Test and print the results for the XGBoost model
xgboost_model = models['XGBoost']
pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', xgboost_model)
])

grid_search = GridSearchCV(pipeline, param_grids['XGBoost'], cv=5, scoring='r2')
grid_search.fit(X_train, y_train)

# Predict on the test set
y_pred = grid_search.predict(X_test)

# Evaluate the model
r2 = r2_score(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)

print(f"XGBoost Model Test R2: {r2}")
print(f"XGBoost Model Test MSE: {mse}")

Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

XGBoost Model Test R2: 0.01285546937727755
XGBoost Model Test MSE: 141478693027.778


MLOps Part

In [0]:
from mlflow.tracking import MlflowClient

# Get the best score from the previous run
client = MlflowClient()

experiment_ids = [123455566]
previous_run = client.search_runs(experiment_ids, order_by=["metrics.best_score DESC"], max_results=1)[0]
previous_best_score = previous_run.data.metrics['best_score']
previous_best_score

0.8276443851906403

In [0]:
!pip install azureml-core
!pip install azureml-mlflow
!pip install azureml-core

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


Deploy the model and create a serving endpoint for inference

In [0]:
from mlflow.deployments import get_deploy_client

# Compare the current XGBoost model with the previous best model.If it is better than the previous log and serve the model
if r2 > previous_best_score:
    # Log the new best model
    with mlflow.start_run() as run:
        mlflow.sklearn.log_model(grid_search.best_estimator_, "house_price_model")
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mse", mse)
        mlflow.log_params(grid_search.best_params_)
        
        # Register the model
        model_uri = mlflow.get_artifact_uri("house_price_model")
        model_name = "house_price_model"
        res =  mlflow.register_model(model_uri, model_name)
        print("version",res)
        # Serve the model
        client.transition_model_version_stage(
            name=model_name,
            version=res.version,
            stage="Production"
        )
        deploy_client = get_deploy_client("databricks")

        serving_endpoint = deploy_client.get_endpoint("house_price_model")
        if serving_endpoint is None:
            endpoint = deploy_client.create_endpoint(
            name="house_price_model",
            config={
            "served_entities": [
              {
                "name": "house_price_model",
                "entity_name": "house_price_model",
                "entity_version": res.version,
                "workload_size": "Small",
                "scale_to_zero_enabled": True
                }
            ],
            "traffic_config": {
                "routes": [
                {
                    "served_model_name": "house_price_model",
                    "traffic_percentage": 100
                }
                ]
            }
        })
        else:
           
            deploy_client.update_endpoint(
            endpoint="house_price_model",config={
            "served_entities": [
              {
                "name": "house_price_model",
                "entity_name": "house_price_model",
                "entity_version": res.version,
                "workload_size": "Small",
                "scale_to_zero_enabled": True
                }
            ]})
            print(f"New best model registered and served. Model URI: {model_uri}")
          
else:
    print("Current XGBoost model is not better than the previous best model.")



Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Registered model 'house_price_model' already exists. Creating a new version of this model...
2025/06/09 14:56:48 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: house_price_model, version 9
Created version '9' of model 'house_price_model'.
  client.transition_model_version_stage(
  deploy_client.update_endpoint(


version <ModelVersion: aliases=[], creation_timestamp=1749481008838, current_stage='None', description=None, last_updated_timestamp=1749481008838, name='house_price_model', run_id=None, run_link='', source='dbfs:/databricks/mlflow-tracking/874797719351422/68dac2b7619e44a7a5424d49a8b118d2/artifacts/house_price_model', status='PENDING_REGISTRATION', status_message=None, tags={}, user_id='5289343852503447', version='9'>
New best model registered and served. Model URI: dbfs:/databricks/mlflow-tracking/874797719351422/68dac2b7619e44a7a5424d49a8b118d2/artifacts/house_price_model
