In [1]:
#pip install skope-rules

In [2]:
#pip install py-ciu==0.1.1

In [3]:
import pandas as pd
import numpy as np
import sklearn
import sklearn.cluster
import warnings
warnings.filterwarnings('ignore')
import matplotlib.pyplot as plt
import random
from ciu import determine_ciu
import six
import sys
import os
sys.modules['sklearn.externals.six'] = six
from skrules import SkopeRules
import openml
from sklearn.impute import SimpleImputer
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import LabelEncoder
from scipy.sparse import csr_matrix


In [4]:
## New proposed separability

#calculate the centroid (mean vector) of the feature values. 
#This represents the average position of each class in the feature space
def calculate_centroids(X, labels):
    unique_labels = np.unique(labels)
    centroids = {label: X[labels == label].mean(axis=0) for label in unique_labels}
    return centroids

#Calculate the average variance within each class. 
#This measures how spread out each class is around its centroid.
def calculate_within_class_variance(X, labels, centroids):
    unique_labels = np.unique(labels)
    variances = {label: ((X[labels == label] - centroids[label])**2).mean() for label in unique_labels}
    total_variance = np.mean(list(variances.values()))
    return total_variance

#Calculate the distance (e.g., Euclidean distance) between the centroids of each pair of classes. 
#This measures how far apart the classes are from each other.
def calculate_between_class_separation(centroids):
    unique_labels = list(centroids.keys())
    separations = []
    for i in range(len(unique_labels)):
        for j in range(i+1, len(unique_labels)):
            separation = np.linalg.norm(centroids[unique_labels[i]] - centroids[unique_labels[j]])
            separations.append(separation)
    avg_separation = np.mean(separations)
    return avg_separation

def calculate_separability(X, labels):
    centroids = calculate_centroids(X, labels)
    within_class_var = calculate_within_class_variance(X, labels, centroids)
    between_class_sep = calculate_between_class_separation(centroids)
    
    if within_class_var == 0:  
        return np.inf
    
    separability_score = between_class_sep / within_class_var
    return separability_score 

In [5]:
def calc_identity(exp1, exp2):
    dis = np.array([np.array_equal(exp1[i], exp2[i]) for i in range(len(exp1))])
    total = dis.shape[0]
    true = np.sum(dis)
    score = (total - true) / total
    return score * 100, true, total

def calc_stability(exp, labels):
    total = labels.shape[0]
    label_values = np.unique(labels)
    n_clusters = label_values.shape[0]
    init = np.array([[np.average(exp[np.where(labels == i)], axis = 0)] for i in label_values]).squeeze()
    ct = sklearn.cluster.KMeans(n_clusters = n_clusters, random_state=1, n_init=10, init = init)
    ct.fit(exp)
    error = np.sum(np.abs(labels-ct.labels_))
    if error/total > 0.5:
        error = total-error
    return error, total

def enc_exp(exp, feature_num):
    enc_exp = np.zeros((len(exp),feature_num))
    for i in range(len(exp)):
        for j in range(len(exp[i])):
            enc_exp[i][int(exp[i,j,0])] = exp[i,j,1]
    return enc_exp

In [6]:
def permute(x, x_dash):
    x = x.copy()
    x_dash = x_dash.copy()
    x_rand = np.random.random(x.shape[0])
    x_new = [x[i] if x_rand[i] > 0.5 else x_dash[i] for i in range(len(x))]
    x_dash_new = [x_dash[i] if x_rand[i] > 0.5 else x[i] for i in range(len(x))]
    return x_new, x_dash_new

def calc_trust_score(test_x, exp, m, feat_list, model):
    total_recalls = []
    for i in range(len(test_x)):
        feat_score = np.zeros((len(feat_list)))
        for _ in range(m):
            x = test_x[i].copy()
            x_dash = test_x[np.random.randint(0,len(test_x))].copy()
            x_perm, x_dash_perm = permute(x, x_dash)
            for j in range(len(feat_list)):
                z = np.concatenate((x_perm[:j+1], x_dash_perm[j+1:]))
                z_dash = np.concatenate((x_dash_perm[:j], x_perm[j:]))
                p_z = model.predict_proba(np.array(z).reshape(1, -1))
                p_z_dash = model.predict_proba(z_dash.reshape(1,-1))
                feat_score[j] = feat_score[j] + np.linalg.norm(p_z-p_z_dash)
        feat_score = feat_score/m
        gold_feat_fs = np.argpartition(feat_score, -6)[-6:]
        recall = len(set(exp[i][:6, 0]).intersection(set(gold_feat_fs)))/6
        total_recalls.append(recall)
    return np.mean(total_recalls)

In [7]:
datasets_folder = "datasets"

folder_names = []
attribute_names_list = []
categorical_indicator_list = []
X_list = []
y_list = []

