# **ML pipeline** for traning, montoring and tracking ML system at Production level using MLFlow and Evidently AI tools.

In [None]:
# Using ML flow for traning, montoring and tracking ML system at Production level using MLFlow and Evidently AI tools.

In [None]:
# requirements.txt
"""
!pip install svc
!pip install PyYAML
!pip install evidently
!pip install pandas
!pip install numpy
!pip install scikit-learn
!pip install mlflow
"""

In [1]:
# Import libraries

import datetime
import io
import numpy as np
import requests
import zipfile
import pandas as pd
import json
import pprint

from sklearn import datasets, model_selection, ensemble
from scipy.stats import anderson_ksamp

# evidently libraries for monitoring the change in datadrift and modelshift
from evidently.metrics import RegressionQualityMetric, RegressionErrorDistribution, RegressionErrorPlot
from evidently.metric_preset import DataDriftPreset, RegressionPreset
from evidently.pipeline.column_mapping import ColumnMapping
from evidently.options import DataDriftOptions
from evidently.report import Report

# Import MLFlow libraries for tracking training
import mlflow
from mlflow.tracking import MlflowClient

In [2]:
# Warning logging
import warnings
warnings.simplefilter('ignore')
warnings.filterwarnings('ignore')

In [3]:
# Reading the csv data of hourly bike ride sharing
raw_data = pd.read_csv('/content/drive/MyDrive/bike_sharing_dataset/hour.csv', sep=',', header=0, parse_dates=['dteday'])

In [4]:
raw_data.columns

Index(['instant', 'dteday', 'season', 'yr', 'mnth', 'hr', 'holiday', 'weekday',
       'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed',
       'casual', 'registered', 'cnt'],
      dtype='object')

In [7]:
# combining date and hour coloumn to create index for hourly bike ride sharing dataset
raw_data.index = raw_data.apply(lambda row : datetime.datetime.combine(row.dteday.date(), datetime.time(row.hr)), axis=1)

In [9]:
raw_data.head()

Unnamed: 0,instant,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
2011-01-01 00:00:00,1,2011-01-01,1,0,1,0,0,6,0,1,0.24,0.2879,0.81,0.0,3,13,16
2011-01-01 01:00:00,2,2011-01-01,1,0,1,1,0,6,0,1,0.22,0.2727,0.8,0.0,8,32,40
2011-01-01 02:00:00,3,2011-01-01,1,0,1,2,0,6,0,1,0.22,0.2727,0.8,0.0,5,27,32
2011-01-01 03:00:00,4,2011-01-01,1,0,1,3,0,6,0,1,0.24,0.2879,0.75,0.0,3,10,13
2011-01-01 04:00:00,5,2011-01-01,1,0,1,4,0,6,0,1,0.24,0.2879,0.75,0.0,0,1,1


In [10]:
raw_data.tail()

Unnamed: 0,instant,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
2012-12-31 19:00:00,17375,2012-12-31,1,1,12,19,0,1,1,2,0.26,0.2576,0.6,0.1642,11,108,119
2012-12-31 20:00:00,17376,2012-12-31,1,1,12,20,0,1,1,2,0.26,0.2576,0.6,0.1642,8,81,89
2012-12-31 21:00:00,17377,2012-12-31,1,1,12,21,0,1,1,1,0.26,0.2576,0.6,0.1642,7,83,90
2012-12-31 22:00:00,17378,2012-12-31,1,1,12,22,0,1,1,1,0.26,0.2727,0.56,0.1343,13,48,61
2012-12-31 23:00:00,17379,2012-12-31,1,1,12,23,0,1,1,1,0.26,0.2727,0.65,0.1343,12,37,49


In [11]:
# Above dataset contains hourly bike ride sharing hourly data from 1st of January 2011 to 31st of December 2012 ( 2 years)
raw_data.size

295443

In [12]:
raw_data.describe()

