In [None]:
import os
import numpy as np
import pandas as pd

# from unidecode import unidecode

import pickle # saving and loading trained model
from os import path

# importing required libraries for normalizing data
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler

# importing library for plotting
import matplotlib.pyplot as plt
import seaborn as sn

from sklearn import metrics
from sklearn.metrics import accuracy_score # for calculating accuracy of model
from sklearn.model_selection import train_test_split # for splitting the dataset for training and testing
from sklearn.metrics import classification_report # for generating a classification report of model

from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix

from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve, auc

import tensorflow as tf
from tensorflow import keras

import tensorflow  as keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Activation
from tensorflow.keras import optimizers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.models import load_model

In [None]:
# Load the data
data_clean = pd.read_csv('cleansample_cicids2017.csv')# use each data set one by one

# # Load the data
# data_clean = pd.read_csv('cleansample_ciciot23.csv')

# # Load the data
# data_clean = pd.read_csv('cleansample_insdn.csv')

In [None]:
print(data_clean['Label'].value_counts())

In [None]:
labels = data_clean['Label'].values
data = data_clean.drop(columns=['Label'])

In [None]:
from sklearn.preprocessing import MinMaxScaler


X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)

X_test = scaler.transform(X_test)

_features = X_train.shape[1]
# n_classes = labels.shape[1]

print('X.shape = ',X_train.shape)
print('Y.shape = ',labels.shape)
print('X_train.shape = ',X_train.shape)
# print('y_train.shape = ', Y_train.shape)
print('X_test.shape = ', X_test.shape)
# print('y_test.shape = ',Y_test.shape)

# print('X_val.shape = ', X_val.shape)
# print('y_val.shape = ',Y_val.shape)

In [None]:
import joblib
# Save the datasets
joblib.dump(X_train, 'X_train.joblib')
joblib.dump(y_train, 'Y_train.joblib')


joblib.dump(X_test, 'X_test.joblib')
joblib.dump(y_test, 'Y_test.joblib')

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import ModelCheckpoint

# Define the model architecture
def build_ae(input_dim):
    # Encoder
    input_layer = layers.Input(shape=(input_dim,))
    x = layers.Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001))(input_layer)
    x = layers.Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0001))(x)
    x = layers.Dense(32, activation='relu')(x)
    x = layers.Dense(16, activation='relu')(x)
    
    # Bottleneck
    encoded = layers.Dense(16, activation='relu')(x)  # Latent space
    
    # Decoder
    x = layers.Dense(32, activation='relu')(encoded)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dense(128, activation='relu')(x)
    decoded = layers.Dense(input_dim, activation='relu')(x)  # No activation for reconstruction
    
    # Full Autoencoder Model
    autoencoder = models.Model(inputs=input_layer, outputs=decoded)
    
    # Compile the model
    autoencoder.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
    
    # Encoder Model (up to the latent space)
    encoder = models.Model(inputs=input_layer, outputs=encoded)
    
    return autoencoder, encoder

# Assume X_train is already preprocessed
input_dim = X_train.shape[1]  # Update according to your data
autoencoder, encoder = build_ae(input_dim)
autoencoder.summary()


# Set up ModelCheckpoint to save the best model based on validation loss
checkpoint = ModelCheckpoint('best_AE_model.h5', 
                             monitor='val_loss', 
                             save_best_only=True, 
                             mode='min', 
                             verbose=1)

In [None]:
import time

# Start the timer
start_time = time.time()
# Train the autoencoder
history = autoencoder.fit(X_train[np.where(y_train==0)], X_train[np.where(y_train==0)],
                          epochs=50,
                          batch_size=256,
                          shuffle=True,
                          validation_data=(X_test, X_test), callbacks=[checkpoint])

# After training, load the best weights
autoencoder.load_weights('best_AE_model.h5')

# End the timer
end_time = time.time()

# Calculate the training time
training_time = end_time - start_time

print(f"Training time: {training_time} seconds")

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt

# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper right')
plt.show()


In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_auc_score, precision_recall_curve, auc, f1_score, f1_score

import time

# Start the timer
start_time = time.time()
# Use the autoencoder to make predictions
predictions = autoencoder.predict(X_test)

# Calculate the reconstruction error
mse = np.mean(np.power(X_test - predictions, 2), axis=1)

# Define a range of percentiles to test as thresholds
percentiles = np.arange(1, 100, 1)  # Testing percentiles from 1% to 99%

