#### Copyright Raymond Soto Jr. D.Eng(c).
#### From Edge to Enterprise: 
#### Federated Learning Threat Classification with Heterogeneous Devices in Converged Energy Sector Networks
#### Revised July 9th, 2025

# Load Modules

In [None]:
# Load Program Modules
import pandas as pd
import numpy as np
import seaborn as sns

import matplotlib.pyplot as plt
import matplotlib.ticker as mtick


from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

# scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import ConfusionMatrixDisplay, classification_report, confusion_matrix, accuracy_score
from sklearn.metrics import accuracy_score, classification_report, roc_curve, auc
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import GridSearchCV
from sklearn.inspection import permutation_importance
from sklearn.model_selection import StratifiedKFold

from collections import Counter

import pickle
import time
from datetime import datetime

# TensorFlow Keras
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint


# Load Data

In [None]:
# imports the feature descriptions csv to work with the full traffic dataset
unswnb15_features = pd.read_csv('UNSWB15_CSV_Files/UNSW-NB15_features.csv')
# convert the feature names into a list
feature_header = unswnb15_features['Name'].tolist()
# import UNSW NB traffic dataset part 1 without headers, then map the list of headers, and silence low memory alert
df1 = pd.read_csv('UNSWB15_CSV_Files/UNSW-NB15_1.csv',header=None, names=feature_header , low_memory=False)
df2 = pd.read_csv('UNSWB15_CSV_Files/UNSW-NB15_2.csv',header=None, names=feature_header , low_memory=False)
df3 = pd.read_csv('UNSWB15_CSV_Files/UNSW-NB15_3.csv',header=None, names=feature_header , low_memory=False)
df4 = pd.read_csv('UNSWB15_CSV_Files/UNSW-NB15_4.csv',header=None, names=feature_header , low_memory=False)
# print shape of each dataframe
print(df1.shape)
print(df2.shape)
print(df3.shape)
print(df4.shape)
# Assume 'label' column indicates 1 for malicious, 0 for normal
# Ensure class distribution is examined, Print out the percentage of malicious vs normal instances
label_col = 'label'
print(df1[label_col].value_counts(normalize=True) * 100)
print(df2[label_col].value_counts(normalize=True) * 100)
print(df3[label_col].value_counts(normalize=True) * 100)
print(df4[label_col].value_counts(normalize=True) * 100)

#### Combine Data

In [None]:
# combined dataframe df1, df2, df3, d4
df_combined = pd.concat([df1, df2, df3, df4], ignore_index=True)
print("Combined DataFrame shape:", df_combined.shape)

# Feature Egnineering

#### Map Protocol

In [4]:
# Create map_protocol function to group network traffic protocols
def map_protocol(proto):
    if proto in ["tcp", "udp", "sctp", "udt", "mux", "iso-tp4", "tp++", "ddp", "xtp", "vmtp", "mtp", "crudp"]:
        return "transport"
    elif proto in ["icmp", "igmp", "rsvp", "ptp"]:
        return 'control'
    elif proto in ["ospf", "egp", "igp", "idrp", "ipv6-route", "gre", "nsfnet-igp", "eigrp", "isis", "vrrp"]:
        return "Routing"
    elif proto in ["ip", "ipv6", "ipv6-frag", "ipv6-opts", "iso-ip", "ipnip", "ggp", "ipip", "ipx-n-ip"]:
        return "Internet"
    elif proto in ["pim", "rtp", "gmtp", "micp", "pgm"]:
        return "Multicast"
    elif proto in ["etherip", "l2tp", "encap"]:
        return "Tunneling"
    elif proto in ["arp", "stp", "ax.25", "fc", "ib"]:
        return "Link"
    elif proto in ["esp", "ipcomp", "secure-vmtp"]:
        return "Security"
    else:
        return "other"

# Use map_protocol function to build a new column with categories from the Train set
df_combined['protocol_category'] = df_combined['proto'].apply(map_protocol)
# drop original proto column before OneHot encoding 'protocol_category'
df_combined.drop(['proto'], axis=1, inplace=True)

#### Map Coonnection State

