In [None]:
import pandas as pd
import numpy as np
import os
import pickle

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OrdinalEncoder
from sklearn.metrics import balanced_accuracy_score, cohen_kappa_score
from sklearn.compose import ColumnTransformer, make_column_selector, make_column_transformer
from sklearn.impute import SimpleImputer

from sklearn.datasets import load_iris
from alibi.explainers import AnchorTabular

from rmatrix.classification import RMatrixClassifier
from xgboost import XGBClassifier
from tqdm import tqdm

import seaborn as sns

In [None]:
from itables import show

def show_dataframe(
    df: pd.DataFrame,
    length_control: bool = True,
    filtering: bool = True,
    pagination: bool = True,
    show_index: bool = False,
    **kwargs
):
    """Show dataframe as an interactive table using itables library. It is customizable
    and allows to filter, sort and paginate the data. Additional options to show method
    can be passed using kwargs. When dom option is passed as a kwarg, it will override
    config created via length_control, filtering and pagination arguments.

    Args:
        df (pd.DataFrame): Data to be displayed.
        length_control (bool, optional): Show length control. Defaults to True.
        filtering (bool, optional): Show filtering. Defaults to True.
        pagination (bool, optional): Show pagination. Defaults to True.
        show_index (bool, optional): Show index. Defaults to False."""
    pl_language_options = {
        "info": "Strona _PAGE_ z _PAGES_",
        "search": "Wyszukaj:",
        "paginate": {
            "first": "Pierwsza",
            "last": "Ostatnia",
            "next": "Następna",
            "previous": "Poprzednia",
        },
        "lengthMenu": "Pokaż _MENU_ wierszy",
    }

    if show_index and isinstance(df.index, pd.RangeIndex) and df.index.start == 0:
        df = df.reset_index(drop=True)
        df.index = df.index + 1

    # it is assumed that paging is controlled by pagination argument
    if "paging" in kwargs:
        kwargs.pop("paging")

    # modify here default "display nowrap" of show function to "display"
    kwargs["classes"] = kwargs["classes"] if "classes" in kwargs else "display"

    if "dom" in kwargs:
        dom_config = kwargs["dom"]
        kwargs.pop("dom")
    else:
        dom_config = "tr" if not filtering else "trf"
        dom_config += "l" if length_control else ""
        dom_config += "p" if pagination else ""

    show(
        df,
        language=pl_language_options,
        dom=dom_config,
        showIndex=show_index,
        paging=pagination,
        **kwargs,
    )


In [None]:
def precision(c) -> float:  # pylint: disable=missing-function-docstring
    if (c.p + c.n) == 0:
        return 0
    return c.p / (c.p + c.n)

def coverage(c) -> float:  # pylint: disable=missing-function-docstring
    return c.p / c.P

In [None]:
min_cov = 3
data_name = "breast-w"
fi = True
file_name = f"{data_name}_mincov{min_cov}"
generator = RMatrixClassifier(mincov=min_cov, induction_measuer="precision", filter_duplicates=False, filtration=False, cuts_only_between_classes=True, prune=True)

# Data

In [None]:
data = pd.read_csv(f"../data_csv/{data_name}.csv")
data_x = data.drop(columns=["class"])
data_y = data[["class"]]

ord_tr = OrdinalEncoder()
y = ord_tr.fit_transform(data_y)
y = np.array([int(obs[0]) for obs in y])

class_names = [list(x) for x in ord_tr.categories_][0]
feature_names = list(data_x.columns)

In [None]:
show_dataframe(data.head())

In [None]:
obj_imputer = SimpleImputer(strategy="constant", fill_value="missing")
num_imputer = SimpleImputer(strategy="constant", fill_value=0)

data_x_imp_final = []
data_x_enc_final = []
cat_i = []
cat_cat = []
cat_i_bool = []

for i in range(len(feature_names)):
    if data_x[feature_names[i]].dtype == "O":
        data_imp = obj_imputer.fit_transform(data_x[feature_names[i]].values.reshape(-1, 1))
        data_imp_flat = [x[0] for x in data_imp]
        data_x_imp_final.append(data_imp_flat)
        ord = OrdinalEncoder()
        data_enc = ord.fit_transform(np.array(data_imp_flat).reshape(-1, 1))
        cat_cat.append([list(x) for x in ord.categories_])
        cat_i.append(i)
        cat_i_bool.append(False)
        data_enc_flat = [x[0] for x in data_enc]
        data_x_enc_final.append(data_enc_flat)
    else:
        data_imp = num_imputer.fit_transform(data_x[feature_names[i]].values.reshape(-1, 1))
        data_imp_flat = [x[0] for x in data_imp]
        data_x_imp_final.append(data_imp_flat)
        data_x_enc_final.append(data_imp_flat)
        cat_i_bool.append(True)

