# Performance of Explainable AI methods in Asset Failure Prediction

In [None]:
import pandas as pd
from tqdm.notebook import tqdm
import re
import numpy as np
import warnings
import itertools
import time
from datetime import datetime

warnings.simplefilter("ignore")

from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import f1_score, confusion_matrix, accuracy_score

# models
from sklearn.svm import SVC, SVR
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.neural_network import MLPClassifier, MLPRegressor
from xgboost import XGBClassifier, XGBRegressor
from interpret.glassbox import ExplainableBoostingClassifier, ExplainableBoostingRegressor

def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        print(f'Time elapsed ({method.__name__}): {round(te-ts, 1)} s.')
        return result
    return timed

## DataFrame preparation

In [None]:
df = pd.read_csv(r"D:\Turbofan dataset\CMaps\train_FD003.txt", sep=" ", header=None)

def prepare_df(df, rul_normal, rul_anomaly, limit_rul=False, dropna=False, special_scale=False):
    df = df.rename(columns= {0: 'unit', 1: 'cycle'})
    df = df.iloc[:, :-2]

    # calculate RUL
    df_max_cycle = df.groupby("unit").max()['cycle']

    print("Calculating RUL of each observation")
    for ind in tqdm(df.index.values):
        unit = df.loc[ind, "unit"]
        cycle = df.loc[ind, "cycle"]
        max_cycle = df_max_cycle.loc[unit]
        df.loc[ind, "RUL"] = max_cycle - cycle

    df.columns = ["unit", "cycle"] + [f"Setting {i}" for i in range(1, 4)] +  [f"Measurement {i}" for i in range(1, 22)] + ["RUL"]
    df["RUL_binary"] = df["RUL"].apply(lambda x: 0 if x >= rul_normal else 1 if x <= rul_anomaly else None)

    if limit_rul:
        df["RUL"] = df["RUL"].apply(lambda x: min(x, rul_normal))

    if dropna:
        df = df.dropna()
    
    if special_scale:
        new_df = pd.DataFrame()
        feature_columns = [i for i in df.columns if "Setting" in i or "Measurement" in i]
        for unit in df["unit"].unique():
            df_unit = df.loc[(df["unit"] == unit)]
            df_unit_normal = df_unit.loc[df_unit["cycle"] <= 50]
            scaler = MinMaxScaler()
            scaler.fit(df_unit_normal[feature_columns])
            df_unit[feature_columns] = scaler.transform(df_unit[feature_columns])
            new_df = new_df.append(df_unit)
        
        new_df = new_df.loc[new_df["cycle"] > 50]
        return new_df
                

    return df

df = prepare_df(df, rul_normal=130, rul_anomaly=40, limit_rul=True, dropna=False, special_scale=True)

id_column = "unit"
X_columns = [i for i in df.columns if "Setting" in i or "Measurement" in i]
y_column = ["RUL"]
rul_column = ["RUL"]

id_array = df[id_column].values
X_array = df[X_columns].values
y_array = df[y_column].values
rul_array = df[rul_column].values

In [None]:
if len(np.unique(y_array)) <= 2:
    classification = True
else:
    classification = False

## Hyperparameters tuning / model selection

In [None]:
from sklearn.metrics import r2_score, mean_squared_error
import copy

def cv_train_model(ids, X, y, model, n_folds, verbose=True, classification=True):
    """ Function which trains the model using cross-validation method"""
   
    kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
    accuracies = []
    for train_ids, test_ids in kf.split(np.unique(ids)):
        bool_train = np.isin(ids, train_ids)
        bool_test = np.isin(ids, test_ids)
        
        ids_train, X_train, y_train = ids[bool_train], X[bool_train], y[bool_train]
        ids_test, X_test, y_test = ids[bool_test], X[bool_test], y[bool_test]
        
        try:
            model.fit(X_train, y_train)
        except Exception as e:
            print(X_train.shape, y_train.shape)
            print(X_train[0])
            raise ValueError("Error during model fitting : %s" % e)
        
        y_pred = model.predict(X_test)
        if classification:
            accuracy = accuracy_score(y_test, y_pred)
        else:
            # accuracy = r2_score(y_test, y_pred)
            accuracy = -np.sqrt(mean_squared_error(y_test, y_pred))
        accuracies.append(accuracy)
        
    mean_acc = np.mean(accuracies)
    if verbose:
        print("Mean accuracy =", round(mean_acc, 3))
        
    return mean_acc

