# Analysis of AI4I 2020 Predictive Maintenance Dataset

### 1.	Design a decision tree based explainable model and provide an explanatory interface 

In [None]:
import numpy as np
import pandas as pd
import sklearn
import xgboost
#import matplotlib.pylab as plt
import matplotlib.pyplot as plt

from sklearn import tree, datasets, ensemble, model_selection
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay, classification_report

from omnixai.data.tabular import Tabular
from omnixai.preprocessing.tabular import TabularTransform
from omnixai.explainers.tabular import TabularExplainer
from omnixai.explainers.prediction import PredictionAnalyzer
from omnixai.visualization.dashboard import Dashboard

%matplotlib inline

In [None]:
data = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/00601/ai4i2020.csv')
data.head()

In [None]:
# Update column names
data.columns =  data.columns.str.replace(' ','_')
data.columns =  data.columns.str.replace(r'\[','', regex=True)
data.columns =  data.columns.str.replace(r'\]','', regex=True)
data

print(data.dtypes)
# ignoring product_id
data['Type'].unique()
data.isnull().sum()

# Replace type with integers since its categorical , can do One-hot encoding and apply column transformation.
data.replace(['L', 'M', 'H'], [1, 2, 3], inplace=True)
data.head()

# Describe data
data.describe()

In [None]:
# Format data for classification
X = data.drop(['UDI','Machine_failure', 'Product_ID'], axis=1).copy()
# X = data.drop(['UDI','Machine_failure', 'Product_ID', 'TWF','HDF','PWF','OSF','RNF'], axis=1).copy()
X.head()

y = data['Machine_failure'].copy()
y.head()
y.value_counts()

In [None]:
# Split data set to train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
init_dtree = DecisionTreeClassifier(random_state=42)
init_dtree = init_dtree.fit(X_train, y_train)

plt.figure(figsize=(15, 7.5))
plot_tree(init_dtree, filled=True, rounded=True, class_names=['fail 1', 'pass 0'], feature_names=X.columns)

res = init_dtree.predict(X_test)
score = accuracy_score(res, y_test)
print('Decision Tree Accuracy:', score)

In [None]:
# Prune the decision tree because initial decision tree is HUGE
path = init_dtree.cost_complexity_pruning_path(X_train,y_train)
ccp_alphas = path.ccp_alphas
ccp_alphas = ccp_alphas[:-1]

pruned_dts = []
for ccp_alpha in ccp_alphas:
    pruned_dt = DecisionTreeClassifier(random_state=0,ccp_alpha=ccp_alpha)
    pruned_dt.fit(X_train, y_train)
    pruned_dts.append(pruned_dt)

In [None]:
train_scores = [pruned_dt.score(X_train, y_train) for pruned_dt in pruned_dts]
test_scores = [pruned_dt.score(X_test, y_test) for pruned_dt in pruned_dts]
fig,ax = plt.subplots()
ax.set_xlabel("alpha")
ax.set_ylabel("Accuracy")
ax.set_title("Accuracy vs alpha for train, test sets")
ax.plot(ccp_alphas, train_scores, marker='o', label='train', drawstyle='steps-post')
ax.plot(ccp_alphas, test_scores, marker='o', label='test', drawstyle='steps-post')
ax.legend()
plt.show()

In [None]:
final_dt = DecisionTreeClassifier(random_state=42, ccp_alpha=0.0075)
final_dt = final_dt.fit(X_train, y_train)

plt.figure(figsize=(15,7.5))
plot_tree(final_dt, filled=True, rounded=True, class_names=['fail 1', 'pass 0'], feature_names=X.columns)

In [None]:
# Final decision tree accuracy after pruning
res = final_dt.predict(X_test)
score = accuracy_score(res, y_test)
print('Decision Tree Accuracy:', score)

Pruning tree to find the best hyperparameter value

In [None]:
grid = {
    'criterion':  ['gini', 'entropy'],
    'max_depth': range(1,4),
    'min_samples_split': range(2,5),
    'splitter': ['best', 'random'],
    'min_samples_leaf': range(1,5)
}

clf = DecisionTreeClassifier(random_state=0)
grid_cv = GridSearchCV(clf, grid, scoring="roc_auc", n_jobs=-1, cv=3, verbose=1)
grid_cv.fit(X_train, y_train)
best_param = grid_cv.best_params_
best_param

In [None]:
new_dtree = DecisionTreeClassifier(criterion=best_param['criterion'], max_depth=best_param['max_depth'],min_samples_leaf=best_param['min_samples_leaf'],
    min_samples_split=best_param['min_samples_split'],splitter=best_param['splitter'])

