# Using MLFlow and Evidently to Evaluate Data Drift

In this example, we will explore the MLflow integration with Evidently.

This notebook shows how you can use the Evidently and MLflow to:
* calculate data drift for the model, performed as batch checks 
* log data drift using MLflow Tracking
* explore the result using MLflow UI


In [1]:
%load_ext autoreload
%autoreload 2

import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

In [17]:
import json
import pandas as pd
import plotly.graph_objects as go
import requests
import zipfile
import io

from datetime import datetime
from typing import List, Dict, Tuple, Union, Optional

from evidently.pipeline.column_mapping import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

More information about the dataset can be found in Kaggle Playground Competition: https://www.kaggle.com/c/bike-sharing-demand/data?select=train.csv

Acknowledgement: Fanaee-T, Hadi, and Gama, Joao, 'Event labeling combining ensemble detectors and background knowledge', Progress in Artificial Intelligence (2013): pp. 1-15, Springer Berlin Heidelberg

In [3]:
#load data
content = requests.get("https://archive.ics.uci.edu/static/public/275/bike+sharing+dataset.zip").content
with zipfile.ZipFile(io.BytesIO(content)) as arc:
    raw_data = pd.read_csv(arc.open("day.csv"), header=0, sep=',', parse_dates=['dteday'])

In [4]:
#observe data structure
raw_data.head()

Unnamed: 0,instant,dteday,season,yr,mnth,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
0,1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
1,2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
2,3,2011-01-03,1,0,1,0,1,1,1,0.196364,0.189405,0.437273,0.248309,120,1229,1349
3,4,2011-01-04,1,0,1,0,2,1,1,0.2,0.212122,0.590435,0.160296,108,1454,1562
4,5,2011-01-05,1,0,1,0,3,1,1,0.226957,0.22927,0.436957,0.1869,82,1518,1600


In [5]:
#set column mapping for Evidently Profile
data_columns = ColumnMapping()
data_columns.datetime = 'dteday'
data_columns.numerical_features = ['weathersit', 'temp', 'atemp', 'hum', 'windspeed']
data_columns.categorical_features = ['holiday', 'workingday']
data_columns.target = 'cnt'

## Dataset Drift

In [6]:

# Evaluate data drift with Evidently Profile
def get_data_drift_report(reference, production, column_mapping) -> Report:
    """
    Returns a list with pairs (feature_name, drift_score)
    Drift Score depends on the selected statistical test or distance and the threshold
    """    
    data_drift_report = Report(metrics=[
        DataDriftPreset(drift_share=0.4)
        ])
    data_drift_report.run(reference_data=reference, current_data=production, column_mapping=column_mapping)

    return data_drift_report

def get_data_drift_metrics(report: Dict) -> Dict:
    """
    Returns a list with pairs (feature_name, drift_score)
    Drift Score depends on the selected statistical test or distance and the threshold
    """ 
    metrics = {}   
    for metric in ['dataset_drift', 
                   'number_of_drifted_columns', 
                   'share_of_drifted_columns']: 
        metrics.update({metric: report["metrics"][0]["result"][metric]})

    return metrics

## Setup Experiments

- option 1: each period (fold) is a separate experiment Run -> save metrics for each Run
- option 2: all folds are part of a single expeirment -> aggrregate metrics
- 

In [7]:
# Set reference dates
reference_dates = ('2011-01-01 00:00:00','2011-01-28 23:00:00')

# Set experiment batches dates
experiment_batches = [
    ('2011-01-01 00:00:00','2011-01-29 23:00:00'),
    ('2011-01-29 00:00:00','2011-02-07 23:00:00'),
    ('2011-02-07 00:00:00','2011-02-14 23:00:00'),
    ('2011-02-15 00:00:00','2011-02-21 23:00:00'),  
]

## Setup MLFLow Experiment

In [10]:
from config import EXPERIMENT_NAME, MLFLOW_TRACKING_URI


mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient()
print(f"Client tracking uri: {client.tracking_uri}")

EXPERIMENT_NAME = "Data Drift"

# Get or Create an experiment by name 
experiment = client.get_experiment_by_name(EXPERIMENT_NAME)

if experiment: 
    
    experiment_id = experiment.experiment_id if experiment else None
    # Show experiment info
    print("Name: {}".format(experiment.name))
    print("Experiment ID: {}".format(experiment.experiment_id))
    print("Artifact Location: {}".format(experiment.artifact_location))
    print("Lifecycle_stage: {}".format(experiment.lifecycle_stage))

else:
    
    # Create_experiment
    experiment_id = client.create_experiment(
        EXPERIMENT_NAME ,
        # artifact_location=MLFLOW_TRACKING_URI,
        # tags={"version": "v1", "priority": "P1"},
    )
    print("Experiment ID: {}".format(experiment_id))

Client tracking uri: http://localhost:5000
Experiment ID: 234193429068143937


### Option 1 

In [11]:
#start new run
for date in experiment_batches:
    with mlflow.start_run(experiment_id=experiment_id) as run: 
        
        
        # Log parameters
        mlflow.log_param("begin", date[0])
        mlflow.log_param("end", date[1])

        # Log metrics
        report: Dict = get_data_drift_report(
            raw_data.loc[raw_data.dteday.between(reference_dates[0], reference_dates[1])], 
            raw_data.loc[raw_data.dteday.between(date[0], date[1])], 
            column_mapping=data_columns)
        
        metrics: Dict = get_data_drift_metrics(report.as_dict())
        
        for metric in metrics.keys():
            mlflow.log_metric(metric, round(metrics[metric], 3))

        print(run.info)

<RunInfo: artifact_uri='mlflow-artifacts:/234193429068143937/22cb151987d146458c17ac951631063b/artifacts', end_time=None, experiment_id='234193429068143937', lifecycle_stage='active', run_id='22cb151987d146458c17ac951631063b', run_name='big-snake-660', run_uuid='22cb151987d146458c17ac951631063b', start_time=1688577393424, status='RUNNING', user_id='mnrozhkov'>
<RunInfo: artifact_uri='mlflow-artifacts:/234193429068143937/1b1c3dea510644b49c41537753bab7fc/artifacts', end_time=None, experiment_id='234193429068143937', lifecycle_stage='active', run_id='1b1c3dea510644b49c41537753bab7fc', run_name='dapper-mink-125', run_uuid='1b1c3dea510644b49c41537753bab7fc', start_time=1688577393657, status='RUNNING', user_id='mnrozhkov'>
<RunInfo: artifact_uri='mlflow-artifacts:/234193429068143937/f75efe3a785840a6b0bd15b2b2595ff8/artifacts', end_time=None, experiment_id='234193429068143937', lifecycle_stage='active', run_id='f75efe3a785840a6b0bd15b2b2595ff8', run_name='big-stork-138', run_uuid='f75efe3a7858

In [12]:
# from pprint import pprint

# feature = 'temp'

# # report["metrics"][1]["result"]["drift_by_columns"][feature]["drift_score"]
# pprint(list(report["metrics"][1]["result"].keys()))
# pprint(report["metrics"][1]["result"]['share_of_drifted_columns'])
# pprint(report["metrics"][1]["result"]['dataset_drift'])
# pprint(report["metrics"][0])

# data_drift_metrics = report["metrics"][0]["result"]
# pprint(data_drift_metrics)

### Option 2

In [13]:
#evaluate data drift with Evidently Profile
def detect_dataset_drift(reference, production, column_mapping, get_ratio=False):
    """
    Returns True if Data Drift is detected, else returns False.
    If get_ratio is True, returns the share of drifted features.
    The Data Drift detection depends on the confidence level and the threshold.
    For each individual feature Data Drift is detected with the selected confidence (default value is 0.95).
    Data Drift for the dataset is detected if share of the drifted features is above the selected threshold (default value is 0.5).
    """
    
    data_drift_report = Report(metrics=[
        DataDriftPreset(drift_share=0.4)
        ])
    data_drift_report.run(reference_data=reference, current_data=production, column_mapping=column_mapping)
    report = data_drift_report.as_dict()
    
    if get_ratio:
        return report["metrics"][0]["result"]["drift_share"]
    else:
        return report["metrics"][0]["result"]["dataset_drift"]
    
    