def grid_search(ids, X, y, model_class, search_space_dict, n_folds):
    search_space = list(itertools.product(*search_space_dict.values()))
    param_dict = {}
    best_params = {}
    best_acc = -1000
    for params in tqdm(search_space, colour="red"):
        for k, v in zip(search_space_dict.keys(), params):
            param_dict[k] = v
        
        classification = len(np.unique(y_array)) <= 2
        
        model = model_class(**param_dict)
        acc = cv_train_model(id_array, X_array, y_array, model, 5, verbose=False, classification=classification)
        if acc > best_acc:
            best_params = copy.deepcopy(param_dict)
            best_acc = acc

            print("Best params so far: %s (acc_score = %.3f)" % (best_params, best_acc))
            
    return best_params, best_acc


#### XGBoost

In [None]:
if classification:
    # classification
    print("Running classification task...")
    search_space_dict = {"n_estimators": [32, 64, 128, 256], 
                         "max_depth": [4, 8, 12],
                         "learning_rate": [0.01, 0.1, 0.2, 0.3, 0.5],
                         "objective": ["binary:logistic", "binary:logitraw", "binary:hinge"],
                         "verbosity": [0]
                         }
    params, score = grid_search(id_array, X_array, y_array, XGBClassifier, search_space_dict, 4)
    print(f"XGBoost Classifier: \n  params: {params}\n  score: {round(score, 3)}")
    
else:
    # regression
    print("Running regression task...")
    search_space_dict = { "n_estimators": [8, 16, 32, 64, 128], 
                         "max_depth": [6, 9],
                         "learning_rate": [0.01, 0.1, 0.2, 0.3, 0.5],
                         "objective": ["reg:squarederror"],
                         "verbosity": [0]
                     }
    params, score = grid_search(id_array, X_array, y_array, XGBRegressor, search_space_dict, 4)
    print(f"XGBoost Regressor: \n  params: {params}\n  score: {round(score, 3)}")

#### Random Forest

In [None]:
if classification:
    # classification
    print("Running classification task...")
    search_space_dict = {"n_estimators": [8, 16, 32, 64], 
                     "max_depth": [2, 4, 8],
                     "criterion": ["gini", "entropy"],
                     "min_samples_leaf": [2, 4, 8]
                     }
    params, score = grid_search(id_array, X_array, y_array, RandomForestClassifier, search_space_dict, 5)
    print(f"XGBoost Classifier: \n  params: {params}\n  score: {round(score, 3)}")
    
else:
    # regression
    print("Running regression task...")
    search_space_dict = {"n_estimators": [8, 16, 32, 64, 128], 
                    "max_depth": [6, 9, 12],
                    "criterion": ["mse"],
                    "min_samples_leaf": [4, 8, 16]
                     }
    params, score = grid_search(id_array, X_array, y_array, RandomForestRegressor, search_space_dict, 4)
    print(f"Random Forest Regressor: \n  params: {params}\n  score: {round(score, 3)}")

#### Support Vector Machine

In [None]:
if classification:
    # classification
    print("Running classification task...")
    search_space_dict = {'C': [10, 100], 
                     'gamma': [1, 0.1, 0.01],
                     'kernel': ['rbf', 'poly']}
    params, score = grid_search(id_array, X_array, y_array, SVC, search_space_dict, 4)
    print(f"XGBoost Classifier: \n  params: {params}\n  score: {round(score, 3)}")
    
else:
    # regression
    print("Running regression task...")
    search_space_dict = {'C': [0.1, 1, 10, 100], 
                     'gamma': [1, 0.1, 0.01, 0.001],
                     'kernel': ['rbf', 'poly', 'sigmoid']}
    search_space_dict = {'C': [1, 10, 100, 200], 
                     'gamma': ['scale'],
                     'kernel': ['rbf', 'poly', 'sigmoid']}
    params, score = grid_search(id_array, X_array, y_array, SVR, search_space_dict, 4)
    print(f"SVR: \n  params: {params}\n  score: {round(score, 3)}")

#### Multi Layer Perceptron

In [None]:
if classification:
    # classification
    print("Running classification task...")
    search_space_dict = {"activation": ["tanh", "relu", "logistic"],
                     "hidden_layer_sizes": [(60,), (60, 30), (60, 30, 20), (30, ), (30, 20), (30, 20, 15)], 
                     "batch_size": [16, 32], 
                     "max_iter": [25]
}
    params, score = grid_search(id_array, X_array, y_array, MLPClassifier, search_space_dict, 4)
    print(f"XGBoost Classifier: \n  params: {params}\n  score: {round(score, 3)}")
    
else:
    # regression
    print("Running regression task...")
    search_space_dict = {"activation": ["tanh", "relu", "logistic"],
                     "hidden_layer_sizes": [(60,), (60, 30), (60, 30, 20), (30, ), (30, 20), (30, 20, 15)], 
                     "batch_size": [16, 32], 
                     "max_iter": [25]
}
    params, score = grid_search(id_array, X_array, y_array, MLPRegressor, search_space_dict, 4)
    print(f"MLP Regressor: \n  params: {params}\n  score: {round(score, 3)}")

#### Explainable Boosting Machine

