In [3]:
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import roc_auc_score, precision_recall_fscore_support, accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Ensure reproducibility
torch.manual_seed(42)
np.random.seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# --- 0. Device Configuration ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- 1. Data Loading and Preprocessing ---
# Reload the datasets
UNSW_training = pd.read_parquet("UNSW-NB15/UNSW_NB15_training-set.parquet")
UNSW_testing = pd.read_parquet("UNSW-NB15/UNSW_NB15_testing-set.parquet")


# Separate labels and attack_cat from features
labels_train = UNSW_training[['label', 'attack_cat']]
labels_test = UNSW_testing[['label', 'attack_cat']]

# Drop 'label' and 'attack_cat' from the feature sets
UNSW_training_features = UNSW_training.drop(columns=['label', 'attack_cat'])
UNSW_testing_features = UNSW_testing.drop(columns=['label', 'attack_cat'])

# Identify categorical and numerical columns
categorical_cols = UNSW_training_features.select_dtypes(include=['category', 'object']).columns
numerical_cols = UNSW_training_features.select_dtypes(include=np.number).columns

# Preprocessing pipelines for numerical and categorical features
numerical_transformer = StandardScaler()
categorical_transformer = OneHotEncoder(handle_unknown='ignore', sparse_output=False)

# Create a preprocessor using ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_cols),
        ('cat', categorical_transformer, categorical_cols)
    ],
    remainder='passthrough'
)

# Fit and transform the training data
X_train_processed_array = preprocessor.fit_transform(UNSW_training_features)
# Transform the testing data
X_test_processed_array = preprocessor.transform(UNSW_testing_features)

# Get feature names after preprocessing
numerical_feature_names = list(numerical_cols)
categorical_feature_names_ohe = preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_cols)
all_processed_feature_names = numerical_feature_names + list(categorical_feature_names_ohe)

# Convert processed arrays back to DataFrame (optional, but good for inspection)
X_train_processed_df = pd.DataFrame(X_train_processed_array, columns=all_processed_feature_names)
X_test_processed_df = pd.DataFrame(X_test_processed_array, columns=all_processed_feature_names)

# Add back the labels and attack_cat
X_train_processed_df['label'] = labels_train['label'].values
X_train_processed_df['attack_cat'] = labels_train['attack_cat'].values
X_test_processed_df['label'] = labels_test['label'].values
X_test_processed_df['attack_cat'] = labels_test['attack_cat'].values

# Filter for normal data for training the GANs
normal_train_data = X_train_processed_df[X_train_processed_df['label'] == 0].drop(columns=['label', 'attack_cat'])
X_train_normal_np = normal_train_data.values.astype(np.float32)

# Prepare full test data and labels for evaluation
X_test_full_np = X_test_processed_df.drop(columns=['label', 'attack_cat']).values.astype(np.float32)
y_test_full = X_test_processed_df['label'].values

print(f"Shape of normal training data for CBIGAN: {X_train_normal_np.shape}")
print(f"Shape of full test data for evaluation: {X_test_full_np.shape}")
print(f"Shape of test labels for evaluation: {y_test_full.shape}")

# Convert numpy arrays to PyTorch tensors
X_train_normal_tensor = torch.from_numpy(X_train_normal_np).to(device)
X_test_full_tensor = torch.from_numpy(X_test_full_np).to(device)
y_test_full_tensor = torch.from_numpy(y_test_full).to(device)

# Create PyTorch DataLoader
BATCH_SIZE = 256
train_dataset = TensorDataset(X_train_normal_tensor)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)




Using device: cuda
Shape of normal training data for CBIGAN: (56000, 186)
Shape of full test data for evaluation: (82332, 186)
Shape of test labels for evaluation: (82332,)


In [4]:
# --- 2. CBIGAN Model Architecture in PyTorch ---

def init_weights(m):
    """Custom weight initialization for linear layers"""
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
        if m.bias is not None:
            nn.init.constant_(m.bias.data, 0)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)


