This project focuses on developing and assessing machine learning models to classify fault types within a system, leveraging data from a dataset. The data is preprocessed to extract numerical features and encode categorical labels, such as "Type", for multi-class classification. Four models are implemented: 

- Neural Network
- Decision Tree
- Random Forest with Cross Validation
- XGBoost with Grid Search and Cross Validation

The project performs both predictive performance and interpretability, employing LIME (Local Interpretable Model-agnostic Explanations) across all models to explain individual predictions and SHAP (SHapley Additive exPlanations) for XGBoost to highlight feature importance. Exploratory data analysis (EDA) and visualizations are used for understanding fault patterns over time. This comprehensive approach underscores the project's goal of accurately diagnosing system faults while ensuring transparency, making it valuable for applications like industrial fault detection.

## Importing libraries

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import shap
from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV, train_test_split, cross_val_score, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, make_scorer, f1_score
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.tree import DecisionTreeClassifier, plot_tree
import dtreeviz 
from lime.lime_tabular import LimeTabularExplainer
import xgboost as xgb
from xgboost import plot_importance
import lime
import lime.lime_tabular
import matplotlib.dates as mdates

## Import dataset and merge the datasets

In [None]:
df = pd.read_csv('dataset.csv', parse_dates=["Datetime"])

# Shuffle the dataset
df = df.sample(frac=1).reset_index(drop=True)

## Exploratory Data Analysis

In [None]:
missing_values = df.isnull().sum()

# Display the results
print(missing_values)

In [None]:
# Check total number of duplicate rows
duplicate_count = df.duplicated().sum()
print(f"Total number of duplicate rows: {duplicate_count}")

# Show what percentage of the data is duplicated
duplicate_percentage = (duplicate_count / len(df)) * 100
print(f"Percentage of duplicates: {duplicate_percentage:.2f}%")

# If you want to see the actual duplicate records
if duplicate_count > 0:
    print("\nFirst few duplicate records:")
    duplicates = df[df.duplicated(keep='first')]
    print(duplicates.head())
    
    # Count duplicates by how many times each record appears
    print("\nDuplicate counts (showing records that appear more than once):")
    duplicate_counts = df.value_counts().reset_index()
    duplicate_counts.columns = ['record', 'count']
    duplicate_counts = duplicate_counts[duplicate_counts['count'] > 1]
    print(duplicate_counts.head())

## Split the input and output variables and split further into training, validation and testing samples

In [None]:
# Split the dataset into features and labels
X = df.drop('Type', axis=1)
y = df['Type']

# Split into train, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Identify numerical columns
numerical_cols = X_train.select_dtypes(include=['float64', 'int64']).columns
non_numerical_cols = X_train.select_dtypes(exclude=['float64', 'int64']).columns

# Drop non numerical columns
X_train = X_train[numerical_cols]
X_val = X_val[numerical_cols]
X_test = X_test[numerical_cols]

# Encode string labels to integers
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_val_encoded = label_encoder.transform(y_val)
y_test_encoded = label_encoder.transform(y_test)

# Get the number of classes
num_classes = len(label_encoder.classes_)

# Convert to one-hot encoded format
y_train = to_categorical(y_train_encoded, num_classes=num_classes)
y_val = to_categorical(y_val_encoded, num_classes=num_classes)
y_test = to_categorical(y_test_encoded, num_classes=num_classes)

In [None]:
df.shape

## Neural Network Model

In [None]:
# Build the neural network model
model = Sequential()
model.add(Dense(64, activation='relu', input_shape=(X_train.shape[1],)))
model.add(Dropout(0.5))
model.add(Dense(32, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(4, activation='softmax'))

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_val, y_val), verbose=1)

# Evaluate the model
y_pred = model.predict(X_test)  # Get predictions
y_pred_classes = np.argmax(y_pred, axis=1)
y_test_classes = np.argmax(y_test, axis=1)

# Print evaluation results
print("Test Accuracy:", accuracy_score(y_test_classes, y_pred_classes))
print(classification_report(y_test_classes, y_pred_classes))

