# Are the explanations faithful to the original classifier?

* Use HPC Time series data
* Train a “golden classifier”, e.g., sparse logistic regression
* Only a few metrics are used for classification
* Get explanations for each sample in the test set
* Report precision/recall for explanations



- To test the faithfulness of CoMTE, we explain a simple model with a known reasoning process and report the precision and recall of our explanations

In [1]:
%load_ext autoreload
%autoreload 2

In [3]:
import copy
import gc
import itertools
import logging
from multiprocessing import Pool
import functools
import sys 
import random
from pathlib import Path
import time
import mlrose_ky

from sklearn.metrics import confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import NuSVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.preprocessing import MinMaxScaler

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from tqdm import tqdm

In [5]:
import data_loading
import analysis.data
import analysis.classifier
import explainers

In [6]:
logging.basicConfig(format='%(asctime)s %(levelname)-7s %(message)s',
                    stream=sys.stderr, level=logging.DEBUG)
mpl_logger = logging.getLogger('matplotlib')
mpl_logger.setLevel(logging.WARNING) 

In [7]:
def get_metrics(pipeline, threshold=0.1):
    return set([
        '_'.join(x.split('_')[1:]) 
        for idx, x in enumerate(pipeline.steps[2][1].column_names) 
        if idx in np.where(np.abs(pipeline.steps[4][1].coef_) > threshold)[1]
    ])

In [8]:
timeseries, labels, test_timeseries, test_labels = data_loading.load_hpc_data(
#    Path('/home/ates/data/taxonomist/'), window=60, skip=60, make_binary=True)
    # Path('/projectnb/peaclab-mon/ates/taxonomist/'), window=60, skip=60, make_binary=True)
    Path('/projectnb/peaclab-mon/ates/hpas'), classes=['none', 'dcopy'])

100%|██████████| 195/195 [00:13<00:00, 14.91it/s]
100%|██████████| 226/226 [00:16<00:00, 14.00it/s]


In [9]:
for idx, row in test_labels.iterrows():
    print(idx[0])
    print(row['label'])
    break

5c1316901963fd7bbf0f90ab_45
none


In [10]:
extractor = analysis.data.TSFeatureGenerator(trim=0)
pipeline = Pipeline([
    ('assert1', analysis.classifier.CheckFeatures()),
    ('features', analysis.data.TSFeatureGenerator(trim=0)),
    ('assert2', analysis.classifier.CheckFeatures()),
    ('scaler', MinMaxScaler(feature_range=(-1, 1))),
    #('clf', RandomForestClassifier(n_estimators=100, class_weight='balanced'))
    ('clf', LogisticRegression(penalty='l1', C=0.1, solver='liblinear'))
    ])

In [11]:
#pipeline.classes_[0]

In [12]:
# train classifier

for C in np.logspace(2, -5, num=10):
    pipeline.set_params(clf__C=C)
    pipeline.fit(timeseries, labels)
    all_metrics = get_metrics(pipeline)
    if len(all_metrics) < 10:
        break
pipeline

NameError: name 'generate_features' is not defined

In [None]:
all_metrics = get_metrics(pipeline)
all_metrics

In [None]:
preds = pipeline.predict(test_timeseries)

In [None]:
print("F1 score:", f1_score(test_labels, preds, average='weighted'))
for label, i in zip(test_labels['label'].unique(), f1_score(test_labels, preds, labels=test_labels['label'].unique(), average=None)):
    print("\t", label, i)

label_list = pipeline.steps[4][1].classes_
cf = confusion_matrix(test_labels, preds, labels=label_list).astype(float)
for i in range(len(cf)):
    cf[i] = [x / cf[i].sum() for x in cf[i]]
sns.heatmap(cf, annot=True, xticklabels=label_list, yticklabels=label_list)
plt.show()

In [None]:
#positive "healthy"
#negative "other"
positive = np.unique(test_labels)[0]
negative = np.unique(test_labels)[1]

true_positives = []
false_positives = []
true_negatives = []
false_negatives = []