class Encoder(nn.Module):
    """Encoder E: Maps data x to latent representation z_x"""
    def __init__(self, input_dim, latent_dim):
        super(Encoder, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.2),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.2),
            nn.Linear(128, latent_dim),
            nn.Tanh() # Latent vectors typically in [-1, 1]
        )
        self.apply(init_weights)

    def forward(self, x):
        return self.main(x)


class Decoder(nn.Module):
    """Decoder G: Maps latent representation z to data x_z"""
    def __init__(self, latent_dim, output_dim):
        super(Decoder, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(True),
            nn.Dropout(0.2),
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.Dropout(0.2),
            nn.Linear(256, output_dim) # Output linear as data is standardized
        )
        self.apply(init_weights)

    def forward(self, z):
        return self.main(z)


class Discriminator(nn.Module):
    """
    Discriminator D: Takes (data, latent) pairs and outputs a score.
    For CBIGAN (and BiGAN), it distinguishes (x, E(x)) from (G(z), z).
    """
    def __init__(self, input_dim, latent_dim):
        super(Discriminator, self).__init__()
        # Discriminator takes concatenated (data, latent)
        self.main = nn.Sequential(
            nn.Linear(input_dim + latent_dim, 512), # Input is concatenated x and z
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.3),
            nn.Linear(256, 1) # Output is raw score for WGAN, no sigmoid
        )
        self.apply(init_weights)

    def forward(self, x, z):
        # Concatenate data and latent vector
        x_z = torch.cat([x, z], 1)
        return self.main(x_z)



In [None]:
# --- 3. CBIGAN Training ---

input_dim = X_train_normal_np.shape[1]
latent_dim = 128 # Consistent with GANomaly for now

# Instantiate models and move to device
encoder_model = Encoder(input_dim, latent_dim).to(device)
decoder_model = Decoder(latent_dim, input_dim).to(device)
discriminator_model = Discriminator(input_dim, latent_dim).to(device)

# Optimizers
lr_d = 0.0002
lr_g = 0.0002
beta1 = 0.5
beta2 = 0.999 # Adam betas
optimizer_d = optim.Adam(discriminator_model.parameters(), lr=lr_d, betas=(beta1, beta2))
optimizer_g = optim.Adam(list(encoder_model.parameters()) + list(decoder_model.parameters()), lr=lr_g, betas=(beta1, beta2))

# Loss weights for Generator
lambda_recon = 10.0 # Weight for reconstruction/consistency loss (often lower than GANomaly)
lambda_gp = 10.0 # Weight for gradient penalty

EPOCHS = 100 # Number of training epochs
CRITIC_ITERS = 5 # Number of discriminator updates per generator update

print("\nStarting CBIGAN training...")
history_cbigan = {'d_loss': [], 'g_loss': [], 'gp_loss': [], 'consistency_loss': []}