data_x_imp = pd.DataFrame(data_x_imp_final).T
data_x_imp.columns = feature_names
data_x_enc = pd.DataFrame(data_x_enc_final).T
data_x_enc.columns = feature_names
data_x_enc

In [None]:
for col in np.array(feature_names)[cat_i]:
    data_x_enc[col] = data_x_enc[col].astype("category")

In [None]:
X_train, X_test, y_train, y_test = train_test_split(data_x_enc, y, test_size=0.2, random_state=42)

np.random.seed(0)
clf = XGBClassifier(enable_categorical=True)
clf.fit(X_train, y_train)

In [None]:
fi_df = pd.DataFrame({'vars': feature_names, 'fi': clf.feature_importances_})
fi_vars = fi_df.sort_values("fi", ascending=False)["vars"].values
fi_df.to_csv(f"../results_anchor/{file_name}_fi.csv", index=False)

In [None]:
y_train_preds = clf.predict(X_train)
y_test_preds = clf.predict(X_test)

In [None]:
print(f"BACC train: {balanced_accuracy_score(y_train, y_train_preds)}, BACC test: {balanced_accuracy_score(y_test, y_test_preds)}")
print(f"Kappa train: {cohen_kappa_score(y_train, y_train_preds)}, Kappa test: {cohen_kappa_score(y_test, y_test_preds)}")

# Anchor

In [None]:
categories_map = [x[0] for x in cat_cat]
categories_anchor = dict(zip(cat_i, categories_map))
categories_anchor

In [None]:
predict_fn = lambda x: clf.predict_proba(x)
explainer = AnchorTabular(predict_fn, feature_names, categories_anchor, seed=42)
explainer.fit(X_train.to_numpy())

In [None]:
def anchor_explainations(data_to_anchor):  

    rules = []
    rules_len = []
    prec = []
    cov = []

    for idx in tqdm(range(len(data_to_anchor))):

        explanation = explainer.explain(data_to_anchor[idx], threshold=0.95)
        rules.append(' AND '.join(explanation.anchor))
        rules_len.append(len(explanation.anchor))
        prec.append(explanation.precision)
        cov.append(explanation.coverage)
        
    anchor_exp_dict = {'anchor_rule': rules, 'anchor_rule_len': rules_len, 'anchor_precision': prec, 'anchor_coverage': cov}    
    return pd.DataFrame(anchor_exp_dict)
    
anchor_train = anchor_explainations(X_train.to_numpy())
anchor_test = anchor_explainations(X_test.to_numpy())

In [None]:
from collections import namedtuple

NCov = namedtuple('NCov', ['p', 'n', 'P'])

def correct_precision_coverage(test, train):

    cov_all_list = []
    prec_all_list = []

    for i in range(len(test)):      
        rule_from_anchor = test["anchor_rule"].values[i]
        decision = test["prediction"].values[i]
        conditions_list = rule_from_anchor.split(" AND ")
        filtered_train = train.copy()
        for condition in conditions_list:
            key_value = condition.split(" = ")
            filtered_train = filtered_train[filtered_train[key_value[0]] == key_value[1]]
        p = len(filtered_train[filtered_train["prediction"]==decision]) + 1
        n = len(filtered_train[filtered_train["prediction"]!=decision])
        P = len(train[train["prediction"]==decision]) + 1
        new_cov_loc = NCov(p, n, P)
        cov_all_list.append(coverage(new_cov_loc))
        prec_all_list.append(precision(new_cov_loc))

    return prec_all_list, cov_all_list

# RMatrix

In [None]:
X_train_org = data_x_imp.iloc[X_train.index,:].reset_index(drop=True)
y_train_pred_org = pd.Series(ord_tr.inverse_transform(y_train_preds.reshape(-1, 1)).flatten(), name="prediction")
y_train_org = pd.Series(ord_tr.inverse_transform(y_train.reshape(-1, 1)).flatten(), name="class")
X_test_org = data_x_imp.iloc[X_test.index,:].reset_index(drop=True)
X_test_org.columns = feature_names
y_test_pred_org = pd.Series(ord_tr.inverse_transform(y_test_preds.reshape(-1, 1)).flatten(), name="prediction")
y_test_org = pd.Series(ord_tr.inverse_transform(y_test.reshape(-1, 1)).flatten(), name="class")

In [None]:
for col in np.array(feature_names)[cat_i_bool]:
    X_train_org[col] = X_train_org[col].astype("float64")
    X_test_org[col] = X_test_org[col].astype("float64")

