In [1]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import joblib
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split

from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel

from tqdm.auto import tqdm
import time

In [2]:
# Create output directory structure
OUTPUT_DIR = './intrusion_detection_model'
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(f"{OUTPUT_DIR}/models", exist_ok=True)
os.makedirs(f"{OUTPUT_DIR}/plots", exist_ok=True)

In [3]:
print("TensorFlow version:", tf.__version__)
print("Setup complete!")

TensorFlow version: 2.19.0
Setup complete!


In [4]:
# Step 1: Loading and exploring the dataset
train_path = 'UNSW_NB15/UNSW_NB15_training-set.csv'
test_path = 'UNSW_NB15/UNSW_NB15_testing-set.csv'

print("Loading datasets...")
start_time = time.time()
df_train = pd.read_csv(train_path)
df_test = pd.read_csv(test_path)
print(f"Data loaded in {time.time() - start_time:.2f} seconds")

Loading datasets...
Data loaded in 0.88 seconds


In [5]:
df = pd.concat([df_train, df_test], ignore_index=True)
print("Dataset Shape:", df.shape)
print("Target column values (attack_cat):", df['attack_cat'].unique())
print("Target column values (label):", df['label'].unique())  # 0: Normal, 1: Attack

Dataset Shape: (257673, 45)
Target column values (attack_cat): ['Normal' 'Reconnaissance' 'Backdoor' 'DoS' 'Exploits' 'Analysis'
 'Fuzzers' 'Worms' 'Shellcode' 'Generic']
Target column values (label): [0 1]


In [6]:
# Step 2: Preprocess the data
print("\nPreprocessing data...")
df.dropna(inplace=True)

# Explicitly drop the ID column to prevent it from being used in modeling
print("Explicitly dropping ID column...")
df.drop(columns=['id'], inplace=True, errors='ignore')

# Store categorical columns for one-hot encoding instead of dropping them
categorical_cols = ['proto', 'service', 'state']

# Make a copy of categorical features before transformation for later use
cat_data = df[categorical_cols].copy()

# Encode 'attack_cat' (multiclass target) & prepare binary target
label_encoder = LabelEncoder()
df['attack_cat_encoded'] = label_encoder.fit_transform(df['attack_cat'])
df['label'] = df['label'].astype(int)  # Ensure binary label is int

# Store the class mappings for later use
attack_cat_mapping = dict(zip(label_encoder.classes_, range(len(label_encoder.classes_))))
print("\nAttack Category Mapping:")
for cat, code in attack_cat_mapping.items():
    print(f"{cat}: {code}")


Preprocessing data...
Explicitly dropping ID column...

Attack Category Mapping:
Analysis: 0
Backdoor: 1
DoS: 2
Exploits: 3
Fuzzers: 4
Generic: 5
Normal: 6
Reconnaissance: 7
Shellcode: 8
Worms: 9


In [7]:
for category in df['attack_cat'].unique():
    count = df['attack_cat'].value_counts().get(category, 0)
    print(f"{category}: {count}")

Normal: 93000
Reconnaissance: 13987
Backdoor: 2329
DoS: 16353
Exploits: 44525
Analysis: 2677
Fuzzers: 24246
Worms: 174
Shellcode: 1511
Generic: 58871


In [8]:
# Split the data by class

attack_data = df[df['label'] == 1]

min_classes = ['Worms', 'Shellcode', 'Backdoor', 'Analysis']
min_att_data = attack_data[attack_data['attack_cat'].isin(min_classes)]
maj_att_data = attack_data[~attack_data['attack_cat'].isin(min_classes)]
print(f"\nminority attack class samples: {len(min_att_data)}")
print(f"majority Attack class samples: {len(maj_att_data)}")

# Print class imbalance ratio
imbalance_ratio = len(maj_att_data) / len(min_att_data) * 4/6
print(f"Class imbalance ratio (attack:normal): {imbalance_ratio:.2f}")


minority attack class samples: 6691
majority Attack class samples: 157982
Class imbalance ratio (attack:normal): 15.74


In [9]:
from sklearn.preprocessing import MinMaxScaler

# One-hot encode categorical features
print("\nApplying one-hot encoding to categorical features...")
df_encoded = pd.get_dummies(df, columns=categorical_cols)

