In [None]:
import pandas as pd
import xgboost as xgb
from sklearn.preprocessing import LabelEncoder
import shap
import matplotlib.pyplot as plt
import numpy as np
import traceback

In [None]:
FAULT_CODE_MAPPING = {
    'F1L': 'Inverter Fault (Low)',
    'F1M': 'Inverter Fault (Med)',
    'F2L': 'Feedback Sensor Fault (Low)',
    'F2M': 'Feedback Sensor Fault (Med)',
    'F3L': 'Grid Anomaly (Low)',
    'F3M': 'Grid Anomaly (Med)',
    'F4L': 'PV Array Mismatch (Low)',
    'F4M': 'PV Array Mismatch (Med)',
    'F5L': 'PV Array Mismatch (Low)',
    'F5M': 'PV Array Mismatch (Med)',
    'F6L': 'MPPT Controller Fault (Low)',
    'F6M': 'MPPT Controller Fault (Med)',
    'F7L': 'Boost Converter Controller Fault (Low)',
    'F7M': 'Boost Converter Controller Fault (Med)'
}

In [None]:
def filter_anomalies_only(input_path='vae_anomaly_test_results_1.csv', output_path='test1_anomalies.csv'):
    try:
        df = pd.read_csv(input_path)
        anomalies = df[df['Anomaly'] == 1].copy()
        if anomalies.empty:
            print(f"Warning: No anomalies found in {input_path}. Output file will be empty.")
        anomalies.to_csv(output_path, index=False)
        print(f"Filtered anomalies saved to: {output_path}")
    except FileNotFoundError:
        print(f"Error: Input file not found at {input_path}")
    except KeyError:
        print(f"Error: 'Anomaly' column not found in {input_path}")
    except Exception as e:
        print(f"An error occurred during filtering: {e}")

In [None]:
def load_and_prepare_data(train_path, test_path):
    try:
        train_df = pd.read_csv(train_path)
        train_anomalies = train_df[train_df['Anomaly'] == 1].copy()
        if train_anomalies.empty:
            raise ValueError(f"No anomalies found in the training file {train_path} for training the source classifier.")

        test_anomalies = pd.read_csv(test_path)
        if test_anomalies.empty:
            print(f"Warning: The test anomalies file {test_path} is empty. No predictions or explanations will be generated.")
            return pd.DataFrame(), np.array([]), pd.DataFrame(), pd.DataFrame(), LabelEncoder()

        feature_cols = ['Vpv', 'Vdc', 'ia', 'ib', 'ic', 'Vabc']  # Use physical features
        if not all(col in train_anomalies.columns for col in feature_cols):
            raise KeyError(f"Training file is missing one or more required features: {feature_cols}")
        if 'source' not in train_anomalies.columns:
            raise KeyError("Training file is missing the 'source' column.")
        if not all(col in test_anomalies.columns for col in feature_cols):
            raise KeyError(f"Test file is missing one or more required features: {feature_cols}")

        X_train = train_anomalies[feature_cols]
        y_train = train_anomalies['source']

        label_encoder = LabelEncoder()
        y_train_encoded = label_encoder.fit_transform(y_train)

        X_test = test_anomalies[feature_cols]

        print(f"Data loaded. Training features shape: {X_train.shape}, Test features shape: {X_test.shape}")
        print(f"Target classes found: {list(label_encoder.classes_)}")

        return X_train, y_train_encoded, X_test, test_anomalies, label_encoder

    except FileNotFoundError as e:
        print(f"Error: Data file not found: {e}")
        raise
    except KeyError as e:
        print(f"Error: Column {e} not found in one of the CSV files.")
        raise
    except ValueError as e:
        print(f"Error in data value: {e}")
        raise
    except Exception as e:
        print(f"An error occurred during data loading: {e}")
        raise

In [None]:
def train_classifier(X_train, y_train):
    if X_train.empty or len(y_train) == 0:
        print("Error: Cannot train model with empty data.")
        return None

    print("Training XGBoost classifier...")
    model = xgb.XGBClassifier(objective='multi:softprob',
                              eval_metric='mlogloss',
                              random_state=42)
    model.fit(X_train, y_train)
    print("Training complete.")

    model_filename = 'xgb_classifier_model.json'
    model.save_model(model_filename)
    print(f"Model saved to {model_filename}")

    return model

In [None]:
def predict_sources(model, X_test, label_encoder):
    if model is None or X_test.empty:
        print("Skipping prediction due to missing model or empty test data.")
        return None, None

    print("Making predictions on test anomalies...")
    y_pred_encoded = model.predict(X_test)
    predicted_sources = label_encoder.inverse_transform(y_pred_encoded)
    print("Prediction complete.")
    return y_pred_encoded, predicted_sources