In [None]:
def rmatrix_explainations(x, y):  

    rules = []
    rules_len = []
    prec = []
    cov_list = []

    for idx in tqdm(range(len(x))):

        if fi:
            rule, cov = generator.local_explanation(example_X=x.iloc[idx:idx+1], example_y=y.iloc[idx:idx+1], X=X_train_org, y=y_train_org, attributes=fi_vars)
        else:
            rule, cov = generator.local_explanation(example_X=x.iloc[idx:idx+1], example_y=y.iloc[idx:idx+1], X=X_train_org, y=y_train_org, attributes=[])
        rules.append(rule.premise.to_string(feature_names))
        rules_len.append(len(rule.premise.to_string(feature_names).split("AND")))
        prec.append(precision(cov))
        cov_list.append(coverage(cov))
        
    rmatrix_exp_dict = {'rmatrix_rule': rules, 'rmatrix_rule_len': rules_len, 'rmatrix_precision': prec, 'rmatrix_coverage': cov_list}
        
    return pd.DataFrame(rmatrix_exp_dict)

rmatrix_train = rmatrix_explainations(X_train_org, y_train_pred_org)
rmatrix_test = rmatrix_explainations(X_test_org, y_test_pred_org)

# Summary

## Train

In [None]:
def rmatrix_unique_rules(rule_str):
    features = []
    conditions = rule_str.split(" AND ")
    for condition in conditions:
        feature_value = condition.split(" = ")
        features.append(feature_value[0])
    
    return len(np.unique(features))

rmatrix_train["rmatrix_rule_len_unique"] = rmatrix_train["rmatrix_rule"].apply(rmatrix_unique_rules)
rmatrix_test["rmatrix_rule_len_unique"] = rmatrix_test["rmatrix_rule"].apply(rmatrix_unique_rules)
rmatrix_train = rmatrix_train[['rmatrix_rule', 'rmatrix_rule_len', 'rmatrix_rule_len_unique', 'rmatrix_precision', 'rmatrix_coverage']]
rmatrix_test = rmatrix_test[['rmatrix_rule', 'rmatrix_rule_len', 'rmatrix_rule_len_unique', 'rmatrix_precision', 'rmatrix_coverage']]

In [None]:
train_all = pd.concat([X_train_org, y_train_org, y_train_pred_org, rmatrix_train, anchor_train], axis=1)

In [None]:
rule_len = train_all.groupby(["rmatrix_rule_len", "anchor_rule_len"])["rmatrix_rule_len", "anchor_rule_len"].size().reset_index(name="counts").sort_values("counts", ascending=False)
show_dataframe(rule_len)

Statystyki opisowe długości reguł, precision oraz coverage. Dla anchor przeliczono miary precision i coverage zgodnie ze wzorami z rmatrix (przyrostek correct).

In [None]:
from collections import namedtuple

NCov = namedtuple('NCov', ['p', 'n', 'P'])

def correct_precision_coverage(test, train):

    cov_all_list = []
    prec_all_list = []

    for i in range(len(test)):      
        rule_from_anchor = test["anchor_rule"].values[i]
        decision = test["prediction"].values[i]
        conditions_list = rule_from_anchor.split(" AND ")
        filtered_train = train.copy()
        for condition in conditions_list:
            key_value = condition.split(" ")
            if key_value[1] == "=":
                if key_value[0] in np.array(feature_names)[cat_i_bool]:
                    filtered_train = filtered_train[filtered_train[key_value[0]] == float(key_value[2])]
                else:
                    filtered_train = filtered_train[filtered_train[key_value[0]] == key_value[2]]
            elif len(key_value) > 3:
                filtered_train = filtered_train.query(condition)
            else:
                if key_value[0] in np.array(feature_names)[cat_i_bool]:
                    filtered_train = filtered_train.query(condition)
                else:
                    filtered_train = filtered_train.query(f"{key_value[0]}{key_value[1]}'{key_value[0]}'")
        p = len(filtered_train[filtered_train["prediction"]==decision]) + 1
        n = len(filtered_train[filtered_train["prediction"]!=decision])
        P = len(train[train["prediction"]==decision]) + 1
        new_cov_loc = NCov(p, n, P)
        cov_all_list.append(coverage(new_cov_loc))
        prec_all_list.append(precision(new_cov_loc))

    return prec_all_list, cov_all_list

In [None]:
anchor_precision_correct, anchor_coverage_correct = correct_precision_coverage(train_all, train_all)
train_all["anchor_precision_correct"] = anchor_precision_correct
train_all["anchor_coverage_correct"] = anchor_coverage_correct
train_all.to_csv(f"results_anchor/{file_name}_train.csv", index=False, sep=";")
train_all.describe()