## LIME for Neural Network Model

In [None]:
# A random sample is selected from the dataset to be predicted by every model
random_idx = np.random.randint(0, X_test.shape[0])
random_sample = X_test.iloc[random_idx:random_idx+1].values

In [None]:
def predict_proba(X):
    return model.predict(X, verbose=0)

# Create LIME explainer
explainer = lime.lime_tabular.LimeTabularExplainer(training_data=X_train.values, feature_names=numerical_cols.tolist(), class_names=list(label_encoder.classes_), mode='classification')
prediction = model.predict(random_sample, verbose=0)
predicted_class = np.argmax(prediction)
print(f"Random Sample Index: {random_idx}")
print(f"Predicted Class: {predicted_class} ({label_encoder.classes_[predicted_class]})")
print(f"Prediction Probabilities: {prediction[0]}")

# Generate LIME explanation
lime_exp = explainer.explain_instance(data_row=random_sample.flatten(), predict_fn=predict_proba, num_features=min(10, X_train.shape[1]), top_labels=1)
lime_exp_dict = lime_exp.as_list(label=predicted_class)
features, contributions = zip(*lime_exp_dict)
plt.barh(features, contributions, color=['green' if x > 0 else 'red' for x in contributions])
plt.xlabel('Contribution to Prediction')
plt.title(f"LIME Feature Contributions for Predicted Class: {label_encoder.classes_[predicted_class]}")
plt.tight_layout()
plt.show()

## Decision Tree

In [None]:
# Revert one-hot encoded y back to integer labels
y_train_int = np.argmax(y_train, axis=1)
y_val_int = np.argmax(y_val, axis=1)
y_test_int = np.argmax(y_test, axis=1)

# Decision Tree Model
dt = DecisionTreeClassifier(random_state=42)
dt.fit(X_train, y_train_int)

# Predict on validation set
y_val_pred_dt = dt.predict(X_val)

# Evaluate on validation set
print("Decision Tree Validation Accuracy:", accuracy_score(y_val_int, y_val_pred_dt))
print("Decision Tree Validation Classification Report:")
print(classification_report(y_val_int, y_val_pred_dt, target_names=label_encoder.classes_))

# Visualize the Decision Tree
plt.figure(figsize=(20, 10))
plot_tree(dt, feature_names=numerical_cols, class_names=list(label_encoder.classes_), filled=True, rounded=True)
plt.title("Decision Tree for Fault Type Classification")
plt.show()

# Predict on test set
y_test_pred_dt = dt.predict(X_test)

# Evaluate on test set
print("Decision Tree Test Accuracy:", accuracy_score(y_test_int, y_test_pred_dt))
print("Decision Tree Test Classification Report:")
print(classification_report(y_test_int, y_test_pred_dt, target_names=label_encoder.classes_))

In [None]:
viz_model = dtreeviz.model(dt, X_train=X_train, y_train=y_train_int, feature_names=numerical_cols, class_names=list(label_encoder.classes_), target_name="Fault Type")
viz_model.view()

## LIME for decision tree

In [None]:
def explain_prediction(model, random_sample, X_train, label_encoder):
    prediction = model.predict(random_sample)[0]
    predicted_class = label_encoder.classes_[prediction]
    print(f"The model predicted: {predicted_class}")
    explainer = lime.lime_tabular.LimeTabularExplainer(X_train.values,feature_names=X_train.columns,class_names=list(label_encoder.classes_),discretize_continuous=True)
    exp = explainer.explain_instance(random_sample[0],model.predict_proba,labels=[prediction])
    exp.as_pyplot_figure(label=prediction)
    plt.title(f"LIME Explanation for Predicted Class: {predicted_class}")
    plt.show()

In [None]:
explain_prediction(dt, random_sample, X_train, label_encoder)

## Random Forest with cross validation

In [None]:
# Combine the datasets for cross-validation
X = np.vstack((X_train, X_val, X_test))
y_int = np.hstack((y_train_int, y_val_int, y_test_int))

