<a href="https://colab.research.google.com/github/rogerpanel/CV/blob/main/Evasion_XIV_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

upgraded with DQN

In [None]:
# Incorporating RL - DQN

# pip install tensorflow tensorflow-addons gym numpy

import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Input, Concatenate
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report, accuracy_score
import tensorflow as tf
from tensorflow.keras.utils import Sequence
import gc
import tensorflow.keras.mixed_precision as mixed_precision
import random


# Preprocessing

In [None]:

# Enable mixed precision training
tf.keras.mixed_precision.set_global_policy('mixed_float16')

# PREPROCESSING

# Function to save dataset
def save_dataset(X_train, X_test, y_train, y_test, scaler, label_encoder, file_path):
    try:
        np.savez(file_path,
                 X_train=X_train, X_test=X_test,
                 y_train=y_train, y_test=y_test,
                 scaler_mean=scaler.mean_, scaler_scale=scaler.scale_,
                 label_encoder_classes=label_encoder.classes_)
        print(f"Dataset saved to {file_path}")
    except Exception as e:
        print(f"Error saving dataset: {e}")

# Function to load dataset
def load_dataset(file_path):
    try:
        data = np.load(file_path, allow_pickle=True)
        X_train, X_test = data['X_train'], data['X_test']
        y_train, y_test = data['y_train'], data['y_test']

        scaler = StandardScaler()
        scaler.mean_ = data['scaler_mean']
        scaler.scale_ = data['scaler_scale']

        label_encoder = LabelEncoder()
        label_encoder.classes_ = data['label_encoder_classes']

        print(f"Dataset loaded from {file_path}")
        return X_train, X_test, y_train, y_test, scaler, label_encoder
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None, None, None, None, None, None

# Function to read data in chunks and handle data types
def read_data_in_chunks(file_path, chunk_size=10000):
    for chunk in pd.read_csv(file_path, dtype='unicode', chunksize=chunk_size, low_memory=False):
        yield chunk

# Preprocess dataset
def preprocess_data(file_path):
    X, y = [], []

    for chunk in read_data_in_chunks(file_path):
        y_chunk = chunk.iloc[:, -1]
        X_chunk = chunk.iloc[:, :-1]

        y.extend(y_chunk)
        X.extend(X_chunk.values)

    X = np.array(X)
    y = np.array(y)

    return X, y

# Preprocess entire data
def preprocess_full_dataset(file_path, save_path=None):
    X, y = preprocess_data(file_path)

    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y)

    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

    if save_path:
        save_dataset(X_train, X_test, y_train, y_test, scaler, label_encoder, save_path)

    return X_train, X_test, y_train, y_test, scaler, label_encoder

# Load preprocessed dataset if exists, otherwise preprocess and save it
file_path = "/mnt/data/KDDTrain+.txt"
save_path = "/mnt/data/processed_data.npz"

if os.path.exists(save_path):
    X_train, X_test, y_train, y_test, scaler, label_encoder = load_dataset(save_path)
else:
    X_train, X_test, y_train, y_test, scaler, label_encoder = preprocess_full_dataset(file_path, save_path)


# Baseline NETWORK IDS MODEL

In [None]:
# Define the neural network model
def build_nn_model(input_dim, num_classes):
    model = Sequential()
    model.add(Dense(128, activation='relu', input_dim=input_dim))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))
    return model

# Train the baseline model
input_dim = X_train.shape[1]
num_classes = len(np.unique(y_train))
nn_model = build_nn_model(input_dim, num_classes)
nn_model.compile(optimizer=Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

class DataGenerator(Sequence):
    def __init__(self, X, y, batch_size=256, shuffle=True):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indices = np.arange(len(self.X))
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.X) / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        X_batch = self.X[batch_indices]
        y_batch = self.y[batch_indices]
        return X_batch, y_batch

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

train_generator = DataGenerator(X_train, y_train, batch_size=256)
test_generator = DataGenerator(X_test, y_test, batch_size=256, shuffle=False)

nn_model.fit(train_generator, epochs=50, callbacks=[early_stopping], validation_data=test_generator)

# Evaluate the baseline model on the test data
y_pred = np.argmax(nn_model.predict(X_test), axis=1)
print("Baseline Neural Network Model")
print(classification_report(y_test, y_pred, labels=np.unique(y_train), target_names=label_encoder.classes_))
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")



# AUTOENCODER IDS MODEL

In [None]:


# Define the autoencoder model
def build_autoencoder(input_dim):
    input_layer = Input(shape=(input_dim,))
    encoded = Dense(128, activation='relu')(input_layer)
    encoded = Dense(64, activation='relu')(encoded)
    encoded = Dense(32, activation='relu')(encoded)
    bottleneck = Dense(16, activation='relu')(encoded)
    decoded = Dense(32, activation='relu')(bottleneck)
    decoded = Dense(64, activation='relu')(decoded)
    decoded = Dense(128, activation='relu')(decoded)
    output_layer = Dense(input_dim, activation='linear')(decoded)
    autoencoder = Model(inputs=input_layer, outputs=output_layer)
    return autoencoder

autoencoder = build_autoencoder(input_dim)
autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse')

# Train the autoencoder model
autoencoder.fit(train_generator, epochs=50, callbacks=[early_stopping], validation_data=test_generator)

# Use the autoencoder to transform the original data
X_train_autoencoded = autoencoder.predict(X_train)
X_test_autoencoded = autoencoder.predict(X_test)

# Build a new IDS model using the autoencoded data
nn_model_autoencoded = build_nn_model(input_dim, num_classes)
nn_model_autoencoded.compile(optimizer=Adam(learning_rate=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
train_generator_autoencoded = DataGenerator(X_train_autoencoded, y_train, batch_size=256)
test_generator_autoencoded = DataGenerator(X_test_autoencoded, y_test, batch_size=256, shuffle=False)
nn_model_autoencoded.fit(train_generator_autoencoded, epochs=50, callbacks=[early_stopping], validation_data=test_generator_autoencoded)



In [None]:
# Evaluate the autoencoded model on the autoencoded test data
y_pred_autoencoded = np.argmax(nn_model_autoencoded.predict(X_test_autoencoded), axis=1)
print("Neural Network Model with Autoencoder - Autoencoded Test Data")
print(classification_report(y_test, y_pred_autoencoded, labels=np.unique(y_train), target_names=label_encoder.classes_))
print(f"Accuracy: {accuracy_score(y_test, y_pred_autoencoded)}")


# RL DQN AGENT

In [None]:


import gym
from collections import deque
import random
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse',
                      optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # returns action

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

# Instantiate the DQNAgent
state_size = input_dim  # This should match the input feature dimension of the IDS
action_size = num_classes  # This should match the number of unique classes in the target
agent = DQNAgent(state_size, action_size)

# Train the DQN agent
EPISODES = 1000
batch_size = 32

for e in range(EPISODES):
    state = np.reshape(X_train[0], [1, state_size])  # Initialize with the first sample
    for time in range(500):  # Limit the number of steps in each episode
        action = agent.act(state)
        next_state = np.reshape(X_train[(time + 1) % len(X_train)], [1, state_size])
        reward = 1 if np.argmax(y_train[time]) == action else -1  # Reward mechanism
        done = time == 499
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            print(f"episode: {e}/{EPISODES}, score: {time}, e: {agent.epsilon:.2}")
            break
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

# Save the trained agent
agent.save("dqn_agent.h5")

# Test the DQN agent
correct = 0
total = len(X_test)

for i in range(total):
    state = np.reshape(X_test[i], [1, state_size])
    action = agent.act(state)
    if np.argmax(y_test[i]) == action:
        correct += 1

accuracy = correct / total
print(f"DQN Agent Accuracy: {accuracy:.2%}")



# ADVSERIAL ATTACK

In [None]:

# FGSM Attack
def fgsm_attack(model, X, y, epsilon=0.1):
    X_adv = X.copy()
    with tf.GradientTape() as tape:
        tape.watch(X_adv)
        prediction = model(X_adv)
        loss = tf.keras.losses.sparse_categorical_crossentropy(y, prediction)
    gradient = tape.gradient(loss, X_adv)
    signed_grad = tf.sign(gradient)
    X_adv = X_adv + epsilon * signed_grad
    X_adv = tf.clip_by_value(X_adv, -1, 1)
    return X_adv.numpy()

# PGD Attack
def pgd_attack(model, X, y, epsilon=0.1, alpha=0.01, num_iterations=40):
    X_adv = X.copy()
    for i in range(num_iterations):
        with tf.GradientTape() as tape:
            tape.watch(X_adv)
            prediction = model(X_adv)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y, prediction)
        gradient = tape.gradient(loss, X_adv)
        signed_grad = tf.sign(gradient)
        X_adv = X_adv + alpha * signed_grad
        X_adv = tf.clip_by_value(X_adv, X - epsilon, X + epsilon)
        X_adv = tf.clip_by_value(X_adv, -1, 1)
    return X_adv.numpy()


# For GAN examples

In [None]:

# Define the generator model for the GAN
def build_generator(input_dim, output_dim):
    model = Sequential([
        Dense(128, input_dim=input_dim, activation='relu'),
        Dense(256, activation='relu'),
        Dense(512, activation='relu'),
        Dense(output_dim, activation='tanh')
    ])
    return model

# Define the discriminator model for the GAN
def build_discriminator(input_dim):
    model = Sequential([
        Dense(512, input_dim=input_dim, activation='relu'),
        Dense(256, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002), metrics=['accuracy'])
    return model

# Compile GAN
def compile_gan(generator, discriminator):
    discriminator.trainable = False
    model = Sequential([
        generator,
        discriminator
    ])
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002))
    return model