In [5]:
# Creates map_connection_state function that maps connection state values to broader groups.
def map_connection_state(state):
    # Group states that indicate an established connection
    if state in ["CON", "ACC", "REQ"]:
        return "established"
    # Group states that indicate termination of the connection
    elif state in ["FIN", "RST", "CLO"]:
        return "terminated"
    # Group states that might indicate the handshake phase (if applicable)
    elif state in ["SYN", "SYN-ACK"]:
        return "handshaking"
    else:
        return "other"

# use map_connection_state function to build a new column with categories from the combined set
df_combined['state_category'] = df_combined['state'].apply(map_connection_state)
# drop original proto column before OneHot encoding 'protocol_category'
df_combined.drop(['state'], axis=1, inplace=True)

# Data Preprocessing

#### Feature Reduction: Drop Columns

In [None]:
# drop columns to align the shape of the dataframes
df_combined.drop(['srcip','sport','dstip','service','stime','ltime','is_sm_ips_ports',
                  'ct_ftp_cmd','ct_flw_http_mthd','is_ftp_login','attack_cat'],axis=1, inplace=True)

# print dataframe shape
print(f'DF Shape Combined Set {df_combined.shape}')

#### Drops Rows of Invalid Hex and Regex and Convert to int

In [None]:
# Build a boolean mask for rows where dsport looks like a hex string or regex
mask_hex = df_combined['dsport'].astype(str).str.lower().str.startswith('0x')
mask_digits = df_combined['dsport'].astype(str).str.fullmatch(r'\d+')

# Drop rows
df_combined = df_combined.loc[~mask_hex].copy()
df_combined = df_combined.loc[mask_digits].copy()

# Convert the remaining dsport values to int
df_combined['dsport'] = df_combined['dsport'].astype(int)

# Quick sanity check
print("Remaining dsport dtype:", df_combined['dsport'].dtype)
print("Any hex rows left?", df_combined['dsport'].astype(str).str.lower().str.startswith('0x').any())

#### Convert to Boolean

In [8]:
# Conver label to a boolean type
df_combined['label'] = df_combined['label'].astype(bool)

#### Categorical Encoding

In [None]:
# OneHotEncoding
# Select the columns to be one-hot encoded
cols_to_encode = ['protocol_category','state_category']
# Create the OneHotEncoder instance with the desired parameters
encoder = OneHotEncoder(dtype=bool, sparse_output=False, handle_unknown='ignore')

# Fit and transform the selected columns from the Combined set. This returns a NumPy array.
encoded_array_combined = encoder.fit_transform(df_combined[cols_to_encode])
# Retrieve the names for the new columns
encoded_columns_combined = encoder.get_feature_names_out(cols_to_encode)
# Convert the encoded array to a DataFrame. Preserve the index to merge correctly.
encoded_df_combined = pd.DataFrame(encoded_array_combined, columns=encoded_columns_combined, index=df_combined.index)
# Drop the original columns and concatenate the one-hot encoded DataFrame
df_encoded_combined = pd.concat([df_combined.drop(columns=cols_to_encode), encoded_df_combined], axis=1)

# Print to confirm
print(f'DF Combined Shape Combined Set {df_encoded_combined.shape}')

#### Split Dataset

In [None]:
# Split dataset to seperate the testing set before training and validation
strat_col = 'label'

df_encoded_combined, df_encoded_4 = train_test_split(
    df_encoded_combined,
    test_size=0.20,
    stratify=df_encoded_combined[strat_col]
)

# Confirm
print(f"Train shape: {df_encoded_combined.shape},  Test shape: {df_encoded_4.shape}")
print("Train True proportion:", df_encoded_combined[strat_col].mean())
print("Test  True proportion:", df_encoded_4[strat_col].mean())

# Training, Validation, & Testing Split 

#### Split Training and Validation

In [None]:
# split the combined dataset into testing and validation
X_train0 = df_encoded_combined.drop(['label'], axis=1) # set X as all features/predicator variables
y_train0 = df_encoded_combined['label'].astype(bool) # set y as target variable

# confirm distribution of malicous to normal: REMOVED for testing , random_state=42
print(X_train0.shape)
#print(X_val0.shape)

