### Moving Forward to Layer 2 Implementation  

The implementation of Layer 1 (Anomaly Detection & Feature Extraction) is now complete. However, for further modifications and validation, we require more dataset variations or additional traffic patterns. Constructing these datasets will take some time.  

In the meantime, I'm now proceeding with the **implementation of Layer 2 (Attack Classification & Adaptive Learning)**.  

### Key Next Steps:
- **Dataset Construction:** Since Layer 2 relies on anomalous samples detected by Layer 1, we will integrate the Layer 1 code with the Layer 2 pipeline.  
- **Feature Extraction:** Extracting CNN-enhanced features from Layer 1 to improve classification performance.  
- **Attack Classification Model:** Implementing a CNN-BiLSTM model with Knowledge Distillation for efficient and scalable attack classification.  

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
import joblib
import matplotlib.pyplot as plt

In [2]:
# Loading and preprocessing dataset
def preprocess_data(file_path, test_size=0.2, random_state=42):
    """
    Load and preprocess dataset for ANIDS.
    - Applies robust scaling
    - Removes outliers using IQR
    - Splits data into training & validation sets
    """
    df = pd.read_csv(file_path)
    X = df.drop(['Attack_label'], axis=1)
    
    # Outlier removal using IQR
    Q1, Q3 = X.quantile(0.25), X.quantile(0.75)
    IQR = Q3 - Q1
    X = X[~((X < (Q1 - 3 * IQR)) | (X > (Q3 + 3 * IQR))).any(axis=1)]
    
    # Scaling
    scaler = RobustScaler()
    X_scaled = scaler.fit_transform(X)
    joblib.dump(scaler, 'robust_scaler.pkl')

    return train_test_split(X_scaled, test_size=test_size, random_state=random_state)

# Load dataset
X_train, X_val = preprocess_data("/Users/siddhantgond/Desktop/6THSEM/Project_Elective/Adaptive-Network-Intrusion-Detection-System/Implementaiton/training_dataset.csv")


In [14]:
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=50):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=64, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features = layer1.extract_features(anomalies)

Epoch 1/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 23ms/step - loss: 0.2358 - val_loss: 0.2136
Epoch 2/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.2126 - val_loss: 0.1941
Epoch 3/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 22ms/step - loss: 0.1943 - val_loss: 0.1774
Epoch 4/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 24ms/step - loss: 0.1811 - val_loss: 0.1633
Epoch 5/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1642 - val_loss: 0.1505
Epoch 6/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 20ms/step - loss: 0.1511 - val_loss: 0.1400
Epoch 7/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 20ms/step - loss: 0.1416 - val_loss: 0.1305
Epoch 8/50
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1356 - val_loss: 0.1227
Epoch 9/50
[1m189/189[0m [32m

In [4]:
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=40):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=128, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_40_BS_128 = layer1.extract_features(anomalies)

Epoch 1/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 30ms/step - loss: 0.2402 - val_loss: 0.2264
Epoch 2/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - loss: 0.2306 - val_loss: 0.2157
Epoch 3/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 33ms/step - loss: 0.2177 - val_loss: 0.2054
Epoch 4/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - loss: 0.2091 - val_loss: 0.1950
Epoch 5/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - loss: 0.1992 - val_loss: 0.1866
Epoch 6/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - loss: 0.1953 - val_loss: 0.1785
Epoch 7/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - loss: 0.1836 - val_loss: 0.1709
Epoch 8/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 30ms/step - loss: 0.1778 - val_loss: 0.1636
Epoch 9/40
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━

In [5]:
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=50):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=128, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_50_BS_128 = layer1.extract_features(anomalies)

Epoch 1/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 29ms/step - loss: 0.2342 - val_loss: 0.2222
Epoch 2/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 26ms/step - loss: 0.2260 - val_loss: 0.2118
Epoch 3/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.2164 - val_loss: 0.2028
Epoch 4/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.2075 - val_loss: 0.1935
Epoch 5/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - loss: 0.1955 - val_loss: 0.1837
Epoch 6/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.1875 - val_loss: 0.1762
Epoch 7/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.1777 - val_loss: 0.1689
Epoch 8/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.1718 - val_loss: 0.1621
Epoch 9/50
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━

In [6]:
 
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=50):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=32, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_50_BS_32 = layer1.extract_features(anomalies)

