In [None]:
#Performance at different noise levels in SACGAN-GP for IEEE14-bus system and IEEE118-bus system
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Load and analyze the combined dataset to identify preprocessing needs
import pandas as pd
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Flatten, Concatenate, Embedding
from tensorflow.keras import Model, Sequential
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Layer, Softmax

# Load the combined dataset with attacked and non-attacked data
revision_dataset_path = '/content/14bus_fdia_gaussian_noise_1pct.csv'
df1 = pd.read_csv(revision_dataset_path)

# Display basic information about the dataset to understand its structure
df_info = df1.info()
df_head = df1.head()

df_info, df_head
import tensorflow as tf
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# -----------------------
# 1. Extract Features & Labels
# -----------------------
feature_columns = [col for col in df1.columns if 'voltage' in col or 'angle' in col]
X = df1[feature_columns]
y_raw = df1['attack'] # Keep the raw float values if needed elsewhere

# Binarize the 'attack' label for classification tasks
# Assuming values > 0.5 are 'attacked' (1) and <= 0.5 are 'non-attacked' (0)
y_binary = (y_raw > 0.5).astype(int)

# -----------------------
# 2. Normalize Features
# -----------------------
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)

# ----------------------- FIRST SPLIT (for GAN-Attack data, not directly used in main GAN training) --------
# Use y_binary and stratify to ensure class distribution in this split
X_train_first_split, X_temp_test, y_train_first_split, y_temp_test = train_test_split(
    X_normalized, y_binary, test_size=0.2, random_state=42, stratify=y_binary)

# ----------------------- SECOND SPLIT (from Temp-Test => Final-Test + GAN-Attack) -------------------------
# Stratify based on y_temp_test to ensure class distribution in these subsets
X_test_final, X_gan_attack, y_test_final, y_gan_attack = train_test_split(
    X_temp_test, y_temp_test, test_size=0.2, random_state=42, stratify=y_temp_test)

# ----------------------- Convert to Tensors (for first split, if needed) -----------------------------------
X_train_tensor_first_split = tf.convert_to_tensor(X_train_first_split, dtype=tf.float32)
X_test_final_tensor = tf.convert_to_tensor(X_test_final, dtype=tf.float32)
X_gan_attack_tensor = tf.convert_to_tensor(X_gan_attack, dtype=tf.float32)

y_train_tensor_first_split = tf.convert_to_tensor(y_train_first_split.to_numpy(), dtype=tf.float32)
y_test_final_tensor = tf.convert_to_tensor(y_test_final.to_numpy(), dtype=tf.float32)
y_gan_attack_tensor = tf.convert_to_tensor(y_gan_attack.to_numpy(), dtype=tf.float32)

# ----------------------- Print Shapes (for the first split) -----------------------------------------------
print("TRAIN (first split for GAN-Attack generation):", X_train_tensor_first_split.shape, y_train_tensor_first_split.shape)
print("FINAL TEST (from second split):", X_test_final_tensor.shape, y_test_final_tensor.shape)
print("GAN ATTACK DATA (from second split):", X_gan_attack_tensor.shape, y_gan_attack_tensor.shape)

# === The actual split used for the GAN training/evaluation ===
# This split will now correctly use the binarized labels and be stratified.
X_train, X_test, y_train, y_test = train_test_split(X_normalized, y_binary, test_size=0.2, random_state=42, stratify=y_binary)

print("\n--- GAN Training/Evaluation Data Split ---")
print("X_train (for GAN training):")
print(X_train)
print("X_test (for GAN evaluation):")
print(X_test)
print("y_train (for GAN training):")
print(y_train)
print("y_test (for GAN evaluation):")
print(y_test)

# Convert NumPy arrays to Pandas DataFrames for easier saving
X_train_df1 = pd.DataFrame(X_train)
X_test_df1 = pd.DataFrame(X_test)
y_train_df1 = pd.DataFrame(y_train) # y_train is now binary
y_test_df1 = pd.DataFrame(y_test)   # y_test is now binary

# Save to Excel files in the local /content/ directory
X_train_df1.to_excel('/content/X_train5.xlsx', index=False)
X_test_df1.to_excel('/content/X_test5.xlsx', index=False)
y_train_df1.to_excel('/content/y_train5.xlsx', index=False)
y_test_df1.to_excel('/content/y_test5.xlsx', index=False)

# Convert data into TensorFlow tensors
X_train_tensor = tf.convert_to_tensor(X_train, dtype=tf.float32)
X_test_tensor = tf.convert_to_tensor(X_test, dtype=tf.float32)
y_train_tensor = tf.convert_to_tensor(y_train.values, dtype=tf.float32) # y_train.values are 0s and 1s now
y_test_tensor = tf.convert_to_tensor(y_test.values, dtype=tf.float32)   # y_test.values are 0s and 1s now

