In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, classification_report
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import models, layers
import tensorflow as tf

In [2]:
# Step 1: Load the data with specified dtype to avoid mixed-type warning
train_data = pd.read_csv(r"D:\VSCODE\venv\train_splits\train.csv", dtype={'handshake_version': str})
test_data = pd.read_csv(r"D:\VSCODE\venv\test_splits\test.csv", dtype={'handshake_version': str})

In [3]:
# Step 2: Separate features and target
X_train = train_data.drop('label', axis=1)
y_train = train_data['label']
X_test = test_data.drop('label', axis=1)
y_test = test_data['label']


In [4]:
# Step 3: Encode target variables
le_attack = LabelEncoder()
y_train_encoded = le_attack.fit_transform(y_train)
y_test_encoded = le_attack.transform(y_test)

In [5]:
# Step 4: Identify numeric and categorical columns
numeric_cols = X_train.select_dtypes(include=['int64', 'float64']).columns
object_cols = X_train.select_dtypes(include=['object']).columns

In [6]:
# Step 5: Convert categorical columns to strings
X_train[object_cols] = X_train[object_cols].astype(str)
X_test[object_cols] = X_test[object_cols].astype(str)

In [7]:
# Step 6: Encode categorical columns
encoder = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
X_train_object_encoded = encoder.fit_transform(X_train[object_cols])
X_test_object_encoded = encoder.transform(X_test[object_cols])

In [8]:
X_train_object_encoded_df = pd.DataFrame(X_train_object_encoded, columns=object_cols, index=X_train.index)
X_test_object_encoded_df = pd.DataFrame(X_test_object_encoded, columns=object_cols, index=X_test.index)

In [9]:
# Step 7: Combine numeric and encoded categorical columns
X_train_encoded = pd.concat([X_train[numeric_cols], X_train_object_encoded_df], axis=1)
X_test_encoded = pd.concat([X_test[numeric_cols], X_test_object_encoded_df], axis=1)

In [10]:
# Step 8: Handle missing values
imputer = SimpleImputer(strategy='mean')
X_train_encoded = pd.DataFrame(imputer.fit_transform(X_train_encoded), columns=X_train_encoded.columns, index=X_train_encoded.index)
X_test_encoded = pd.DataFrame(imputer.transform(X_test_encoded), columns=X_test_encoded.columns, index=X_test_encoded.index)

In [11]:
# Step 9: Scale the features and convert to float32
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_encoded).astype('float32')
X_test_scaled = scaler.transform(X_test_encoded).astype('float32')

In [12]:
# Step 10: Reshape data for LSTM
n_features = X_train_scaled.shape[1]
X_train_lstm = X_train_scaled.reshape((X_train_scaled.shape[0], 1, n_features))
X_test_lstm = X_test_scaled.reshape((X_test_scaled.shape[0], 1, n_features))

In [13]:
# Step 11: One-hot encode target labels
num_classes = len(le_attack.classes_)
y_train_lstm = to_categorical(y_train_encoded, num_classes=num_classes)
y_test_lstm = to_categorical(y_test_encoded, num_classes=num_classes)

In [14]:

# Step 12: Define the initial LSTM model with higher dropout to prevent overfitting
model_lstm = models.Sequential()
model_lstm.add(layers.LSTM(64, input_shape=(1, n_features), return_sequences=False))
model_lstm.add(layers.Dropout(0.5))
model_lstm.add(layers.Dense(32, activation='relu'))
model_lstm.add(layers.Dropout(0.5))
model_lstm.add(layers.Dense(num_classes, activation='softmax'))

  super().__init__(**kwargs)


In [15]:

# Step 13: Compile the model
model_lstm.compile(optimizer='adam',
                   loss='categorical_crossentropy',
                   metrics=['acc'])

In [16]:

# Step 14: Train the initial model
history_lstm = model_lstm.fit(X_train_lstm,
                             y_train_lstm,
                             epochs=20,
                             batch_size=128,
                             validation_data=(X_test_lstm, y_test_lstm))

