In [None]:
import sklearn
from sklearn.datasets import load_breast_cancer
import sklearn.ensemble

import pickle
import numpy as np
import pandas as pd

import dice_ml
import rulematrix
from rulematrix.surrogate import rule_surrogate
import lime
import lime.lime_tabular

import json
import os

In [None]:
cancer_dataset = load_breast_cancer()
# random_state = 42 to ensure the same train/test split
train, test, labels_train, labels_test = sklearn.model_selection.train_test_split(cancer_dataset.data, cancer_dataset.target, train_size=0.80, random_state=42)

In [3]:
model_path = "rf.pkl"
model_path_2 = "rf2.pkl"

def get_model(model_path):
    if os.path.exists(model_path):
        rf = pickle.load(open(model_path, 'rb'))
    else:
        rf = sklearn.ensemble.RandomForestClassifier(n_estimators=500)
        rf.fit(train, labels_train)

        # sklearn.metrics.accuracy_score(labels_test, rf.predict(test))
        pickle.dump(rf, open(model_path, "wb"))
    # i = np.random.randint(0, test.shape[0])
    return rf

In [4]:
# hard examples in the test set
def find_hard_examples(rf, test):
    hard_examples = []
    for index, prob in enumerate(rf.predict_proba(test)):
        if abs(prob[0] - prob[1]) < 0.3:
            hard_examples.append(index, prob)
    print(hard_examples)
    return hard_examples

# 0 [0.372 0.628]
# 32 [0.622 0.378]
# 51 [0.43 0.57]
# 62 [0.52 0.48]
# 68 [0.538 0.462]
# 98 [0.632 0.368]
# 106 [0.354 0.646]

# [TODO:below are code I will wrap into the package]

In [5]:
test_idx=78
target_idx = 0
# create an explainer object
lime_explainer = lime.lime_tabular.LimeTabularExplainer(train, feature_names=cancer_dataset.feature_names, class_names=cancer_dataset.target_names, discretize_continuous=True)

def get_lime_exp(lime_explainer, i, model_path):
    test_item = test[i]
    model = get_model(model_path)
    lime_exp = lime_explainer.explain_instance(test_item, model.predict_proba)
    exp_map = lime_exp.as_map()[1]
    exp_map.sort(key=lambda x: x[1])
    var_indexes = [map[0] for map in exp_map]
    # return exp_map # a list of tuples (feature_index, score)

    scores = [ 0 for i in range(len(test_item))]
    for map in exp_map:
        scores[map[0]] =map[1]

    return var_indexes, scores

lime1_var_indexes, lime1_score = get_lime_exp(lime_explainer, test_idx, model_path)
lime2_var_indexes, lime2_score = get_lime_exp(lime_explainer, test_idx, model_path_2)

# lime_exp an explanation object 
# https://github.com/marcotcr/lime/blob/fd7eb2e6f760619c29fca0187c07b82157601b32/lime/explanation.py#L73C10-L73C10

In [6]:
# dice explainer


# dice requires different data formats
dice_train_data = pd.DataFrame(train, columns=cancer_dataset.feature_names)
dice_train_data['target'] = labels_train

dice_test_data = pd.DataFrame(test, columns=cancer_dataset.feature_names)
d = dice_ml.Data(dataframe=dice_train_data,
                    continuous_features=cancer_dataset.feature_names.tolist(),
                    outcome_name='target')

def get_dice_exp(model_path, d, i):    
    m = dice_ml.Model(model_path=model_path, backend='sklearn')                    
    dice_explainer = dice_ml.Dice(d, m)
    dice_exp = dice_explainer.generate_counterfactuals(
        dice_test_data[i:i+1], 
        total_CFs=4,
        # features_to_vary=[]
    )

    cfs = json.loads(dice_exp.to_json())['cfs_list'][0]
    test_item = json.loads(dice_exp.to_json())['test_data'][0]
    cf_delta = np.around( np.array(cfs) - np.array(test_item) , decimals=4 )
    cf_delta[cf_delta<5e-3] = 0
    # return cf_delta.tolist()

    var_indexes = []
    for i, example in enumerate(cf_delta):
        var_indexes.append([])
        for var_indx, delta in enumerate(example):
            if delta != 0:
                var_indexes[i].append(var_indx)

    return var_indexes, cf_delta.tolist()

dice1_var_indexes, dice1_score = get_dice_exp(model_path, d, test_idx)
dice2_var_indexes, dice2_score = get_dice_exp(model_path_2, d, test_idx)
    
# Visualize counterfactual explanation

# dice_exp.visualize_as_dataframe(show_only_changes=True)
# dice_exp is another explanation object
# https://interpret.ml/DiCE/_modules/dice_ml/counterfactual_explanations.html#CounterfactualExplanations.visualize_as_dataframe

