In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
import os # For saving model plot

# Set plot style for better aesthetics
plt.style.use('seaborn-v0_8-darkgrid')

# --- 1. Load Data ---
# IMPORTANT: Ensure 'FaultFree_Testing.csv' and 'Faulty_Testing.csv' are in the same directory as this script.
try:
    df_faulty_free = pd.read_csv('FaultFree_Testing.csv')
    df_faulty_train = pd.read_csv('Faulty_Testing.csv')
except FileNotFoundError as e:
    print(f"Error loading CSV files: {e}")
    print("Please ensure 'FaultFree_Testing.csv' and 'Faulty_Testing.csv' are in the same directory as this script.")
    exit() # Exit if files are not found

# Assign fault numbers
df_faulty_free['faultNumber'] = 0  # Assign 0 for 'No faults'
# Fill missing 'faultNumber' values in faulty data with -1 (or any other appropriate indicator if needed)
# It's crucial that all unique fault numbers are handled correctly as classes.
df_faulty_train['faultNumber'] = df_faulty_train['faultNumber'].fillna(-1)

# Combine datasets into a single DataFrame
df = pd.concat([df_faulty_free, df_faulty_train]).reset_index(drop=True)

# --- 2. Prepare Data ---
# Separate features (X) and labels (y)
X = df.drop(columns=['faultNumber']).values
y = df['faultNumber'].values

# Identify the number of unique classes for the output layer
num_classes = len(np.unique(y))
print(f"Number of unique fault classes: {num_classes}")

# Normalize Features using StandardScaler
# Standardization is crucial for neural networks as it helps with faster convergence.
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Reshape input for Conv1D layers: (samples, time_steps, features)
# Here, each original feature column becomes a time_step, and there's 1 feature channel.
X_reshaped = X_scaled.reshape(X_scaled.shape[0], X_scaled.shape[1], 1)

# Split data into training and testing sets
# Using a 80/20 split for training/testing data. random_state ensures reproducibility.
X_train, X_test, y_train, y_test = train_test_split(X_reshaped, y, test_size=0.2, random_state=42, stratify=y)
# stratify=y ensures that the proportion of classes is the same in both train and test sets.

print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_test shape: {y_test.shape}")

# --- 3. Define and Compile CNN Model ---
def build_cnn_model(input_shape, num_classes):
    """
    Builds an improved 1D Convolutional Neural Network model for fault classification.
    Includes additional layers, L2 regularization, and Dropout.

    Args:
        input_shape (tuple): The shape of the input data (time_steps, features).
        num_classes (int): The number of unique fault classes.

    Returns:
        tf.keras.models.Model: The compiled Keras model.
    """
    input_layer = Input(shape=input_shape)

    # First Convolutional Block
    # Increased filters for learning more patterns, added L2 regularization to weights
    # Reduced L2 regularization strength from 0.001 to 0.0001
    x = Conv1D(64, kernel_size=3, activation='relu', padding='same',
               kernel_regularizer=l2(0.0001))(input_layer)
    x = MaxPooling1D(pool_size=2, padding='same')(x)
    x = Dropout(0.1)(x) # Reduced dropout from 0.25 to 0.1

    # Second Convolutional Block
    x = Conv1D(128, kernel_size=3, activation='relu', padding='same',
               kernel_regularizer=l2(0.0001))(x)
    x = MaxPooling1D(pool_size=2, padding='same')(x)
    x = Dropout(0.1)(x) # Reduced dropout from 0.25 to 0.1

    # Third Convolutional Block (added for deeper feature extraction)
    x = Conv1D(256, kernel_size=3, activation='relu', padding='same',
               kernel_regularizer=l2(0.0001))(x)
    x = MaxPooling1D(pool_size=2, padding='same')(x)
    x = Dropout(0.1)(x) # Reduced dropout from 0.25 to 0.1

    # Flatten the output to connect to Dense layers
    x = Flatten()(x)

    # Fully Connected Dense layers
    x = Dense(128, activation='relu', kernel_regularizer=l2(0.0001))(x)
    x = Dropout(0.2)(x) # Reduced dropout from 0.3 to 0.2

    # Output layer with softmax activation for multi-class probability distribution
    output_layer = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=input_layer, outputs=output_layer)

    # Use Adam optimizer with a slightly higher initial learning rate,
    # as ReduceLROnPlateau will handle decay.
    optimizer = Adam(learning_rate=0.0005)

    # Compile the model
    # Using sparse_categorical_crossentropy because labels (y) are integers and not one-hot encoded.
    model.compile(loss='sparse_categorical_crossentropy',
                  optimizer=optimizer,
                  metrics=['accuracy'])

    return model