# Set up Stratified K-Fold Cross-Validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# Initialize the Random Forest Model
rf = RandomForestClassifier(n_estimators=100, random_state=42)

# Perform Cross-Validation for Accuracy
cv_scores = cross_val_score(rf, X, y_int, cv=cv, scoring='accuracy')
print("Cross-Validation Accuracy Scores:", cv_scores)
print("Mean CV Accuracy:", np.mean(cv_scores))
print("Standard Deviation of CV Accuracy:", np.std(cv_scores))

# Cross-Validation for F1-Score (Macro)
f1_scorer = make_scorer(f1_score, average='macro')
cv_f1_scores = cross_val_score(rf, X, y_int, cv=cv, scoring=f1_scorer)
print("Cross-Validation F1-Scores:", cv_f1_scores)
print("Mean CV F1-Score:", np.mean(cv_f1_scores))

# Train the Model on the Training Set
rf.fit(X_train, y_train_int)

# Predict and Evaluate on Validation Set
y_val_pred_rf = rf.predict(X_val)
val_accuracy_rf = accuracy_score(y_val_int, y_val_pred_rf)
print("Random Forest Validation Accuracy:", val_accuracy_rf)
print("Random Forest Validation Classification Report:")
print(classification_report(y_val_int, y_val_pred_rf, target_names=label_encoder.classes_))

# Predict and Evaluate on Test Set
y_test_pred_rf = rf.predict(X_test)
test_accuracy_rf = accuracy_score(y_test_int, y_test_pred_rf)
print("Random Forest Test Accuracy:", test_accuracy_rf)
print("Random Forest Test Classification Report:")
print(classification_report(y_test_int, y_test_pred_rf, target_names=label_encoder.classes_))

# Feature Importance
importances = rf.feature_importances_
feature_names = numerical_cols
sorted_idx = np.argsort(importances)[::-1]
plt.barh(feature_names[sorted_idx], importances[sorted_idx])
plt.xlabel("Feature Importance")
plt.title("Random Forest Feature Importances")
plt.gca().invert_yaxis()
plt.show()

## LIME for random forest

In [None]:
explain_prediction(rf, random_sample, X_train, label_encoder)

## XGBoost with Grid Search and Cross Validation

In [None]:
# Initialize the XGBoost classifier
xgb = XGBClassifier(eval_metric='mlogloss')     # Multi-class log loss

# Define the hyperparameter grid
param_grid = {
    'n_estimators': [50, 100, 150],      # Number of trees
    'max_depth': [5, 10, 15, 20],            # Maximum tree depth
    'learning_rate': [0.1, 0.2],   # Step size shrinkage
    'subsample': [0.8, 1.0],             # Fraction of samples per tree
    'colsample_bytree': [0.8, 1.0]       # Fraction of features per tree
}

# Set up Grid Search with Stratified K-Fold
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
grid_search = GridSearchCV(estimator=xgb, param_grid=param_grid, cv=cv, scoring='accuracy', n_jobs=-1, verbose=1)

# Fit Grid Search on training data
grid_search.fit(X, y_int)

# Display the best parameters and score
print("Best Parameters:", grid_search.best_params_)
print("Best Cross-Validation Accuracy:", grid_search.best_score_)

# Train the best model on the full training set
best_xgb = grid_search.best_estimator_
best_xgb.fit(X_train, y_train_int)

# Feature importance
importances = best_xgb.feature_importances_
feature_names = numerical_cols
sorted_idx = np.argsort(importances)[::-1]
plt.barh(feature_names[sorted_idx], importances[sorted_idx])
plt.xlabel("Feature Importance")
plt.title("XGBoost Feature Importances")
plt.gca().invert_yaxis()
plt.show()



In [None]:
# Explain model predictions
explainer = shap.Explainer(best_xgb)
shap_values = explainer(X_train)
shap.summary_plot(shap_values, X_train)

## LIME for XGBoost

In [None]:
explain_prediction(best_xgb, random_sample, X_train, label_encoder)