#evaluate data drift with Evidently Profile
def detect_features_drift(reference, production, column_mapping, get_scores=False):
    """
    Returns True if Data Drift is detected, else returns False. 
    If get_scores is True, returns scores value (like p-value) for each feature.
    The Data Drift detection depends on the confidence level and the threshold.
    For each individual feature Data Drift is detected with the selected confidence (default value is 0.95).
    """
    
    data_drift_report = Report(metrics=[DataDriftPreset()])
    data_drift_report.run(reference_data=reference, current_data=production, column_mapping=column_mapping)
    report = data_drift_report.as_dict()
    
    drifts = []
    num_features = column_mapping.numerical_features if column_mapping.numerical_features else []
    cat_features = column_mapping.categorical_features if column_mapping.categorical_features else []
    for feature in num_features + cat_features:
        drift_score = report["metrics"][1]["result"]["drift_by_columns"][feature]["drift_score"]
        if get_scores:
            drifts.append((feature, drift_score))
        else:
            drifts.append((feature, report["metrics"][1]["result"]["drift_by_columns"][feature]["drift_detected"]))
             
    return drifts


In [14]:
def plot_drifted_features(drifted_features_data, experiment_batches):
    """
    Plots the drifted features for each batch in the experiment.
    """
    
    features_historical_drift_frame = pd.DataFrame(
        drifted_features_data, 
        columns = data_columns.numerical_features + data_columns.categorical_features)
    
    fig = go.Figure(data=go.Heatmap(
                       z = features_historical_drift_frame.astype(int).transpose(),
                       x = [x[1] for x in experiment_batches],
                       y = data_columns.numerical_features,
                       hoverongaps = False,
                       xgap = 1,
                       ygap = 1,
                       zmin = 0,
                       zmax = 1,
                       showscale = False,
                       colorscale = 'Bluered'
    ))

    fig.update_xaxes(side="top")

    fig.update_layout(
        xaxis_title = "Timestamp",
        yaxis_title = "Feature Drift"
    )

    return fig
    
# fig = plot_drifted_features(features_historical_drift, experiment_batches)
# fig.show()

In [15]:
def plot_drifted_feature_scores(drifted_features_data, experiment_batches):
    """
    Plots the drifted features for each batch in the experiment.
    """
    

    drifted_features_data = pd.DataFrame(
        drifted_features_data, 
        columns = data_columns.numerical_features + data_columns.categorical_features)


    fig = go.Figure(data=go.Heatmap(
                    z = drifted_features_data.transpose(),
                    x = [x[1] for x in experiment_batches],
                    y = drifted_features_data.columns,
                    hoverongaps = False,
                    xgap = 1,
                    ygap = 1,
                    zmin = 0,
                    zmax = 1,
                    colorscale = 'reds_r'
                    )
                )

    fig.update_xaxes(side="top")

    fig.update_layout(
        xaxis_title = "Timestamp",
        yaxis_title = "p-value"
    )
    return fig

# fig = plot_drifted_feature_scores(features_historical_drift_pvalues, experiment_batches)
# fig.show("notebook")

In [18]:
###start new run
    
