## List of features from Evidently we will use
Data drift doc: https://docs.seldon.io/projects/alibi-detect/en/stable/examples/cd_ks_cifar10.html  
Data drift ref paper: https://arxiv.org/pdf/1810.11953.pdf
- Data Quality
    - Metrics
        - ConflictPredictionMetric (for cur)
        - ConflictTargetMetric (for ref)
    - Tests
        - TestConflictPrediction (for cur)
        - TestConflictTarget (for ref)
- Data Drift
    - Metrics 
        - EmbeddingsDriftMetric (MMD for UAE)
        - EmbeddingsDriftMetric (Ratio (KS) for UAE)
        - EmbeddingsDriftMetric (Euclidean distance for UAE)
        - EmbeddingsDriftMetric (Model (0.7 thr) for UAE)
        - EmbeddingsDriftMetric (MMD for BBSD)
        - EmbeddingsDriftMetric (Ratio (KS) for BBSD)
    - Tests
        - TestEmbeddingsDrift (MMD for UAE)
        - TestEmbeddingsDrift (Ratio (KS) for UAE)
        - TestEmbeddingsDrift (Euclidean distance for UAE)
        - TestEmbeddingsDrift (Model (0.7 thr) for UAE)
        - TestEmbeddingsDrift (MMD for BBSD)
        - TestEmbeddingsDrift (Ratio (KS) for BBSD)
     
**Note**: MMD is a popular kernel-based technique for multivariate two-sample testing and KS is a popluar alternative of MMG for multiple univariate tests. In the paper, the author use Bonferroni correction for aggregating multiple univariate results for KS. In Evidently's doc, they also support the KS test, but they use the word **"share of drifted embeddings"** for a single final drift score which, honestly, we don't know what it means (maybe average or percentage?). Anyways, we did use Evidently's KS implementation here for the sake of simplicity. If you are more interested in KS using Bonferroni correction as in the paper, be sure to check Alibi Detect documentation, they have implemented according to the ref paper.

In [1]:
from evidently import ColumnMapping
from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently.metric_preset import DataDriftPreset
from evidently.test_preset import NoTargetPerformanceTestPreset, DataDriftTestPreset
from evidently.ui.remote import RemoteWorkspace
from evidently.ui.workspace import Workspace, WorkspaceBase
from evidently.metrics import (ConflictPredictionMetric, DatasetCorrelationsMetric,
                               EmbeddingsDriftMetric, ConflictTargetMetric)
from evidently.tests import (TestConflictPrediction, TestPredictionFeaturesCorrelations,
                             TestEmbeddingsDrift, TestConflictTarget)
from evidently.ui.dashboards import (CounterAgg, DashboardPanelCounter, DashboardPanelPlot, 
                                     PanelValue, PlotType, ReportFilter)
from evidently.metrics.data_drift.embedding_drift_methods import model, ratio, distance, mmd

In [2]:
def create_report():
    data_drift_report = Report(
        metrics=[
            ConflictTargetMetric(),
            ConflictPredictionMetric(),
            # mmd with uae # note: currently, mmd failed. please check docs
            EmbeddingsDriftMetric('uae', drift_method = mmd(threshold = 0.015, quantile_probability = 0.95)),
            # ks with uae
            EmbeddingsDriftMetric('uae', drift_method = ratio(
                                        component_stattest='ks',
                                        threshold = 0.05
                                    )
                                 ),
            # euclidean with uae
            EmbeddingsDriftMetric('uae', 
                                  drift_method = distance(
                                      dist = 'euclidean',
                                      threshold = 0.2
                                  )
                                 ),
            # model with uae
            EmbeddingsDriftMetric('uae', drift_method = model(threshold = 0.75)),
            # mmd with bbsd
            EmbeddingsDriftMetric('bbsd', drift_method = mmd(threshold = 0.015, quantile_probability = 0.95)),
            # ks with bbsd
            EmbeddingsDriftMetric('bbsd', drift_method = ratio(
                                        component_stattest='ks',
                                        threshold = 0.05
                                    )
                                 ),
        ]
    )
    return data_drift_report


def create_test_suite():
    data_drift_test_suite = TestSuite(
        tests=[
            TestConflictTarget(),
            TestConflictPrediction(),
            # mmd with uae
            TestEmbeddingsDrift('uae', drift_method = mmd(threshold = 0.015, quantile_probability = 0.95)),
            # ks with uae
            TestEmbeddingsDrift('uae', drift_method = ratio(
                                        component_stattest='ks',
                                        threshold = 0.05
                                    )
                                 ),
            # euclidean with uae
            TestEmbeddingsDrift('uae', 
                                  drift_method = distance(
                                      dist = 'euclidean',
                                      threshold = 0.2
                                  )
                                 ),
            # model with uae
            TestEmbeddingsDrift('uae', drift_method = model(threshold = 0.75)),
            # mmd with bbsd
            TestEmbeddingsDrift('bbsd', drift_method = mmd(threshold = 0.015, quantile_probability = 0.95)),
            # ks with bbsd
            TestEmbeddingsDrift('bbsd', drift_method = ratio(
                                        component_stattest='ks',
                                        threshold = 0.05
                                    )
                                 ),
        ]
    )
    return data_drift_test_suite