Unnamed: 0,instant,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
count,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0,17379.0
mean,8690.0,2.50164,0.502561,6.537775,11.546752,0.02877,3.003683,0.682721,1.425283,0.496987,0.475775,0.627229,0.190098,35.676218,153.786869,189.463088
std,5017.0295,1.106918,0.500008,3.438776,6.914405,0.167165,2.005771,0.465431,0.639357,0.192556,0.17185,0.19293,0.12234,49.30503,151.357286,181.387599
min,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.02,0.0,0.0,0.0,0.0,0.0,1.0
25%,4345.5,2.0,0.0,4.0,6.0,0.0,1.0,0.0,1.0,0.34,0.3333,0.48,0.1045,4.0,34.0,40.0
50%,8690.0,3.0,1.0,7.0,12.0,0.0,3.0,1.0,1.0,0.5,0.4848,0.63,0.194,17.0,115.0,142.0
75%,13034.5,3.0,1.0,10.0,18.0,0.0,5.0,1.0,2.0,0.66,0.6212,0.78,0.2537,48.0,220.0,281.0
max,17379.0,4.0,1.0,12.0,23.0,1.0,6.0,1.0,4.0,1.0,1.0,1.0,0.8507,367.0,886.0,977.0


In [13]:
# Model training  part
target = 'cnt'
prediction = 'prediction'
numerical_features = ['temp', 'atemp', 'hum', 'windspeed', 'hr', 'mnth', 'yr']
categorical_features = ['season', 'workingday', 'holiday']


In [15]:
# In order to training model and track model performance with ML-flow, We will
# only use small sequential part dataset for training.

reference = raw_data.loc["2011-01-01 00:00:00":"2011-01-28 23:00:00"] # for training as reference model
current = raw_data.loc["2011-01-29 00:00:00":"2011-02-28 23:00:00"]

In [17]:
# Spliting data for training and testing
train_X, test_X, train_Y, test_Y = model_selection.train_test_split(
    reference[numerical_features + categorical_features],
    reference[target],
    test_size=0.3
)

In [18]:
# Logging model using MLFlow for Experiment tracking

# Initialize MLflowClient in the jupyter notebook
client = MlflowClient()

# Setting the name for model evaluation with time
mlflow.set_experiment("Bike Sharing Model Evaluation")

# Setting the logging based on model used for training
mlflow.sklearn.autolog()   # enable autologging

# Regression model to be used for training
regressor_model = ensemble.RandomForestRegressor(n_estimators=50, random_state=33)

# start training model within context of ML Flow
with mlflow.start_run() as run:
  regressor_model.fit(train_X, train_Y)


2023/10/14 14:27:32 INFO mlflow.tracking.fluent: Experiment with name 'Bike Sharing Model Evaluation' does not exist. Creating a new experiment.


In [19]:
# Function for fetching log data like metrics, artifacts and others for given run_id.
def fetch_logged_data(run_id):

  client = MlflowClient()
  log_data = client.get_run(run_id).data
  tags = {k:v for k,v in log_data.tags.items() if not k.startswith("mlflow.")}
  artifacts = [f.path for f in client.list_artifacts(run_id, "model")]

  return log_data.params, log_data.metrics, tags, artifacts


In [21]:
# printing log data
params, metrics, tags, artifacts = fetch_logged_data(run.info.run_id)

In [23]:
from pprint import pprint
pprint(run.info.run_id)  # run_id
pprint(params)  # params of training

'8fd9e49786c7433c8e5abe0dfa1172cf'
{'bootstrap': 'True',
 'ccp_alpha': '0.0',
 'criterion': 'squared_error',
 'max_depth': 'None',
 'max_features': '1.0',
 'max_leaf_nodes': 'None',
 'max_samples': 'None',
 'min_impurity_decrease': '0.0',
 'min_samples_leaf': '1',
 'min_samples_split': '2',
 'min_weight_fraction_leaf': '0.0',
 'n_estimators': '50',
 'n_jobs': 'None',
 'oob_score': 'False',
 'random_state': '33',
 'verbose': '0',
 'warm_start': 'False'}


In [24]:
pprint(metrics)  # metrics of training

{'training_mean_absolute_error': 4.441018518518519,
 'training_mean_squared_error': 51.86811481481482,
 'training_r2_score': 0.9797407819877882,
 'training_root_mean_squared_error': 7.201952153049534,
 'training_score': 0.9797407819877882}


In [25]:
pprint(tags)   # tags

{'estimator_class': 'sklearn.ensemble._forest.RandomForestRegressor',
 'estimator_name': 'RandomForestRegressor'}


In [26]:
pprint(artifacts)  # artifacts

['model/MLmodel',
 'model/conda.yaml',
 'model/model.pkl',
 'model/python_env.yaml',
 'model/requirements.txt']