with mlflow.start_run(experiment_id=experiment_id) as run: 
    
    
    features_historical_drift = []
    features_historical_drift_pvalues = []
    metrics_parent = {}

    for date in experiment_batches:
        print(date)
        
        # Calculate Dataset Drift for the fold
        with mlflow.start_run(run_name=date[1], 
                              nested=True,
                              experiment_id=experiment_id) as child_run:

            # Log parameters
            mlflow.log_param("begin", date[0])
            mlflow.log_param("end", date[1])

            # Log metrics
            report: Dict = get_data_drift_report(
                raw_data.loc[raw_data.dteday.between(reference_dates[0], reference_dates[1])], 
                raw_data.loc[raw_data.dteday.between(date[0], date[1])], 
                column_mapping=data_columns)
            
            metrics: Dict = get_data_drift_metrics(report.as_dict())
            ts = int(datetime.strptime(date[1], '%Y-%m-%d %H:%M:%S').timestamp())
            
            for metric in metrics.keys():
                mlflow.log_metric(
                    key=metric, 
                    value=round(metrics[metric], 3), 
                    step=ts) # use last date as step

            print(run.info)
            
            if metrics['dataset_drift'] is True:
                date2str = date[1].replace(" ", "-").replace(":", "-")
                report_path = f"data_drift_report_{date2str}.html"
                report.save_html(report_path)
                mlflow.log_artifact(report_path)
                
                metrics_parent.update({"dataset_drift": metrics['dataset_drift']})

        # Log parent Run metrics
        mlflow.log_metrics(metrics_parent)
        
        # Calculate Features Drift 
        drift_features = detect_features_drift(raw_data.loc[raw_data.dteday.between(reference_dates[0],reference_dates[1])], 
                           raw_data.loc[raw_data.dteday.between(date[0], date[1])], 
                           column_mapping=data_columns)
        features_historical_drift.append([x[1] for x in drift_features])
        fig = plot_drifted_features(features_historical_drift, experiment_batches)
        fig.write_html("features_drift.html")
        mlflow.log_artifact("features_drift.html")
        
        
        # Calculate Features Drift scores (p-value)
        drift_feature_scores = detect_features_drift(
            raw_data.loc[raw_data.dteday.between(reference_dates[0], reference_dates[1])], 
            raw_data.loc[raw_data.dteday.between(date[0], date[1])],
            column_mapping=data_columns,
            get_scores=True)
        features_historical_drift_pvalues.append([x[1] for x in drift_feature_scores])
        fig = plot_drifted_feature_scores(features_historical_drift_pvalues, experiment_batches)
        fig.write_html("features_drift_pvalues.html")
        mlflow.log_artifact("features_drift_pvalues.html")
    

('2011-01-01 00:00:00', '2011-01-29 23:00:00')
<RunInfo: artifact_uri='mlflow-artifacts:/234193429068143937/60c4cf25133945a1867b8939ccb32712/artifacts', end_time=None, experiment_id='234193429068143937', lifecycle_stage='active', run_id='60c4cf25133945a1867b8939ccb32712', run_name='spiffy-kit-798', run_uuid='60c4cf25133945a1867b8939ccb32712', start_time=1688577508828, status='RUNNING', user_id='mnrozhkov'>
('2011-01-29 00:00:00', '2011-02-07 23:00:00')
<RunInfo: artifact_uri='mlflow-artifacts:/234193429068143937/60c4cf25133945a1867b8939ccb32712/artifacts', end_time=None, experiment_id='234193429068143937', lifecycle_stage='active', run_id='60c4cf25133945a1867b8939ccb32712', run_name='spiffy-kit-798', run_uuid='60c4cf25133945a1867b8939ccb32712', start_time=1688577508828, status='RUNNING', user_id='mnrozhkov'>
('2011-02-07 00:00:00', '2011-02-14 23:00:00')
<RunInfo: artifact_uri='mlflow-artifacts:/234193429068143937/60c4cf25133945a1867b8939ccb32712/artifacts', end_time=None, experiment_i

In [19]:
metrics

{'dataset_drift': True,
 'number_of_drifted_columns': 4,
 'share_of_drifted_columns': 0.5}

True

# Prdiction Monitoring

- monitor predictions for period 
- use date as a Run name 
- save report if there is a Drift 
- run from Python script