# Keep 'attack_cat_encoded' as the conditional label
# Drop only columns that are not needed for CGAN input
columns_to_drop = ['attack_cat', 'label']  # Keep 'attack_cat_encoded'
df_encoded.drop(columns=columns_to_drop, inplace=True)

# Normalize feature columns (excluding 'attack_cat_encoded')
# as 'attack_cat_encoded' will be used as label in cgan
print("Normalizing features...")
feature_cols = df_encoded.columns[df_encoded.columns != 'attack_cat_encoded']
scaler = MinMaxScaler()
df_encoded[feature_cols] = scaler.fit_transform(df_encoded[feature_cols])



Applying one-hot encoding to categorical features...
Normalizing features...


In [10]:
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix


df_ids_original = df_encoded.copy()
X_orig = df_ids_original.drop(columns=['attack_cat_encoded']).values
y_orig = df_ids_original['attack_cat_encoded'].values

X_train_orig, X_test_orig, y_train_orig, y_test_orig = train_test_split(
                                                        X_orig, y_orig, test_size=0.3,
                                                        random_state=42, stratify=y_orig)
print("train test splitting done")

train test splitting done


In [11]:
# --- Random Forest ---
print("\nTraining Random Forest on original data...")
rf_orig = RandomForestClassifier(n_estimators=100, random_state=42)
rf_orig.fit(X_train_orig, y_train_orig)
y_pred_rf_orig = rf_orig.predict(X_test_orig)
print("done")


Training Random Forest on original data...
done


In [12]:
from sklearn.metrics import accuracy_score
# Calculate accuracy
accuracy_rf_orig = accuracy_score(y_test_orig, y_pred_rf_orig)
print(f"Accuracy of Random Forest on original data: {accuracy_rf_orig:.4f}")


Accuracy of Random Forest on original data: 0.8257


In [13]:
# --- XGBoost ---
print("Training XGBoost on original data...")
xgb_orig = XGBClassifier(eval_metric='mlogloss')
xgb_orig.fit(X_train_orig, y_train_orig)
y_pred_xgb_orig = xgb_orig.predict(X_test_orig)
print("done")

Training XGBoost on original data...
done


In [14]:
# Calculate accuracy
accuracy_rf_orig = accuracy_score(y_test_orig, y_pred_xgb_orig)
print(f"Accuracy of XGB on original data: {accuracy_rf_orig:.4f}")

Accuracy of XGB on original data: 0.8331


In [15]:
from tensorflow.keras import models, optimizers