print("\nShapes of processed tensors (for GAN training/evaluation):")
print("X_train:", X_train_tensor.shape)
print("X_test:", X_test_tensor.shape)
print("y_train:", y_train_tensor.shape)
print("y_test:", y_test_tensor.shape)


class SelfAttention(Layer):
    def __init__(self, units):
        super(SelfAttention, self).__init__()
        self.query = Dense(units // 8)
        self.key = Dense(units // 8)
        self.value = Dense(units)

    def call(self, inputs):
        query = self.query(inputs)
        key = self.key(inputs)
        value = self.value(inputs)

        scores = tf.matmul(query, key, transpose_b=True)
        attention_weights = Softmax()(scores)
        attention_output = tf.matmul(attention_weights, value)
        return attention_output + inputs  # Residual connection

def build_cgan_generator(noise_dim, output_dim, num_classes=2):
    noise_input = Input(shape=(noise_dim,))
    label_input = Input(shape=(1,), dtype='int32')

    label_embedding = Embedding(num_classes, noise_dim)(label_input)
    label_embedding = Flatten()(label_embedding)

    combined_input = Concatenate()([noise_input, label_embedding])

    x = Dense(256, activation="relu")(combined_input)
    x = SelfAttention(256)(x)
    x = Dense(output_dim, activation="tanh")(x)
    return Model([noise_input, label_input], x)

def build_cgan_discriminator(input_dim, num_classes=2):
    input_data = Input(shape=(input_dim,))
    input_label = Input(shape=(1,), dtype='int32')

    label_embedding = Embedding(num_classes, input_dim)(input_label)
    label_embedding = Flatten()(label_embedding)

    combined_input = Concatenate()([input_data, label_embedding])

    x = Dense(256, activation="relu")(combined_input)
    x = Dense(128, activation="relu")(x) # Additional layer for the discriminator
    output = Dense(1, activation=None)(x) # Output a raw score for WGAN-GP

    return Model([input_data, input_label], output)

def gradient_penalty(real, fake, label, discriminator):
    alpha = tf.random.uniform([real.shape[0], 1], 0.0, 1.0)
    interpolated = alpha * real + (1 - alpha) * fake

    with tf.GradientTape() as tape:
        tape.watch(interpolated)
        pred = discriminator([interpolated, label])

    gradients = tape.gradient(pred, [interpolated])[0]
    slopes = tf.sqrt(tf.reduce_sum(tf.square(gradients), axis=1))
    return tf.reduce_mean((slopes - 1.0) ** 2)
# Set up the GAN parameters
noise_dim = 100
output_dim = X_train.shape[1]
batch_size = 64
epochs = 100
lambda_gp = 1 # Gradient penalty weight

# Instantiate generator and discriminator models
generator = build_cgan_generator(noise_dim, output_dim)
discriminator = build_cgan_discriminator(output_dim)

# Set optimizers
generator_optimizer = Adam(1e-5)
discriminator_optimizer = Adam(1e-5)

# For tracking
epoch_list = []
d_loss_list = []
g_loss_list = []
accuracy_list = []

# Training loop
# Convert labels to tensors in the training loop
for epoch in range(epochs):
    for _ in range(5):  # Update discriminator more frequently
        # Sample random noise and labels
        noise = np.random.normal(0, 1, (batch_size, noise_dim))
        random_labels = np.random.randint(0, 2, batch_size)

        # Convert noise and random labels to tensors
        noise = tf.convert_to_tensor(noise, dtype=tf.float32)
        random_labels = tf.convert_to_tensor(random_labels, dtype=tf.int32)

        # Generate fake data conditioned on labels
        generated_data = generator([noise, random_labels])

        # Get a batch of real data and labels
        idx = np.random.randint(0, X_train.shape[0], batch_size)
        real_data = X_train[idx]
        real_labels = y_train.iloc[idx].values # y_train is now binary (0 or 1)

        # Convert real labels to tensors
        real_data = tf.convert_to_tensor(real_data, dtype=tf.float32)
        real_labels = tf.convert_to_tensor(real_labels, dtype=tf.int32)

        # Train Discriminator with gradient penalty
        with tf.GradientTape() as tape:
            d_loss_real = tf.reduce_mean(discriminator([real_data, real_labels]))
            d_loss_fake = tf.reduce_mean(discriminator([generated_data, random_labels]))
            gp = gradient_penalty(real_data, generated_data, real_labels, discriminator)
            d_loss = d_loss_fake - d_loss_real + lambda_gp * gp

        gradients = tape.gradient(d_loss, discriminator.trainable_variables)
        discriminator_optimizer.apply_gradients(zip(gradients, discriminator.trainable_variables))

    # Train Generator
    noise = np.random.normal(0, 1, (batch_size, noise_dim))
    valid_labels = np.ones((batch_size,), dtype=int)  # Generate labels for attacked samples

    # Convert noise and valid labels to tensors
    noise = tf.convert_to_tensor(noise, dtype=tf.float32)
    valid_labels = tf.convert_to_tensor(valid_labels, dtype=tf.int32)

    with tf.GradientTape() as tape:
        g_loss = -tf.reduce_mean(discriminator([generator([noise, valid_labels]), valid_labels]))

    gradients = tape.gradient(g_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(zip(gradients, generator.trainable_variables))

    if epoch % 100 == 0:
        print(f"Epoch: {epoch}, D Loss: {d_loss.numpy():.4f}, G Loss: {g_loss.numpy():.4f}")
        # Generate fake samples for accuracy evaluation
        generated_samples = generator([noise, valid_labels])
        predictions = discriminator([generated_samples, valid_labels])

        # Classify as attack if discriminator output < 0.5 (you can adjust threshold)
        predicted_labels = tf.cast(predictions < 0.5, tf.int32)

        # In a real case, compare against known attack labels (1 for valid_labels here)
        acc = accuracy_score(valid_labels.numpy(), predicted_labels.numpy())

        # Print and store
        print(f"Epoch: {epoch}, D Loss: {d_loss.numpy():.4f}, G Loss: {g_loss.numpy():.4f}, Accuracy: {acc:.4f}")

        epoch_list.append(epoch)
        d_loss_list.append(d_loss.numpy())
        g_loss_list.append(g_loss.numpy())
        accuracy_list.append(acc)


# Generate anomaly scores for the test set
discriminator_scores = discriminator.predict([X_test, y_test])

# Choose a threshold (e.g., 75th percentile of scores for non-attacked samples)
# y_test is now binary, so y_test == 0 will correctly select non-attacked samples
threshold = np.percentile(discriminator_scores[y_test == 0], 75)
predicted_labels = (discriminator_scores > threshold).astype(int)

# Calculate performance metrics
accuracy = accuracy_score(y_test, predicted_labels)
precision = precision_score(y_test, predicted_labels)
recall = recall_score(y_test, predicted_labels)
f1 = f1_score(y_test, predicted_labels)

print(f"Detection Accuracy: {accuracy:.4f}")
print(f"Detection Precision: {precision:.4f}")
print(f"Detection Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# ===============================================
# üîç FDIA Detection Evaluation Code
# Confusion Matrix + ROC Curve + Precision‚ÄìRecall
# ===============================================

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score, confusion_matrix, ConfusionMatrixDisplay

# -----------------------------------------------
# 1Ô∏è‚É£  Get anomaly scores from the discriminator
# -----------------------------------------------
# Convert test labels to tensor
y_test_tensor = tf.convert_to_tensor(y_test.values, dtype=tf.int32)
X_test_tensor = tf.convert_to_tensor(X_test, dtype=tf.float32)

# Discriminator output score (higher => attack)
scores = discriminator([X_test_tensor, y_test_tensor]).numpy().flatten()

# Convert continuous scores to binary prediction using threshold
threshold = np.percentile(scores, 75)      # ‚âà adaptive decision boundary
y_pred = (scores > threshold).astype(int)

# -----------------------------------------------
# 3Ô∏è‚É£  ROC Curve + AUC
# -----------------------------------------------
fpr, tpr, _ = roc_curve(y_test, scores)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, linewidth=2, label=f"AUC = {roc_auc:.4f}")
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate (Recall)")
plt.title("ROC Curve - FDIA Detection")
plt.legend()
plt.grid()
plt.show()

print(f"\nROC AUC Score: {roc_auc:.4f}")

# -----------------------------------------------
# 4Ô∏è‚É£  Precision‚ÄìRecall Curve
# -----------------------------------------------
precision, recall, _ = precision_recall_curve(y_test, scores)
avg_precision = average_precision_score(y_test, scores)

plt.figure()
plt.plot(recall, precision, linewidth=2, label=f"AP = {avg_precision:.4f}")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision‚ÄìRecall Curve - FDIA Detection (Imbalanced Data)")
plt.legend()
plt.grid()
plt.show()

print(f"\nAverage Precision Score: {avg_precision:.4f}")