for folder_name in os.listdir(datasets_folder):
    folder_path = os.path.join(datasets_folder, folder_name)
    
    if os.path.isdir(folder_path):
        attribute_names_path = os.path.join(folder_path, "attribute_names.csv")
        categorical_indicator_path = os.path.join(folder_path, "categorical_indicator.csv")
        X_path = os.path.join(folder_path, "X.csv")
        y_path = os.path.join(folder_path, "y.csv")
        
        attribute_names_df = pd.read_csv(attribute_names_path)
        categorical_indicator_df = pd.read_csv(categorical_indicator_path)
        X_df = pd.read_csv(X_path)
        y_df = pd.read_csv(y_path)

        unique_classes = y_df.iloc[:, 0].unique()
        sampled_indices = []
        for cls in unique_classes:
            cls_indices = y_df[y_df.iloc[:, 0] == cls].index
            sampled_indices.append(np.random.choice(cls_indices, 1)[0])

        sampled_indices = np.array(sampled_indices)

        needed_samples = 100 - len(sampled_indices)
        seed_value = 42 
        np.random.seed(seed_value)

        if needed_samples > 0:
            additional_indices = np.random.choice(y_df.index, needed_samples, replace=False)
            sampled_indices = np.concatenate([sampled_indices, additional_indices])
        
        
        X_list.append(X_df.loc[sampled_indices])
        y_list.append(y_df.loc[sampled_indices])

        folder_names.append(folder_name)
        attribute_names_list.append(attribute_names_df)
        categorical_indicator_list.append(categorical_indicator_df)
        
        
def convert_to_numeric_and_impute(X_list, y_list):
    imputer = SimpleImputer(strategy='mean')
    label_encoder = LabelEncoder()

    def process_X_dataframe(df):
        for column in df.columns:
            if isinstance(df[column].iloc[0], csr_matrix):
                df[column] = df[column].apply(lambda x: x.toarray()[0,0] if x.shape[1] == 1 else x.toarray())

            df[column] = pd.to_numeric(df[column], errors='coerce')

            if df[column].dtype == 'object':
                df[column] = df[column].fillna('Missing')
                df[column] = label_encoder.fit_transform(df[column])
            else:
                if df[column].notna().any():
                    df[column] = imputer.fit_transform(df[[column]]).ravel()
                else:
                    df[column] = df[column].fillna(0)
        return df

    def process_y_dataframe(df):
        if df.dtypes[0] == 'object' or not np.issubdtype(df.dtypes[0], np.number):
            df_encoded = df.apply(lambda x: label_encoder.fit_transform(x))
            df_encoded = df_encoded.rename(columns={df_encoded.columns[0]: 'class'})
            return df_encoded
        
        else:
            return df
            print('aaaah')


        
    X_list = [process_X_dataframe(df) for df in X_list]
    y_list = [process_y_dataframe(df) for df in y_list]

    return X_list, y_list

X_list, y_list = convert_to_numeric_and_impute(X_list, y_list)

In [8]:
import pandas as pd
import time
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from ciu import determine_ciu

def exp_fn_ciu(xtest, model, X_train):
    exp1 = []
    for i in range(len(xtest)):
        exp = determine_ciu(xtest.iloc[i:i+1], model.predict_proba, X_train.to_dict('list'), samples=1000, prediction_index=1)
        exp_list = [[feat_list.index(i), exp.ci[i]] for i in exp.ci]
        exp1.append(exp_list)
    return np.array(exp1)

def permute(x, x_dash):
    x = x.copy()
    x_dash = x_dash.copy()
    x_rand = np.random.random(x.shape[0])
    x_new = [x[i] if x_rand[i] > 0.5 else x_dash[i] for i in range(len(x))]
    x_dash_new = [x_dash[i] if x_rand[i] > 0.5 else x[i] for i in range(len(x))]
    return x_new, x_dash_new

def calc_trust_score(test_x, exp, m, feat_list, model):
    total_recalls = []
    for i in range(len(test_x)):
        feat_score = np.zeros((len(feat_list)))
        for _ in range(m):
            x = test_x[i].copy()
            x_dash = test_x[np.random.randint(0,len(test_x))].copy()
            x_perm, x_dash_perm = permute(x, x_dash)
            for j in range(len(feat_list)):
                z = np.concatenate((x_perm[:j+1], x_dash_perm[j+1:]))
                z_dash = np.concatenate((x_dash_perm[:j], x_perm[j:]))
                z = np.array(z).reshape(1, -1)
                z_dash = np.array(z_dash).reshape(1, -1)
                
                p_z = model.predict_proba(z)
                p_z_dash = model.predict_proba(z_dash)
                feat_score[j] = feat_score[j] + np.linalg.norm(p_z-p_z_dash)
        feat_score = feat_score/m
        gold_feat_fs = np.argpartition(feat_score, -6)[-6:]
        recall = len(set(exp[i][:6, 0]).intersection(set(gold_feat_fs)))/6
        total_recalls.append(recall)
    return np.mean(total_recalls)