class CGAN:
    def __init__(self, noise_dim, num_classes, feature_dim, lr=0.0002):
        self.noise_dim = noise_dim
        self.num_classes = num_classes
        self.feature_dim = feature_dim
        self.lr = lr

        # Build models
        self.generator = self.build_generator()
        self.discriminator = self.build_discriminator()

        # Optimizers
        self.gen_optimizer = optimizers.Adam(self.lr, beta_1=0.5)
        self.disc_optimizer = optimizers.Adam(self.lr, beta_1=0.5)

        # Loss
        self.cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

    def build_generator(self):
        noise_input = layers.Input(shape=(self.noise_dim,))
        label_input = layers.Input(shape=(1,), dtype='int32')
        label_embedding = layers.Embedding(self.num_classes, self.noise_dim)(label_input)
        label_embedding = layers.Flatten()(label_embedding)

        x = layers.multiply([noise_input, label_embedding])
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dense(256, activation='relu')(x)
        x = layers.Dense(self.feature_dim, activation='sigmoid')(x)

        return models.Model([noise_input, label_input], x, name="Generator")

    def build_discriminator(self):
        data_input = layers.Input(shape=(self.feature_dim,))
        label_input = layers.Input(shape=(1,), dtype='int32')
        label_embedding = layers.Embedding(self.num_classes, self.feature_dim)(label_input)
        label_embedding = layers.Flatten()(label_embedding)

        x = layers.multiply([data_input, label_embedding])
        x = layers.Dense(256, activation='relu')(x)
        x = layers.Dense(128, activation='relu')(x)
        x = layers.Dense(1)(x)

        return models.Model([data_input, label_input], x, name="Discriminator")

    def generator_loss(self, fake_output):
        return self.cross_entropy(tf.ones_like(fake_output), fake_output)

    def discriminator_loss(self, real_output, fake_output):
        real_loss = self.cross_entropy(tf.ones_like(real_output), real_output)
        fake_loss = self.cross_entropy(tf.zeros_like(fake_output), fake_output)
        return real_loss + fake_loss

    def train(self, X_real, y_real, batch_size=64, epochs=200):
        dataset = tf.data.Dataset.from_tensor_slices((X_real, y_real)).shuffle(buffer_size=1024).batch(batch_size)

        print("training started : \n")
        for epoch in range(epochs):
            for real_batch, label_batch in dataset:
                # Generate noise and fake labels
                noise = tf.random.normal([real_batch.shape[0], self.noise_dim])
                generated_data = self.generator([noise, label_batch], training=True)

                with tf.GradientTape(persistent=True) as tape:
                    # Discriminator loss
                    real_output = self.discriminator([real_batch, label_batch], training=True)
                    fake_output = self.discriminator([generated_data, label_batch], training=True)
                    d_loss = self.discriminator_loss(real_output, fake_output)

                    # Generator loss
                    noise = tf.random.normal([real_batch.shape[0], self.noise_dim])
                    generated_data = self.generator([noise, label_batch], training=True)
                    fake_output = self.discriminator([generated_data, label_batch], training=True)
                    g_loss = self.generator_loss(fake_output)

                # Backpropagation
                gradients_of_generator = tape.gradient(g_loss, self.generator.trainable_variables)
                gradients_of_discriminator = tape.gradient(d_loss, self.discriminator.trainable_variables)

                self.gen_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables))
                self.disc_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables))

            # Epoch Summary
            if (epoch + 1) % 10 == 0 or epoch == 0:
                print(f"Epoch {epoch+1}/{epochs} | D Loss: {d_loss:.4f} | G Loss: {g_loss:.4f}")

    def generate_samples(self, num_samples, class_label):
        noise = tf.random.normal([num_samples, self.noise_dim])
        labels = tf.convert_to_tensor([class_label] * num_samples)
        synthetic_data = self.generator([noise, labels], training=False)
        return synthetic_data.numpy()
    def save(self, save_dir):
        os.makedirs(save_dir, exist_ok=True)
        self.generator.save(os.path.join(save_dir, "generator.h5"))
        self.discriminator.save(os.path.join(save_dir, "discriminator.h5"))
        print(f"✅ CGAN models saved to: {save_dir}")

    def load(self, save_dir):
        self.generator = tf.keras.models.load_model(os.path.join(save_dir, "generator.h5"))
        self.discriminator = tf.keras.models.load_model(os.path.join(save_dir, "discriminator.h5"))
        print(f"✅ CGAN models loaded from: {save_dir}")


In [16]:
# Filter for selected minority classes
minority_classes = ['Worms', 'Shellcode', 'Backdoor','Analysis']
minority_codes = [attack_cat_mapping[cls] for cls in minority_classes]
gan_data = df_encoded[df['attack_cat_encoded'].isin(minority_codes)].copy()

# Extract features (X) and conditional labels (y) for CGAN
X_gan = gan_data.drop(columns=['attack_cat_encoded']).values
y_gan = gan_data['attack_cat_encoded'].values

print(f"\nSamples selected for CGAN training (classes: {minority_classes}): {X_gan.shape[0]}")
print(f"Feature vector size: {X_gan.shape[1]}")
print(f"Unique class labels in GAN data: {np.unique(y_gan)}")


Samples selected for CGAN training (classes: ['Worms', 'Shellcode', 'Backdoor', 'Analysis']): 6691
Feature vector size: 196
Unique class labels in GAN data: [0 1 8 9]


In [17]:
# Initialize CGAN
cgan = CGAN(
    noise_dim=100,
    num_classes=len(attack_cat_mapping),
    feature_dim=X_gan.shape[1],
    lr=0.0002
)
# Train CGAN
cgan.train(X_real=X_gan, y_real=y_gan, batch_size=64, epochs=300)
print("training done")

training started : 