# Train the GAN
def train_gan(generator, discriminator, gan, data, epochs=10000, batch_size=64):
    for epoch in range(epochs):
        idx = np.random.randint(0, data.shape[0], batch_size)
        real_data = data[idx]
        noise = np.random.normal(0, 1, (batch_size, generator.input_shape[1]))
        generated_data = generator.predict(noise)

        d_loss_real = discriminator.train_on_batch(real_data, np.ones((batch_size, 1)))
        d_loss_fake = discriminator.train_on_batch(generated_data, np.zeros((batch_size, 1)))

        noise = np.random.normal(0, 1, (batch_size, generator.input_shape[1]))
        g_loss = gan.train_on_batch(noise, np.ones((batch_size, 1)))

        if epoch % 1000 == 0:
            print(f"{epoch} [D loss: {d_loss_real[0]}] [G loss: {g_loss}]")
    return generator


# Prepare data for GAN training
generator = build_generator(input_dim, input_dim)
discriminator = build_discriminator(input_dim)
gan = compile_gan(generator, discriminator)

# Train the GAN on normal data
generator = train_gan(generator, discriminator, gan, X_train)

# Generate adversarial examples using GAN
noise = np.random.normal(0, 1, (len(X_test), input_dim))
X_test_gan_adv = generator.predict(noise)


# Evaluation on baseline model and Robust model

In [None]:

# Function to evaluate a model on adversarial examples
def evaluate_model(model, X, y, attack_fn, **kwargs):
    X_adv = attack_fn(model, X, y, **kwargs)
    y_pred = np.argmax(model.predict(X_adv), axis=1)
    accuracy = accuracy_score(y, y_pred)
    print(f"Accuracy on adversarial examples: {accuracy:.2%}")
    print(classification_report(y, y_pred, target_names=label_encoder.classes_))
    return accuracy

# Evaluate the baseline model on FGSM adversarial examples
print("Evaluating Baseline Model on FGSM Adversarial Examples")
evaluate_model(nn_model, X_test, y_test, fgsm_attack, epsilon=0.1)

# Evaluate the baseline model on PGD adversarial examples
print("Evaluating Baseline Model on PGD Adversarial Examples")
evaluate_model(nn_model, X_test, y_test, pgd_attack, epsilon=0.1, alpha=0.01, num_iterations=40)

# Evaluate the robust adversarial model (RL DQN) on FGSM adversarial examples
print("Evaluating RL DQN Model on FGSM Adversarial Examples")
evaluate_model(agent.model, X_test, y_test, fgsm_attack, epsilon=0.1)

# Evaluate the robust adversarial model (RL DQN) on PGD adversarial examples
print("Evaluating RL DQN Model on PGD Adversarial Examples")
evaluate_model(agent.model, X_test, y_test, pgd_attack, epsilon=0.1, alpha=0.01, num_iterations=40)

# Evaluate the baseline model on GAN adversarial examples
y_pred_gan_adv = np.argmax(nn_model.predict(X_test_gan_adv), axis=1)
print("Baseline Model - GAN Adversarial Examples")
print(classification_report(y_test, y_pred_gan_adv, target_names=label_encoder.classes_))
print(f"Accuracy: {accuracy_score(y_test, y_pred_gan_adv)}")

# Evaluate the robust adversarial model (RL DQN) on GAN adversarial examples
y_pred_gan_adv_dqn = np.argmax(agent.model.predict(X_test_gan_adv), axis=1)
print("RL DQN Model - GAN Adversarial Examples")
print(classification_report(y_test, y_pred_gan_adv_dqn, target_names=label_encoder.classes_))
print(f"Accuracy: {accuracy_score(y_test, y_pred_gan_adv_dqn)}")