# For gradient penalty
def calculate_gradient_penalty(discriminator, real_samples, fake_samples, real_latent, fake_latent):
    """Calculates the gradient penalty loss for WGAN-GP."""
    alpha = torch.rand(real_samples.size(0), 1).to(device)
    
    # Expand alpha specifically for data samples (x)
    alpha_x = alpha.expand_as(real_samples)
    interpolated_x = (alpha_x * real_samples + ((1 - alpha_x) * fake_samples)).requires_grad_(True)

    # Expand alpha specifically for latent vectors (z)
    alpha_z = alpha.expand_as(real_latent)
    interpolated_z = (alpha_z * real_latent + ((1 - alpha_z) * fake_latent)).requires_grad_(True)

    interpolated_score = discriminator(interpolated_x, interpolated_z)
    
    # Calculate gradients of D's output w.r.t. interpolated samples (x and z)
    gradients = torch.autograd.grad(
        outputs=interpolated_score,
        inputs=(interpolated_x, interpolated_z),
        grad_outputs=torch.ones_like(interpolated_score),
        create_graph=True,
        retain_graph=True,
    )
    # The gradients are returned as a tuple (grad_x, grad_z)
    # We need to flatten both and concatenate them for the norm calculation
    grad_x_flattened = gradients[0].view(gradients[0].size(0), -1)
    grad_z_flattened = gradients[1].view(gradients[1].size(0), -1)
    
    # Concatenate gradients of x and z to compute the norm of the combined gradient
    combined_gradients = torch.cat([grad_x_flattened, grad_z_flattened], dim=1)

    gradient_penalty = ((combined_gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty


for epoch in range(EPOCHS):
    for i, (real_data_batch,) in enumerate(train_loader):
        real_data = real_data_batch.view(real_data_batch.size(0), -1).to(device)
        batch_size = real_data.size(0)

        # --- Train Discriminator ---
        for _ in range(CRITIC_ITERS):
            optimizer_d.zero_grad()

            # 1. Real pair (x, E(x))
            z_encoded_real = encoder_model(real_data).detach() # Detach E(x)
            d_real = discriminator_model(real_data, z_encoded_real)

            # 2. Fake pair (G(z), z)
            z_random = torch.randn(batch_size, latent_dim).to(device) # Random latent vector
            fake_data_from_z = decoder_model(z_random).detach() # Detach G(z)
            d_fake = discriminator_model(fake_data_from_z, z_random)

            # WGAN Loss for D: D(fake) - D(real)
            d_loss = d_fake.mean() - d_real.mean()

            # Gradient Penalty
            gp = calculate_gradient_penalty(discriminator_model, real_data, fake_data_from_z, z_encoded_real, z_random)
            d_loss_total = d_loss + lambda_gp * gp
            d_loss_total.backward()
            optimizer_d.step()
        
        # --- Train Generator (Encoder and Decoder) ---
        optimizer_g.zero_grad()


        # Term 1: Maximize D's output for (x, E(x))
        # Feed real_data through encoder
        z_encoded_real_g = encoder_model(real_data)
        d_real_g = discriminator_model(real_data, z_encoded_real_g)

        # Term 2: Minimize D's output for (G(z), z)
        # Generate fake data from random latent
        z_random_g = torch.randn(batch_size, latent_dim).to(device)
        fake_data_from_z_g = decoder_model(z_random_g)
        d_fake_g = discriminator_model(fake_data_from_z_g, z_random_g)

        # Adversarial loss for Generator
        # G wants d_real_g to be high and d_fake_g to be low (making the generator components look real to D)
        # So, the loss is -(d_real_g.mean() - d_fake_g.mean())
        g_loss_adv = -d_real_g.mean() + d_fake_g.mean()

        # Consistency Loss: E(D(z)) should be close to z
        # Take the generated data from z_random_g and encode it
        z_reconstructed_from_fake_x = encoder_model(fake_data_from_z_g)
        consistency_loss = (z_reconstructed_from_fake_x - z_random_g).abs().mean() # L1 loss for consistency

        g_loss_total = g_loss_adv + lambda_recon * consistency_loss
        g_loss_total.backward()
        optimizer_g.step()

        if i % 50 == 0: # Print more frequently to see training progress
            print(f"Epoch {epoch+1}/{EPOCHS}, Batch {i}/{len(train_loader)}: "
                  f"D Loss: {d_loss.item():.4f}, GP: {gp.item():.4f}, "
                  f"G Adv Loss: {g_loss_adv.item():.4f}, Consistency Loss: {consistency_loss.item():.4f}")

    # Store metrics for plotting
    history_cbigan['d_loss'].append(d_loss.item())
    history_cbigan['gp_loss'].append(gp.item())
    history_cbigan['g_loss'].append(g_loss_adv.item()) # Store adversarial part of G loss
    history_cbigan['consistency_loss'].append(consistency_loss.item())

    print(f"Epoch {epoch+1} completed. D Loss: {history_cbigan['d_loss'][-1]:.4f}, G Loss: {history_cbigan['g_loss'][-1]:.4f}")

print("\nCBIGAN training finished.")


# --- 4. Anomaly Detection and Evaluation for CBIGAN ---

def cbigan_anomaly_score(encoder, decoder, discriminator, data_tensor, latent_dim=128):
    encoder.eval()
    decoder.eval()
    discriminator.eval()

    with torch.no_grad():
        # Encode the input data x -> z_x
        z_x = encoder(data_tensor)

        # Reconstruct x from z_x -> x_recon
        x_recon = decoder(z_x)

        # Rconsistency check
        z_x_recon = encoder(x_recon)

        # MAE
        recon_error = torch.mean(torch.abs(data_tensor - x_recon), dim=1)

        # MAE between z_x and z_x_recon
        latent_consistency_error = torch.mean(torch.abs(z_x - z_x_recon), dim=1)

        # 3. Discriminator Score for (x, E(x)) pair
        # Higher D(x, E(x)) means more "normal" to D.
        # So for anomaly score, we want (1 - D_score) if D outputs [0,1] or just -D_score if linear.
        # Since WGAN-GP D output is linear, we use -D_score, meaning lower D score implies more anomalous.
        d_score = discriminator(data_tensor, z_x).squeeze()
    
        anomaly_score_val = -d_score # Negative discriminator score, higher means more anomalous

    return anomaly_score_val.cpu().numpy()


print("\nCalculating anomaly scores for test data using CBIGAN...")
anomaly_scores_cbigan = cbigan_anomaly_score(encoder_model, decoder_model, discriminator_model, X_test_full_tensor)

# Determine a threshold for anomaly detection
print("\nCalculating anomaly scores for normal training data to determine threshold for CBIGAN...")
normal_train_anomaly_scores_cbigan = cbigan_anomaly_score(encoder_model, decoder_model, discriminator_model, X_train_normal_tensor)

threshold_cbigan = np.percentile(normal_train_anomaly_scores_cbigan, 95)
print(f"Calculated anomaly threshold (95th percentile of normal training scores for CBIGAN): {threshold_cbigan:.4f}")

# Predict anomalies based on the threshold
y_pred_anomaly_cbigan = (anomaly_scores_cbigan > threshold_cbigan).astype(int)





Starting CBIGAN training...
Epoch 1/100, Batch 0/218: D Loss: -0.0055, GP: 0.8854, G Adv Loss: -0.0058, Consistency Loss: 0.7934
Epoch 1/100, Batch 50/218: D Loss: -53.2252, GP: 1.3851, G Adv Loss: -55.1455, Consistency Loss: 0.8020
Epoch 1/100, Batch 100/218: D Loss: -2780.1335, GP: 129.3901, G Adv Loss: -2741.6548, Consistency Loss: 0.8424
Epoch 1/100, Batch 150/218: D Loss: -37759.2539, GP: 1790.3772, G Adv Loss: -37314.5625, Consistency Loss: 1.1699
Epoch 1/100, Batch 200/218: D Loss: -232621.4375, GP: 11460.8828, G Adv Loss: -229934.8281, Consistency Loss: 1.1562
Epoch 1 completed. D Loss: -389645.5938, G Loss: -380980.5312
Epoch 2/100, Batch 0/218: D Loss: -394171.4688, GP: 20041.6289, G Adv Loss: -399034.9062, Consistency Loss: 1.1585
Epoch 2/100, Batch 50/218: D Loss: -1411978.1250, GP: 69466.9062, G Adv Loss: -1438163.7500, Consistency Loss: 1.1681
Epoch 2/100, Batch 100/218: D Loss: -4073746.7500, GP: 200902.8438, G Adv Loss: -4059223.2500, Consistency Loss: 1.1723
Epoch 2/1

In [None]:
# --- Save Trained CBIGAN Models ---
torch.save(encoder_model.state_dict(), 'cbigan_encoder.pth')
torch.save(decoder_model.state_dict(), 'cbigan_decoder.pth')
torch.save(discriminator_model.state_dict(), 'cbigan_discriminator.pth')

print("CBIGAN models saved successfully.")


In [None]:
from sklearn.metrics import (
    confusion_matrix,
    roc_curve,
    precision_recall_curve,
    auc,
    roc_auc_score,
    precision_recall_fscore_support,
    accuracy_score,
)
import matplotlib.pyplot as plt
import seaborn as sns

# Ensure y_test_labels is numpy array
y_test_np = y_test_labels.cpu().numpy()

# --- Confusion Matrix ---
cm = confusion_matrix(y_test_np, y_pred_anomaly_cbigan)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Purples',
            xticklabels=['Normal', 'Attack'], yticklabels=['Normal', 'Attack'])
plt.title('CBIGAN Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()

# --- ROC & PR Curves ---
fpr_cbigan, tpr_cbigan, _ = roc_curve(y_test_np, anomaly_scores_cbigan)
roc_auc_cbigan = auc(fpr_cbigan, tpr_cbigan)

precision_vals, recall_vals, _ = precision_recall_curve(y_test_np, anomaly_scores_cbigan)
pr_auc_cbigan = auc(recall_vals, precision_vals)

plt.figure(figsize=(14, 6))

# ROC Curve
plt.subplot(1, 2, 1)
plt.plot(fpr_cbigan, tpr_cbigan, color='darkorange', lw=2, label=f'AUC = {roc_auc_cbigan:.2f}')
plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
plt.title('CBIGAN ROC Curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.grid(True)

# PR Curve
plt.subplot(1, 2, 2)
plt.plot(recall_vals, precision_vals, color='blue', lw=2, label=f'AUC = {pr_auc_cbigan:.2f}')
plt.title('CBIGAN Precision-Recall Curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# --- Additional Metrics ---
y_pred = y_pred_anomaly_cbigan
precision, recall, f1, _ = precision_recall_fscore_support(y_test_np, y_pred, average='binary', zero_division=0)
accuracy = accuracy_score(y_test_np, y_pred)

print(f"\n--- CBIGAN Evaluation Metrics ---")
print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")
print(f"ROC AUC:   {roc_auc_cbigan:.4f}")
print(f"PR AUC:    {pr_auc_cbigan:.4f}")
print(f"Threshold: {threshold_cbigan:.4f}")


In [None]:
# Evaluate performance
print("\n--- CBIGAN Performance Evaluation ---")
auc_roc_cbigan = roc_auc_score(y_test_full, anomaly_scores_cbigan) # AUC-ROC with raw anomaly scores
print(f"AUC-ROC Score: {auc_roc_cbigan:.4f}")

precision_cbigan, recall_cbigan, f1_cbigan, _ = precision_recall_fscore_support(y_test_full, y_pred_anomaly_cbigan, average='binary', zero_division=0)
accuracy_cbigan = accuracy_score(y_test_full, y_pred_anomaly_cbigan)
conf_matrix_cbigan = confusion_matrix(y_test_full, y_pred_anomaly_cbigan)

print(f"Accuracy: {accuracy_cbigan:.4f}")
print(f"Precision: {precision_cbigan:.4f}")
print(f"Recall: {recall_cbigan:.4f}")
print(f"F1-Score: {f1_cbigan:.4f}")
print("\nConfusion Matrix:")
print(conf_matrix_cbigan)

# Plotting training losses for CBIGAN
plt.figure(figsize=(12, 6))
plt.plot(history_cbigan['d_loss'], label='Discriminator Loss (WGAN)')
plt.plot(history_cbigan['g_loss'], label='Generator Adversarial Loss')
plt.plot(history_cbigan['gp_loss'], label='Gradient Penalty Loss')
plt.plot(history_cbigan['consistency_loss'], label='Consistency Loss')
plt.title('CBIGAN Training Losses')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

# Plotting anomaly score distributions for CBIGAN
plt.figure(figsize=(12, 6))
sns.histplot(normal_train_anomaly_scores_cbigan, color='blue', label='Normal Training Data Anomaly Scores', kde=True, stat='density', alpha=0.6)
sns.histplot(anomaly_scores_cbigan[y_test_full == 0], color='green', label='Normal Test Data Anomaly Scores', kde=True, stat='density', alpha=0.6)
sns.histplot(anomaly_scores_cbigan[y_test_full == 1], color='red', label='Attack Test Data Anomaly Scores', kde=True, stat='density', alpha=0.6)
plt.axvline(threshold_cbigan, color='purple', linestyle='--', label=f'Threshold ({threshold_cbigan:.4f})')
plt.title('Distribution of Anomaly Scores (CBIGAN)')
plt.xlabel('Anomaly Score')
plt.ylabel('Density')
plt.legend()
plt.grid(True)
plt.show()