In [None]:
if classification:
    # classification
    print("Running classification task...")
    search_space_dict = {"max_bins": [64, 128, 256], 
                       "min_samples_leaf": [2, 4, 8],
                       "max_leaves": [2, 3, 4],
                       "learning_rate": [0.001, 0.01, 0.1],
}
    params, score = grid_search(id_array, X_array, y_array, ExplainableBoostingClassifier, search_space_dict, 5)
    print(f"XGBoost Classifier: \n  params: {params}\n  score: {round(score, 3)}")
    
else:
    # regression
    print("Running regression task...")
    search_space_dict = {"max_bins": [64, 128, 256], 
                       "min_samples_leaf": [2, 4, 8],
                       "learning_rate": [0.001, 0.01, 0.1],
}
    params, score = grid_search(id_array, X_array, y_array, ExplainableBoostingRegressor, search_space_dict, 5)
    print(f"XGBoost Regressor: \n  params: {params}\n  score: {round(score, 3)}")

## Refit with best params

In [None]:
train_ids, test_ids = train_test_split(np.unique(id_array), test_size=0.2, random_state=42)

bool_train = np.isin(id_array, train_ids)
bool_test = np.isin(id_array, test_ids)

ids_train, X_train, y_train, rul_train = id_array[bool_train], X_array[bool_train], y_array[bool_train], rul_array[bool_train]
ids_test, X_test, y_test, rul_test = id_array[bool_test], X_array[bool_test], y_array[bool_test], rul_array[bool_test]

In [None]:
def get_fitted_model(model_class, model_params, model_name, X_train, y_train, save=False):
    time_start = time.time()
    model = model_class(**model_params)
    model.fit(X_train, y_train)
    time_end = time.time()
    print("%s training finished in %.1f seconds." % (model_name, time_end - time_start))

    return model

if classification:
    # NOTE: INVALID HYPERPARAMTERS -> THESE WERE USED FOR REGRESSION, RECALCULATE IF NEEDED
    xgboost_params = {'n_estimators': 64, 'max_depth': 9, 'learning_rate': 0.1, 'objective': 'reg:squarederror', 'verbosity': 0}
    # xgboost_params  = {'n_estimators': 252, 'max_depth': 12, 'learning_rate': 0.5, 'objective': 'reg:squarederror', 'verbosity': 0}
    xgboost_model = get_fitted_model(XGBClassifier, xgboost_params, "XGBoost", X_train, y_train)

    rf_params = {'n_estimators': 128, 'max_depth': 12, 'criterion': 'mse', 'min_samples_leaf': 4}
    rf_model = get_fitted_model(RandomForestClassifier, rf_params, "Random Forest", X_train, y_train)

    svm_params = {'C': 100, 'gamma': 1, 'kernel': 'poly'}
    svm_model = get_fitted_model(SVC, svm_params, "Support Vector Machine", X_train, y_train)

    mlp_params = {'activation': 'relu', 'hidden_layer_sizes': (30, 20, 15), 'batch_size': 16, 'max_iter': 25}
    mlp_model = get_fitted_model(MLPClassifier, mlp_params, "Multi-Layer Perceptron", X_train, y_train)

    ebm_params = {"feature_names": X_columns, "interactions": 0}
    ebm = get_fitted_model(ExplainableBoostingClassifier, ebm_params, "Explainable Boosting Machine", X_train, y_train)

else:
    xgboost_params = {'n_estimators': 64, 'max_depth': 6, 'learning_rate': 0.1, 'objective': 'reg:squarederror', 'verbosity': 0}
    xgboost_model = get_fitted_model(XGBRegressor, xgboost_params, "XGBoost Reg", X_train, y_train)

    rf_params = {'n_estimators': 64, 'max_depth': 12, 'criterion': 'mse', 'min_samples_leaf': 4, }
    rf_model = get_fitted_model(RandomForestRegressor, rf_params, "Random Forest", X_train, y_train)

    # svm_params = {'C': 100, 'gamma': 1.0, 'kernel': 'poly',}
    svm_params = {'C': 10, 'gamma': 0.1, 'kernel': 'rbf'}
    svm_model = get_fitted_model(SVR, svm_params, "Support Vector Machine", X_train, y_train)

    mlp_params = {'activation': 'relu', 'hidden_layer_sizes': (60, 30, 20), 'batch_size': 16, 'max_iter': 25}
    mlp_model = get_fitted_model(MLPRegressor, mlp_params, "Multi-Layer Perceptron", X_train, y_train)

    ebm_params = {"feature_names": X_columns, "interactions": 0, }
    ebm = get_fitted_model(ExplainableBoostingRegressor, ebm_params, "Explainable Boosting Machine", X_train, y_train)

print("Done.")

#### Model metrics

In [None]:
from sklearn.metrics import classification_report, mean_squared_error, r2_score
import matplotlib.pyplot as plt