for pred, (idx, row) in zip(preds, test_labels.iterrows()):    
    if row['label'] == positive:
        if row['label'] == pred:
            true_positives.append(idx[0])
        else:
            false_positives.append(idx[0])
    else:
        if row['label'] == pred:
            true_negatives.append(idx[0])            
        else:
            false_negatives.append(idx[0])
            
print(len(true_positives))
print(len(false_positives))
print(len(true_negatives))
print(len(false_negatives))
assert len(true_positives) + len(false_positives) + len(true_negatives) + len(false_negatives) == len(test_labels)

In [None]:
### It will choose misclassified example from one of the classes 

# for idx, (true, pred) in enumerate(zip(test_labels['label'], preds)):
#     if true != pred:
#         x_test_idx = test_labels.iloc[idx].name[0]
#         true_label = true
#         x_test = test_timeseries.loc[[x_test_idx], :, :]
#         for idx, (true, pred) in enumerate(zip(test_labels['label'], preds)):
#             if true == true_label and pred == true:
#                 distractor_idx = test_labels.iloc[idx].name[0]
#                 distractor = test_timeseries.loc[[distractor_idx], :, :]
#                 break
#         break

In [None]:
our_method = explainers.OptimizedSearch(pipeline, timeseries, labels, silent=False)

explainer_list = {
    'bruteforce': explainers.BruteForceSearch(pipeline, timeseries, labels, silent=False, dont_stop=False),
    'lime': explainers.LimeExplanation(pipeline, timeseries, labels),
    'random': explainers.RandomExplanation(pipeline, timeseries, labels),
    'shap': explainers.ShapExplanation(pipeline, timeseries, labels)

}

In [None]:
########### TEST ########################

ground_truth = set(all_metrics)
scores = []

no_exp_list_fp = []

#TODO: Update here
for run in tqdm(random.sample(true_positives,2)):
    
    x_test = test_timeseries.loc[[run], :, :]
    explanation = our_method.explain(x_test)
    print("OUR EXP: ", explanation)
    
    #If we cannot find an explanation or found explanation is not in the ground truth
    if explanation is None or len(ground_truth.intersection(explanation)) != len(explanation):
        no_exp_list_fp.append(run)
        target_length = 3 #if our method cannot find an explanation, set target to max number of features
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': 0,
            'precision': 0
        })

    else:    
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': len(ground_truth.intersection(explanation)) / len(ground_truth),
            'precision': len(ground_truth.intersection(explanation)) / len(explanation)
        })
        target_length = len(explanation)
    
    for e in explainer_list:
        print(target_length)
        explanation = explainer_list[e].explain(x_test, num_features=target_length)
        print("SHAP EXP:",explanation)
        
        try:
            recall = len(ground_truth.intersection(explanation)) / len(ground_truth)
            precision = len(ground_truth.intersection(explanation)) / len(explanation)
        except:
            recall = 0
            precision = 0
        
        scores.append({
            'run': run,
            'method': e,
            'recall': recall,
            'precision': precision
        })

In [None]:
results_temp = pd.DataFrame(scores)

In [None]:
sns.catplot(data=results_temp, x='method', y='recall', kind='bar')
plt.ylim(0, 1)
sns.catplot(data=results_temp, x='method', y='precision', kind='bar')

In [None]:
ground_truth = set(all_metrics)
scores = []

no_exp_list_fp = []

#TODO: Update here
for run in tqdm(false_positives):
    
    x_test = test_timeseries.loc[[run], :, :]
    explanation = our_method.explain(x_test)
    
    #If we cannot find an explanation or found explanation is not in the ground truth
    if explanation is None or len(ground_truth.intersection(explanation)) != len(explanation):
        no_exp_list_fp.append(run)
        target_length = 3 #if our method cannot find an explanation, set target to max number of features
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': 0,
            'precision': 0
        })

    else:    
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': len(ground_truth.intersection(explanation)) / len(ground_truth),
            'precision': len(ground_truth.intersection(explanation)) / len(explanation)
        })
        target_length = len(explanation)
    
    for e in explainer_list:
        
        explanation = explainer_list[e].explain(x_test, num_features=target_length)
        
        try:
            recall = len(ground_truth.intersection(explanation)) / len(ground_truth)
            precision = len(ground_truth.intersection(explanation)) / len(explanation)
        except:
            recall = 0
            precision = 0
        
        scores.append({
            'run': run,
            'method': e,
            'recall': recall,
            'precision': precision
        })