In [30]:
# calculate predictions on training data
preds_train = regressor_model.predict(train_X)
preds_test = regressor_model.predict(test_X)

In [31]:
# adding these prediction in new column "predictions" in original data
train_X['target'] = train_Y
train_X['prediction'] = preds_train

test_X['target'] = test_Y
test_X['prediction'] = preds_test

In [33]:
# Let's do Column Mapping for Reporting the data analysis in Evidently
# For reference : https://docs.evidentlyai.com/user-guide/input-data/column-mapping
# Column mapping helps correctly process the input data.

column_mapping = ColumnMapping()
column_mapping.target = 'target'
column_mapping.prediction = 'prediction'
column_mapping.numerical_features = numerical_features
column_mapping.categorical_features = categorical_features

In [34]:
# Checking Model performance on reference data and current data
# We need two datasets for comparison: reference and current.

# RegressionPreset already contains metrics related with Regression model for comparison
regression_performance_report = Report(metrics=[RegressionPreset(),])


In [35]:
# Adding data for comparison
regression_performance_report.run(reference_data=train_X.sort_index(),
                                  current_data=test_X.sort_index(),
                                  column_mapping=column_mapping)

regression_performance_report.save_html("regression_performance_report_train_test.html")


In [None]:
# Report for comparison of model performance on below will be shown.
#     1. train_data as reference
#     2. test_data as current

"""
For saving and analysis, reports can be saved in .html, or .json() format.
For more details: https://docs.evidentlyai.com/user-guide/tests-and-reports/run-tests
"""

# Production Model training, monitoring and tracking with respect to reference data

In [36]:
# Lets train the model on the whole reference data taken for first 28 days
regressor_model.fit(reference[numerical_features + categorical_features], reference[target])

2023/10/14 16:00:57 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'e3a1b44b0b6e4278bce4e77a4267ae95', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current sklearn workflow


In [37]:
# Create new column mapping for reference training report
column_mapping = ColumnMapping()
column_mapping.target = target
column_mapping.prediction = prediction
column_mapping.numerical_features = numerical_features
column_mapping.categorical_features = categorical_features

In [38]:
# Creating prediction column for reference data for comparison
ref_preds = regressor_model.predict(reference[numerical_features + categorical_features])
reference['prediction'] = ref_preds

In [39]:
# generating report for reference model only
regression_performance_report = Report(metrics=[RegressionPreset(),])
regression_performance_report.run(reference_data=None, current_data=reference,
                                  column_mapping=column_mapping)

regression_performance_report.save_html("regression_performance_report_reference.html")

# **Monitoring model performance after Deployment**

After reference model is trained and deployed for the predictions at endpoint or batch prediction, We need to keep monitoring its performance at fixed interval as per business need.

For example: Monitoring model performance at end of the week.

# Procedure :

1. Capture all the input data entered for endpoint prediction from model at Serving.
2. Run the model with the weekly data and look for any drift in data distribution or decay in model prediction.
3. If model is performing worse than the threshold limit set, Model needs to be re-trained again on the whole data collected till now along with historical data (if needed).



In [40]:
# Lets check model performance for next 1st week
current_preds = regressor_model.predict(current[numerical_features + categorical_features])
current['prediction'] = current_preds


In [41]:
# generating model performance report on next 1 week data
regression_performance_report = Report(metrics=[RegressionPreset(),])
regression_performance_report.run(reference_data=reference,
                                  current_data=current.loc["2011-01-29 00:00:00":"2011-02-07 23:00:00"],
                                  column_mapping=column_mapping)

regression_performance_report.save_html("regression_performance_report_reference_current_1st_week.html")

In [42]:
# Checking model performance on Week-2 data as compared to reference data
# here we will look more into the Data distribution and drift in input data too.

regression_performance_report = Report(metrics=[RegressionErrorPlot(),
                                                RegressionQualityMetric(),
                                                RegressionErrorDistribution()
                                                ])
regression_performance_report.run(reference_data=reference,
                                  current_data=current.loc["2011-02-08 00:00:00":"2011-02-14 23:00:00"],
                                  column_mapping=column_mapping)

regression_performance_report.save_html("regression_performance_report_reference_current_2nd_week.html")

In [43]:
# Checking model performance on Week-3 data as compared to reference data
# here we will look more into the Data distribution and drift in input data too.