Epoch 1/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 4ms/step - acc: 0.9806 - loss: 0.0514 - val_acc: 0.9995 - val_loss: 0.0013
Epoch 2/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m64s[0m 4ms/step - acc: 0.9980 - loss: 0.0066 - val_acc: 0.9994 - val_loss: 0.0013
Epoch 3/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m70s[0m 4ms/step - acc: 0.9986 - loss: 0.0045 - val_acc: 0.9998 - val_loss: 4.5741e-04
Epoch 4/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m65s[0m 4ms/step - acc: 0.9990 - loss: 0.0034 - val_acc: 0.9997 - val_loss: 8.0200e-04
Epoch 5/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m70s[0m 4ms/step - acc: 0.9991 - loss: 0.0031 - val_acc: 0.9998 - val_loss: 5.4011e-04


In [17]:
# Step 15: Evaluate the initial model on original data
train_loss, train_acc = model_lstm.evaluate(X_train_lstm, y_train_lstm, verbose=0)
test_loss, test_acc = model_lstm.evaluate(X_test_lstm, y_test_lstm, verbose=0)
print(f"Initial Model - Training Accuracy: {train_acc:.4f}")
print(f"Initial Model - Test Accuracy: {test_acc:.4f}")

Initial Model - Training Accuracy: 0.9997
Initial Model - Test Accuracy: 0.9998


In [18]:
# Step 16: Convert data to TensorFlow tensors for PGD
X_train_tf = tf.constant(X_train_lstm, dtype=tf.float32)
X_test_tf = tf.constant(X_test_lstm, dtype=tf.float32)
y_train_tf = tf.constant(y_train_lstm, dtype=tf.float32)
y_test_tf = tf.constant(y_test_lstm, dtype=tf.float32)

In [19]:
# Step 17: Set PGD parameters
epsilon = 0.05  # Maximum perturbation size
alpha = 0.005  # Step size per iteration
num_iterations = 10  # Number of PGD iterations
batch_size = 10000  # Adjust based on available memory

In [20]:
# Step 18: Define PGD function for a single batch
def apply_pgd_batch(batch_x, batch_y, original_x, model, epsilon, alpha, num_iterations):
    adv_x = batch_x
    for _ in range(num_iterations):
        with tf.GradientTape() as tape:
            tape.watch(adv_x)
            predictions = model(adv_x)
            loss = tf.keras.losses.categorical_crossentropy(batch_y, predictions)
        gradient = tape.gradient(loss, adv_x)
        perturbation = alpha * tf.sign(gradient)
        adv_x = adv_x + perturbation
        adv_x = original_x + tf.clip_by_value(adv_x - original_x, -epsilon, epsilon)
    return adv_x


In [21]:
# Step 19: Apply PGD to training data in batches
num_train_samples = X_train_tf.shape[0]
adv_X_train_lstm = []
for start_idx in range(0, num_train_samples, batch_size):
    end_idx = min(start_idx + batch_size, num_train_samples)
    batch_x = X_train_tf[start_idx:end_idx]
    batch_y = y_train_tf[start_idx:end_idx]
    original_x = X_train_tf[start_idx:end_idx]
    adv_batch = apply_pgd_batch(batch_x, batch_y, original_x, model_lstm, epsilon, alpha, num_iterations)
    adv_X_train_lstm.append(adv_batch)
adv_X_train_lstm = tf.concat(adv_X_train_lstm, axis=0)


In [22]:
# Step 20: Apply PGD to test data in batches
num_test_samples = X_test_tf.shape[0]
adv_X_test_lstm = []
for start_idx in range(0, num_test_samples, batch_size):
    end_idx = min(start_idx + batch_size, num_test_samples)
    batch_x = X_test_tf[start_idx:end_idx]
    batch_y = y_test_tf[start_idx:end_idx]
    original_x = X_test_tf[start_idx:end_idx]
    adv_batch = apply_pgd_batch(batch_x, batch_y, original_x, model_lstm, epsilon, alpha, num_iterations)
    adv_X_test_lstm.append(adv_batch)
adv_X_test_lstm = tf.concat(adv_X_test_lstm, axis=0)

In [23]:
# Step 21: Convert adversarial examples back to numpy arrays
adv_X_train_lstm = adv_X_train_lstm.numpy()
adv_X_test_lstm = adv_X_test_lstm.numpy()

In [24]:
# Step 22: Evaluate initial model on adversarial test data
adv_test_loss, adv_test_acc = model_lstm.evaluate(adv_X_test_lstm, y_test_lstm, verbose=0)
print(f"Initial Model - Adversarial Test Accuracy: {adv_test_acc:.4f}")

Initial Model - Adversarial Test Accuracy: 0.8457


In [25]:
# Step 23: Define a new model for adversarial training with the same architecture
model_adv = models.Sequential()
model_adv.add(layers.LSTM(64, input_shape=(1, n_features), return_sequences=False))
model_adv.add(layers.Dropout(0.5))
model_adv.add(layers.Dense(32, activation='relu'))
model_adv.add(layers.Dropout(0.5))
model_adv.add(layers.Dense(num_classes, activation='softmax'))

In [26]:
# Step 24: Compile the adversarial model
model_adv.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['acc'])

In [27]:
# Step 25: Train the model on adversarial training data
history_adv = model_adv.fit(adv_X_train_lstm,
                            y_train_lstm,
                            epochs=5,
                            batch_size=128,
                            validation_data=(X_test_lstm, y_test_lstm))

Epoch 1/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 5ms/step - acc: 0.9923 - loss: 0.0214 - val_acc: 0.9778 - val_loss: 0.1084
Epoch 2/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m72s[0m 4ms/step - acc: 0.9997 - loss: 9.0602e-04 - val_acc: 0.9761 - val_loss: 0.1203
Epoch 3/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m74s[0m 4ms/step - acc: 0.9998 - loss: 5.0764e-04 - val_acc: 0.9774 - val_loss: 0.1625
Epoch 4/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 4ms/step - acc: 0.9999 - loss: 4.2874e-04 - val_acc: 0.9653 - val_loss: 0.2692
Epoch 5/5
[1m16934/16934[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m76s[0m 4ms/step - acc: 0.9999 - loss: 3.3557e-04 - val_acc: 0.9778 - val_loss: 0.1291


In [28]:
# Step 26: Evaluate the adversarially trained model on original test data
train_loss_adv, train_acc_adv = model_adv.evaluate(X_train_lstm, y_train_lstm, verbose=0)
test_loss_adv, test_acc_adv = model_adv.evaluate(X_test_lstm, y_test_lstm, verbose=0)
print(f"Adversarially Trained Model - Training Accuracy: {train_acc_adv:.4f}")
print(f"Adversarially Trained Model - Test Accuracy: {test_acc_adv:.4f}")

Adversarially Trained Model - Training Accuracy: 0.9613
Adversarially Trained Model - Test Accuracy: 0.9778


In [29]:
# Step 27: Evaluate the adversarially trained model on adversarial test data
adv_test_loss_adv, adv_test_acc_adv = model_adv.evaluate(adv_X_test_lstm, y_test_lstm, verbose=0)
print(f"Adversarially Trained Model - Adversarial Test Accuracy: {adv_test_acc_adv:.4f}")

Adversarially Trained Model - Adversarial Test Accuracy: 1.0000