new_dtree.fit(X_train, y_train)

plt.figure(figsize=(15,7.5))
plot_tree(new_dtree, filled=True, rounded=True, class_names=['1', '0'], feature_names=X.columns)

In [None]:
res = new_dtree.predict(X_test)
score = accuracy_score(res, y_test)
print('Decision Tree Accuracy:', score)

In [None]:
estimator = xgboost.XGBClassifier(nthread=4,seed=42)

# hyper params tuning
parameters = {
    'max_depth': range (2, 5, 1),
    'n_estimators': range(60, 220, 40),
    'learning_rate': [0.1, 0.01, 0.05]
}

grid_search = GridSearchCV(estimator=estimator,param_grid=parameters,scoring = 'roc_auc',n_jobs = 10,verbose=True)
grid_search.fit(X_train, y_train)

In [None]:
print(grid_search.best_params_)

xgtree = grid_search.best_estimator_
res = xgtree.predict(X_test)
score = accuracy_score(res, y_test)

print('Decision Tree Accuracy:', score)

**SHAP Explainer**

In [None]:
data = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/00601/ai4i2020.csv')
data.columns =  data.columns.str.replace(' ','_')
data.columns =  data.columns.str.replace(r'\[','', regex=True)
data.columns =  data.columns.str.replace(r'\]','', regex=True)
data.head()
data.columns

In [None]:
processed_data = data.drop(['UDI', 'Product_ID'], axis=1).copy()
processed_data

In [None]:
feature_names = list(processed_data.columns)
print(feature_names)

tabular_data = Tabular(
    data=processed_data,
    categorical_columns=['Type'],
    target_column='Machine_failure'
)

tabular_data

In [None]:
np.random.seed(1)
transformer = TabularTransform().fit(tabular_data)
class_names = transformer.class_names
x = transformer.transform(tabular_data)
train, test, train_labels, test_labels = train_test_split(x[:, :-1], x[:, -1], train_size=0.80)
print('Training data shape: {}'.format(train.shape))
print('Test data shape:     {}'.format(test.shape))
train_data = transformer.invert(train)
test_data = transformer.invert(test)

In [None]:
gbtree = DecisionTreeClassifier( max_depth=7)
gbtree.fit(train, train_labels)
print('Test accuracy: {}'.format(accuracy_score(test_labels, gbtree.predict(test))))

In [None]:
preprocess = lambda z: transformer.transform(z)

In [None]:
explainers = TabularExplainer(
    explainers=["lime", "shap", "mace", "pdp", "ale"],
    mode="classification",
    data=train_data,
    model=gbtree,
    preprocess=preprocess,
    params={
        "lime": {"kernel_width": 3},
        "shap": {"nsamples": 100},
        "mace": {"ignored_features": ["UDI", "Product_ID"]}
    }
)

# Generate explanations
test_instances = test_data[1653:1680]
local_explanations = explainers.explain(X=test_instances)
global_explanations = explainers.explain_global(
    params={"pdp": {"features": ['Type', 'Air_temperature_K', 'Process_temperature_K', 'Rotational_speed_rpm', 'Torque_Nm', 'Tool_wear_min', 'TWF', 'HDF', 'PWF', 'OSF', 'RNF']}}
)
test_instances

In [None]:
index=1
print("LIME results:")
local_explanations["lime"].ipython_plot(index, class_names=class_names)
print("SHAP results:")
local_explanations["shap"].ipython_plot(index, class_names=class_names)
print("MACE results:")
local_explanations["mace"].ipython_plot(index, class_names=class_names)
print("PDP results:")
global_explanations["pdp"].ipython_plot(class_names=class_names)
print("ALE results:")
global_explanations["ale"].ipython_plot(class_names=class_names)

In [None]:
analyzer = PredictionAnalyzer(
    mode="classification",
    test_data=test_data,
    test_targets=test_labels,
    model=gbtree,
    preprocess=preprocess
)

prediction_explanations = analyzer.explain()

In [None]:
for name, metrics in prediction_explanations.items():
    print(f"{name}:")
    metrics.ipython_plot(class_names=class_names)

In [None]:
# Launch a dashboard for visualization
dashboard = Dashboard(
    instances=test_instances,
    local_explanations=local_explanations,
    global_explanations=global_explanations,
    prediction_explanations=prediction_explanations,
    class_names=class_names
)

dashboard.show()

### 2.	Compute the fairness of the model  

In [None]:
import dalex as dx
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from xgboost import XGBClassifier
import plotly


data = pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/00601/ai4i2020.csv')
data.columns =  data.columns.str.replace(' ','_')
data.columns =  data.columns.str.replace(r'\[','', regex=True)
data.columns =  data.columns.str.replace(r'\]','', regex=True)
data.head()

processed_data = data.drop(['UDI', 'Product_ID'], axis=1).copy()
processed_data.columns

X = processed_data.drop(['Machine_failure'], axis=1).copy()
y = processed_data['Machine_failure']

categorical_features = ['Type']
numerical_features = [ 'Air_temperature_K', 'Process_temperature_K',
       'Rotational_speed_rpm', 'Torque_Nm', 'Tool_wear_min', 'TWF', 'HDF', 'PWF', 'OSF', 'RNF']

categorical_transformer = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore'))])
preprocessor = ColumnTransformer(transformers=[('cat', categorical_transformer, categorical_features),
        ('num', 'passthrough', numerical_features)])

clf = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier', XGBClassifier(learning_rate= 0.1, max_depth= 3, n_estimators= 140))])

clf.fit(X, y)

exp = dx.Explainer(clf, X, y)

exp.model_performance().result

failures = data.loc[data['Machine_failure'] >  0]
failures['Type'].value_counts()
protected = data['Type']
privileged  = 'L' # since most failures are from L type

fobject = exp.model_fairness(protected = protected, privileged=privileged)

fobject.fairness_check(epsilon = 0.8)

fobject.result

fobject.plot()

fobject.plot(type = 'metric_scores')

numeric_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])


preprocessor = ColumnTransformer(
    transformers=[
        ('cat', categorical_transformer, categorical_features),
        ('num', numeric_transformer, numerical_features)])

clf_forest = Pipeline(steps=[('preprocessor', preprocessor),
                      ('classifier', RandomForestClassifier(random_state=123, max_depth=5))]).fit(X,y)

clf_logreg = Pipeline(steps=[('preprocessor', preprocessor),
                      ('classifier', LogisticRegression(random_state=123))]).fit(X,y)

# create Explainer objects 
exp_forest  = dx.Explainer(clf_forest, X,y, verbose = False)
exp_logreg  = dx.Explainer(clf_logreg, X,y, verbose = False)
# create fairness explanations
fobject_forest = exp_forest.model_fairness(protected, privileged)
fobject_logreg = exp_logreg.model_fairness(protected, privileged)

fobject.plot(objects=[fobject_forest, fobject_logreg])
fobject.plot(objects=[fobject_forest, fobject_logreg], type = "metric_scores")

fobject.parity_loss
fobject.plot(objects=[fobject_forest, fobject_logreg], type = "radar")
fobject.plot(objects=[fobject_forest, fobject_logreg], type = "heatmap")
fobject.plot(objects=[fobject_forest, fobject_logreg], type = "performance_and_fairness")

###3.	Perform a what-if analysis using CeterisParibus on the given dataset

In [None]:
import dalex as dx
import pandas as pd
import numpy as np

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score

import warnings
warnings.filterwarnings('ignore')

data = pd.read_csv('ai4i2020.csv', names = ["UDI", "Product ID", "Type",  
            "Air temperature [K]", "Process temperature [K]", "Rotational speed [rpm]", "Torque [Nm]", 
                                        "Tool wear [min]", "TWF", "HDF", "PWF", "OSF", "RNF", "Machine failure"])

X = data.drop(['UDI','Product ID','Machine failure'], axis=1)
y = data["Machine failure"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.10, random_state=50)

numerical_features = ["Air temperature [K]", "Process temperature [K]", "Rotational speed [rpm]", "Torque [Nm]", "Tool wear [min]", "TWF", "HDF", "PWF", "OSF", "RNF"]
numerical_transformer = Pipeline(
    steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ]
)

categorical_features = ['Type']
categorical_transformer = Pipeline(
    steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ]
)

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

classifier = DecisionTreeClassifier(max_depth=4, criterion='entropy', max_features=0.6, splitter='best')

clf = Pipeline(steps=[('preprocessor', preprocessor),
                      ('classifier', classifier)])

clf.fit(X_train, y_train)
predictions = clf.predict(X_test)

exp = dx.Explainer(clf, X_train, y_train)

exp.predict(X_test)[:10]

cp = exp.predict_profile(X_test.iloc[70], label='testing')
cp.plot()

print(accuracy_score(y_test, predictions))

mp = exp.model_performance(model_type = 'classification')
mp.result
mp.plot(geom="roc")