In [None]:
import json
import pandas as pd
import numpy as np
import os

import warnings
warnings.filterwarnings('ignore')

from sklearn.metrics import cohen_kappa_score, accuracy_score, balanced_accuracy_score, recall_score, mutual_info_score

from tqdm import tqdm
from rmatrix.classification import RMatrixClassifier
from decision_rules.serialization.utils import JSONSerializer
from decision_rules.classification.ruleset import ClassificationRuleSet
from decision_rules.measures import c2, precision


In [None]:
def mutual_inclusion(R, M):
  """
  This function takes two lists, R and M, and returns the result of the equation:

  F(R) n F(M) = F(R) U F(M)

  where n is the intersection and U is the union of the sets.

  Args:
    R: A list of elements.
    M: A list of elements.

  Returns:
    A list containing the elements in the intersection and union of R and M.
  """

  # Find the intersection of R and M using a set
  intersection = set(R).intersection(set(M))

  # Find the union of R and M using the + operator
  union = set(R).union(set(M))

  # Combine the intersection and union into a single list
  result = len(intersection)/len(union) 

  return result

def rmatrix_unique_rules(rule, feature_names=None):
    rule_str = rule.premise.to_string(feature_names)
    features = []
    conditions = rule_str.split(" AND ")
    for condition in conditions:
        feature_value = condition.split(" = ")
        features.append(feature_value[0])
    
    return list(np.unique(features))

def extract_shap_fi(shap_df_row, shap_df, shap_length, stat):
    row_values = np.abs(shap_df.iloc[shap_df_row,:].values)
    col_names = np.array(shap_df.columns)
    row_values_sort_idx = np.argsort(-row_values)
    if stat == "all":
      feature_top = list(col_names[row_values_sort_idx])
    elif stat == "max":
      top = np.max(shap_length)
      feature_top = list(col_names[row_values_sort_idx])[:top]
    elif stat == "len":
      top = shap_length[shap_df_row]
      feature_top = list(col_names[row_values_sort_idx])[:top]
    return feature_top

def precision(c) -> float:  # pylint: disable=missing-function-docstring
    if (c.p + c.n) == 0:
        return 0
    return c.p / (c.p + c.n)

def coverage(c) -> float:  # pylint: disable=missing-function-docstring
    return c.p / c.P


In [None]:
bb_models = pd.read_csv("results/selected_bb_models.csv")
datasets = bb_models["dataset"].unique()

# Mutual inclusion - with fi global

In [None]:
class_types = ["_filterFF_precision_global"]

results_all = pd.DataFrame()

for class_type in class_types:

    for sel_dataset in tqdm(datasets):

        models = np.unique(bb_models[bb_models["dataset"]==sel_dataset]["model"])

        x_train_df = pd.read_csv(f"../results_all/{sel_dataset}/train.csv")
        x_train_df = x_train_df.rename(columns={'target': 'name'})
        y_train = x_train_df["name"].squeeze().astype(str)
        x_train_df.drop(columns=["name"], inplace=True)

        x_test_df = pd.read_csv(f"../results_all/{sel_dataset}/test.csv")
        x_test_df = x_test_df.rename(columns={'target': 'name'})
        y_test = x_test_df["name"].squeeze().astype(str)
        x_test_df.drop(columns=["name"], inplace=True)

        binary_columns = list(x_train_df.columns[x_train_df.isin([0,1]).all()])
        if len(binary_columns) > 0:
            x_train_df[binary_columns] = x_train_df[binary_columns].astype(str)
            x_test_df[binary_columns] = x_test_df[binary_columns].astype(str)

        feature_names = x_train_df.columns

        for sel_model in models:

            fi_path = f"../results_new/{sel_dataset}/{sel_model}/fi_test.csv"

            if os.path.exists(fi_path):

                fi = pd.read_csv(fi_path)
                fi_attr = fi.sort_values(by="importance", ascending=False)["attribute"].values

                file_path= f"../results_all/{sel_dataset}/{sel_model}/ruleset{class_type}.json"
                with open(file_path, 'r') as json_file:
                    ruleset_json_read = json.load(json_file)

                classifier = JSONSerializer.deserialize(ruleset_json_read, target_class=ClassificationRuleSet)
                if "c2" in class_type:
                    classifier.update(x_train_df, y_train, measure=c2)
                elif "precision" in class_type:
                    classifier.update(x_train_df, y_train, measure=precision)

                rules_features = [rmatrix_unique_rules(rule, feature_names) for rule in classifier.rules]
                rules_lengths = [len(rule) for rule in rules_features]
                rule_max_length = np.max(rules_lengths)

                fi_features_max = fi_attr[:rule_max_length]
                fi_features_len = [fi_attr[:rule_len] for rule_len in rules_lengths]

                mi = [mutual_inclusion(rules_features[obs], fi_attr) for obs in range(len(rules_features))]
                mi_max = [mutual_inclusion(rules_features[obs], fi_features_max) for obs in range(len(rules_features))]
                mi_len = [mutual_inclusion(rules_features[obs], fi_features_len[obs]) for obs in range(len(rules_features))]

                mi_df = pd.DataFrame({'mi_all': mi, 'mi_max': mi_max, 'mi_len': mi_len})
                mi_df["dataset"] = sel_dataset
                mi_df["model"] = sel_model
                mi_df["rmatrix"] = "c2" if "c2" in class_type else "precision"

                results_all = pd.concat([results_all, mi_df])