# Initialize variables to store the best metrics and threshold
best_threshold = 0
best_macro_f1 = 0
best_accuracy = 0
best_conf_matrix = None
best_class_report = None
best_roc_auc = 0
best_pr_auc = 0

for p in percentiles:
    # Calculate the threshold for the current percentile
    threshold = np.percentile(mse, p)
    
    # Classify as anomaly if mse > threshold
    predicted_labels = (mse > threshold).astype(int)
    
    # Calculate metrics
    accuracy = accuracy_score(y_test, predicted_labels)
    macro_f1 = f1_score(y_test, predicted_labels, average='macro')
    conf_matrix = confusion_matrix(y_test, predicted_labels)
    class_report = classification_report(y_test, predicted_labels)
    roc_auc = roc_auc_score(y_test, mse)
    precision, recall, _ = precision_recall_curve(y_test, mse)
    pr_auc = auc(recall, precision)
    
    # If the current macro F1-score is better than the best one, update the best metrics
    if macro_f1 > best_macro_f1:
        best_macro_f1 = macro_f1
        best_threshold = threshold
        best_accuracy = accuracy
        best_conf_matrix = conf_matrix
        best_class_report = class_report
        best_roc_auc = roc_auc
        best_pr_auc = pr_auc

# Output the best results
print(f'Optimal Percentile Threshold: {best_threshold} (percentile: {p}%)')
print(f'Best Macro F1-Score: {best_macro_f1}')
print(f'Accuracy at Best Macro F1-Score: {best_accuracy}')
print('Confusion Matrix at Best Macro F1-Score:')
print(best_conf_matrix)
print('\nClassification Report at Best Macro F1-Score:')
print(best_class_report)
print(f'ROC-AUC at Best Macro F1-Score: {best_roc_auc}')
print(f'PR-AUC at Best Macro F1-Score: {best_pr_auc}')


# End the timer
end_time = time.time()

# Calculate the training time
training_time = end_time - start_time

print(f"Test time: {training_time} seconds")

In [None]:
# Plot Precision-Recall curve
PrecisionRecallDisplay(precision=precision, recall=recall).plot()
plt.title('Precision-Recall curve')
plt.show()

In [None]:
from sklearn.metrics import roc_curve, RocCurveDisplay, roc_auc_score

# Assuming mse is already calculated for the reconstruction error
# and y_test contains the true labels

# Compute ROC curve and ROC area
fpr, tpr, _ = roc_curve(y_test, mse)
roc_auc = roc_auc_score(y_test, mse)

# Plot ROC curve
plt.figure(figsize=(8, 6))
RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc, estimator_name='Autoencoder').plot()
plt.title(f'Receiver Operating Characteristic (ROC) Curve\nROC-AUC: {roc_auc:.2f}')
plt.show()

In [None]:
# Latent Space Visualization with PCA
encoder = Model(inputs=input_layer, outputs=encoded)
encoded_X_test = encoder.predict(X_test)

# Normalize the encoded data
scaler = StandardScaler()
encoded_X_test_normalized = scaler.fit_transform(encoded_X_test)

# Apply PCA
pca = PCA(n_components=2)
encoded_X_test_pca = pca.fit_transform(encoded_X_test_normalized)

# Plotting
plt.figure(figsize=(8, 6))
plt.scatter(encoded_X_test_pca[:, 0], encoded_X_test_pca[:, 1], c=y_test, cmap='viridis')
plt.colorbar()
plt.title('Latent Space Visualization using PCA')
plt.xlabel('PCA Component 1')
plt.ylabel('PCA Component 2')
plt.show()

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

# Step 1: Encode the data into the latent space using the VAE encoder
encoded_X_test = encoder.predict(X_test)

# Step 2: Apply t-SNE to the latent representations
tsne = TSNE(n_components=2, random_state=42)
X_tsne = tsne.fit_transform(encoded_X_test)

# Step 3: Plot the t-SNE results
plt.figure(figsize=(8, 6))
scatter = plt.scatter(X_tsne[:, 0], X_tsne[:, 1], c=y_test, cmap='viridis', s=10)
plt.colorbar(scatter)
plt.title('t-SNE Visualization of VAE Latent Space')
plt.xlabel('t-SNE Component 1')
plt.ylabel('t-SNE Component 2')
plt.show()