print(y_train0.value_counts(normalize=True) * 100)
#print(y_val0.value_counts(normalize=True) * 100)

#### Split Testing

In [None]:
# split the testing dataset
X2 = df_encoded_4.drop(['label'], axis=1) # set X as all features/predicator variables
y2 = df_encoded_4['label'].astype(bool) # set y as target variable
# confirm distribution of malicous to normal
print(y2.value_counts(normalize=True) * 100)
print(X2.shape)

#### Feature Standardization

In [13]:
# Identify numeric columns in your training set
numeric_cols = X_train0.select_dtypes(include=[np.number]).columns
# Create and fit the scaler on TRAINING data
scaler = StandardScaler()
X_train0[numeric_cols] = scaler.fit_transform(X_train0[numeric_cols])

In [15]:
# Transform your testing data 
X2[numeric_cols] = scaler.transform(X2[numeric_cols])

#### Confirm Standardization

In [None]:
# Compute separately
means0 = X_train0[numeric_cols].mean()
stds0  = X_train0[numeric_cols].std()
# Show the top 5 means and stds
print("Means:\n", means0.head(), "\n")
print("Stds:\n",  stds0.head(), "\n")

means2 = X2[numeric_cols].mean()
stds2  = X2[numeric_cols].std()
# Show the top 5 means and stds
print("Means:\n", means2.head(), "\n")
print("Stds:\n",  stds2.head(), "\n")

# Handling Class Imbalance

In [None]:
# SMOTE will increase the count of miniroty items
# Apply SMOTE to training data only
smote = SMOTE(random_state=42)
X_train_smote, y_train_smote = smote.fit_resample(X_train0, y_train0)
print("Before SMOTE:", y_train0.value_counts())
print("After SMOTE:", y_train_smote.value_counts())

# Model Training

#### Convert to Numpy

In [18]:
# TensorFlow operations and the tf.data.Dataset API expect data 
# in a format that it can work with efficiently—typically NumPy arrays or tensors.
X_train_smote_np = X_train_smote.to_numpy(dtype=np.float32)
y_train_smote_np = y_train_smote.to_numpy(dtype=np.float32)

### StratifiedKFold, Early Stop: Val_Recall, Model Checkpoint, Best Model Saved

#### Multi-Layer Perceptron (MLP) Training

In [None]:
start_time = time.time()

# --- Define your model-creation function ---
def create_mlp_model():
    model = models.Sequential([
        layers.Input(shape=(47,)),
        layers.Dense(128, activation='relu'),
        layers.Dense(64, activation='relu'),
        layers.Dense(1, activation='sigmoid')
    ])
    optimizer = optimizers.Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer,
                  loss='binary_crossentropy',
                  metrics=[
                      'binary_accuracy',
                      tf.keras.metrics.Precision(name='precision'),
                      tf.keras.metrics.Recall(name='recall')
                  ])
    return model

# --- Define Callbacks ---
early_stop = EarlyStopping(
    monitor='val_recall',
    patience=20,
    verbose=1,
    restore_best_weights=True
)

# --- Cross Validation Setup ---
num_folds = 5
skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

fold_no = 1
fold_histories = []
best_model_overall = None
best_val_recall = 0.0