In [None]:
def explain_predictions_with_shap(model, X_test, test_df_with_predictions, label_encoder):
    if model is None or X_test.empty:
        print("Skipping SHAP explanations due to missing model or empty test data.")
        return

    print("\n--- Generating SHAP Explanations ---")

    try:
        explainer = shap.TreeExplainer(model)
        print("Calculating SHAP values (this may take a moment)...")
        shap_values = explainer.shap_values(X_test)
        print("SHAP values calculated.")

        expected_value = explainer.expected_value

        num_explanations_to_show = min(5, len(X_test))
        shap_details_list = []  # To save SHAP values to CSV

        if 'Predicted_Source' not in test_df_with_predictions.columns:
            print("Error: 'Predicted_Source' column not found in the prediction DataFrame. Cannot generate force plots or explanations.")
            return

        y_pred_encoded = label_encoder.transform(test_df_with_predictions['Predicted_Source'])

        for i in range(len(X_test)):
            instance_idx = i
            predicted_class_idx = y_pred_encoded[instance_idx]
            predicted_class_name = label_encoder.classes_[predicted_class_idx]
            predicted_fault_description = FAULT_CODE_MAPPING.get(predicted_class_name, 'Unknown Fault')

            print(f"\nExplaining Anomaly {instance_idx} (Predicted Source: {predicted_class_name})")

            try:
                if isinstance(shap_values, list):
                    class_shap_values = shap_values[predicted_class_idx][instance_idx, :]
                    base_value = expected_value[predicted_class_idx] if isinstance(expected_value, (list, np.ndarray)) else expected_value
                else:
                    class_shap_values = shap_values[instance_idx, :, predicted_class_idx] if shap_values.ndim == 3 else shap_values[instance_idx, :]
                    base_value = expected_value[predicted_class_idx] if isinstance(expected_value, (list, np.ndarray)) else expected_value

                # Save SHAP explanation details into list
                feature_contributions = dict(zip(X_test.columns, class_shap_values))
                sorted_features = sorted(feature_contributions.items(), key=lambda x: abs(x[1]), reverse=True)

                top_features = {f"TopFeature_{rank+1}": feature for rank, (feature, _) in enumerate(sorted_features[:3])}
                top_contributions = {f"TopContribution_{rank+1}": contrib for rank, (_, contrib) in enumerate(sorted_features[:3])}

                shap_details = {
                    'Anomaly_Index': instance_idx,
                    'Predicted_Source_Code': predicted_class_name,
                    'Predicted_Fault_Description': predicted_fault_description,
                    **top_features,
                    **top_contributions
                }
                shap_details_list.append(shap_details)

                # Plot and save the SHAP force plot for the current anomaly
                '''shap.initjs()
                force_plot = shap.force_plot(base_value, class_shap_values, X_test.iloc[instance_idx, :])
                force_plot_path = f"shap_force_plot_anomaly_{instance_idx}.html"
                shap.save_html(force_plot_path, force_plot)
                print(f"Force plot saved for anomaly {instance_idx} to {force_plot_path}")'''

            except Exception as plot_error:
                print(f"Error explaining instance {instance_idx}: {plot_error}")
                traceback.print_exc()

        # After looping, save all shap explanations to CSV
        shap_df = pd.DataFrame(shap_details_list)
        shap_df.to_csv('shap_explanations_per_anomaly.csv', index=False)
        print(f"\nSaved detailed SHAP explanations to shap_explanations_per_anomaly.csv")

        print("\n--- SHAP Explanations Complete ---")

    except Exception as e:
        print(f"\n--- An unexpected error occurred during SHAP explanation generation ---")
        traceback.print_exc()
        print(f"Error message: {e}")

In [None]:
def main():
    filter_anomalies_only()
    
    train_path = 'vae_anomaly_test_results.csv'
    test_path = 'test1_anomalies.csv'
    
    try:
        X_train, y_train_encoded, X_test, test_anomalies, label_encoder = load_and_prepare_data(train_path, test_path)
        
        if X_train.empty or X_test.empty:
            print("Cannot proceed with empty datasets.")
            return
            
        model = train_classifier(X_train, y_train_encoded)
        
        y_pred_encoded, predicted_sources = predict_sources(model, X_test, label_encoder)
        
        if predicted_sources is not None:
            test_with_predictions = test_anomalies.copy()
            test_with_predictions['Predicted_Source'] = predicted_sources
            test_with_predictions['Predicted_Source_Code'] = label_encoder.transform(predicted_sources)
            explain_predictions_with_shap(model, X_test, test_with_predictions, label_encoder)
        
    except Exception as e:
        print(f"Error during the pipeline execution: {e}")

In [None]:
if __name__ == "__main__":
    main()