Epoch 1/300 | D Loss: 1.0046 | G Loss: 0.9632
Epoch 10/300 | D Loss: 0.4786 | G Loss: 1.7295
Epoch 20/300 | D Loss: 0.0657 | G Loss: 3.4465
Epoch 30/300 | D Loss: 0.3489 | G Loss: 2.3746
Epoch 40/300 | D Loss: 0.1160 | G Loss: 4.1543
Epoch 50/300 | D Loss: 0.0459 | G Loss: 5.9137
Epoch 60/300 | D Loss: 0.0055 | G Loss: 6.3847
Epoch 70/300 | D Loss: 0.1969 | G Loss: 2.1667
Epoch 80/300 | D Loss: 0.0356 | G Loss: 3.7837
Epoch 90/300 | D Loss: 0.0844 | G Loss: 4.9143
Epoch 100/300 | D Loss: 0.0099 | G Loss: 6.0849
Epoch 110/300 | D Loss: 0.0069 | G Loss: 5.3787
Epoch 120/300 | D Loss: 0.0797 | G Loss: 4.6612
Epoch 130/300 | D Loss: 0.0077 | G Loss: 5.5488
Epoch 140/300 | D Loss: 0.0028 | G Loss: 6.5191
Epoch 150/300 | D Loss: 0.0019 | G Loss: 6.7708
Epoch 160/300 | D Loss: 0.1540 | G Loss: 7.4478
Epoch 170/300 | D Loss: 0.0021 | G Loss: 7.5059
Epoch 180/300 | D Loss: 0.3875 | G Loss: 2.0460
Epoch 190/300 | D Loss: 0.2121 | G Loss: 3.8494
Epoch 200/300 | D Loss: 0.0180

In [18]:
# Number of synthetic samples per minority class
num_synthetic = 3000
X_synth_list = []
y_synth_list = []

# Generate synthetic samples for each minority class
for cls_name in ['Worms', 'Shellcode', 'Backdoor','Analysis']:
    class_code = attack_cat_mapping[cls_name]
    X_synth = cgan.generate_samples(num_synthetic, class_code)
    y_synth = np.array([class_code] * num_synthetic)
    
    X_synth_list.append(X_synth)
    y_synth_list.append(y_synth)

# Combine all synthetic data
X_synth_total = np.vstack(X_synth_list)
y_synth_total = np.hstack(y_synth_list)


In [19]:
X_augmented = np.vstack([X_orig, X_synth_total])
y_augmented = np.hstack([y_orig, y_synth_total])

# Train/test split (same proportion)
X_train_aug, X_test_aug, y_train_aug, y_test_aug = train_test_split(
                                                   X_augmented, y_augmented, test_size=0.3,
                                                    random_state=42, stratify=y_augmented)
print("train test splitting for augmented done")

train test splitting for augmented done


In [21]:
def train_and_evaluate(X_train, X_test, y_train, y_test, label):
    print(f"\n📊 Training on: {label}")
    
    # Random Forest
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X_train, y_train)
    y_pred_rf = rf.predict(X_test)
    
    # XGBoost
    xgb = XGBClassifier(eval_metric='mlogloss', random_state=42)
    xgb.fit(X_train, y_train)
    y_pred_xgb = xgb.predict(X_test)

    # Evaluate
    for model_name, y_pred in zip(["Random Forest", "XGBoost"], [y_pred_rf, y_pred_xgb]):
        acc = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred, average='weighted')
        # cm = confusion_matrix(y_test, y_pred)
        
        print(f"\n🔍 {model_name} on {label} data")
        print(f"Accuracy: {acc:.4f} | F1-score: {f1:.4f}")
        # print("Confusion Matrix:")
        # print(cm)

# Evaluate on original and augmented datasets
train_and_evaluate(X_train_orig, X_test_orig, y_train_orig, y_test_orig, label="Original (Imbalanced)")
train_and_evaluate(X_train_aug, X_test_aug, y_train_aug, y_test_aug, label="GAN-Augmented")



📊 Training on: Original (Imbalanced)

🔍 Random Forest on Original (Imbalanced) data
Accuracy: 0.8257 | F1-score: 0.8173

🔍 XGBoost on Original (Imbalanced) data
Accuracy: 0.8331 | F1-score: 0.8162

📊 Training on: GAN-Augmented

🔍 Random Forest on GAN-Augmented data
Accuracy: 0.8327 | F1-score: 0.8289

🔍 XGBoost on GAN-Augmented data
Accuracy: 0.8417 | F1-score: 0.8302


In [23]:
GAN_SAVE_PATH = os.path.join(OUTPUT_DIR, 'models', 'cgan')
os.makedirs(GAN_SAVE_PATH, exist_ok=True)

# Save Generator
cgan.generator.save(os.path.join(GAN_SAVE_PATH, 'generator.keras'))

# Save Discriminator
cgan.discriminator.save(os.path.join(GAN_SAVE_PATH, 'discriminator.keras'))