for train_index, val_index in skf.split(X_train_smote_np, y_train_smote_np):
    print(f'--- Fold {fold_no} ---')
    
    # Split data into training and validation for this fold
    X_train_fold, X_val_fold = X_train_smote_np[train_index], X_train_smote_np[val_index]
    y_train_fold, y_val_fold = y_train_smote_np[train_index], y_train_smote_np[val_index]
    
    # Create tf.data.Dataset objects for training and validation
    train_dataset_fold = tf.data.Dataset.from_tensor_slices((X_train_fold, y_train_fold))
    train_dataset_fold = train_dataset_fold.shuffle(buffer_size=len(X_train_fold)).batch(64).prefetch(tf.data.AUTOTUNE)
    
    val_dataset_fold = tf.data.Dataset.from_tensor_slices((X_val_fold, y_val_fold))
    val_dataset_fold = val_dataset_fold.batch(64).prefetch(tf.data.AUTOTUNE)
    
    # Create a new instance of your model for each fold
    model = create_mlp_model()
    
    # Setup a ModelCheckpoint callback for this fold to save the best model based on val_loss
    checkpoint_filepath = f'best_MLP_model_fold_{fold_no}.h5'
    checkpoint = ModelCheckpoint(
        filepath=checkpoint_filepath,
        monitor='val_recall',
        mode='max',
        save_best_only=True,
        verbose=1
    )
    
    # Train the model
    history = model.fit(
        train_dataset_fold,
        epochs=100,
        validation_data=val_dataset_fold,
        callbacks=[early_stop, checkpoint],
        verbose=1
    )
    
    fold_histories.append(history.history)
    
    # Evaluate the model on the validation fold
    scores = model.evaluate(val_dataset_fold, verbose=0)
    print(f"Fold {fold_no} - Loss: {scores[0]:.4f} - Accuracy: {scores[1]:.4f}")
    
    # Save the best overall model based on the final validation loss of the fold
 
    current_best_recall = max(history.history['val_recall'])
    
    if current_best_recall > best_val_recall:
        best_val_recall   = current_best_recall
        best_model_overall = model
        
    fold_no += 1

# --- Save the Best Overall Model ---
if best_model_overall:
    best_model_overall.save('best_MLP_model_overall.h5')
    print('Best overall model saved as best_MLP_model_overall.keras')

# --- Timing ---
current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("Current date and time:", current_datetime)
elapsed_time = time.time() - start_time
print("Total elapsed time: {:.2f} seconds".format(elapsed_time))

#### MLP Plot