In [3]:
def modify_dashboard(project, project_desc: str):
    project.description = project_desc
    project.dashboard.add_panel(
        DashboardPanelCounter(
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            agg=CounterAgg.NONE,
            title="Production Model Monitor!",
        )
    )
    project.dashboard.add_panel(
        DashboardPanelCounter(
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            agg=CounterAgg.NONE,
            title="Production Data Quality",
        )
    )
    project.dashboard.add_panel(
        DashboardPanelCounter(
            title="Number of conflicts in Prediction",
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            value=PanelValue(
                metric_id="ConflictPredictionMetric",
                field_path=ConflictPredictionMetric.fields.current.number_not_stable_prediction,
            ),
            text="count",
            agg=CounterAgg.LAST,
            size=1,
        )
    )
    project.dashboard.add_panel(
        DashboardPanelCounter(
            title="Number of conflicts in Target (GT)",
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            value=PanelValue(
                metric_id="ConflictTargetMetric",
                field_path=ConflictTargetMetric.fields.number_not_stable_target,
            ),
            text="count",
            agg=CounterAgg.LAST,
            size=1,
        )
    )
    project.dashboard.add_panel(
        DashboardPanelCounter(
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            agg=CounterAgg.NONE,
            title="Drift Detection",
        )
    )
    project.dashboard.add_panel(
        DashboardPanelCounter(
            title="Drift Score",
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            value=PanelValue(
                metric_id="EmbeddingsDriftMetric",
                field_path=EmbeddingsDriftMetric.fields.drift_score,
                legend="score",
            ),
            text="latest",
            agg=CounterAgg.LAST,
            size=2,
        )
    )
    project.save()
    return project

In [4]:
import os
import json 
import sqlalchemy
import numpy as np
import pandas as pd
from typing import Union, List, Dict
from collections import defaultdict
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import create_engine
from db_tables import APILogTable, PredictionsTable

In [5]:
def open_db_session(engine: sqlalchemy.engine) -> sqlalchemy.orm.Session:
    Session = sessionmaker(bind=engine)
    session = Session()
    return session

In [6]:
def get_cur_df_from_query(sql_ret, use_cols: List[str] = ['id', 'uae_feats', 'bbsd_feats', 'prediction_json']):
    current_data = defaultdict(list)
    for row in sql_ret:
        for col in use_cols:
            current_data[col].append(getattr(row, col))
    cur_df = pd.DataFrame(current_data).set_index('id')
    return cur_df

In [7]:
def convert_pred_json_to_df(pred_json_col: Union[pd.DataFrame, pd.Series]):
    return pd.DataFrame(list(pred_json_col.apply(lambda x: json.loads(x))))

In [8]:
def make_cur_evidently_compat(cur_df: pd.DataFrame, uae_feats_col: str= 'uae_feats',
                              bbsd_feats_col: str='bbsd_feats', pred_json_col: str = 'prediction_json'):
    uae_feats_arr = np.stack(cur_df[uae_feats_col])
    uae_n_feats = uae_feats_arr.shape[1]
    uae_feat_cols = [f'uae_feat_{i}' for i in range(uae_n_feats)]
    uae_df = pd.DataFrame(uae_feats_arr, columns=uae_feat_cols)
    # Create a dup of uae_feat to use for a different purpose
    # uae_feat will be used as embeddings for computing drift
    # numerical_feat will be used as numerical features for the data quality test
    # specifically, TestConflictTarget & TestConflictPrediction
    # which are useful to verify. Evidently does not support using the same columns
    # twice in column mapping. So, we have to create a duplicate and save them alongside here
    num_feat_cols = [f'numerical_feat_{i}' for i in range(uae_n_feats)]
    num_df = pd.DataFrame(uae_feats_arr, columns=num_feat_cols)

    bbsd_feats_arr = np.stack(cur_df[bbsd_feats_col])
    bbsd_n_feats = bbsd_feats_arr.shape[1]
    bbsd_feat_cols = [f'bbsd_feat_{i}' for i in range(bbsd_n_feats)]
    bbsd_df = pd.DataFrame(bbsd_feats_arr, columns=bbsd_feat_cols)

    pred_df = convert_pred_json_to_df(cur_df[pred_json_col])
    final_df = pd.concat([num_df, uae_df, bbsd_df, pred_df], axis=1)

    # fill columns that exist in ref but not in this cur (label col) with nan
    # to make schema of both ref and cur df identical
    final_df['label'] = [np.nan] * len(final_df)
    
    return final_df, num_feat_cols, uae_feat_cols, bbsd_feat_cols

