<a href="https://colab.research.google.com/github/nourhan-transformerML/Advanced-Intelligent-Fault-Diagnosis-of-Power-Transformers-Using-Machine-Learning-on-DGA-Data/blob/main/code_25_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# Transformer Fault Diagnosis with SHAP and CatBoost
# Author: [Your Name]
# Date: [Current Date]

# Install required packages
try:
    from catboost import CatBoostClassifier
    CATBOOST_AVAILABLE = True
except ImportError:
    print("Installing required packages...")
    !pip install catboost lightgbm xgboost imbalanced-learn python-docx shap -q
    from catboost import CatBoostClassifier
    CATBOOST_AVAILABLE = True

# Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.preprocessing import RobustScaler, LabelEncoder
from sklearn.metrics import (accuracy_score, classification_report, confusion_matrix,
                           f1_score, precision_score, recall_score, roc_auc_score,
                           roc_curve, auc)
from sklearn.ensemble import (GradientBoostingClassifier,
                            ExtraTreesClassifier, VotingClassifier, StackingClassifier)
from sklearn.linear_model import LogisticRegression
from lightgbm import LGBMClassifier
from sklearn.neural_network import MLPClassifier
from docx import Document
from docx.shared import Inches, Pt, RGBColor
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from imblearn.over_sampling import SMOTE, RandomOverSampler
from collections import Counter
import shap
import warnings
import os
warnings.filterwarnings('ignore')

# ======================
# CUSTOM ROC PLOT FUNCTION
# ======================
def plot_roc_curve(y_true, y_proba, title='ROC Curves'):
    """Custom function to plot ROC curves"""
    plt.figure(figsize=(10, 8))

    # Binary classification
    if y_proba.ndim == 1 or y_proba.shape[1] == 2:
        fpr, tpr, _ = roc_curve(y_true, y_proba if y_proba.ndim == 1 else y_proba[:, 1])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.2f})')
    # Multiclass classification
    else:
        n_classes = y_proba.shape[1]
        for i in range(n_classes):
            fpr, tpr, _ = roc_curve(y_true == i, y_proba[:, i])
            roc_auc = auc(fpr, tpr)
            plt.plot(fpr, tpr, label=f'Class {i} (AUC = {roc_auc:.2f})')

    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc="lower right")
    plt.tight_layout()
    return plt

# ======================
# ENHANCED CONFUSION MATRIX PLOT
# ======================
def plot_confusion_matrix(y_true, y_pred, class_names, title='Confusion Matrix'):
    """Custom function to plot confusion matrix with proper axes"""
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title(title)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    plt.tight_layout()
    return plt