# Initialize the model with dynamic input shape and number of classes
model = build_cnn_model(input_shape=(X_train.shape[1], 1), num_classes=num_classes)

# Display model summary
model.summary()

# Plot model graph and save it (requires graphviz and pydot)
try:
    tf.keras.utils.plot_model(model, to_file='cnn_model_architecture.png', show_shapes=True, show_layer_names=True)
    print("Model architecture plot saved to 'cnn_model_architecture.png'")
except ImportError:
    print("Warning: pydot and graphviz are required to plot model architecture. Skipping plot.")
except Exception as e:
    print(f"Error plotting model: {e}")

# --- 4. Callbacks for Training ---
# Early Stopping: Stop training if validation accuracy doesn't improve for 'patience' epochs.
# restore_best_weights=True ensures the model retains the best weights found during training.
early_stopping = EarlyStopping(
    monitor='val_accuracy', # Monitor validation accuracy
    patience=15,            # Number of epochs with no improvement after which training will be stopped.
    restore_best_weights=True, # Restores model weights from the epoch with the best value of the monitored quantity.
    mode='max',             # Since we are monitoring accuracy, we want to maximize it.
    verbose=1
)

# Reduce Learning Rate on Plateau: Reduce learning rate when a metric has stopped improving.
# This helps the model fine-tune weights when progress slows down.
reduce_lr = ReduceLROnPlateau(
    monitor='val_accuracy', # Monitor validation accuracy
    factor=0.5,             # New learning rate will be old_lr * factor
    patience=7,             # Number of epochs with no improvement after which learning rate will be reduced.
    min_lr=0.00001,         # Minimum learning rate.
    mode='max',             # Since we are monitoring accuracy, we want to maximize it.
    verbose=1
)

# --- 5. Train the Model ---
print("\n--- Model Training Started ---")
history = model.fit(
    X_train, y_train,
    epochs=100,         # Max epochs, but EarlyStopping will likely stop it sooner.
    batch_size=256,
    validation_data=(X_test, y_test),
    callbacks=[early_stopping, reduce_lr], # Apply both callbacks
    verbose=1           # Show training progress for each epoch
)
print("--- Model Training Complete ---")

# --- 6. Evaluate Model ---
print("\n--- Model Evaluation ---")
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f"Final Test Accuracy: {test_acc * 100:.2f}%")

# Generate predictions for the test set
y_pred_probs = model.predict(X_test, verbose=0)
y_pred = np.argmax(y_pred_probs, axis=1) # Convert probabilities to class labels

# Ensure y_true is integer type for consistency
y_true = y_test.astype(int)

# --- 7. Performance Metrics ---
# Classification Report: Provides precision, recall, f1-score, and support for each class.
print("\n--- Classification Report ---")
# Handle potential class imbalance or classes not present in predictions/true labels.
# Use zero_division=0 to prevent warnings when a class has no predicted samples.
print(classification_report(y_true, y_pred, labels=np.unique(y_true), zero_division=0))


# Confusion Matrix: Visual representation of true vs. predicted classifications.
print("\n--- Confusion Matrix Analysis ---")
unique_labels = np.unique(y_true)
conf_matrix = confusion_matrix(y_true, y_pred, labels=unique_labels)

# Compute Class-Wise Accuracy (Recall for each class)
class_wise_accuracy = np.diag(conf_matrix) / np.sum(conf_matrix, axis=1)
print("\nClass-Wise Accuracy (Recall per Fault):")
for label, acc in zip(unique_labels, class_wise_accuracy):
    print(f"Fault {label}: {acc:.3f}")

# Compute Overall Average Accuracy (from Confusion Matrix)
# This should ideally be very close to `test_acc` from model.evaluate.
overall_average_accuracy = np.sum(np.diag(conf_matrix)) / np.sum(conf_matrix)
print(f"\nOverall Average Accuracy (from Confusion Matrix): {overall_average_accuracy:.3f}")

# Compute Fault Detection Rate (FDR) - this is equivalent to Recall, which we already calculated.
# Keeping it for conceptual clarity as per original request.
FDR = class_wise_accuracy # FDR is directly class-wise recall
print("\nFault Detection Rate (FDR) for Each Fault:")
for label, fdr_val in zip(unique_labels, FDR):
    print(f"Fault {label}: {fdr_val:.3f}")

# Compute Overall FDR (Overall Recall)
overall_FDR = np.sum(np.diag(conf_matrix)) / np.sum(conf_matrix, axis=1).sum()
print(f"\nOverall FDR (Overall Recall): {overall_FDR:.3f}")

# --- 8. Visualization of Results ---

# Heatmap Color Scheme
cmap_sns = sns.light_palette("green", as_cmap=True)