In [None]:
results = pd.DataFrame(scores)

In [None]:
results

In [None]:
results[results['method'] == 'shap']

In [None]:
#Results for Taxonomist FP Data
sns.catplot(data=results, x='method', y='recall', kind='bar')
plt.ylim(0, 1)

In [None]:
#Results for Taxonomist FP Data
sns.catplot(data=results, x='method', y='precision', kind='bar')

In [None]:
ground_truth = set(all_metrics)
scores = []

no_exp_list_tp = []

#TODO: Update here
for run in tqdm(true_positives):
    
    x_test = test_timeseries.loc[[run], :, :]
    explanation = our_method.explain(x_test)
    
    #If we cannot find an explanation or found explanation is not in the ground truth
    if explanation is None or len(ground_truth.intersection(explanation)) != len(explanation):
        no_exp_list_tp.append(run)
        target_length = 3 #if our method cannot find an explanation, set target to max number of features
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': 0,
            'precision': 0
        })

    else:    
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': len(ground_truth.intersection(explanation)) / len(ground_truth),
            'precision': len(ground_truth.intersection(explanation)) / len(explanation)
        })
        target_length = len(explanation)
    
    for e in explainer_list:
        
        explanation = explainer_list[e].explain(x_test, num_features=target_length)
        
        try:
            recall = len(ground_truth.intersection(explanation)) / len(ground_truth)
            precision = len(ground_truth.intersection(explanation)) / len(explanation)
        except:
            recall = 0
            precision = 0
        
        scores.append({
            'run': run,
            'method': e,
            'recall': recall,
            'precision': precision
        })

In [None]:
results = pd.DataFrame(scores)

In [None]:
#Results for Taxonomist FP Data
sns.catplot(data=results, x='method', y='recall', kind='bar')
plt.ylim(0, 1)

In [None]:
#Results for Taxonomist FP Data
sns.catplot(data=results, x='method', y='precision', kind='bar')

In [None]:
ground_truth = set(all_metrics)
scores = []

no_exp_list_fn = []

#TODO: Update here
for run in tqdm(false_negatives):
    
    x_test = test_timeseries.loc[[run], :, :]
    explanation = our_method.explain(x_test)
    
    #If we cannot find an explanation or found explanation is not in the ground truth
    if explanation is None or len(ground_truth.intersection(explanation)) != len(explanation):
        no_exp_list_fn.append(run)
        target_length = 3 #if our method cannot find an explanation, set target to max number of features
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': 0,
            'precision': 0
        })

    else:    
        scores.append({
            'run': run,
            'method': 'our_method',
            'recall': len(ground_truth.intersection(explanation)) / len(ground_truth),
            'precision': len(ground_truth.intersection(explanation)) / len(explanation)
        })
        target_length = len(explanation)
    
    for e in explainer_list:
        
        explanation = explainer_list[e].explain(x_test, num_features=target_length)
        
        try:
            recall = len(ground_truth.intersection(explanation)) / len(ground_truth)
            precision = len(ground_truth.intersection(explanation)) / len(explanation)
        except:
            recall = 0
            precision = 0
        
        scores.append({
            'run': run,
            'method': e,
            'recall': recall,
            'precision': precision
        })

In [None]:
results = pd.DataFrame(scores)

In [None]:
#Results for Taxonomist FP Data
sns.catplot(data=results, x='method', y='recall', kind='bar')
plt.ylim(0, 1)

In [None]:
#Results for Taxonomist FP Data
sns.catplot(data=results, x='method', y='precision', kind='bar')

In [None]:
#Results for Taxonomist Data
sns.catplot(data=results, x='method', y='recall', kind='bar')
plt.ylim(0, 1)

In [None]:
#Results for Taxonomist Data
sns.catplot(data=results, x='method', y='precision', kind='bar')