Epoch 1/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 16ms/step - loss: 0.2297 - val_loss: 0.1953
Epoch 2/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1912 - val_loss: 0.1648
Epoch 3/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1650 - val_loss: 0.1415
Epoch 4/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 18ms/step - loss: 0.1433 - val_loss: 0.1239
Epoch 5/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 20ms/step - loss: 0.1268 - val_loss: 0.1115
Epoch 6/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1136 - val_loss: 0.1043
Epoch 7/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1132 - val_loss: 0.1017
Epoch 8/50
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1059 - val_loss: 0.1012
Epoch 9/50
[1m378/378[0m [32m

In [7]:
 
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=25):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=64, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_25_BS_64 = layer1.extract_features(anomalies)

Epoch 1/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 22ms/step - loss: 0.2420 - val_loss: 0.2174
Epoch 2/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 21ms/step - loss: 0.2194 - val_loss: 0.1970
Epoch 3/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 20ms/step - loss: 0.2006 - val_loss: 0.1804
Epoch 4/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 20ms/step - loss: 0.1844 - val_loss: 0.1661
Epoch 5/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1667 - val_loss: 0.1533
Epoch 6/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1572 - val_loss: 0.1421
Epoch 7/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1482 - val_loss: 0.1322
Epoch 8/25
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1355 - val_loss: 0.1237
Epoch 9/25
[1m189/189[0m [32m

In [9]:
 
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=25):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=32, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_25_BS_32 = layer1.extract_features(anomalies)

Epoch 1/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 16ms/step - loss: 0.2324 - val_loss: 0.1961
Epoch 2/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1940 - val_loss: 0.1642
Epoch 3/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1629 - val_loss: 0.1407
Epoch 4/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1409 - val_loss: 0.1229
Epoch 5/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1255 - val_loss: 0.1110
Epoch 6/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1170 - val_loss: 0.1042
Epoch 7/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1102 - val_loss: 0.1022
Epoch 8/25
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 18ms/step - loss: 0.1059 - val_loss: 0.1015
Epoch 9/25
[1m378/378[0m [32m

In [10]:
 
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=25):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=128, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_25_BS_32 = layer1.extract_features(anomalies)

Epoch 1/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 28ms/step - loss: 0.2412 - val_loss: 0.2260
Epoch 2/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.2299 - val_loss: 0.2158
Epoch 3/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 26ms/step - loss: 0.2210 - val_loss: 0.2058
Epoch 4/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 29ms/step - loss: 0.2074 - val_loss: 0.1958
Epoch 5/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 34ms/step - loss: 0.1988 - val_loss: 0.1868
Epoch 6/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 29ms/step - loss: 0.1916 - val_loss: 0.1790
Epoch 7/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.1825 - val_loss: 0.1714
Epoch 8/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 29ms/step - loss: 0.1774 - val_loss: 0.1643
Epoch 9/25
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━

In [11]:
 
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=30):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=32, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_30_BS_32 = layer1.extract_features(anomalies)

Epoch 1/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 16ms/step - loss: 0.2309 - val_loss: 0.1966
Epoch 2/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1919 - val_loss: 0.1663
Epoch 3/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1658 - val_loss: 0.1424
Epoch 4/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1432 - val_loss: 0.1242
Epoch 5/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1308 - val_loss: 0.1112
Epoch 6/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1177 - val_loss: 0.1041
Epoch 7/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 16ms/step - loss: 0.1077 - val_loss: 0.1020
Epoch 8/30
[1m378/378[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 15ms/step - loss: 0.1108 - val_loss: 0.1011
Epoch 9/30
[1m378/378[0m [32m

In [12]:
 
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=30):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=64, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_30_BS_64 = layer1.extract_features(anomalies)

Epoch 1/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 21ms/step - loss: 0.2334 - val_loss: 0.2129
Epoch 2/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.2114 - val_loss: 0.1940
Epoch 3/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1925 - val_loss: 0.1767
Epoch 4/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1790 - val_loss: 0.1623
Epoch 5/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 19ms/step - loss: 0.1627 - val_loss: 0.1496
Epoch 6/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 20ms/step - loss: 0.1568 - val_loss: 0.1388
Epoch 7/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 20ms/step - loss: 0.1372 - val_loss: 0.1294
Epoch 8/30
[1m189/189[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 20ms/step - loss: 0.1318 - val_loss: 0.1215
Epoch 9/30
[1m189/189[0m [32m

In [13]:
 
# Define Layer 1 of the Adaptive NIDS
class AdaptiveNIDSLayer1:
    def __init__(self, input_dim, latent_dim=16):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.model = self._build_autoencoder()

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.BatchNormalization()(inputs)
        x = layers.Reshape((-1, 1))(x)

        # Feature extraction via Residual CNN
        x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
        x = layers.Conv1D(32, 3, activation='relu', padding='same')(x)
        x = layers.GlobalAveragePooling1D(name="gap_layer")(x)  # Added explicit name

        # Latent Representation
        x = layers.Dense(64, activation='mish', kernel_regularizer=regularizers.l1(0.0005))(x)
        x = layers.Dropout(0.3)(x)
        encoded = layers.Dense(self.latent_dim, activation='linear')(x)

        # Decoder
        x = layers.RepeatVector(self.input_dim)(encoded)
        x = layers.LSTM(self.latent_dim * 2, return_sequences=True, recurrent_dropout=0.25)(x)
        decoded = layers.TimeDistributed(layers.Dense(1, activation='linear'))(x)
        decoded = layers.Flatten()(decoded)

        autoencoder = keras.Model(inputs=inputs, outputs=decoded)
        autoencoder.compile(optimizer=keras.optimizers.Adam(1e-4), loss='mse')
        return autoencoder

    def train(self, X_train, X_val, epochs=30):
        self.model.fit(X_train, X_train, epochs=epochs, batch_size=128, validation_data=(X_val, X_val))

    def detect_anomalies(self, X_data, threshold=0.02):
        reconstructed = self.model.predict(X_data)
        errors = np.mean(np.square(X_data - reconstructed), axis=1)
        return X_data[errors > threshold], np.where(errors > threshold)[0]
        
    def extract_features(self, X_anomalies):
        # Create a feature extractor model using the named gap_layer
        feature_extractor = keras.Model(
            inputs=self.model.input, 
            outputs=self.model.get_layer("gap_layer").output  # Use the named layer
        )
        return feature_extractor.predict(X_anomalies)

# Train Layer 1
layer1 = AdaptiveNIDSLayer1(input_dim=X_train.shape[1])
layer1.train(X_train, X_val)

# Detect anomalies
anomalies, anomaly_indices = layer1.detect_anomalies(X_val)

# Example usage of the new extract_features method
extracted_features_epoch_30_BS_128 = layer1.extract_features(anomalies)

Epoch 1/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 28ms/step - loss: 0.2367 - val_loss: 0.2254
Epoch 2/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 29ms/step - loss: 0.2306 - val_loss: 0.2146
Epoch 3/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 29ms/step - loss: 0.2184 - val_loss: 0.2050
Epoch 4/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 26ms/step - loss: 0.2059 - val_loss: 0.1948
Epoch 5/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.1973 - val_loss: 0.1858
Epoch 6/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 26ms/step - loss: 0.1947 - val_loss: 0.1782
Epoch 7/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.1817 - val_loss: 0.1707
Epoch 8/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - loss: 0.1767 - val_loss: 0.1637
Epoch 9/30
[1m95/95[0m [32m━━━━━━━━━━━━━━━━━━

In [15]:
# Preparing dataset for Layer 2
def attach_attack_labels(original_df, anomaly_indices):
    labeled_anomalies = original_df.iloc[anomaly_indices]
    return labeled_anomalies.drop(columns=['Attack_label']), labeled_anomalies['Attack_label']

# Load original dataset for labels
original_df = pd.read_csv("/Users/siddhantgond/Desktop/6THSEM/Project_Elective/Adaptive-Network-Intrusion-Detection-System/Implementaiton/training_dataset.csv")
X_layer2, y_layer2 = attach_attack_labels(original_df, anomaly_indices)

In [16]:
# Define Layer 2 of the Adaptive NIDS
class AdaptiveNIDSLayer2:
    def __init__(self, input_dim, num_classes, seq_length=10):
        """
        Initializes Layer 2 for attack classification using CNN-BiLSTM.
        
        Args:
            input_dim (int): Number of input features per time step.
            num_classes (int): Number of attack classes.
            seq_length (int): Number of time steps in sequence.
        """
        self.input_dim = input_dim
        self.num_classes = num_classes
        self.seq_length = seq_length
        self.model = self._build_model()

    def _build_model(self):
        """
        Builds the CNN-BiLSTM classification model.
        """
        inputs = layers.Input(shape=(self.seq_length, self.input_dim))
        
        # CNN Feature Extraction
        x = layers.Conv1D(64, kernel_size=3, activation='relu', padding='same')(inputs)
        x = layers.BatchNormalization()(x)
        
        # BiLSTM for Temporal Sequence Learning
        x = layers.Bidirectional(layers.GRU(48, return_sequences=False))(x)
        
        # Fully Connected Layers
        x = layers.Dense(64, activation='relu')(x)
        x = layers.Dropout(0.2)(x)
        
        # Output Layer (Softmax for Classification)
        outputs = layers.Dense(self.num_classes, activation='softmax')(x)

        # Compile Model
        model = keras.Model(inputs=inputs, outputs=outputs)
        model.compile(optimizer=keras.optimizers.Adam(1e-3), 
                      loss='sparse_categorical_crossentropy', 
                      metrics=['accuracy'])
        return model

    def train(self, X_train, y_train, epochs=50, batch_size=64):
        """
        Trains the Layer 2 model.
        
        Args:
            X_train (np.array): Input sequences.
            y_train (np.array): Attack labels.
            epochs (int): Number of training epochs.
            batch_size (int): Batch size.
        """
        self.model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_split=0.2)


def create_sequences(data, labels, seq_length=10):
    """
    Converts feature data into time-series sequences for Layer 2.
    
    Args:
        data (np.array): 2D feature matrix.
        labels (pd.Series or np.array): Corresponding labels.
        seq_length (int): Number of time steps per sequence.
    
    Returns:
        Tuple of (sequential data, adjusted labels)
    """
    sequences, seq_labels = [], []
    
    # Convert labels to NumPy array to avoid indexing issues
    labels = np.array(labels)

    for i in range(len(data) - seq_length):
        sequences.append(data[i:i + seq_length])
        seq_labels.append(labels[i + seq_length - 1])  # ✅ Now works correctly

    return np.array(sequences), np.array(seq_labels)

# Generate sequences for Layer 2
seq_length = 10
X_layer2_reshaped, y_layer2_adjusted = create_sequences(X_layer2, y_layer2, seq_length=seq_length)

# Train Layer 2
layer2 = AdaptiveNIDSLayer2(input_dim=X_layer2_reshaped.shape[2], num_classes=5, seq_length=seq_length)
layer2.train(X_layer2_reshaped, y_layer2_adjusted)

Epoch 1/50
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 16ms/step - accuracy: 0.6626 - loss: 0.9693 - val_accuracy: 0.8563 - val_loss: 0.2665
Epoch 2/50
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 0.9461 - loss: 0.1434 - val_accuracy: 0.9235 - val_loss: 0.2001
Epoch 3/50
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - accuracy: 0.9795 - loss: 0.0575 - val_accuracy: 0.9384 - val_loss: 0.1957
Epoch 4/50
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - accuracy: 0.9889 - loss: 0.0384 - val_accuracy: 0.9384 - val_loss: 0.2310
Epoch 5/50
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - accuracy: 0.9790 - loss: 0.0490 - val_accuracy: 0.9515 - val_loss: 0.1724
Epoch 6/50
[1m34/34[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - accuracy: 0.9860 - loss: 0.0361 - val_accuracy: 0.9757 - val_loss: 0.0587
Epoch 7/50
[1m34/34[0m [32m━━━━━

In [36]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

def knowledge_distillation(teacher_model, student_model, X_train, y_train, temperature=3.0, alpha=0.5, epochs=50):
    """
    Implements knowledge distillation by training the student model with soft targets from the teacher.

    Args:
        teacher_model: Pre-trained teacher model.
        student_model: Student model to be trained.
        X_train (np.array): Training features.
        y_train (np.array): Training labels (should be a NumPy array).
        temperature (float): Softmax temperature for distillation.
        alpha (float): Weight balance between hard loss and soft loss.
        epochs (int): Number of training epochs.
    """
    # ✅ Ensure y_train is a NumPy array
    y_train = np.array(y_train)

    # ✅ Ensure labels are integers (for sparse categorical crossentropy)
    if len(y_train.shape) > 1 and y_train.shape[1] > 1:
        y_train = np.argmax(y_train, axis=1)  # Convert one-hot to integer labels

    # Get number of classes for one-hot encoding
    num_classes = student_model.output_shape[-1]
    if isinstance(num_classes, tf.TensorShape):
        num_classes = num_classes.as_list()[-1]  # Fix for unknown TensorShape
    
    # Step 1: Get teacher predictions first
    print("Getting teacher predictions...")
    teacher_logits = teacher_model.predict(X_train)
    teacher_probs = tf.nn.softmax(teacher_logits / temperature).numpy()
    
    # Step 2: Define custom loss function for distillation
    def distillation_loss(y_true, y_pred):
        """
        Computes the knowledge distillation loss:
        - Hard loss: Student's predictions vs. true labels (Sparse Categorical Crossentropy)
        - Soft loss: Student's predictions vs. Teacher's soft probabilities (KL Divergence)
        """
        # Get index of the batch in the dataset
        batch_indices = tf.range(tf.shape(y_true)[0])
        
        # Convert y_true to one-hot format (needed for loss calculation)
        y_true_one_hot = tf.one_hot(tf.cast(tf.squeeze(y_true), tf.int32), depth=num_classes)
        
        # Get teacher soft targets for this batch
        # We use a more robust approach that doesn't depend on keeping the teacher predictions in memory
        batch_teacher_probs = tf.nn.softmax(teacher_model(tf.cast(tf.gather(X_train, batch_indices), tf.float32), training=False) / temperature)
        
        # Get student soft predictions
        student_logits = y_pred
        student_soft_probs = tf.nn.softmax(student_logits / temperature)
        
        # Compute losses
        hard_loss = tf.keras.losses.sparse_categorical_crossentropy(y_true, student_logits)
        soft_loss = tf.keras.losses.KLDivergence()(batch_teacher_probs, student_soft_probs)
        
        # Weighted combination of hard and soft loss
        return (1 - alpha) * hard_loss + alpha * soft_loss * (temperature ** 2)

    # Step 3: Compile the student model with distillation loss
    student_model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3),
                          loss=distillation_loss,
                          metrics=['accuracy'])

    # Step 4: Train the student model using distillation
    print("Training the student model with knowledge distillation...")
    student_model.fit(X_train, y_train, epochs=epochs, batch_size=64, verbose=1)

    return student_model