In [None]:
results_all.to_csv("results/mutual_inclusion_global.csv", index=False)

# Mutual inclusion - without fi

In [None]:
class_types = ["_filterFF_precision_approx"]

results_all = pd.DataFrame()

for class_type in class_types:

    for sel_dataset in tqdm(datasets):

        models = np.unique(bb_models[bb_models["dataset"]==sel_dataset]["model"])

        x_train_df = pd.read_csv(f"../results_all/{sel_dataset}/train.csv")
        x_train_df = x_train_df.rename(columns={'target': 'name'})
        y_train = x_train_df["name"].squeeze().astype(str)
        x_train_df.drop(columns=["name"], inplace=True)

        x_test_df = pd.read_csv(f"../results_all/{sel_dataset}/test.csv")
        x_test_df = x_test_df.rename(columns={'target': 'name'})
        y_test = x_test_df["name"].squeeze().astype(str)
        x_test_df.drop(columns=["name"], inplace=True)

        binary_columns = list(x_train_df.columns[x_train_df.isin([0,1]).all()])
        if len(binary_columns) > 0:
            x_train_df[binary_columns] = x_train_df[binary_columns].astype(str)
            x_test_df[binary_columns] = x_test_df[binary_columns].astype(str)

        feature_names = x_train_df.columns

        for sel_model in models:

            fi_path = f"../results_new/{sel_dataset}/{sel_model}/fi_test.csv"
            file_path= f"../results_all/{sel_dataset}/{sel_model}/ruleset{class_type}.json"

            if os.path.exists(fi_path) and os.path.exists(file_path):

                fi = pd.read_csv(fi_path)
                fi_attr = fi.sort_values(by="importance", ascending=False)["attribute"].values
                
                with open(file_path, 'r') as json_file:
                    ruleset_json_read = json.load(json_file)

                classifier = JSONSerializer.deserialize(ruleset_json_read, target_class=ClassificationRuleSet)
                if "c2" in class_type:
                    classifier.update(x_train_df, y_train, measure=c2)
                elif "precision" in class_type:
                    classifier.update(x_train_df, y_train, measure=precision)

                rules_features = [rmatrix_unique_rules(rule, feature_names) for rule in classifier.rules]
                rules_lengths = [len(rule) for rule in rules_features]
                rule_max_length = np.max(rules_lengths)

                fi_features_max = fi_attr[:rule_max_length]
                fi_features_len = [fi_attr[:rule_len] for rule_len in rules_lengths]

                mi = [mutual_inclusion(rules_features[obs], fi_attr) for obs in range(len(rules_features))]
                mi_max = [mutual_inclusion(rules_features[obs], fi_features_max) for obs in range(len(rules_features))]
                mi_len = [mutual_inclusion(rules_features[obs], fi_features_len[obs]) for obs in range(len(rules_features))]

                mi_df = pd.DataFrame({'mi_all': mi, 'mi_max': mi_max, 'mi_len': mi_len})
                mi_df["dataset"] = sel_dataset
                mi_df["model"] = sel_model
                mi_df["rmatrix"] = "c2" if "c2" in class_type else "precision"

                results_all = pd.concat([results_all, mi_df])

In [None]:
results_all.to_csv("results/mutual_inclusion_without_fi.csv", index=False)