In [20]:
from evidently.pipeline.column_mapping import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import TargetDriftPreset, RegressionPreset

In [21]:
# Set reference dates
reference_dates = ('2011-01-01 00:00:00','2011-01-28 23:00:00')

# Set experiment batches dates
experiment_batches = [
    ('2011-01-01 00:00:00','2011-01-29 23:00:00'),
    # ('2011-01-29 00:00:00','2011-02-07 23:00:00'),
    # ('2011-02-07 00:00:00','2011-02-14 23:00:00'),
    # ('2011-02-15 00:00:00','2011-02-21 23:00:00'),  
]

In [22]:
from config import MLFLOW_TRACKING_URI


mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient()
print(f"Client tracking uri: {client.tracking_uri}")

EXPERIMENT_NAME = "Prediction Monitoring"

# Get or Create an experiment by name 
experiment = client.get_experiment_by_name(EXPERIMENT_NAME)

if experiment: 
    
    experiment_id = experiment.experiment_id if experiment else None
    # Show experiment info
    print("Name: {}".format(experiment.name))
    print("Experiment ID: {}".format(experiment.experiment_id))
    print("Artifact Location: {}".format(experiment.artifact_location))
    print("Lifecycle_stage: {}".format(experiment.lifecycle_stage))

else:
    
    # Create_experiment
    experiment_id = client.create_experiment(
        EXPERIMENT_NAME,
        # artifact_location=MLFLOW_TRACKING_URI,
        # tags={"version": "v1", "priority": "P1"},
    )
    print("Experiment ID: {}".format(experiment_id))

Client tracking uri: http://localhost:5000
Experiment ID: 337416877405482141


In [23]:
# Evaluate data drift with Evidently Profile
# def get_data_drift_report(reference, production, column_mapping) -> Report:
#     """
#     Returns a list with pairs (feature_name, drift_score)
#     Drift Score depends on the selected statistical test or distance and the threshold
#     """    


#     return num_target_drift_report

def get_data_drift_metrics(report: Dict) -> Dict:
    """
    Returns a list with pairs (feature_name, drift_score)
    Drift Score depends on the selected statistical test or distance and the threshold
    """ 
    metrics = {}   
    for metric in ['dataset_drift', 
                   'number_of_drifted_columns', 
                   'share_of_drifted_columns']: 
        metrics.update({metric: report["metrics"][0]["result"][metric]})

    return metrics

In [24]:
#start new run
for date in experiment_batches:
    with mlflow.start_run(experiment_id=experiment_id) as run: 
        
        
        # Log parameters
        mlflow.log_param("begin", date[0])
        mlflow.log_param("end", date[1])

        # Log metrics
        # report: Dict = get_data_drift_report(
        #     raw_data.loc[raw_data.dteday.between(reference_dates[0], reference_dates[1])], 
        #     raw_data.loc[raw_data.dteday.between(date[0], date[1])], 
        #)
        
        reference = raw_data.loc[raw_data.dteday.between(reference_dates[0], reference_dates[1])]
        production = raw_data.loc[raw_data.dteday.between(date[0], date[1])]
        column_mapping=data_columns
        
        
        num_target_drift_report = Report(metrics=[
            TargetDriftPreset(),
        ])

        num_target_drift_report.run(
            reference_data=reference, 
            current_data=production, 
            column_mapping=column_mapping)
        
        # reg_performance_report = Report(metrics=[
        #     RegressionPreset(),
        # ]) 
        # reg_performance_report.run(reference_data=reference, current_data=production, column_mapping=column_mapping)
        
        
        # metrics: Dict = get_data_drift_metrics(report.as_dict())
        
        # for metric in metrics.keys():
        #     mlflow.log_metric(metric, round(metrics[metric], 3))

        # print(run.info)

In [25]:
target_report = num_target_drift_report.as_dict()

for metric in target_report['metrics']:
    print(metric['metric'])

ColumnDriftMetric
ColumnCorrelationsMetric
TargetByFeaturesTable