In [9]:
def interpret_ciu_as_prediction(ciu_result, threshold=0.5):
    # Assuming ciu_result is a list of tuples (feature, importance)
    # And that a higher cumulative importance suggests a particular class (e.g., class 1)
    cumulative_importance = sum(importance for feature, importance in ciu_result)
    return 1 if cumulative_importance > threshold else 0


def calculate_fidelity_score(X_test, model, ciu_results):
    model_predictions = model.predict(X_test)
    ciu_predictions = [interpret_ciu_as_prediction(ciu_result) for ciu_result in ciu_results]
    correct_predictions = sum(ciu_pred == model_pred for ciu_pred, model_pred in zip(ciu_predictions, model_predictions))
    fidelity_score = correct_predictions / len(X_test)
    return fidelity_score


In [None]:
ciu_identity_scores = []
ciu_lime_separability_scores = []
ciu_speed_scores = []
ciu_fidelity_scores = []

df_interp = pd.DataFrame(columns=["Dataset", "Fidelity", "Identity", "Separability", "Speed"])


for i in range(len(X_list)):
    X, y = X_list[i], y_list[i].squeeze()  # Ensure y is a 1D array
    
    # Convert X and y to numeric if not already
    
    #Calculate overall separability
    ciu_lime_separability = calculate_separability(X, y)
    ciu_lime_separability_scores.append(ciu_lime_separability)
    
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=555)
    feat_list = X_train.columns.tolist()
    model = RandomForestClassifier()
    model.fit(X_train, y_train)

    # Evaluate the model
    print(f"Dataset {folder_names[i]} - Accuracy: {accuracy_score(y_test, model.predict(X_test))}")
    
    #Applying bulk CIU
    start_time = time.time()
    bulk_ciu_result = exp_fn_ciu(X_test, model, X_train)
    end_time = time.time()
    ciu_speed = end_time - start_time
    ciu_speed_scores.append(ciu_speed)
    bulk_ciu_result2 = exp_fn_ciu(X_test, model, X_train)
    
    #Calculating identity score
    ciu_identity = calc_identity(bulk_ciu_result, bulk_ciu_result2)
    ciu_identity_scores.append(ciu_identity[0])
    bulk_ciu_result = exp_fn_ciu(X_test, model, X_train)
    
    # Calculating fidelity scores
    ciu_fidelity = calculate_fidelity_score(X_test, model, bulk_ciu_result)
    ciu_fidelity_scores.append(ciu_fidelity)
    
    
    results = {
        "Dataset": i,
        "Fidelity": ciu_fidelity,
        "Identity": ciu_identity,
        "Separability": ciu_lime_separability,
        "Speed": ciu_speed
    }
    
    df_interp = df_interp.append(results, ignore_index=True)
    
    print(f"Identity score is {ciu_identity[0]}")
    print(f"Separability score is {ciu_lime_separability}")

    print(f"Speed: {round(ciu_speed, 2)}")
    print(f"fidelity: {ciu_fidelity}")

Dataset 307 - Accuracy: 0.55
Identity score is 0.0
Separability score is 8.673951444138323
Speed: 2.39
fidelity: 0.15
Dataset 1067 - Accuracy: 0.85
Identity score is 40.0
Separability score is 0.0003532835925087861
Speed: 4.37
fidelity: 0.15
Dataset 50 - Accuracy: 0.65
Identity score is 100.0
Separability score is inf
Speed: 1.34
fidelity: 0.0
Dataset 32 - Accuracy: 0.8
Identity score is 0.0
Separability score is 0.3330128424502104
Speed: 3.27
fidelity: 0.0
Dataset 1466 - Accuracy: 0.95
Identity score is 0.0
Separability score is 0.017689787072479875
Speed: 10.42
fidelity: 0.3
Dataset 1459 - Accuracy: 0.25
Identity score is 5.0
Separability score is 0.1937284183100717
Speed: 1.3
fidelity: 0.2
Dataset 1050 - Accuracy: 0.9
Identity score is 80.0
Separability score is 5.317486448639185e-06
Speed: 10.94
fidelity: 0.0
Dataset 1068 - Accuracy: 0.9
Identity score is 60.0
Separability score is 0.00011024498207074123
Speed: 4.63
fidelity: 0.0
Dataset 1467 - Accuracy: 0.9
Identity score is 10.0


In [None]:
df_interp

In [None]:
#df_t = pd.concat([
#    pd.Series(ciu_lime_separability_scores, name='Separability_scores'),
#    pd.Series(ciu_identity_scores, name='CIU_identity_scores'),
#    pd.Series(ciu_speed_scores, name='CIU_speed_scores')
#], axis=1)

In [None]:
df_interp.to_csv('records_ciu.csv')