100%|██████████| 1/1 [00:01<00:00,  1.05s/it]
100%|██████████| 1/1 [00:00<00:00,  1.10it/s]


In [7]:
def get_var_indexs(index_lists):
    '''
    `scores_lists` is a list of list of tuples (var_index, score)
    return a list of var_indexs, ranked by the number of times they appear in the score_lists
    '''
    indexes = []
    for index_list in index_lists:
        indexes += index_list
    unique_indexes = list(set(indexes))  
    unique_indexes.sort(key = lambda x: indexes.count(x), reverse=True) # first sort by the number of times they appear
    # unique_indexes.sort(key = lambda x: index_lists[0].index(x), reverse=True) # then sort by the first time they appear
    
    return unique_indexes

def reorder(array, indexes):
    '''
    reorder array according to indexes
    '''
    return [array[i] for i in indexes]

In [8]:
var_indexes = get_var_indexs([lime1_var_indexes, lime2_var_indexes] + dice1_var_indexes + dice2_var_indexes)

feature_ranges = [
                [cancer_dataset.data[:, i].min(), cancer_dataset.data[:,i].max()
            ] 
            for i in range(cancer_dataset.data.shape[1])
        ]
feature_types = ['continuous'] * cancer_dataset.data.shape[1]

In [9]:
def get_pd_scores(model_path,  feature_ranges, feature_types, var_indexes, test_idx, step_num=10):
    PD_examples = []
    for var_index in var_indexes:
        PD_examples.append([])
        if feature_types[var_index] == "continuous":
            for i in range(step_num):
                feature_range = feature_ranges[var_index]
                copy_example = test[test_idx].copy()
                copy_example[var_index] = feature_range[0] + (feature_range[1] - feature_range[0]) * i / step_num
                
                PD_examples[-1].append(copy_example.tolist())
        else:
            for category in feature_ranges[var_index]:
                copy_example = test[test_idx].copy()
                copy_example[var_index] = category

                PD_examples[-1].append(copy_example.tolist())
        

    model = get_model(model_path)

    results = []
    for i in range(len(var_indexes)):
        PD_probs = model.predict_proba(PD_examples[i])
        results.append(PD_probs[:,target_idx].tolist())
    return results

In [10]:
is_continuous = cancer_dataset.get('is_continuous', None)
is_categorical = cancer_dataset.get('is_categorical', None)
is_integer = cancer_dataset.get('is_integer', None)

def get_rule(model_path):
    model = get_model(model_path)
    surrogate = rule_surrogate(model.predict, train_x=train, is_continuous=is_continuous, is_categorical=is_categorical, rlargs={'feature_names': cancer_dataset.feature_names.tolist(), 'verbose':2}, sampling_rate=2.0, seed=None)
    surrogate.fit(train)
    print(surrogate.student)

In [11]:
vis_data = {
   
    "dependent_var":cancer_dataset.target_names[target_idx],
    
    "input_data": {
        "type": "tabular",
        "value": test[test_idx].tolist(),
        "headers": cancer_dataset.feature_names.tolist()
    },

     "independent_vars": {
        "names": reorder( cancer_dataset.feature_names.tolist(), var_indexes),
        'types': reorder(feature_types, var_indexes), # 'continuous' or 'categorical
        "values": reorder( test[test_idx].tolist(), var_indexes), # test[test_idx].tolist()
        "ranges": reorder( feature_ranges, var_indexes),
    },
    "explanations": {
        "attribution": [
            {
            "name": "lime",
            "score": reorder( lime1_score, var_indexes)
            },
            {
            "name": "lime2",
            "score": reorder( lime2_score, var_indexes)
            }
        ],
        "cf": [
            {
            "name": "dice",
            "delta":[ reorder( cf, var_indexes) for cf in dice1_score]
            }, 
             {
            "name": "dice2",
            "delta": [ reorder( cf, var_indexes) for cf in dice2_score]
            } 
        ],
        "pd": [
            {"name": "rf1",
             "score": get_pd_scores(model_path,  feature_ranges, feature_types, var_indexes, test_idx, step_num=10)
             },
             {"name": "rf2",
              "score": get_pd_scores(model_path_2,  feature_ranges, feature_types, var_indexes, test_idx, step_num=10)
              }
        ]
    }
}

# rank vars in the vis_data


with open('../src/asset/vis_data_test.json', 'w') as f:
    json.dump(vis_data, f)

In [12]:
import pathlib
import anywidget
import traitlets

class myWidget(anywidget.AnyWidget):
    exp = traitlets.Dict(vis_data).tag(sync=True)
    _esm = pathlib.Path("../bundle/widget.js")
    _css = pathlib.Path("../bundle/widget.css")
    

w = myWidget()
w

myWidget(exp={'dependent_var': 'malignant', 'input_data': {'type': 'tabular', 'value': [10.75, 14.97, 68.26, 3…