# Plot Class-Wise Accuracy Bar Chart
plt.figure(figsize=(12, 6))
sns.barplot(x=unique_labels, y=class_wise_accuracy, palette='viridis')
plt.xlabel("Fault Number")
plt.ylabel("Accuracy (Recall)")
plt.title("Class-Wise Accuracy (Recall) for Each Fault")
plt.xticks(rotation=45)
plt.ylim(0, 1.05) # Extend y-axis slightly above 1 for better visual
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

# Confusion Matrix Heatmap (Counts)
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, cmap=cmap_sns, fmt="d",
            xticklabels=unique_labels, yticklabels=unique_labels,
            cbar=True, linewidths=.5, linecolor='black')
plt.xlabel('Predicted Class')
plt.ylabel('Actual Class')
plt.title("Confusion Matrix (Counts)")
plt.tight_layout()
plt.show()

# Recall Matrix Heatmap (Row-normalized Confusion Matrix)
# A[i,j] is the proportion of actual class i that was predicted as class j.
# Sum of each row will be 1.
recall_matrix = np.where(conf_matrix.sum(axis=1, keepdims=True) != 0,
                         conf_matrix / conf_matrix.sum(axis=1, keepdims=True), 0)
plt.figure(figsize=(10, 8))
sns.heatmap(recall_matrix, annot=True, cmap=cmap_sns, fmt=".2f",
            xticklabels=unique_labels, yticklabels=unique_labels,
            cbar=True, linewidths=.5, linecolor='black')
plt.xlabel('Predicted Class')
plt.ylabel('Actual Class')
plt.title("Recall Matrix (Normalized by Actual Class)")
plt.tight_layout()
plt.show()

# Precision Matrix Heatmap (Column-normalized Confusion Matrix)
# B[i,j] is the proportion of predictions for class j that were actually class i.
# Sum of each column will be 1.
precision_matrix = np.where(conf_matrix.sum(axis=0, keepdims=True) != 0,
                            conf_matrix / conf_matrix.sum(axis=0, keepdims=True), 0)
plt.figure(figsize=(10, 8))
sns.heatmap(precision_matrix, annot=True, cmap=cmap_sns, fmt=".2f",
            xticklabels=unique_labels, yticklabels=unique_labels,
            cbar=True, linewidths=.5, linecolor='black')
plt.xlabel('Predicted Class')
plt.ylabel('Actual Class')
plt.title("Precision Matrix (Normalized by Predicted Class)")
plt.tight_layout()
plt.show()

# --- 9. Plot Training History (Accuracy and Loss) ---
plt.figure(figsize=(14, 6))

# Plot Accuracy
plt.subplot(1, 2, 1) # 1 row, 2 columns, first plot
plt.plot(history.history['accuracy'], label='Training Accuracy', color='blue')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy', color='orange')
plt.title('Model Accuracy Over Epochs', fontsize=14)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Accuracy', fontsize=12)
plt.legend(fontsize=10)
plt.grid(True)
plt.ylim(0.75, 0.9) # Adjust ylim based on expected range

# Plot Loss
plt.subplot(1, 2, 2) # 1 row, 2 columns, second plot
plt.plot(history.history['loss'], label='Training Loss', color='blue')
plt.plot(history.history['val_loss'], label='Validation Loss', color='orange')
plt.title('Model Loss Over Epochs', fontsize=14)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.legend(fontsize=10)
plt.grid(True)
plt.tight_layout() # Adjusts plot params for a tight layout
plt.show()


Number of unique fault classes: 21
X_train shape: (585627, 54, 1)
y_train shape: (585627,)
X_test shape: (146407, 54, 1)
y_test shape: (146407,)


You must install pydot (`pip install pydot`) for `plot_model` to work.
Model architecture plot saved to 'cnn_model_architecture.png'

--- Model Training Started ---
Epoch 1/100
[1m2288/2288[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 35ms/step - accuracy: 0.7706 - loss: 0.9387 - val_accuracy: 0.8314 - val_loss: 0.5616 - learning_rate: 5.0000e-04
Epoch 2/100
[1m2288/2288[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 34ms/step - accuracy: 0.8297 - loss: 0.5710 - val_accuracy: 0.8460 - val_loss: 0.5071 - learning_rate: 5.0000e-04
Epoch 3/100
[1m2288/2288[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m80s[0m 35ms/step - accuracy: 0.8428 - loss: 0.5227 - val_accuracy: 0.8529 - val_loss: 0.4812 - learning_rate: 5.0000e-04
Epoch 4/100
[1m2288/2288[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 35ms/step - accuracy: 0.8492 - loss: 0.4977 - val_accuracy: 0.8551 - val_loss: 0.4735 - learning_rate: 5.0000e-04
Epoch 5/100
[1m2288/2288[0m [32m━━━━━━━━━━━━━━━━━━━━