for name, model in zip(["xgboost", "rf", "svm", "mlp", "ebm"], [xgboost_model, rf_model, svm_model, mlp_model, ebm]):
    
    y_pred = model.predict(X_test)
    
    if classification:
        print(name)
        print(classification_report(y_pred, y_test, digits=3))
        print("-------------------------------------------------------------")
        
    else:
        rmse = np.sqrt(mean_squared_error(y_pred, y_test))
        print("%s: R2 = %.3f, RMSE=%.1f" % (name, r2_score(y_pred, y_test), rmse))

        plt.figure()
        plt.title(name)
        plt.scatter(y_test, y_pred, s=3, alpha=0.3)
        plt.show()

## XAI models

In [None]:
from interpret.blackbox import ShapKernel, LimeTabular
from interpret import show

In [None]:
# limit number of units used for explanations
np.random.seed(42)
n_explain_ids = 10
ids_explain = np.random.choice(np.unique(ids_test), size=n_explain_ids, replace=False)

bool_explain = np.isin(ids_test, ids_explain)
ids_explain = ids_test[bool_explain]
X_explain = X_test[bool_explain]
y_explain = y_test[bool_explain]
rul_explain = rul_test[bool_explain]
df_explain = pd.DataFrame(data=X_explain, columns=X_columns)         
df_explain_sample = df_explain.sample(10)

In [None]:
# build data for explanations -> only failure samples
df_explain_sample = df_explain.sample(50)

# we are only explaining data of malfunctioning equipment
only_class_1 = y_explain < 130
df_explain_1 = df_explain[only_class_1]
y_explain_1 = y_explain[only_class_1]
rul_explain_1 = rul_explain[only_class_1]
ids_explain_1 = ids_explain.reshape((-1, 1))[only_class_1]

In [None]:
def extract_data_from_explanation(explanation):
    feature_names = explanation["names"]
    feature_values = explanation["values"]
    explain_scores = explanation["scores"]
    actual_output = explanation["perf"]["actual"]
    predicted_output = explanation["perf"]["predicted"]
    
    data = pd.DataFrame(data=[feature_names, feature_values, explain_scores])#, columns=["Feature", "Value", "Explain_Score"])
    data = pd.DataFrame(data={"Feature": feature_names, "Value": feature_values, "Explain_Score": explain_scores, })
    data["output_actual"] = actual_output
    data["output_predicted"] = predicted_output
    data["abs_explain_score"] = abs(data["Explain_Score"])
    data = data.sort_values(by="abs_explain_score", ascending=False)
    data["Rank"] = np.arange(len(data)) + 1
    
    return data

def generate_local_explanations(explainer, df, y, ids, rul):
    local_explanations = explainer.explain_local(df, y)
    
    explanations = pd.DataFrame()
    for i, (_id, _rul) in tqdm(enumerate(zip(ids, rul)), total=rul.shape[0]):   
        data_explain = extract_data_from_explanation(local_explanations.data(i))
        data_explain["unit"] = _id
        data_explain["RUL"] = _rul

        explanations = explanations.append(data_explain)

    explanations = explanations.reset_index(drop=True)
    return explanations


### Explanations

#### Blackbox models

In [None]:
blackbox_models = {
    "xgb": xgboost_model, 
    "rf": rf_model, 
   "svm": svm_model, 
   "mlp": mlp_model,
}

for name, model in blackbox_models.items():
    shap_kernel = ShapKernel(model.predict, df_explain_sample)
    lime_tabular = LimeTabular(model.predict, df_explain_sample, n_jobs=6, explain_kwargs={"num_features": len(X_columns), "num_samples": 1500}, 
                              mode='regression')
    blackbox_explainers = {
        "shap": shap_kernel, 
        "lime": lime_tabular
    } 
    
    for exp_name, explainer in blackbox_explainers.items():
        time_start = time.time()
        timestamp = datetime.today().strftime("%m%d-%H%M")
        print("Explaining %s with %s... (%s)" % (name, exp_name, timestamp))
        
        explanations = generate_local_explanations(explainer, df_explain_1, y_explain_1, ids_explain_1, rul_explain_1)
        time_end = time.time()
        explanations.to_csv(rf"results/{name}_{exp_name}_explanations_{timestamp}.csv")
        print("Explanations finished in %.1f seconds." % (time_end - time_start))
    

### Explanations for EBM

In [None]:
time_start = time.time()
print("Explaining ebm")
timestamp = datetime.today().strftime("%m%d-%H%M")
explanations = generate_local_explanations(ebm, df_explain_1, y_explain_1, ids_explain_1, rul_explain_1)
time_end = time.time()
explanations.to_csv(rf"results/ebm_explanations_{timestamp}.csv")
print("Explanations finished in %.1f seconds." % (time_end - time_start))