# ======================
# GROUPED MODEL COMPARISON BAR CHART
# ======================
def plot_grouped_model_comparison(results_df):
    """Create grouped bar chart comparing all metrics for each model"""
    # Prepare data for plotting
    metrics_df = results_df.melt(id_vars=['Model'],
                                value_vars=['Accuracy', 'Precision', 'Recall', 'F1-Score'],
                                var_name='Metric',
                                value_name='Score')

    plt.figure(figsize=(14, 8))
    ax = sns.barplot(x='Model', y='Score', hue='Metric', data=metrics_df)
    plt.title('Model Performance Comparison')
    plt.xticks(rotation=45, ha='right')
    plt.ylim(0, 1.05)
    plt.legend(loc='upper right', bbox_to_anchor=(1.15, 1))

    # Add values on top of bars
    for p in ax.patches:
        ax.annotate(f"{p.get_height():.3f}",
                    (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center',
                    xytext=(0, 10),
                    textcoords='offset points',
                    fontsize=8)

    plt.tight_layout()
    return plt

# ======================
# PERFORMANCE BY FAULT TYPE CHART
# ======================
def plot_fault_type_performance(metrics_df):
    """Create bar chart for performance by fault type"""
    plt.figure(figsize=(12, 8))
    metrics_df.set_index('Fault Type').plot(kind='bar', rot=45)
    plt.title('Performance by Fault Type')
    plt.ylabel('Score')
    plt.ylim(0, 1.1)
    plt.legend(loc='upper right')
    plt.tight_layout()
    return plt

# ======================
# MODEL TRAINING AND EVALUATION
# ======================
def main():
    # Parameters
    random_state = 42
    test_size = 0.25
    n_splits = 5
    top_features = 5

    # 1. Load and preprocess data
    print("Loading data...")
    try:
        df = pd.read_csv("data set.csv")

        if df.empty:
            raise ValueError("Dataset is empty")
        if "fault type" not in df.columns:
            raise ValueError("'fault type' column not found")

        # Handle missing values
        df.fillna(df.median(numeric_only=True), inplace=True)

        # Store original fault type names
        fault_types = df["fault type"].unique()

        # Encode target
        label_encoder = LabelEncoder()
        df["fault type"] = label_encoder.fit_transform(df["fault type"])

        print("\nClass distribution:")
        print(Counter(df["fault type"]))

    except Exception as e:
        print(f"Data loading error: {str(e)}")
        return

    # 2. Prepare data
    print("\nPreparing data...")
    X = df.drop("fault type", axis=1)
    y = df["fault type"]
    feature_names = X.columns.tolist()

    # Scale features
    scaler = RobustScaler()
    X_scaled = scaler.fit_transform(X)
    X_scaled = pd.DataFrame(X_scaled, columns=feature_names)

    # Handle imbalance
    min_samples = min(Counter(y).values())
    n_neighbors = min(5, min_samples - 1) if min_samples > 1 else 1

    try:
        smote = SMOTE(k_neighbors=n_neighbors, random_state=random_state)
        X_res, y_res = smote.fit_resample(X_scaled, y)
        print(f"Used SMOTE with k_neighbors={n_neighbors}")
    except:
        print("Using RandomOverSampler")
        ros = RandomOverSampler(random_state=random_state)
        X_res, y_res = ros.fit_resample(X_scaled, y)

    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(
        X_res, y_res,
        test_size=test_size,
        random_state=random_state,
        stratify=y_res
    )

    # 3. Initialize models with exact parameters from the report
    print("\nInitializing models...")
    models = {
        "LightGBM": LGBMClassifier(
            n_estimators=200,
            max_depth=-1,
            learning_rate=0.05,
            random_state=random_state,
            class_weight='balanced'
        ),
        "Gradient Boosting": GradientBoostingClassifier(
            n_estimators=200,
            max_depth=5,
            learning_rate=0.05,
            random_state=random_state
        ),
        "Extra Trees": ExtraTreesClassifier(
            n_estimators=200,
            max_depth=10,
            random_state=random_state,
            class_weight='balanced'
        ),
        "MLP (Multi-Layer Perceptron)": MLPClassifier(
            hidden_layer_sizes=(100, 50),
            max_iter=300,
            random_state=random_state,
            early_stopping=True
        ),
        "StackingClassifier (LGBM+ET)": StackingClassifier(
            estimators=[
                ('lgbm', LGBMClassifier(n_estimators=200, max_depth=-1, learning_rate=0.05,
                                      random_state=random_state, class_weight='balanced')),
                ('et', ExtraTreesClassifier(n_estimators=200, max_depth=10, random_state=random_state, class_weight='balanced'))
            ],
            final_estimator=LogisticRegression(max_iter=1000, random_state=random_state),
            stack_method='auto'
        ),
        "WeightedEnsemble (LGBM+GB)": VotingClassifier(
            estimators=[
                ('lgbm', LGBMClassifier(n_estimators=200, max_depth=-1, learning_rate=0.05,
                                      random_state=random_state, class_weight='balanced')),
                ('gb', GradientBoostingClassifier(n_estimators=200, max_depth=5, learning_rate=0.05, random_state=random_state))
            ],
            voting='soft',
            weights=[2, 1]
        )
    }

    if CATBOOST_AVAILABLE:
        models["CatBoost"] = CatBoostClassifier(
            iterations=200,
            depth=6,
            learning_rate=0.05,
            random_state=random_state,
            verbose=0
        )

    # 4. Train and evaluate models
    print("\nEvaluating models...")
    results = []
    best_model = {'name': None, 'model': None, 'accuracy': 0, 'class_report': None}

    for name, model in models.items():
        try:
            print(f"\nTraining {name}...")

            # Cross-validation
            cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
            cv_scores = cross_val_score(model, X_train, y_train, cv=cv, scoring='accuracy')

            # Train and predict
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            y_proba = model.predict_proba(X_test) if hasattr(model, "predict_proba") else None

            # Calculate metrics
            accuracy = accuracy_score(y_test, y_pred)
            precision = precision_score(y_test, y_pred, average='macro')
            recall = recall_score(y_test, y_pred, average='macro')
            f1 = f1_score(y_test, y_pred, average='macro')
            roc_auc = roc_auc_score(y_test, y_proba, multi_class='ovr') if y_proba is not None else None

            # Classification report for per-class metrics
            class_report = classification_report(y_test, y_pred, output_dict=True)

            # Store results
            results.append({
                "Model": name,
                "Accuracy": accuracy,
                "Precision": precision,
                "Recall": recall,
                "F1-Score": f1,
                "ROC AUC": roc_auc if roc_auc is not None else 'N/A',
                "CV Mean": np.mean(cv_scores),
                "CV Std": np.std(cv_scores),
                "Class Report": class_report
            })

            # Update best model
            if accuracy > best_model['accuracy']:
                best_model = {
                    'name': name,
                    'model': model,
                    'accuracy': accuracy,
                    'predictions': y_pred,
                    'probabilities': y_proba,
                    'class_report': class_report
                }

        except Exception as e:
            print(f"Error with {name}: {str(e)}")
            continue

    # 5. Generate report with all visualizations
    print("\nGenerating report...")
    try:
        doc = Document()

        # Add title
        title = doc.add_heading('Transformer Fault Diagnosis Report', 0)
        title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER

        # Add summary
        doc.add_heading('Executive Summary', level=1)
        doc.add_paragraph(
            f"The best performing model was {best_model['name']} with\naccuracy: {best_model['accuracy']:.4f}"
        )

        # Model comparison
        doc.add_heading('Model Comparison', level=1)
        results_df = pd.DataFrame(results)

        # Create table
        table = doc.add_table(rows=1, cols=7)
        table.style = 'Light Shading Accent 1'
        hdr_cells = table.rows[0].cells
        headers = ['Model', 'Accuracy', 'Precision', 'Recall', 'F1-Score', 'ROC AUC', 'CV Score']
        for i, header in enumerate(headers):
            hdr_cells[i].text = header

        for _, row in results_df.iterrows():
            row_cells = table.add_row().cells
            row_cells[0].text = row['Model']
            row_cells[1].text = f"{row['Accuracy']:.4f}"
            row_cells[2].text = f"{row['Precision']:.4f}"
            row_cells[3].text = f"{row['Recall']:.4f}"
            row_cells[4].text = f"{row['F1-Score']:.4f}"
            row_cells[5].text = f"{row['ROC AUC']:.4f}" if row['ROC AUC'] != 'N/A' else 'N/A'
            row_cells[6].text = f"{row['CV Mean']:.4f} ± {row['CV Std']:.4f}"

        # Add model comparison chart
        doc.add_heading('Model Performance Comparison', level=2)
        plt = plot_grouped_model_comparison(results_df)
        plt.savefig("model_comparison.png", bbox_inches='tight', dpi=300)
        plt.close()
        doc.add_picture("model_comparison.png", width=Inches(7.0))

        # Best model analysis
        doc.add_heading(f'Best Model Analysis: {best_model["name"]}', level=1)

        # Confusion matrix
        doc.add_heading('Confusion Matrix', level=2)
        fault_names = label_encoder.inverse_transform(range(len(label_encoder.classes_)))
        plt = plot_confusion_matrix(y_test, best_model['predictions'],
                                 fault_names,
                                 title='Confusion Matrix')
        plt.savefig("confusion_matrix.png", bbox_inches='tight', dpi=300)
        plt.close()
        doc.add_picture("confusion_matrix.png", width=Inches(6.0))

        # ROC curve
        if best_model['probabilities'] is not None:
            doc.add_heading('ROC Curve (Best Model)', level=2)
            plt = plot_roc_curve(y_test, best_model['probabilities'], title='ROC Curve')
            plt.savefig("roc_curve.png", bbox_inches='tight', dpi=300)
            plt.close()
            doc.add_picture("roc_curve.png", width=Inches(5.0))

        # Performance by fault type
        doc.add_heading('Performance by Fault Type', level=2)

        # Prepare fault type metrics
        fault_metrics = []
        for i, fault_name in enumerate(fault_names):
            class_metrics = best_model['class_report'][str(i)]
            fault_metrics.append({
                'Fault Type': fault_name,
                'Precision': class_metrics['precision'],
                'Recall': class_metrics['recall'],
                'F1-Score': class_metrics['f1-score'],
                'Accuracy': class_metrics['precision'],  # Using precision as proxy for accuracy
                'Support': class_metrics['support']
            })

        fault_metrics_df = pd.DataFrame(fault_metrics)

        # Plot fault type performance
        plt = plot_fault_type_performance(fault_metrics_df)
        plt.savefig("fault_type_performance.png", bbox_inches='tight', dpi=300)
        plt.close()
        doc.add_picture("fault_type_performance.png", width=Inches(6.0))

        # Detailed metrics by fault type
        doc.add_heading('Detailed Metrics by Fault Type', level=3)
        fault_table = doc.add_table(rows=1, cols=6)
        fault_table.style = 'Light Shading Accent 2'
        hdr_cells = fault_table.rows[0].cells
        headers = ['Fault Type', 'Precision', 'Recall', 'F1-Score', 'Accuracy', 'Support']
        for i, header in enumerate(headers):
            hdr_cells[i].text = header

        for _, row in fault_metrics_df.iterrows():
            row_cells = fault_table.add_row().cells
            row_cells[0].text = str(row['Fault Type'])
            row_cells[1].text = f"{row['Precision']:.4f}"
            row_cells[2].text = f"{row['Recall']:.4f}"
            row_cells[3].text = f"{row['F1-Score']:.4f}"
            row_cells[4].text = f"{row['Accuracy']:.4f}"
            row_cells[5].text = str(int(row['Support']))

        # Analysis paragraph
        doc.add_paragraph(
            "\nThe above chart and table show how the model performs for each specific\n"
            "fault type. This helps identify which fault types are easier or harder\n"
            "for the model to diagnose correctly."
        )

        # Identify best and worst performing fault types
        best_fault = fault_metrics_df.loc[fault_metrics_df['F1-Score'].idxmax()]
        worst_fault = fault_metrics_df.loc[fault_metrics_df['F1-Score'].idxmin()]

        doc.add_paragraph(
            f"The model performs best on '{best_fault['Fault Type']}' faults with F1-score of {best_fault['F1-Score']:.3f}, "
            f"and has the most difficulty with '{worst_fault['Fault Type']}' faults with F1-score of {worst_fault['F1-Score']:.3f}."
        )

        # Save report
        report_path = "fault_diagnosis_report.docx"
        doc.save(report_path)
        print(f"\nReport generated: {report_path}")

    except Exception as e:
        print(f"Report generation error: {str(e)}")

    print("\n=== Analysis Complete ===")
    print(f"Best model: {best_model['name']}")
    print(f"Accuracy: {best_model['accuracy']:.4f}")

if __name__ == "__main__":
    main()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m

Training WeightedEnsemble (LGBM+GB)...
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.000187 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1739
[LightGBM] [Info] Number of data points in the train set: 1128, number of used features: 7
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [Info] Start training from score -2.397895
[LightGBM] [In

<Figure size 1200x800 with 0 Axes>