In [None]:
# --- Plotting Metrics for Each Fold ---
for i, history in enumerate(fold_histories, 1):
    epochs = range(1, len(history['loss']) + 1)
    plt.figure(figsize=(15, 10))
    
    # Loss
    plt.subplot(2, 3, 1)
    plt.plot(epochs, history['loss'], label='Train Loss')
    plt.plot(epochs, history['val_loss'], label='Val Loss')
    plt.title(f'MLP Fold {i} Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    # Binary Accuracy
    plt.subplot(2, 3, 2)
    plt.plot(epochs, history['binary_accuracy'], label='Train Accuracy')
    plt.plot(epochs, history['val_binary_accuracy'], label='Val Accuracy')
    plt.title(f'MLP Fold {i} Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Precision
    plt.subplot(2, 3, 3)
    plt.plot(epochs, history['precision'], label='Train Precision')
    plt.plot(epochs, history['val_precision'], label='Val Precision')
    plt.title(f'MLP Fold {i} Precision')
    plt.xlabel('Epoch')
    plt.ylabel('Precision')
    plt.legend()
    
    # Recall
    plt.subplot(2, 3, 4)
    plt.plot(epochs, history['recall'], label='Train Recall')
    plt.plot(epochs, history['val_recall'], label='Val Recall')
    plt.title(f'MLP Fold {i} Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Recall')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

#### Convolutional Neural Network (CNN) Training

In [None]:
start_time = time.time()

# --- Define your model-creation function using CNN layers ---
def create_cnn_model():
    model = models.Sequential([
        # Input shape is (47,) and we reshape to (47, 1) for the Conv1D layers.
        layers.Input(shape=(47,)),
        layers.Reshape((47, 1)),
        # First convolutional block: 128 filters, kernel size 3, RELU activation.
        layers.Conv1D(filters=128, kernel_size=3, activation='relu', padding='same'),
        layers.MaxPooling1D(pool_size=2),
        # Second convolutional block: 64 filters.
        layers.Conv1D(filters=64, kernel_size=3, activation='relu', padding='same'),
        layers.MaxPooling1D(pool_size=2),
        # Flatten the output and pass through dense layers.
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(1, activation='sigmoid')
    ])
    optimizer = optimizers.Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer,
                  loss='binary_crossentropy',
                  metrics=[
                      'binary_accuracy',
                      tf.keras.metrics.Precision(name='precision'),
                      tf.keras.metrics.Recall(name='recall')
                  ])
    return model

# --- Define Callbacks ---
early_stop = EarlyStopping(
    monitor='val_recall',
    patience=20,
    verbose=1,
    restore_best_weights=True
)

# --- Cross Validation Setup ---
num_folds = 5
skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

fold_no = 1
fold_histories = []
best_model_overall = None
best_val_recall = 0.0

for train_index, val_index in skf.split(X_train_smote_np, y_train_smote_np):
    print(f'--- Fold {fold_no} ---')
    
    # Split data into training and validation for this fold
    X_train_fold, X_val_fold = X_train_smote_np[train_index], X_train_smote_np[val_index]
    y_train_fold, y_val_fold = y_train_smote_np[train_index], y_train_smote_np[val_index]
    
    # Create tf.data.Dataset objects for training and validation
    train_dataset_fold = tf.data.Dataset.from_tensor_slices((X_train_fold, y_train_fold))
    train_dataset_fold = train_dataset_fold.shuffle(buffer_size=len(X_train_fold)).batch(64).prefetch(tf.data.AUTOTUNE)
    
    val_dataset_fold = tf.data.Dataset.from_tensor_slices((X_val_fold, y_val_fold))
    val_dataset_fold = val_dataset_fold.batch(64).prefetch(tf.data.AUTOTUNE)
    
    # Create a new instance of your CNN-based model for each fold
    model = create_cnn_model()
    
    # Setup a ModelCheckpoint callback for this fold to save the best model based on val_loss
    checkpoint_filepath = f'best_CNN_model_fold_{fold_no}.h5'
    checkpoint = ModelCheckpoint(
        filepath=checkpoint_filepath,
        monitor='val_recall',
        mode='max',
        save_best_only=True,
        verbose=1
    )
    
    # Train the model
    history = model.fit(
        train_dataset_fold,
        epochs=100,
        validation_data=val_dataset_fold,
        callbacks=[early_stop, checkpoint],
        verbose=1
    )
    
    fold_histories.append(history.history)
    
    # Evaluate the model on the validation fold
    scores = model.evaluate(val_dataset_fold, verbose=0)
    print(f"Fold {fold_no} - Loss: {scores[0]:.4f} - Accuracy: {scores[1]:.4f}")
    
    # Save the best overall model based on the final validation loss of the fold
    current_best_recall = max(history.history['val_recall'])
    
    if current_best_recall > best_val_recall:
        best_val_recall   = current_best_recall
        best_model_overall = model
    
    fold_no += 1

# --- Save the Best Overall Model ---
if best_model_overall:
    best_model_overall.save('best_CNN_model_overall.h5')
    print("Best overall model saved as best_CNN_model_overall.h5")

# --- Timing ---
current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("Current date and time:", current_datetime)
elapsed_time = time.time() - start_time
print("Total elapsed time: {:.2f} seconds".format(elapsed_time))

#### CNN Plot

In [None]:
# --- Plotting Metrics for Each Fold ---
for i, history in enumerate(fold_histories, 1):
    epochs = range(1, len(history['loss']) + 1)
    plt.figure(figsize=(15, 10))
    
    # Loss
    plt.subplot(2, 3, 1)
    plt.plot(epochs, history['loss'], label='Train Loss')
    plt.plot(epochs, history['val_loss'], label='Val Loss')
    plt.title(f'CNN Fold {i} Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    # Binary Accuracy
    plt.subplot(2, 3, 2)
    plt.plot(epochs, history['binary_accuracy'], label='Train Accuracy')
    plt.plot(epochs, history['val_binary_accuracy'], label='Val Accuracy')
    plt.title(f'CNN Fold {i} Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Precision
    plt.subplot(2, 3, 3)
    plt.plot(epochs, history['precision'], label='Train Precision')
    plt.plot(epochs, history['val_precision'], label='Val Precision')
    plt.title(f'CNN Fold {i} Precision')
    plt.xlabel('Epoch')
    plt.ylabel('Precision')
    plt.legend()
    
    # Recall
    plt.subplot(2, 3, 4)
    plt.plot(epochs, history['recall'], label='Train Recall')
    plt.plot(epochs, history['val_recall'], label='Val Recall')
    plt.title(f'CNN Fold {i} Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Recall')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

#### Recurrent Neural Network (RNN) Training

In [None]:
start_time = time.time()

# --- Define your model-creation function using RNN layers ---
def create_rnn_model():
    model = models.Sequential([
        # Input shape is (47,); reshape to (47, 1) for the RNN layers.
        layers.Input(shape=(47,)),
        layers.Reshape((47, 1)),
        # First SimpleRNN layer with 128 units returning sequences for stacking.
        layers.SimpleRNN(128, activation='relu', return_sequences=True),
        # Second SimpleRNN layer with 64 units; does not return sequences.
        layers.SimpleRNN(64, activation='relu', return_sequences=False),
        # Final Dense layer for binary classification.
        layers.Dense(1, activation='sigmoid')
    ])
    optimizer = optimizers.Adam(learning_rate=0.0001)
    model.compile(optimizer=optimizer,
                  loss='binary_crossentropy',
                  metrics=[
                      'binary_accuracy',
                      tf.keras.metrics.Precision(name='precision'),
                      tf.keras.metrics.Recall(name='recall')
                  ])
    return model

# --- Define Callbacks ---
early_stop = EarlyStopping(
    monitor='val_recall',
    patience=20,
    verbose=1,
    restore_best_weights=True
)

# --- Cross Validation Setup ---
num_folds = 5
skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

fold_no = 1
fold_histories = []
best_model_overall = None
best_val_recall = 0.0

for train_index, val_index in skf.split(X_train_smote_np, y_train_smote_np):
    print(f'--- Fold {fold_no} ---')
    
    # Split data into training and validation for this fold
    X_train_fold, X_val_fold = X_train_smote_np[train_index], X_train_smote_np[val_index]
    y_train_fold, y_val_fold = y_train_smote_np[train_index], y_train_smote_np[val_index]
    
    # Create tf.data.Dataset objects for training and validation
    train_dataset_fold = tf.data.Dataset.from_tensor_slices((X_train_fold, y_train_fold))
    train_dataset_fold = train_dataset_fold.shuffle(buffer_size=len(X_train_fold)).batch(64).prefetch(tf.data.AUTOTUNE)
    
    val_dataset_fold = tf.data.Dataset.from_tensor_slices((X_val_fold, y_val_fold))
    val_dataset_fold = val_dataset_fold.batch(64).prefetch(tf.data.AUTOTUNE)
    
    # Create a new instance of your RNN-based model for each fold
    model = create_rnn_model()
    
    # Setup a ModelCheckpoint callback for this fold to save the best model based on val_loss.
    # Using HDF5 format (.h5) to avoid unsupported "options" arguments in native Keras format.
    checkpoint_filepath = f'best_RNN_model_fold_{fold_no}.h5'
    checkpoint = ModelCheckpoint(
        filepath=checkpoint_filepath,
        monitor='val_recall',
        mode='max',
        save_best_only=True,
        verbose=1
    )
    
    # Train the model
    history = model.fit(
        train_dataset_fold,
        epochs=100,
        validation_data=val_dataset_fold,
        callbacks=[early_stop, checkpoint],
        verbose=1
    )
    
    fold_histories.append(history.history)
    
    # Evaluate the model on the validation fold
    scores = model.evaluate(val_dataset_fold, verbose=0)
    print(f"Fold {fold_no} - Loss: {scores[0]:.4f} - Accuracy: {scores[1]:.4f}")
    
    # Save the best overall model based on the final validation loss of the fold
    current_best_recall = max(history.history['val_recall'])
    
    if current_best_recall > best_val_recall:
        best_val_recall   = current_best_recall
        best_model_overall = model
    
    fold_no += 1

# --- Save the Best Overall Model ---
if best_model_overall:
    best_model_overall.save('best_RNN_model_overall.h5')
    print("Best overall model saved as best_RNN_model_overall.h5")

# --- Timing ---
current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("Current date and time:", current_datetime)
elapsed_time = time.time() - start_time
print("Total elapsed time: {:.2f} seconds".format(elapsed_time))

#### RNN Plot

In [None]:
# --- Plotting Metrics for Each Fold ---
for i, history in enumerate(fold_histories, 1):
    epochs = range(1, len(history['loss']) + 1)
    plt.figure(figsize=(15, 10))
    
    # Loss
    plt.subplot(2, 3, 1)
    plt.plot(epochs, history['loss'], label='Train Loss')
    plt.plot(epochs, history['val_loss'], label='Val Loss')
    plt.title(f'RNN Fold {i} Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    # Binary Accuracy
    plt.subplot(2, 3, 2)
    plt.plot(epochs, history['binary_accuracy'], label='Train Accuracy')
    plt.plot(epochs, history['val_binary_accuracy'], label='Val Accuracy')
    plt.title(f'RNN Fold {i} Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Precision
    plt.subplot(2, 3, 3)
    plt.plot(epochs, history['precision'], label='Train Precision')
    plt.plot(epochs, history['val_precision'], label='Val Precision')
    plt.title(f'RNN Fold {i} Precision')
    plt.xlabel('Epoch')
    plt.ylabel('Precision')
    plt.legend()
    
    # Recall
    plt.subplot(2, 3, 4)
    plt.plot(epochs, history['recall'], label='Train Recall')
    plt.plot(epochs, history['val_recall'], label='Val Recall')
    plt.title(f'RNN Fold {i} Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Recall')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

# Model Testing

#### MLP

In [None]:
# Load the saved model
model = tf.keras.models.load_model('best_MLP_model_overall.h5')
print("Model loaded successfully.")

# Convert to NumPy arrays with appropriate data types
X2_np = X2.to_numpy(dtype=np.float32)
y2_np = y2.to_numpy(dtype=np.int32)

# Use the model to predict probabilities, then convert to binary predictions using a threshold of 0.5
y_pred_prob = model.predict(X2_np)
y_pred = (y_pred_prob > 0.5).astype(int)

# Print the classification report
print("Classification Report:")
print(classification_report(y2_np, y_pred))

# Compute the confusion matrix
cm = confusion_matrix(y2_np, y_pred)
print("Confusion Matrix:")
print(cm)

# Plot the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=["Predicted Negative", "Predicted Positive"],
            yticklabels=["Actual Negative", "Actual Positive"])
plt.xlabel("Predicted Label")
plt.ylabel("Actual Label")
plt.title("Confusion Matrix for MLP Testing")
plt.show()

#### CNN

In [None]:
# Load the saved model
model = tf.keras.models.load_model('best_CNN_model_overall.h5')
print("Model loaded successfully.")

# Convert to NumPy arrays with appropriate data types
X2_np = X2.to_numpy(dtype=np.float32)
y2_np = y2.to_numpy(dtype=np.int32)

# Use the model to predict probabilities, then convert to binary predictions using a threshold of 0.5
y_pred_prob = model.predict(X2_np)
y_pred = (y_pred_prob > 0.5).astype(int)

# Print the classification report
print("Classification Report:")
print(classification_report(y2_np, y_pred))

# Compute the confusion matrix
cm = confusion_matrix(y2_np, y_pred)
print("Confusion Matrix:")
print(cm)

# Plot the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=["Predicted Negative", "Predicted Positive"],
            yticklabels=["Actual Negative", "Actual Positive"])
plt.xlabel("Predicted Label")
plt.ylabel("Actual Label")
plt.title("Confusion Matrix for CNN Testing")
plt.show()

#### RNN

In [None]:
# Load the saved model
model = tf.keras.models.load_model('best_RNN_model_overall.h5')
print("Model loaded successfully.")

# Convert to NumPy arrays with appropriate data types
X2_np = X2.to_numpy(dtype=np.float32)
y2_np = y2.to_numpy(dtype=np.int32)

# Use the model to predict probabilities, then convert to binary predictions using a threshold of 0.5
y_pred_prob = model.predict(X2_np)
y_pred = (y_pred_prob > 0.5).astype(int)

# Print the classification report
print("Classification Report:")
print(classification_report(y2_np, y_pred))

# Compute the confusion matrix
cm = confusion_matrix(y2_np, y_pred)
print("Confusion Matrix:")
print(cm)

# Plot the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=["Predicted Negative", "Predicted Positive"],
            yticklabels=["Actual Negative", "Actual Positive"])
plt.xlabel("Predicted Label")
plt.ylabel("Actual Label")
plt.title("Confusion Matrix for RNN Testing")
plt.show()