In [9]:
def make_ref_evidently_compat(ref_df: pd.DataFrame, classes: List[str], uae_feats_col: str= 'uae_feats',
                              bbsd_feats_col: str='bbsd_feats', label_col: str = 'label'):
    uae_feats_arr = np.stack(ref_df[uae_feats_col])
    uae_n_feats = uae_feats_arr.shape[1]
    uae_feat_cols = [f'uae_feat_{i}' for i in range(uae_n_feats)]
    uae_df = pd.DataFrame(uae_feats_arr, columns=uae_feat_cols)
    # Create a dup of uae_feat to use for a different purpose
    num_feat_cols = [f'numerical_feat_{i}' for i in range(uae_n_feats)]
    num_df = pd.DataFrame(uae_feats_arr, columns=num_feat_cols)
    
    bbsd_feats_arr = np.stack(ref_df[bbsd_feats_col])
    bbsd_n_feats = bbsd_feats_arr.shape[1]
    bbsd_feat_cols = [f'bbsd_feat_{i}' for i in range(bbsd_n_feats)]
    bbsd_df = pd.DataFrame(bbsd_feats_arr, columns=bbsd_feat_cols)
    
    final_df = pd.concat([num_df, uae_df, bbsd_df], axis=1)
    

    # fill columns that exist in cur but not in this ref (prediction cols) with nan
    # to make schema of both ref and cur df identical
    for class_name in classes:
        final_df[class_name] = [np.nan] * len(final_df)

    final_df['label'] = ref_df[label_col].apply(lambda x: np.argmax(x))
    
    return final_df, num_feat_cols, uae_feat_cols, bbsd_feat_cols

In [10]:
def create_col_mapping(classes, num_feat_cols, uae_feat_cols, bbsd_feat_cols):
    column_mapping = ColumnMapping()

    column_mapping.target = 'label'
    column_mapping.numerical_features = num_feat_cols
    column_mapping.prediction = classes
    column_mapping.embeddings = {'uae': uae_feat_cols, 'bbsd': bbsd_feat_cols}
    column_mapping.id = None
    column_mapping.datetime = None
    
    return column_mapping

In [11]:
classes = ['butterfly', 'cat', 'chicken', 'cow', 'dog', 'elephant', 'horse', 'sheep', 'spider', 'squirrel']

## Get current data from db

In [12]:
POSTGRES_PORT = os.getenv('POSTGRES_PORT', '5432')
DB_CONNECTION_URL = os.getenv('DB_CONNECTION_URL', f'postgresql://dlservice_user:SuperSecurePwdHere@postgres:{POSTGRES_PORT}/dlservice_pg_db')

In [13]:
engine = create_engine(DB_CONNECTION_URL)

In [14]:
session = open_db_session(engine)
# latest N elements
ret = session.query(PredictionsTable).order_by(PredictionsTable.id.desc()).limit(100).all()

In [15]:
temp_cur_df = get_cur_df_from_query(ret, use_cols=['id', 'uae_feats', 'bbsd_feats', 'prediction_json'])
cur_df, cur_num_feat_cols, cur_uae_feat_cols, cur_bbsd_feat_cols = make_cur_evidently_compat(temp_cur_df)

## Get ref data (from .parquet file)

In [16]:
ref_path = '/home/ariya/central_storage/ref_data/animals10_classifier_50px_trial1_ref_data.parquet'
temp_ref_df = pd.read_parquet(ref_path)

In [17]:
ref_df, ref_num_feat_cols, ref_uae_feat_cols, ref_bbsd_feat_cols = make_ref_evidently_compat(temp_ref_df, classes)

In [18]:
if set(ref_df.columns).difference(set(cur_df.columns)) != set():
    raise Exception('Columns of ref and cur data are not equal, please reverify.')
column_mapping = create_col_mapping(classes, ref_num_feat_cols, ref_uae_feat_cols, ref_bbsd_feat_cols)

## Build reports

In [27]:
first_time = False

In [20]:
WORKSPACE_NAME = "production_models_monitor"
ws = Workspace.create(WORKSPACE_NAME)

In [26]:
project_name = 'animals10_monitor'
search_results = ws.search_project(project_name)

if len(search_results) == 0:
    print('Created a new project')
    project = ws.create_project(project_name)
else:
    # select the latest one
    print('Project already exists. Use the latest one.')
    project = search_results[-1]

if first_time:
    project = modify_dashboard(project, project_desc='Try Evidently')

Project already exists. Use the latest one.


In [22]:
report = create_report()
test_suite = create_test_suite()

In [23]:
# runs took quite long, maybe set bootstrap = False
# bug encountered (in 0.4.3), mmd method cannot have the ref data greater than 1000x of cur data (reported)
report.run(reference_data=ref_df, current_data=cur_df, column_mapping=column_mapping)
test_suite.run(reference_data=ref_df, current_data=cur_df, column_mapping=column_mapping)

In [24]:
ws.add_report(project.id, report)
ws.add_test_suite(project.id, test_suite)