# since categorical values won't contribute much to data drift calculation because
# most of them are periodic
# So, Letsc create new column mapping using only numerical features
column_mapping = ColumnMapping()
column_mapping.target = target
column_mapping.prediction = prediction
column_mapping.numerical_features = numerical_features
column_mapping.categorical_features = []

regression_performance_report = Report(metrics=[RegressionErrorPlot(),
                                                RegressionQualityMetric(),
                                                RegressionErrorDistribution(),
                                                DataDriftPreset()
                                                ])
regression_performance_report.run(reference_data=reference,
                                  current_data=current.loc["2011-02-15 00:00:00":"2011-02-21 23:00:00"],
                                  column_mapping=column_mapping)

regression_performance_report.save_html("regression_performance_report_reference_current_3rd_week.html")

In [45]:
# for Data-Drift testing, It uses various tests like below:
# 1. KOLMOGOROV-SMIRNOV
# 2. T-TEST
# 3. Z-test

# All these tests are statistical methods for calculating distribution drift values.

# **Conituous** Experimentation using **MLFlow** and monitoring logs using **Evidently AI**



In [59]:
# When we need to keep monitoring model experimentation and performance continuously,
# We will be geeting data from different data sources like Cloud storage or data lake or warehouse etc.

# thus, these data will come in batches:
# We can defice an end-to-end flow for model experimentation and performance monitoring

client = MlflowClient()
mlflow.search_experiments("Model Quality Evaluation")

experiment_batches = [
    ("2011-01-29 00:00:00", "2011-02-07 23:00:00"),
    ("2011-02-08 00:00:00", "2011-02-14 23:00:00"),
    ("2011-02-15 00:00:00", "2011-02-21 23:00:00")
]

for dates in experiment_batches:

  with mlflow.start_run() as run:

    # logging start and end dates
    mlflow.log_param("date begin", dates[0])
    mlflow.log_param("date end", dates[1])

    # defining report metrics
    reports = Report(metrics=[RegressionQualityMetric(),
                              RegressionErrorPlot(),
                              RegressionErrorDistribution(),
                              DataDriftPreset()])

    column_mapping_drift = ColumnMapping()
    column_mapping_drift.target = target
    column_mapping_drift.prediction = prediction
    column_mapping_drift.numerical_features = numerical_features
    column_mapping_drift.categorical_features = []

    reports.run(reference_data=reference,
                current_data=current.loc[dates[0]:dates[1]],
                column_mapping=column_mapping_drift)

    logged_data_json = json.loads(reports.json())

    me = logged_data_json['metrics'][0]['result']['rmse_default']
    mae = logged_data_json['metrics'][0]['result']['mean_abs_error_default']
    drift_share = logged_data_json['metrics'][3]['result']['drift_share']

    # log metrics into mlflow
    mlflow.log_metric("me", round(me, 3))
    mlflow.log_metric("mae", round(mae, 3))
    mlflow.log_metric("drift_share", round(drift_share, 3))

    pprint(run.info)





<RunInfo: artifact_uri='file:///content/mlruns/812302269961842814/31d59e5639da4860886a9a6d4cdedcb3/artifacts', end_time=None, experiment_id='812302269961842814', lifecycle_stage='active', run_id='31d59e5639da4860886a9a6d4cdedcb3', run_name='melodic-lamb-999', run_uuid='31d59e5639da4860886a9a6d4cdedcb3', start_time=1697306216004, status='RUNNING', user_id='root'>
<RunInfo: artifact_uri='file:///content/mlruns/812302269961842814/45ca61457e2e4089a4a96c5a37effb34/artifacts', end_time=None, experiment_id='812302269961842814', lifecycle_stage='active', run_id='45ca61457e2e4089a4a96c5a37effb34', run_name='ambitious-snail-939', run_uuid='45ca61457e2e4089a4a96c5a37effb34', start_time=1697306216509, status='RUNNING', user_id='root'>
<RunInfo: artifact_uri='file:///content/mlruns/812302269961842814/b18b3a784e8342a1afc2bf89bf4adea0/artifacts', end_time=None, experiment_id='812302269961842814', lifecycle_stage='active', run_id='b18b3a784e8342a1afc2bf89bf4adea0', run_name='skittish-horse-57', run_uu

In [None]:
# All the experiments details are stored in MLflow runs as per run_ids