Porównanie miar precision dla przypadków ze zbioru treningowego.

In [None]:
sns.scatterplot(data=train_all, x="rmatrix_precision", y="anchor_precision")

Porównanie miar precision dla przypadków ze zbioru treningowego - korekta miary w anchor.

In [None]:
sns.scatterplot(data=train_all, x="rmatrix_precision", y="anchor_precision_correct")

Porównanie miar coverage dla przypadków ze zbioru treningowego.

In [None]:
sns.scatterplot(data=train_all, x="rmatrix_coverage", y="anchor_coverage")

Porównanie miar coverage dla przypadków ze zbioru treningowego - korekta miary w anchor.

In [None]:
sns.scatterplot(data=train_all, x="rmatrix_coverage", y="anchor_coverage_correct")

In [None]:
train_all["anchor_rule_all"] = train_all["anchor_rule"] + " THEN " + train_all["prediction"]
train_all["rmatrix_rule_all"] = train_all["rmatrix_rule"] + " THEN " + train_all["prediction"]

Liczba unikalnych reguł anchor

In [None]:
len(train_all.groupby(["anchor_rule_all"])["anchor_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))

Najczęściej występujące lokalne objaśnienia wygenerowane przez anchor w zbiorze treningowym

In [None]:
show_dataframe(train_all.groupby(["anchor_rule_all"])["anchor_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))

Liczba unikalnych reguł RMatrix

In [None]:
len(train_all.groupby(["rmatrix_rule_all"])["rmatrix_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))

Najczęściej występujące lokalne objaśnienia wygenerowane przez rmatrix w zbiorze treningowym

In [None]:
show_dataframe(train_all.groupby(["rmatrix_rule_all"])["rmatrix_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))

## Test

In [None]:
test_all = pd.concat([X_test_org, y_test_org, y_test_pred_org, rmatrix_test, anchor_test], axis=1)

Porównanie długości reguł rmatrix i anchor oraz liczba przypadków w zbiorze testowym

In [None]:
rule_len = test_all.groupby(["rmatrix_rule_len", "anchor_rule_len"])["rmatrix_rule_len", "anchor_rule_len"].size().reset_index(name="counts").sort_values("counts", ascending=False)
show_dataframe(rule_len)

Statystyki opisowe długości reguł, precision oraz coverage. Dla anchor przeliczono miary precision i coverage zgodnie ze wzorami z rmatrix (przyrostek correct).

In [None]:
anchor_precision_correct, anchor_coverage_correct = correct_precision_coverage(test_all, train_all)
test_all["anchor_precision_correct"] = anchor_precision_correct
test_all["anchor_coverage_correct"] = anchor_coverage_correct
test_all.to_csv(f"results_anchor/{file_name}_test.csv", index=False, sep=";")
test_all.describe()

Porównanie miar precision dla przypadków ze zbioru testowego.

In [None]:
sns.scatterplot(data=test_all, x="rmatrix_precision", y="anchor_precision")

Porównanie miar precision dla przypadków ze zbioru testowego - korekta miary w anchor.

In [None]:
sns.scatterplot(data=test_all, x="rmatrix_precision", y="anchor_precision_correct")

Porównanie miar coverage dla przypadków ze zbioru testowego.

In [None]:
sns.scatterplot(data=test_all, x="rmatrix_coverage", y="anchor_coverage")

Porównanie miar coverage dla przypadków ze zbioru testowego - korekta miary w anchor.

In [None]:
sns.scatterplot(data=test_all, x="rmatrix_coverage", y="anchor_coverage_correct")

In [None]:
test_all["anchor_rule_all"] = test_all["anchor_rule"] + " THEN " + test_all["prediction"]
test_all["rmatrix_rule_all"] = test_all["rmatrix_rule"] + " THEN " + test_all["prediction"]

Liczba unikalnych reguł anchor

In [None]:
len(test_all.groupby(["anchor_rule_all"])["anchor_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))

Najczęściej występujące lokalne objaśnienia wygenerowane przez anchor w zbiorze testowym

In [None]:
show_dataframe(test_all.groupby(["anchor_rule_all"])["anchor_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))

Liczba unikalnych reguł RMatrix

In [None]:
len(test_all.groupby(["rmatrix_rule_all"])["rmatrix_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))

Najczęściej występujące lokalne objaśnienia wygenerowane przez rmatrix w zbiorze testowym

In [None]:
show_dataframe(test_all.groupby(["rmatrix_rule_all"])["rmatrix_rule_all"].size().reset_index(name="counts").sort_values("counts", ascending=False))