In [None]:
Python notebook:
```python
# -*- coding: utf-8 -*-

# # 0. Setup & Imports
#
# Install dependencies and import necessary libraries.

In [None]:
# Install necessary libraries if not already installed
# (Uncomment and run in environments like Google Colab)
# !pip install pyswarms scikit-learn pandas numpy matplotlib seaborn scipy tqdm requests -q
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import requests
from io import StringIO
import time
import warnings
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix, roc_curve, classification_report, f1_score, precision_score, recall_score
from tqdm.notebook import tqdm # Use tqdm for progress bars
import sys # Import sys to check Python version for get_feature_names_out
import os # Import os to check file existence
warnings.filterwarnings('ignore') # Suppress warnings

# # 1. Data Acquisition
#
# Load the NSL-KDD dataset from the specified GitHub repository.

In [None]:
# URLs
train_url    = "https://raw.githubusercontent.com/defcom17/NSL_KDD/master/KDDTrain%2B.txt"
test_url     = "https://raw.githubusercontent.com/defcom17/NSL_KDD/master/KDDTest%2B.txt"
features_url = "https://raw.githubusercontent.com/defcom17/NSL_KDD/master/Field%20Names.csv"
# Define column names based on the Field Names.csv structure
# The CSV has 'feature_name,type'. We only need the names.
# The last two columns in the data files are 'attack_type' and 'difficulty_level'.
column_names = []
try:
print("Attempting to fetch feature names...")
features_response = requests.get(features_url)
features_response.raise_for_status() # Check for request errors
# Read the CSV, it has two columns, we need the first one
features_df = pd.read_csv(StringIO(features_response.text), header=None)
column_names = features_df[0].tolist()
# Add the target and difficulty columns which are not in Field Names.csv
column_names.extend(['attack_type', 'difficulty_level'])
print("Feature names fetched successfully.")
except requests.exceptions.RequestException as e:
print(f"Error fetching feature names: {e}")
# Fallback list if fetch fails
column_names = [
'duration','protocol_type','service','flag','src_bytes','dst_bytes','land',
'wrong_fragment','urgent','hot','num_failed_logins','logged_in','num_compromised',
'root_shell','su_attempted','num_root','num_file_creations','num_shells',
'num_access_files','num_outbound_cmds','is_host_login','is_guest_login',
'count','srv_count','serror_rate','srv_serror_rate','rerror_rate',
'srv_rerror_rate','same_srv_rate','diff_srv_rate','srv_diff_host_rate',
'dst_host_count','dst_host_srv_count','dst_host_same_srv_rate',
'dst_host_diff_srv_rate','dst_host_same_src_port_rate',
'dst_host_srv_diff_host_rate','dst_host_serror_rate',
'dst_host_srv_serror_rate','dst_host_rerror_rate','dst_host_srv_rerror_rate',
'attack_type','difficulty_level']
print("Using fallback feature names.")
# Fetch datasets
df_train = None
df_test = None
data_loaded = False
try:
print(f"\nFetching training data from {train_url}...")
data_response = requests.get(train_url)
data_response.raise_for_status() # Check for request errors
df_train = pd.read_csv(StringIO(data_response.text), header=None, names=column_names)
print("Training data loaded successfully.")
print(f"Fetching testing data from {test_url}...")
test_response = requests.get(test_url)
test_response.raise_for_status() # Check for request errors
df_test = pd.read_csv(StringIO(test_response.text), header=None, names=column_names)
print("Testing data loaded successfully.")
data_loaded = True
print("\nTraining data shape:", df_train.shape)
print("Testing data shape:", df_test.shape)
# Drop the 'difficulty_level' column as it's not a feature for detection
if 'difficulty_level' in df_train.columns:
df_train = df_train.drop('difficulty_level', axis=1)
if 'difficulty_level' in df_test.columns:
df_test = df_test.drop('difficulty_level', axis=1)
# The 'attack_type' column contains specific attack names and 'normal'.
# We need to convert this into a binary classification problem: 'normal' vs 'attack'.
# Let's create a binary target variable: 1 for attack, 0 for normal.
# Get unique attack types to understand the data
print("\nUnique attack types in training data:")
print(df_train['attack_type'].unique())
# Create binary target variable
df_train['is_attack'] = (df_train['attack_type'] != 'normal').astype(int)
df_test['is_attack'] = (df_test['attack_type'] != 'normal').astype(int)
# Drop the original 'attack_type' column
df_train = df_train.drop('attack_type', axis=1)
df_test = df_test.drop('attack_type', axis=1)
print("\nTraining data shape after dropping columns:", df_train.shape)
print("Testing data shape after dropping columns:", df_test.shape)
# Display the first few rows
print("\nTraining data head:")
print(df_train.head())
except requests.exceptions.RequestException as e:
print(f"Error fetching data: {e}")
print("Please ensure you have an internet connection and the URLs are correct.")
print("Cannot proceed without data files.")
df_train = None
df_test = None
data_loaded = False
except Exception as e:
print(f"An unexpected error occurred during data loading or initial processing: {e}")
df_train = None
df_test = None
data_loaded = False
# Initialize empty data structures if data loading failed, to allow code structure to be viewed
if not data_loaded:
print("\nData loading failed. Initializing empty data structures for notebook structure demonstration.")
# Use the column_names defined earlier, excluding the dropped ones
feature_cols = [col for col in column_names if col not in ['attack_type', 'difficulty_level']]
X_train_raw, y_train = pd.DataFrame(columns=feature_cols), pd.Series(dtype='int')
X_test_raw, y_test = pd.DataFrame(columns=feature_cols), pd.Series(dtype='int')
X_train_p, X_test_p = np.array([]).reshape(0, len(feature_cols)), np.array([]).reshape(0, len(feature_cols))
X_tr, X_val, y_tr, y_val = np.array([]).reshape(0, len(feature_cols)), np.array([]).reshape(0, len(feature_cols)), pd.Series(dtype='int'), pd.Series(dtype='int')
feat_names = feature_cols # Use original names as fallback
```

# # 2. Data Preprocessing
#
# This step involves:
# - Handling missing values (NSL-KDD is relatively clean, but good practice to check).
# - Encoding categorical features (`protocol_type`, `service`, `flag`) using **One-Hot Encoding**.
# - Scaling numerical features using **MinMaxScaler**.
# - Splitting the training data into training (`X_tr`, `y_tr`) and validation (`X_val`, `y_val`) sets for use in the feature selection fitness function. The original test set (`X_test_p`, `y_test`) is kept separate for final evaluation.

In [None]:
if data_loaded:
# Split features/target
X_train_raw, y_train = df_train.drop('is_attack', axis=1), df_train['is_attack']
X_test_raw,  y_test  = df_test.drop('is_attack', axis=1),  df_test['is_attack']
# Identify categorical vs numeric
# Ensure we only select columns that exist in the dataframe
all_features = X_train_raw.columns.tolist()
cat_feats = [col for col in ['protocol_type', 'service', 'flag'] if col in all_features]
num_feats = [col for col in all_features if col not in cat_feats]
print(f"\nCategorical features: {cat_feats}")
print(f"Numerical features: {num_feats}")
# Pipelines
num_pipe = Pipeline([
('impute', SimpleImputer(strategy='median')),
('scale', MinMaxScaler())
])
cat_pipe = Pipeline([
('impute', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# Create a column transformer to apply different transformations to different columns
# Use remainder='passthrough' to keep any columns not explicitly listed (shouldn't be any here)
pre = ColumnTransformer([
('num', num_pipe, num_feats),
('cat', cat_pipe, cat_feats)
], remainder='passthrough')
# Fit/transform
# Fit the preprocessor only on the training data to avoid data leakage
X_train_p = pre.fit_transform(X_train_raw)
X_test_p  = pre.transform(X_test_raw)
# Feature names after preprocessing
try:
# For sklearn >= 1.0
feat_names = pre.get_feature_names_out()
except AttributeError:
# For older sklearn versions, this is more manual
cat_names = pre.named_transformers_['cat']['onehot'].get_feature_names(cat_feats)
feat_names = num_feats + list(cat_names)
# Handle remainder='passthrough' if used
if pre.remainder == 'passthrough':
passthrough_cols = [col for col in X_train_raw.columns if col not in num_feats and col not in cat_feats]
feat_names.extend(passthrough_cols)
# Create internal train/val split for FS fitness
# Use the processed training data for splitting
X_tr, X_val, y_tr, y_val = train_test_split(
X_train_p, y_train, test_size=0.25, stratify=y_train, random_state=42
)
print("Processed shapes: X_train_processed:", X_train_p.shape, "X_test_processed:", X_test_p.shape)
print("Feature selection splits: X_tr:", X_tr.shape, "X_val:", X_val.shape)
print(f"Number of features after preprocessing: {len(feat_names)}")
else:
print("\nSkipping preprocessing as data was not loaded.")
# Ensure these variables exist even if data loading failed
X_train_raw, y_train = pd.DataFrame(), pd.Series(dtype='int')
X_test_raw, y_test = pd.DataFrame(), pd.Series(dtype='int')
X_train_p, X_test_p = np.array([]).reshape(0,0), np.array([]).reshape(0,0)
X_tr, X_val, y_tr, y_val = np.array([]).reshape(0,0), np.array([]).reshape(0,0), pd.Series(dtype='int'), pd.Series(dtype='int')
feat_names = []
```

# # 3. Define Fitness Function
#
# Define a single fitness function that will be used by all swarm intelligence algorithms (individual and hybrid) for feature selection.
#
# - It takes a binary feature mask as input.
# - It trains a **fast classifier** (Logistic Regression) on the training split (`X_tr`, `y_tr`) using only the features indicated by the mask.
# - It evaluates the classifier's **accuracy** on the validation split (`X_val`, `y_val`).
# - It calculates fitness as `Accuracy - alpha * (Number of Selected Features / Total Features)`. This function **maximizes** fitness.
# - It handles edge cases like no features being selected or errors during training/evaluation.

In [None]:
if data_loaded and X_tr.shape[0] > 0:
# Define the fitness function for feature selection
def feature_fitness(mask, X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=0.05):
"""
Evaluates the fitness of a feature subset using a simple classifier on a validation set.
Fitness = Accuracy - alpha * (Number of Selected Features / Total Features)
Maximizing this fitness means maximizing accuracy and minimizing features.
"""
selected_indices = np.where(mask == 1)[0]
total_features = len(mask)
# If no features are selected, return a very low fitness
if len(selected_indices) == 0:
return -1.0
# Select features from the data subsets
X_train_selected = X_train_subset[:, selected_indices]
X_val_selected = X_val_subset[:, selected_indices]
# Train a simple classifier (e.g., Logistic Regression)
# Use a small max_iter for speed during feature selection
# Handle potential errors during training (e.g., if selected features are all zero variance)
try:
# Check for sufficient samples and features
if X_train_selected.shape[0] < 2 or X_train_selected.shape[1] == 0:
return -1.0 # Not enough data or features
# Use a fast classifier like Logistic Regression
# Add a small regularization (C) and handle potential convergence warnings
clf = LogisticRegression(max_iter=200, solver='liblinear', random_state=42, C=0.1)
clf.fit(X_train_selected, y_train_subset)
# Evaluate on the validation set
accuracy = clf.score(X_val_selected, y_val_subset) # Use score for accuracy
# Penalty for number of features
num_selected = len(selected_indices)
feature_penalty = alpha * (num_selected / total_features)
fitness = accuracy - feature_penalty
# Return a very low fitness if accuracy is NaN or problematic
if np.isnan(fitness):
return -1.0
return fitness
except Exception as e:
# print(f"Error during fitness evaluation: {e}") # Uncomment for debugging
return -1.0 # Return low fitness on error
print("\nFeature selection fitness function defined.")
else:
print("\nSkipping fitness function definition as data was not loaded or split.")
# Define a dummy function to avoid errors if called
def feature_fitness(mask, X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=0.05):
print("Fitness function called but data not loaded.")
return -1.0
```

# # 4. Implement Individual Swarm FS Algorithms
#
# Implement the individual ACO, PSO, ABC, and MWPA algorithms for binary feature selection. These will serve as benchmarks to compare against the hybrid approach.
#
# - Each function will take the data splits (`X_tr`, `y_tr`, `X_val`, `y_val`) and algorithm parameters.
# - They will use the `feature_fitness` function defined in Step 3.
# - They will return the best selected feature mask (binary vector), the fitness history, and computational time.
#
# **Note:** The implementations below are adapted from your provided snippets and use the corrected fitness function. MWPA implementation is based on the structure provided.

In [None]:
import numpy as np
import time
if data_loaded and X_tr.shape > 0:
n_features_fs = X_tr.shape
# Helper function to convert continuous position to binary mask
# Used by PSO, ABC, MWPA
def _binarize(pos, threshold=0.5):
"""Sigmoid thresholding for continuous positions."""
# Apply sigmoid to map values to (0, 1)
# Use a steeper sigmoid (e.g., 10x or 20x) for clearer binarization
sigmoid_output = 1 / (1 + np.exp(-15 * (pos - threshold)))
# Threshold to get binary mask
return (sigmoid_output > threshold).astype(int)
# --- Ant Colony Optimization (ACO) for Feature Selection ---
def aco_fs(X_train_subset, y_train_subset, X_val_subset, y_val_subset,
n_agents=30, max_iter=50, evap_rate=0.1, alpha_fitness=0.05):
"""
Ant Colony Optimization for binary feature selection.
Uses the feature_fitness function.
"""
n_feat = X_train_subset.shape
# Pheromone trail on each feature (higher pheromone -> more likely to be selected)
pher = np.ones(n_feat) * 0.1 # Initialize pheromones
best_mask = np.zeros(n_feat, dtype=int)
best_fit = -np.inf
fitness_history = []
# print("\nRunning ACO Feature Selection...") # Moved print outside
start_time = time.time()
for t in range(max_iter):
ants_masks = []
fits = []
# Calculate selection probability for each feature
# Probability of selecting feature i = pheromone[i] / sum(pheromones)
# Add a small epsilon to avoid division by zero if all pheromones are zero
prob = (pher + 1e-9) / (np.sum(pher) + 1e-9)
for i in range(n_agents):
# Build ant solution: probabilistically decide whether to include each feature
mask = (np.random.rand(n_feat) < prob).astype(int)
ants_masks.append(mask)
# Evaluate fitness using the common fitness function
fit_i = feature_fitness(mask, X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
fits.append(fit_i)
# Update global best based on MAXIMUM fitness
current_fits = np.array(fits)
# Handle case where all fits are -inf
if np.max(current_fits) > best_fit:
best_current_idx = np.argmax(current_fits)
current_best_fit = current_fits[best_current_idx]
current_best_mask = ants_masks[best_current_idx]
if current_best_fit > best_fit:
best_fit = current_best_fit
best_mask = current_best_mask.copy()
# Pheromone evaporation
pher *= (1 - evap_rate)
# Pheromone deposit: Deposit pheromone on features in the best mask found so far (Elitist ACO)
# Deposit amount could be proportional to fitness or a fixed value
if best_fit > -np.inf: # Only deposit if a valid solution was found
deposit = best_fit # Example: Deposit amount is the best fitness
pher[best_mask == 1] += deposit
# Ensure pheromones don't become negative or too small (optional clipping)
pher = np.maximum(pher, 1e-2) # Keep pheromones slightly positive
fitness_history.append(best_fit)
# print(f"  ACO Iter {t+1}/{max_iter}, Best Fit: {best_fit:.4f}, Selected: {np.sum(best_mask)}") # Uncomment for verbose
end_time = time.time()
comp_time = end_time - start_time
# print(f"ACO finished in {comp_time:.2f} seconds. Best Fitness: {best_fit:.4f}, Selected Features: {np.sum(best_mask)}") # Moved print outside
return best_mask, fitness_history, comp_time
# --- Particle Swarm Optimization (PSO) for Feature Selection ---
# This is a binary PSO adaptation using continuous positions and binarization
def pso_fs(X_train_subset, y_train_subset, X_val_subset, y_val_subset,
n_agents=30, max_iter=50, w=0.7, c1=1.5, c2=1.5, alpha_fitness=0.05):
"""
Binary Particle Swarm Optimization for feature selection.
Uses the feature_fitness function.
"""
n_feat = X_train_subset.shape
# Positions are continuous, velocities are continuous
pos = np.random.rand(n_agents, n_feat)
vel = np.zeros_like(pos) # Initialize velocities to zero
# Personal bests (positions and fitnesses)
pbest_pos = pos.copy()
pbest_fit = np.full(n_agents, -np.inf)
# Global best (position and fitness)
gbest_pos = np.zeros(n_feat) # Initialize global best position
gbest_fit = -np.inf
fitness_history = []
# print("\nRunning PSO Feature Selection...") # Moved print outside
start_time = time.time()
# Evaluate initial population to set initial personal and global bests
for i in range(n_agents):
fit_i = feature_fitness(_binarize(pos[i]), X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
pbest_fit[i] = fit_i
pbest_pos[i] = pos[i].copy()
if fit_i > gbest_fit:
gbest_fit = fit_i
gbest_pos = pos[i].copy()
fitness_history.append(gbest_fit)
for t in range(max_iter):
# Update inertia weight (linear decay example)
current_w = w * (max_iter - t) / max_iter # Linear decay from w to 0
for i in range(n_agents):
r1, r2 = np.random.rand(n_feat), np.random.rand(n_feat)
# Update velocity (standard PSO velocity update)
vel[i] = current_w * vel[i] + c1 * r1 * (pbest_pos[i] - pos[i]) + c2 * r2 * (gbest_pos - pos[i])
# Update position (standard PSO position update for continuous space)
pos[i] += vel[i]
# Apply bounds to position
pos[i] = np.clip(pos[i], 0, 1)
# Evaluate fitness of the BINARY mask derived from the continuous position
current_mask = _binarize(pos[i])
fit_i = feature_fitness(current_mask, X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
# Update personal best (based on MAXIMUM fitness)
if fit_i > pbest_fit[i]:
pbest_fit[i] = fit_i
pbest_pos[i] = pos[i].copy()
# Update global best (based on MAXIMUM fitness)
if fit_i > gbest_fit:
gbest_fit = fit_i
gbest_pos = pos[i].copy()
fitness_history.append(gbest_fit)
# print(f"  PSO Iter {t+1}/{max_iter}, Best Fit: {gbest_fit:.4f}, Selected: {np.sum(_binarize(gbest_pos))}") # Uncomment for verbose
end_time = time.time()
comp_time = end_time - start_time
final_mask = _binarize(gbest_pos) # Final mask is from the global best position
# print(f"PSO finished in {comp_time:.2f} seconds. Best Fitness: {gbest_fit:.4f}, Selected Features: {np.sum(final_mask)}") # Moved print outside
return final_mask, fitness_history, comp_time
# --- Artificial Bee Colony (ABC) for Feature Selection ---
# This is a continuous ABC adaptation for positions
def abc_fs(X_train_subset, y_train_subset, X_val_subset, y_val_subset,
n_agents=30, max_iter=50, limit=5, alpha_fitness=0.05):
"""
Artificial Bee Colony for binary feature selection.
Uses the feature_fitness function.
"""
n_feat = X_train_subset.shape
n_employed = n_agents // 2 # Half are employed, half are onlooker
n_onlooker = n_agents - n_employed
# Food sources (positions) are continuous
foods = np.random.rand(n_employed, n_feat)
fitnesses = np.full(n_employed, -np.inf)
trials = np.zeros(n_employed, dtype=int) # Trial counts for scout phase
best_mask = np.zeros(n_feat, dtype=int)
best_fit = -np.inf
fitness_history = []
# print("\nRunning ABC Feature Selection...") # Moved print outside
start_time = time.time()
# Evaluate initial food sources
for i in range(n_employed):
fitnesses[i] = feature_fitness(_binarize(foods[i]), X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
if fitnesses[i] > best_fit:
best_fit = fitnesses[i]
best_mask = _binarize(foods[i]).copy()
fitness_history.append(best_fit)
for t in range(max_iter):
# --- Employed bees phase ---
for i in range(n_employed):
# Select a random neighbor food source (j != i)
# Ensure there's at least one other food source
if n_employed > 1:
neighbor_idx = np.random.choice([j for j in range(n_employed) if j != i])
else:
neighbor_idx = i # If only one, compare to itself (no change)
# Generate a new candidate solution
phi = np.random.uniform(-1, 1, n_feat)
candidate_pos = foods[i] + phi * (foods[i] - foods[neighbor_idx])
# Apply bounds
candidate_pos = np.clip(candidate_pos, 0, 1)
# Evaluate candidate fitness
candidate_mask = _binarize(candidate_pos)
candidate_fit = feature_fitness(candidate_mask, X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
# Greedy selection: Replace food source if candidate is better (MAXIMIZING fitness)
if candidate_fit > fitnesses[i]:
foods[i] = candidate_pos
fitnesses[i] = candidate_fit
trials[i] = 0 # Reset trial count
else:
trials[i] += 1 # Increment trial count
# Update global best after employed phase
current_best_employed_idx = np.argmax(fitnesses)
if fitnesses[current_best_employed_idx] > best_fit:
best_fit = fitnesses[current_best_employed_idx]
best_mask = _binarize(foods[current_best_employed_idx]).copy()
# --- Onlooker bees phase ---
# Calculate selection probabilities based on fitness (higher fitness -> higher probability)
# Avoid division by zero or negative fitness if using 1/fitness
# Using softmax-like probability based on positive fitness values
# Add a small constant to fitness to make all values positive for probability calculation
positive_fitnesses = fitnesses - np.min(fitnesses) + 1e-9 # Shift to positive, add epsilon
total_fitness = np.sum(positive_fitnesses)
selection_probs = positive_fitnesses / total_fitness if total_fitness > 0 else np.ones(n_employed) / n_employed
# Onlookers select food sources based on probabilities and search around them
if total_fitness > 0 and n_employed > 1: # Only proceed if there's at least one non-negative fitness and enough sources
selected_indices = np.random.choice(n_employed, size=n_onlooker, p=selection_probs)
for i in range(n_onlooker):
selected_food_idx = selected_indices[i]
# Select a random neighbor food source (j != selected_food_idx)
neighbor_idx = np.random.choice([j for j in range(n_employed) if j != selected_food_idx])
# Generate a new candidate solution
phi = np.random.uniform(-1, 1, n_feat)
candidate_pos = foods[selected_food_idx] + phi * (foods[selected_food_idx] - foods[neighbor_idx])
# Apply bounds
candidate_pos = np.clip(candidate_pos, 0, 1)
# Evaluate candidate fitness
candidate_mask = _binarize(candidate_pos)
candidate_fit = feature_fitness(candidate_mask, X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
# Greedy selection: Replace food source if candidate is better (MAXIMIZING fitness)
if candidate_fit > fitnesses[selected_food_idx]:
foods[selected_food_idx] = candidate_pos
fitnesses[selected_food_idx] = candidate_fit
trials[selected_food_idx] = 0 # Reset trial count
else:
trials[selected_food_idx] += 1 # Increment trial count
# Update global best after onlooker phase
current_best_onlooker_idx = np.argmax(fitnesses)
if fitnesses[current_best_onlooker_idx] > best_fit:
best_fit = fitnesses[current_best_onlooker_idx]
best_mask = _binarize(foods[current_best_onlooker_idx]).copy()
# --- Scout bees phase ---
# Identify exhausted food sources (trials > limit)
scout_indices = np.where(trials >= limit) # Get indices as a numpy array
for i in scout_indices:
# Replace exhausted food source with a new randomly generated one
foods[i] = np.random.rand(n_feat)
fitnesses[i] = feature_fitness(_binarize(foods[i]), X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
trials[i] = 0 # Reset trial count
# Update global best if the new scout solution is better
if fitnesses[i] > best_fit:
best_fit = fitnesses[i]
best_mask = _binarize(foods[i]).copy()
fitness_history.append(best_fit)
# print(f"  ABC Iter {t+1}/{max_iter}, Best Fit: {best_fit:.4f}, Selected: {np.sum(best_mask)}") # Uncomment for verbose
end_time = time.time()
comp_time = end_time - start_time
final_mask = _binarize(foods[np.argmax(fitnesses)]) # Final mask is from the best food source at the end
# print(f"ABC finished in {comp_time:.2f} seconds. Best Fitness: {best_fit:.4f}, Selected Features: {np.sum(final_mask)}") # Moved print outside
return final_mask, fitness_history, comp_time
# --- Modified Wolf Predation Algorithm (MWPA) for Feature Selection ---
# This is a continuous MWPA adaptation for positions
# Based on the structure provided in the PDF snippet.
def mwpa_fs(X_train_subset, y_train_subset, X_val_subset, y_val_subset,
n_agents=30, max_iter=50, alpha_fitness=0.05):
"""
Modified Wolf Predation Algorithm (MWPA) for binary feature selection.
Uses the feature_fitness function.
"""
n_feat = X_train_subset.shape
# Initialize wolf pack (positions are continuous)
wolves = np.random.rand(n_agents, n_feat)
fits = np.full(n_agents, -np.inf)
# Alpha wolf (best wolf)
alpha_pos = np.zeros(n_feat) # Initialize alpha position
alpha_fit = -np.inf
best_mask = np.zeros(n_feat, dtype=int)
best_fit = -np.inf
fitness_history = []
# print("\nRunning MWPA Feature Selection...") # Moved print outside
start_time = time.time()
# Evaluate initial wolf pack to find initial alpha
for i in range(n_agents):
fits[i] = feature_fitness(_binarize(wolves[i]), X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
if fits[i] > alpha_fit:
alpha_fit = fits[i]
alpha_pos = wolves[i].copy()
best_fit = alpha_fit # Initial best is alpha's fitness
best_mask = _binarize(alpha_pos).copy()
fitness_history.append(best_fit)
for t in range(max_iter):
# Update alpha wolf based on current best fitness in the pack
current_best_idx = np.argmax(fits)
if fits[current_best_idx] > alpha_fit:
alpha_fit = fits[current_best_idx]
alpha_pos = wolves[current_best_idx].copy()
# Update global best if alpha is better than overall best found so far
if alpha_fit > best_fit:
best_fit = alpha_fit
best_mask = _binarize(alpha_pos).copy()
# Main loop for wolf predation behavior
for i in range(n_agents):
# Two random factors
r1, r2 = np.random.rand(), np.random.rand()
# Hunting coefficient, shrinking linearly from 2 to 0
A = 2 * (1 - t / max_iter) * r1
# Distance to alpha (prey)
D = np.abs(2 * r2 * alpha_pos - wolves[i])
# Update wolf position (predation step)
# The update rule from the PDF snippet: alpha - A * (D**1.5)
# This is a position update.
wolves[i] = np.clip(alpha_pos - A * (D**1.5), 0, 1)
# Re-evaluate fitnesses and update alpha for the next iteration
for i in range(n_agents):
fits[i] = feature_fitness(_binarize(wolves[i]), X_train_subset, y_train_subset, X_val_subset, y_val_subset, alpha=alpha_fitness)
fitness_history.append(best_fit)
# print(f"  MWPA Iter {t+1}/{max_iter}, Best Fit: {best_fit:.4f}, Selected: {np.sum(best_mask)}") # Uncomment for verbose
end_time = time.time()
comp_time = end_time - start_time
final_mask = _binarize(alpha_pos) # Final mask is from the best wolf position
# print(f"MWPA finished in {comp_time:.2f} seconds. Best Fitness: {best_fit:.4f}, Selected Features: {np.sum(final_mask)}") # Moved print outside
return final_mask, fitness_history, comp_time
else:
print("\nSkipping individual swarm algorithm definitions as data was not loaded or split.")
# Define dummy functions to avoid errors if called
def aco_fs(*args, **kwargs): return np.array([]), [], 0
def pso_fs(*args, **kwargs): return np.array([]), [], 0
def abc_fs(*args, **kwargs): return np.array([]), [], 0
def mwpa_fs(*args, **kwargs): return np.array([]), [], 0
```

# # 5. Implement Plausible Hybrid Swarm Intelligence for Feature Selection
#
# Implement a plausible hybrid ACO+PSO+ABC+MWPA algorithm for binary feature selection. This implementation attempts to combine the search mechanisms within a single iterative loop.
#
# - This class `HybridSwarmFeatureSelector` manages the population and the iteration process.
# - It uses the `feature_fitness` function.
# - The `update_agents` method implements the hybridization logic. A common strategy is to apply different update rules (or combinations) probabilistically to agents in each iteration. This implementation uses a probabilistic approach where each agent applies one of the component-inspired updates (ACO, PSO+MWPA, ABC).
# - It returns the best selected feature mask, fitness history, and computational time.
#
# **Note:** This is a *plausible interpretation* of how these algorithms could be hybridized for binary feature selection. Your specific novel algorithm might differ. You should replace the logic inside `initialize_agents` and `update_agents` with your precise implementation if needed.

In [None]:
import numpy as np
import time
if data_loaded and X_tr.shape[0] > 0:
n_features_hybrid = X_tr.shape[1]
# --- Plausible Hybrid Swarm Intelligence Algorithm for Feature Selection ---
class HybridSwarmFeatureSelector:
def __init__(self, n_features, n_agents=50, max_iter=50, alpha_fitness=0.05,
pso_w=0.7, pso_c1=1.5, pso_c2=1.5, aco_evap=0.1, aco_deposit=0.1,
abc_limit=5, mwpa_beta=1.5, hybrid_probs=[0.33, 0.33, 0.34]): # Probabilities for [ACO, PSO+MWPA, ABC]
"""
Initializes the hybrid swarm intelligence feature selector.
Uses a probabilistic approach to apply component-inspired updates.
"""
self.n_features = n_features
self.n_agents = n_agents
self.max_iter = max_iter
self.alpha_fitness = alpha_fitness # Weight for feature penalty in fitness function
# Algorithm parameters
self.pso_w = pso_w
self.pso_c1 = pso_c1
self.pso_c2 = pso_c2
self.aco_evap = aco_evap
self.aco_deposit = aco_deposit
self.abc_limit = abc_limit
self.mwpa_beta = mwpa_beta
self.hybrid_probs = hybrid_probs # Probabilities for selecting update type
# Agent state (continuous positions for PSO/ABC/MWPA base)
self.agents_pos = np.random.rand(self.n_agents, self.n_features)
self.velocities = np.zeros_like(self.agents_pos) # For PSO part
self.pbest_pos = self.agents_pos.copy()
self.pbest_fit = np.full(self.n_agents, -np.inf)
# Global best (shared)
self.gbest_pos = np.zeros(self.n_features)
self.gbest_fit = -np.inf
self.best_feature_mask = np.zeros(self.n_features, dtype=int) # Overall best mask
# ACO components (pheromone trail on each feature)
self.pheromones = np.ones(self.n_features) * 0.1
# ABC components (trial counts for scout phase)
self.trials = np.zeros(self.n_agents, dtype=int)
# MWPA components (alpha wolf is the global best)
# Hunting coefficient A will be calculated per iteration/agent
self.fitness_history = [] # To track convergence
self.computational_time = 0 # To track time
# Store data splits for fitness evaluation
self.X_train_val = None
self.y_train_val = None
self.X_val = None
self.y_val = None
print("HybridSwarmFeatureSelector initialized.")
def initialize_agents(self):
"""
Initialize the population of agents (continuous positions) and algorithm-specific components.
"""
# Initialize agents as random continuous vectors in [0, 1]
self.agents_pos = np.random.rand(self.n_agents, self.n_features)
self.velocities = np.zeros_like(self.agents_pos) # For PSO part
self.pbest_pos = self.agents_pos.copy()
self.pbest_fit = np.full(self.n_agents, -np.inf)
# Initialize pheromones
self.pheromones = np.ones(self.n_features) * 0.1
# Initialize trial counts for ABC
self.trials = np.zeros(self.n_agents, dtype=int)
# Evaluate initial population to set initial personal and global bests
initial_fitnesses = [feature_fitness(_binarize(pos), self.X_train_val, self.y_train_val, self.X_val, self.y_val, self.alpha_fitness) for pos in self.agents_pos]
# Find initial best (MAXIMIZING fitness)
if max(initial_fitnesses) > -np.inf:
best_initial_idx = np.argmax(initial_fitnesses)
self.gbest_fit = initial_fitnesses[best_initial_idx]
self.gbest_pos = self.agents_pos[best_initial_idx].copy()
self.best_fitness = self.gbest_fit
self.best_feature_mask = _binarize(self.gbest_pos).copy()
else:
# If all initial solutions are invalid
self.gbest_fit = -np.inf
self.gbest_pos = np.random.rand(self.n_features) # Still need a starting point for gbest_pos update
self.best_fitness = -np.inf
self.best_feature_mask = np.zeros(self.n_features, dtype=int)
self.pbest_fit = np.array(initial_fitnesses)
def update_agents(self, iteration):
"""
Perform one iteration of the hybrid swarm intelligence algorithm for feature selection.
Applies component-inspired updates probabilistically to each agent.
"""
new_agents_pos = self.agents_pos.copy()
new_velocities = self.velocities.copy()
current_fitnesses = np.full(self.n_agents, -np.inf) # To store fitnesses of updated agents
# Update inertia weight (linear decay)
current_w = self.pso_w * (self.max_iter - iteration) / self.max_iter
# Update hunting coefficient A for MWPA (linear decay)
current_A = 2 * (1 - iteration / self.max_iter)
for i in range(self.n_agents):
# Probabilistically choose which algorithm's update to apply
choice = np.random.choice(['ACO', 'PSO+MWPA', 'ABC'], p=self.hybrid_probs)
current_pos = self.agents_pos[i]
current_vel = self.velocities[i]
pbest_pos_i = self.pbest_pos[i]
gbest_pos_i = self.gbest_pos # Global best is the alpha wolf/prey
if choice == 'PSO+MWPA':
# --- PSO + MWPA Velocity Update ---
r1, r2 = np.random.rand(self.n_features), np.random.rand(self.n_features) # Random vectors for PSO
r3 = np.random.rand() # Random scalar for MWPA D calculation
# PSO terms
pso_vel_term = self.pso_c1 * r1 * (pbest_pos_i - current_pos) + self.pso_c2 * r2 * (gbest_pos_i - current_pos)
# MWPA term (adapted to influence velocity)
# Distance to alpha (gbest)
D = np.abs(2 * r3 * gbest_pos_i - current_pos)
# The abstract equation adds A*D^alpha to velocity. Let's use A * (D**beta)
mwpa_vel_term = current_A * (D**self.mwpa_beta) * np.random.uniform(-1, 1, self.n_features) # Add randomness and direction
# Combined velocity update
new_velocities[i] = current_w * current_vel + pso_vel_term + mwpa_vel_term
# Update position
new_agents_pos[i] = current_pos + new_velocities[i]
elif choice == 'ABC':
# --- ABC Search Update ---
# Select a random neighbor agent (j != i)
if self.n_agents > 1:
neighbor_idx = np.random.choice([j for j in range(self.n_agents) if j != i])
else:
neighbor_idx = i # If only one, compare to itself (no change)
phi = np.random.uniform(-1, 1, self.n_features)
new_agents_pos[i] = current_pos + phi * (current_pos - self.agents_pos[neighbor_idx])
new_velocities[i] = np.zeros_like(current_vel) # Reset velocity after ABC move
elif choice == 'ACO':
# --- ACO-inspired Update (for continuous space) ---
# Use pheromones to bias a random walk or step towards gbest
# Probability of moving towards gbest vs random walk could depend on pheromones?
# Simple approach: Add a step biased by pheromones and gbest
prob = (self.pheromones + 1e-9) / (np.sum(self.pheromones) + 1e-9)
# Step direction biased by pheromones (more likely to step in dimensions with high pheromone)
biased_random_step = np.random.rand(self.n_features) * prob * 0.1 # Step size scaled by prob
# Move towards gbest, scaled by a random factor and potentially pheromones
move_towards_gbest = np.random.rand(self.n_features) * (gbest_pos_i - current_pos) * prob * 0.1
new_agents_pos[i] = current_pos + biased_random_step + move_towards_gbest
new_velocities[i] = np.zeros_like(current_vel) # Reset velocity after ACO move
# Apply bounds [0, 1] to positions after update
new_agents_pos[i] = np.clip(new_agents_pos[i], 0, 1)
# Evaluate fitness of the BINARY mask derived from the new continuous position
current_mask = _binarize(new_agents_pos[i])
current_fitnesses[i] = feature_fitness(current_mask, self.X_train_val, self.y_train_val, self.X_val, self.y_val, self.alpha_fitness)
# --- Update Personal Best (PSO/Hybrid) ---
if current_fitnesses[i] > self.pbest_fit[i]:
self.pbest_fit[i] = current_fitnesses[i]
self.pbest_pos[i] = new_agents_pos[i].copy()
# --- Update Trial Count (ABC) ---
# If the position improved fitness, reset trial count. Otherwise, increment.
# This requires comparing to the fitness *before* the update.
# A simpler approach in a hybrid: if fitness improved, reset trials.
if current_fitnesses[i] > feature_fitness(_binarize(current_pos), self.X_train_val, self.y_train_val, self.X_val, self.y_val, self.alpha_fitness):
self.trials[i] = 0
else:
self.trials[i] += 1
# --- Apply Global Updates (after all agents are updated) ---
self.agents_pos = new_agents_pos
self.velocities = new_velocities
# Update Global Best (PSO/MWPA/Hybrid - based on MAXIMUM fitness)
best_current_idx = np.argmax(current_fitnesses)
if current_fitnesses[best_current_idx] > self.gbest_fit:
self.gbest_fit = current_fitnesses[best_current_idx]
self.gbest_pos = self.agents_pos[best_current_idx].copy()
self.best_fitness = self.gbest_fit # Update overall best fitness
self.best_feature_mask = _binarize(self.gbest_pos).copy() # Update overall best mask
# --- ACO Pheromone Update ---
# Evaporation
self.pheromones *= (1 - self.aco_evap)
# Deposit on features in the global best mask
if self.best_fitness > -np.inf:
self.pheromones[self.best_feature_mask == 1] += self.aco_deposit * self.best_fitness # Deposit proportional to fitness
self.pheromones = np.maximum(self.pheromones, 1e-2) # Keep pheromones slightly positive
# --- ABC Scout Phase ---
# Identify exhausted agents (trials > limit) and replace them
scout_indices = np.where(self.trials >= self.limit)[0]
for i in scout_indices:
# Replace with a new randomly generated position
self.agents_pos[i] = np.random.rand(self.n_features)
self.velocities[i] = np.zeros(self.n_features) # Reset velocity
self.trials[i] = 0 # Reset trial count
# Re-evaluate fitness for the new agent and update personal/global bests if needed
new_fit = feature_fitness(_binarize(self.agents_pos[i]), self.X_train_val, self.y_train_val, self.X_val, self.y_val, self.alpha_fitness)
self.pbest_fit[i] = new_fit # New position is the new personal best
self.pbest_pos[i] = self.agents_pos[i].copy()
if new_fit > self.gbest_fit:
self.gbest_fit = new_fit
self.gbest_pos = self.agents_pos[i].copy()
self.best_fitness = self.gbest_fit
self.best_feature_mask = _binarize(self.gbest_pos).copy()
def run(self, X_train_val, y_train_val, X_val, y_val):
"""Runs the hybrid swarm intelligence algorithm for feature selection."""
self.X_train_val = X_train_val
self.y_train_val = y_train_val
self.X_val = X_val
self.y_val = y_val
if self.n_features == 0:
print("\nNo features available for selection. Skipping feature selection.")
self.best_feature_mask = np.zeros(0, dtype=int)
self.best_fitness = -np.inf
self.fitness_history = []
self.computational_time = 0
return self.best_feature_mask, self.fitness_history, self.computational_time
self.initialize_agents()
# Ensure initial best fitness is recorded if valid
if self.best_fitness > -np.inf:
self.fitness_history = [self.best_fitness]
else:
self.fitness_history = [] # Start empty if no valid initial solution
# print("\nStarting Hybrid Swarm Intelligence Feature Selection...") # Moved print outside
start_time = time.time()
for iteration in range(self.max_iter):
self.update_agents(iteration) # Pass iteration number if needed for parameters (e.g., inertia)
# Append current best fitness (could be the same as previous)
self.fitness_history.append(self.best_fitness)
# print(f"  Hybrid FS Iter {iteration+1}/{self.max_iter}, Best Fit: {self.best_fitness:.4f}, Selected: {np.sum(self.best_feature_mask)}") # Uncomment for verbose
end_time = time.time()
self.computational_time = end_time - start_time
# print("Feature Selection finished.") # Moved print outside
# print(f"Best Fitness found: {self.best_fitness:.4f}") # Moved print outside
# print(f"Number of features selected: {np.sum(self.best_feature_mask)}") # Moved print outside
# print(f"Computational time: {self.computational_time:.2f} seconds") # Moved print outside
return self.best_feature_mask, self.fitness_history, self.computational_time
# --- End of Plausible Hybrid Swarm Intelligence Algorithm for Feature Selection ---
else:
print("\nSkipping hybrid swarm algorithm definition as data was not loaded or split.")
# Define a dummy class to avoid errors if instantiated
class HybridSwarmFeatureSelector:
def __init__(self, *args, **kwargs): pass
def run(self, *args, **kwargs):
print("Hybrid selector called but data not loaded.")
return np.array([]), [], 0
```

# # 6. Run Feature Selection Experiments
#
# Run each of the implemented feature selection methods (Individual ACO, PSO, ABC, MWPA, and the Hybrid) on the training/validation data splits.
#
# Collect the results: the best feature mask found, the convergence history (best fitness per iteration), and the computational time for each method.

In [None]:
feature_selection_results = {}
if data_loaded and X_tr.shape > 0:
print("\n--- Running Feature Selection Experiments ---")
# Define the methods to run
# Map method names to their corresponding functions/classes
fs_methods = {
"ACO": aco_fs,
"PSO": pso_fs,
"ABC": abc_fs,
"MWPA": mwpa_fs,
"Hybrid": HybridSwarmFeatureSelector # Instantiate the class for hybrid
}
# Parameters for the runs (can be adjusted)
run_params = {
"n_agents": 30,
"max_iter": 50, # Reduced iterations for faster example run. Increase for better results.
"alpha_fitness": 0.05 # Feature penalty weight
# Add specific parameters for individual/hybrid methods here if needed
# Example for Hybrid: 'pso_w': 0.8, 'aco_evap': 0.05, 'hybrid_probs': [0.4, 0.3, 0.3]
}
# Loop through each method and run it
for name, method in tqdm(fs_methods.items(), desc="Running FS Methods"):
print(f"\nRunning FS method: {name}")
try:
if name == "Hybrid":
# Instantiate the hybrid class
selector = method(n_features=X_tr.shape, **run_params)
selected_mask, history, comp_time = selector.run(X_tr, y_tr, X_val, y_val)
else:
# Run the individual function
selected_mask, history, comp_time = method(X_tr, y_tr, X_val, y_val, **run_params)
feature_selection_results[name] = {
'selected_mask': selected_mask,
'num_features': np.sum(selected_mask) if selected_mask is not None else 0,
'fitness_history': history,
'time': comp_time,
'best_fitness': history[-1] if history else -np.inf # Final fitness
}
print(f"{name} completed. Selected Features: {feature_selection_results[name]['num_features']}, Final Fitness: {feature_selection_results[name]['best_fitness']:.4f}, Time: {feature_selection_results[name]['time']:.2f}s")
except Exception as e:
print(f"Error running {name}: {e}")
feature_selection_results[name] = {
'selected_mask': None,
'num_features': 0,
'fitness_history': [],
'time': 0,
'best_fitness': -np.inf,
'error': str(e)
}
print(f"{name} failed.")
print("\n--- Feature Selection Experiments Complete ---")
# Summarize feature selection results
print("\nFeature Selection Summary:")
for name, results in feature_selection_results.items():
if 'error' in results:
print(f"  {name}: ERROR - {results['error']}")
else:
print(f"  {name}: Selected Features = {results['num_features']}, Final Fitness = {results['best_fitness']:.4f}, Time = {results['time']:.2f}s")
# Plot convergence curves for all methods
plt.figure(figsize=(12, 8))
for name, results in feature_selection_results.items():
if results['fitness_history']:
plt.plot(results['fitness_history'], label=f'{name} (Final Fit: {results["best_fitness"]:.4f})')
plt.xlabel("Iteration")
plt.ylabel("Best Fitness")
plt.title("Feature Selection Convergence Comparison")
plt.legend()
plt.grid(True)
plt.show()
else:
print("\nSkipping feature selection experiments as data was not loaded or split.")
feature_selection_results = {}
```

# # 7. Model Training
#
# Train final classifiers (Random Forest, SVM, Neural Networks) using the features selected by **each** feature selection method (Individual ACO, PSO, ABC, MWPA, and Hybrid).
#
# We will use the combined training and validation set (`X_tr` + `X_val`, `y_tr` + `y_val`) for training the final models. Hyperparameter tuning will be done using cross-validation on this combined set.

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
import time
final_models = {}
final_model_training_time = {}
final_model_best_params = {}
# Proceed only if feature selection results are available and test data exists
if feature_selection_results and X_test_p.shape[0] > 0:
print("\n--- Starting Final Model Training ---")
# Define classifiers
classifiers = {
"Random Forest": RandomForestClassifier(random_state=42),
"SVM": SVC(probability=True, random_state=42), # probability=True needed for ROC AUC
"Neural Network": MLPClassifier(random_state=42, max_iter=500) # Increased max_iter for better convergence
}
# Hyperparameter tuning distributions (can be adjusted)
# Reduced n_iter for faster example run. Increase for better tuning.
param_distributions = {
"Random Forest": {
'n_estimators': [50, 100], # Reduced options
'max_depth': [10, 20, None], # Reduced options
'min_samples_split': [2, 5] # Reduced options
},
"SVM": {
'C': [0.1, 1], # Reduced options
'gamma': ['scale', 0.1], # Reduced options
'kernel': ['rbf']
},
"Neural Network": {
'hidden_layer_sizes': [(50,), (100,)], # Reduced options
'activation': ['relu'], # Reduced options
'solver': ['adam'],
'alpha': [0.0001, 0.001] # Reduced options
}
}
# Combine train and validation splits for training the final models
X_train_val_combined = np.vstack((X_tr, X_val))
y_train_val_combined = pd.concat([y_tr, y_val])
# Train models for each feature selection method's result
for fs_method_name, fs_results in tqdm(feature_selection_results.items(), desc="Training Models per FS Method"):
selected_mask = fs_results.get('selected_mask')
if selected_mask is None or np.sum(selected_mask) == 0:
print(f"\nSkipping model training for {fs_method_name} as no features were selected.")
final_models[fs_method_name] = {clf_name: "No Features Selected" for clf_name in classifiers.keys()}
final_model_training_time[fs_method_name] = {clf_name: 0 for clf_name in classifiers.keys()}
final_model_best_params[fs_method_name] = {clf_name: {} for clf_name in classifiers.keys()}
continue
num_selected_features = np.sum(selected_mask)
print(f"\nTraining models using features selected by {fs_method_name} ({num_selected_features} features)...")
# Select features from the combined training+validation data
X_train_val_selected = X_train_val_combined[:, np.where(selected_mask == 1)[0]]
# Check if selected data is valid for training
if X_train_val_selected.shape[0] < 2 or X_train_val_selected.shape[1] == 0:
print(f"  Skipping training for {fs_method_name}: Insufficient data or features ({X_train_val_selected.shape}).")
final_models[fs_method_name] = {clf_name: "Insufficient Data/Features" for clf_name in classifiers.keys()}
final_model_training_time[fs_method_name] = {clf_name: 0 for clf_name in classifiers.keys()}
final_model_best_params[fs_method_name] = {clf_name: {} for clf_name in classifiers.keys()}
continue
final_models[fs_method_name] = {}
final_model_training_time[fs_method_name] = {}
final_model_best_params[fs_method_name] = {}
# Train each classifier
for clf_name, clf_model in tqdm(classifiers.items(), desc=f"  Training Classifiers for {fs_method_name}", leave=False):
# print(f"  Training {clf_name}...") # Use inner tqdm description instead
start_time = time.time()
# Use RandomizedSearchCV for tuning on the selected features
# n_iter controls the number of parameter combinations sampled. Reduce for faster run.
# cv=3 or 5 for cross-validation on the combined train+val set
# error_score='raise' helps debug issues with parameters
random_search = RandomizedSearchCV(
clf_model,
param_distributions=param_distributions[clf_name],
n_iter=3, # Reduced n_iter for faster example run. Increase for better tuning.
cv=3,
scoring='accuracy',
random_state=42,
n_jobs=-1, # Use all available cores
verbose=0, # Reduce verbosity
error_score='raise' # Raise errors instead of ignoring
)
try:
# Fit on the selected features from the combined train+val set
random_search.fit(X_train_val_selected, y_train_val_combined)
end_time = time.time()
training_time = end_time - start_time
best_model = random_search.best_estimator_
best_params_found = random_search.best_params_
final_models[fs_method_name][clf_name] = best_model
final_model_training_time[fs_method_name][clf_name] = training_time
final_model_best_params[fs_method_name][clf_name] = best_params_found
# print(f"    Best CV accuracy for {clf_name}: {random_search.best_score_:.4f}") # Use inner tqdm description instead
# print(f"    Training time for {clf_name}: {training_time:.2f} seconds") # Use inner tqdm description instead
except Exception as e:
print(f"\n    Error training {clf_name} for {fs_method_name}: {e}")
print("    Skipping evaluation for this model.")
final_models[fs_method_name][clf_name] = "Training Failed" # Mark as failed
final_model_training_time[fs_method_name][clf_name] = 0
final_model_best_params[fs_method_name][clf_name] = {}
print("\n--- Final Model Training Complete ---")
else:
print("\nSkipping final model training as feature selection results are not available or test data is missing.")
final_models = {}
final_model_training_time = {}
final_model_best_params = {}
```

# # 8. Model Evaluation
#
# Evaluate the trained final models on the held-out test set (`X_test_p`, `y_test`) using the features selected by each method.
#
# Metrics include: Accuracy, Precision, Recall, F1-score, ROC AUC, Confusion Matrix, and False Positive Rate (FPR).

In [None]:
print("\n--- Starting Final Model Evaluation on Test Set ---")
final_evaluation_results = {}
# Proceed only if final models were trained and test data is available
if final_models and X_test_p.shape > 0:
for fs_method_name, clf_models in tqdm(final_models.items(), desc="Evaluating Models per FS Method"):
# print(f"\nEvaluating models trained with features from {fs_method_name}...") # Use outer tqdm description
selected_mask = feature_selection_results.get(fs_method_name, {}).get('selected_mask')
if selected_mask is None or np.sum(selected_mask) == 0:
# print(f"  Skipping evaluation for {fs_method_name} as no features were selected.") # Use outer tqdm description
final_evaluation_results[fs_method_name] = {clf_name: "No Features Selected" for clf_name in clf_models.keys()}
continue
# Select features from the test data
X_test_selected = X_test_p[:, np.where(selected_mask == 1)]
# Check if selected test data is valid for evaluation
if X_test_selected.shape == 0 or X_test_selected.shape == 0:
print(f"  Skipping evaluation for {fs_method_name}: Insufficient test data or features ({X_test_selected.shape}).")
final_evaluation_results[fs_method_name] = {clf_name: "Insufficient Test Data/Features" for clf_name in clf_models.keys()}
continue
final_evaluation_results[fs_method_name] = {}
for clf_name, model in tqdm(clf_models.items(), desc=f"  Evaluating Classifiers for {fs_method_name}", leave=False):
if isinstance(model, str): # Check if model training failed
# print(f"  Skipping evaluation for {clf_name} due to status: {model}.") # Use inner tqdm description
final_evaluation_results[fs_method_name][clf_name] = model
continue
# print(f"  Evaluating {clf_name}...") # Use inner tqdm description
try:
# Make predictions
y_pred = model.predict(X_test_selected)
# Get probabilities for ROC AUC (if supported)
y_prob = None
roc_auc = "N/A"
try:
if hasattr(model, "predict_proba"):
y_prob = model.predict_proba(X_test_selected)[:, 1]
roc_auc = roc_auc_score(y_test, y_prob)
elif hasattr(model, "decision_function"): # For SVM with linear kernel
y_prob = model.decision_function(X_test_selected)
roc_auc = roc_auc_score(y_test, y_prob)
except Exception as e:
# print(f"  Could not calculate ROC AUC for {clf_name}: {e}") # Uncomment for debugging
y_prob = None
roc_auc = "Error"
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred, output_dict=True, zero_division=0) # zero_division=0 to avoid warnings/errors
cm = confusion_matrix(y_test, y_pred)
# Calculate FPR: FP / (FP + TN)
# cm = [[TN, FP], [FN, TP]]
if cm.shape == (2, 2):
TN, FP, FN, TP = cm.ravel()
fpr = FP / (FP + TN) if (FP + TN) != 0 else 0
else:
# Handle cases where confusion matrix might not be 2x2 (e.g., only one class predicted)
# print(f"  Warning: Confusion matrix for {clf_name} is not 2x2: {cm.shape}. Cannot calculate FPR easily.") # Uncomment for debugging
fpr = "N/A"
final_evaluation_results[fs_method_name][clf_name] = {
'accuracy': accuracy,
'report': report,
'confusion_matrix': cm,
'roc_auc': roc_auc,
'fpr': fpr,
'num_features': np.sum(selected_mask)
}
# Print results for this classifier (optional, can rely on summary table)
# print(f"    Accuracy: {accuracy:.4f}")
# print(f"    FPR: {fpr:.4f}" if isinstance(fpr, float) else f"FPR: {fpr}")
# print(f"    ROC AUC: {roc_auc:.4f}" if isinstance(roc_auc, float) else f"ROC AUC: {roc_auc}")
# print("\nClassification Report:") # Uncomment for verbose
# print(classification_report(y_test, y_pred, zero_division=0))
# print("\nConfusion Matrix:") # Uncomment for verbose
# print(cm)
# Plot ROC curve if probabilities are available and ROC AUC was calculated
if y_prob is not None and isinstance(roc_auc, float):
try:
fpr_curve, tpr_curve, _ = roc_curve(y_test, y_prob)
plt.figure(figsize=(8, 6))
plt.plot(fpr_curve, tpr_curve, label=f'{clf_name} (AUC = {roc_auc:.4f})')
plt.plot(,, 'k--') # Random guess line
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(f'ROC Curve - {fs_method_name} + {clf_name}')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()
except Exception as e:
print(f"    Could not plot ROC curve for {clf_name}: {e}")
except Exception as e:
print(f"\n    Error during evaluation of {clf_name} for {fs_method_name}: {e}")
final_evaluation_results[fs_method_name][clf_name] = "Evaluation Failed"
print("\n--- Final Model Evaluation Complete ---")
# Summarize results across all FS methods and classifiers
print("\n--- Summary of Test Set Results ---")
summary_data = []
for fs_method_name, clf_results in final_evaluation_results.items():
for clf_name, metrics in clf_results.items():
if isinstance(metrics, str):
summary_data.append({
'FS Method': fs_method_name,
'Classifier': clf_name,
'Status': metrics,
'Num Features': feature_selection_results.get(fs_method_name, {}).get('num_features', 0),
'Accuracy': np.nan, 'FPR': np.nan, 'ROC AUC': np.nan,
'Precision (Attack)': np.nan, 'Recall (Attack)': np.nan, 'F1-Score (Attack)': np.nan,
'FS Time (s)': feature_selection_results.get(fs_method_name, {}).get('time', np.nan),
'Train Time (s)': final_model_training_time.get(fs_method_name, {}).get(clf_name, np.nan)
})
else:
summary_data.append({
'FS Method': fs_method_name,
'Classifier': clf_name,
'Status': 'Success',
'Num Features': metrics.get('num_features', np.nan),
'Accuracy': metrics.get('accuracy', np.nan),
'FPR': metrics.get('fpr', np.nan) if isinstance(metrics.get('fpr'), float) else np.nan,
'ROC AUC': metrics.get('roc_auc', np.nan) if isinstance(metrics.get('roc_auc'), float) else np.nan,
'Precision (Attack)': metrics['report'].get('1', {}).get('precision', np.nan),
'Recall (Attack)': metrics['report'].get('1', {}).get('recall', np.nan),
'F1-Score (Attack)': metrics['report'].get('1', {}).get('f1-score', np.nan),
'FS Time (s)': feature_selection_results.get(fs_method_name, {}).get('time', np.nan),
'Train Time (s)': final_model_training_time.get(fs_method_name, {}).get(clf_name, np.nan)
})
summary_df = pd.DataFrame(summary_data)
print(summary_df.to_string()) # Print full dataframe
else:
print("\nSkipping final model evaluation as models were not trained or test data is not available.")
summary_df = pd.DataFrame() # Ensure summary_df exists
```

# # 9. Implement Plausible Hybrid Swarm Optimizer for Benchmarks
#
# Implement a plausible hybrid ACO+PSO+ABC+MWPA algorithm for continuous optimization benchmark functions.
#
# - This class `HybridSwarmOptimizer` manages the population and the iteration process for continuous variables.
# - It minimizes the objective function.
# - The `update_agents` method implements the hybridization logic, attempting to combine the update rules for continuous space. This implementation uses a probabilistic approach where each agent applies one of the component-inspired updates (ACO, PSO+MWPA, ABC).
#
# **Note:** This is a *plausible interpretation* of how these algorithms could be hybridized for continuous optimization, inspired by the abstract's equation. Your specific novel algorithm might differ. You should replace the logic inside `initialize_agents` and `update_agents` with your precise implementation if needed.

In [None]:
import scipy.optimize
import time
# Define benchmark functions (to be minimized)
def sphere(x):
"""Sphere function: f(x) = sum(x_i^2). Global minimum 0 at x = [0, ..., 0]."""
x = np.asarray(x) # Ensure x is numpy array
return np.sum(x**2)
def rastrigin(x):
"""Rastrigin function: f(x) = 10*n + sum(x_i^2 - 10*cos(2*pi*x_i)). Global minimum 0 at x = [0, ..., 0]."""
x = np.asarray(x) # Ensure x is numpy array
n = len(x)
return 10 * n + np.sum(x**2 - 10 * np.cos(2 * np.pi * x))
def rosenbrock(x):
"""Rosenbrock function: f(x) = sum(100*(x_{i+1} - x_i^2)^2 + (x_i - 1)^2). Global minimum 0 at x = [1, ..., 1]."""
x = np.asarray(x) # Ensure x is numpy array
# Ensure x has at least 2 dimensions for the calculation
if len(x) < 2:
# Handle 1D case or return infinity/error - Rosenbrock is typically multi-dimensional
# Returning a large value indicates it's not the minimum
return np.sum((x - 1)**2) if len(x) == 1 else np.inf
return np.sum(100.0 * (x[1:] - x[:-1]**2.0)**2.0 + (x[:-1] - 1)**2.0)
# Define search space bounds for the functions (example bounds)
# Sphere and Rastrigin are often evaluated on [-5.12, 5.12]
# Rosenbrock is often evaluated on [-5, 10] or [-2.048, 2.048]
n_dimensions_benchmark = 10 # Example number of dimensions for benchmark functions
bounds_sphere = [(-5.12, 5.12)] * n_dimensions_benchmark
bounds_rastrigin = [(-5.12, 5.12)] * n_dimensions_benchmark
bounds_rosenbrock = [(-2.048, 2.048)] * n_dimensions_benchmark
# --- Plausible Hybrid Swarm Algorithm for Minimization ---
class HybridSwarmOptimizer:
def __init__(self, objective_func, bounds, n_agents=50, max_iter=100,
pso_w=0.7, pso_c1=1.5, pso_c2=1.5, aco_evap=0.1, aco_deposit=0.1,
abc_limit=5, mwpa_beta=1.5, hybrid_probs=[0.33, 0.33, 0.34]): # Probabilities for [ACO, PSO+MWPA, ABC]
"""
Initializes the hybrid swarm intelligence optimizer for continuous problems.
Uses a probabilistic approach to apply component-inspired updates.
"""
self.objective_func = objective_func
self.bounds = np.array(bounds) # Convert bounds to numpy array
self.n_agents = n_agents
self.max_iter = max_iter
self.n_dimensions = len(bounds)
# Algorithm parameters
self.pso_w = pso_w
self.pso_c1 = pso_c1
self.pso_c2 = pso_c2
self.aco_evap = aco_evap
self.aco_deposit = aco_deposit
self.abc_limit = abc_limit
self.mwpa_beta = mwpa_beta
self.hybrid_probs = hybrid_probs # Probabilities for selecting update type
# Agent state (continuous positions)
self.agents_pos = np.random.uniform(self.bounds[:, 0], self.bounds[:, 1], size=(self.n_agents, self.n_dimensions))
self.velocities = np.zeros_like(self.agents_pos) # For PSO part
self.pbest_pos = self.agents_pos.copy()
self.pbest_value = np.full(self.n_agents, np.inf)
# Global best (shared)
self.gbest_pos = np.random.uniform(self.bounds[:, 0], self.bounds[:, 1], size=self.n_dimensions) # Initialize randomly
self.gbest_value = self.objective_func(self.gbest_pos) # Evaluate initial best
self.best_position = self.gbest_pos.copy() # Overall best position
self.best_value = self.gbest_value # Overall best value
# ACO components (pheromone trail - adapted for continuous space, e.g., on regions or best paths)
# A simple adaptation: pheromones on dimensions, influencing step size/direction
self.pheromones = np.ones(self.n_dimensions) * 0.1
# ABC components (trial counts for scout phase)
self.trials = np.zeros(self.n_agents, dtype=int)
# MWPA components (alpha wolf is the global best)
# Hunting coefficient A will be calculated per iteration/agent
self.history = [self.best_value] # To track convergence
self.computational_time = 0 # To track time
print(f"HybridSwarmOptimizer initialized for {objective_func.__name__}.")
def initialize_agents(self):
"""
Initialize the population of agents (positions) and algorithm-specific components.
Agents should be initialized within the specified bounds.
"""
# Initialize agents randomly within bounds
self.agents_pos = np.random.uniform(
self.bounds[:, 0],
self.bounds[:, 1],
size=(self.n_agents, self.n_dimensions)
)
self.velocities = np.zeros_like(self.agents_pos) # For PSO part
self.pbest_pos = self.agents_pos.copy()
self.pbest_value = np.full(self.n_agents, np.inf)
# Initialize pheromones
self.pheromones = np.ones(self.n_dimensions) * 0.1
# Initialize trial counts for ABC
self.trials = np.zeros(self.n_agents, dtype=int)
# Evaluate initial population to set initial personal and global bests
initial_values = [self.objective_func(pos) for pos in self.agents_pos]
# Find initial best (MINIMIZING value)
best_initial_idx = np.argmin(initial_values)
self.gbest_value = initial_values[best_initial_idx]
self.gbest_pos = self.agents_pos[best_initial_idx].copy()
self.best_value = self.gbest_value
self.best_position = self.gbest_pos.copy()
self.pbest_value = np.array(initial_values)
def update_agents(self, iteration):
"""
Perform one iteration of the hybrid swarm intelligence algorithm for minimization.
Applies component-inspired updates probabilistically to each agent.
"""
new_agents_pos = self.agents_pos.copy()
new_velocities = self.velocities.copy()
current_values = np.full(self.n_agents, np.inf) # To store values of updated agents
# Update inertia weight (linear decay)
current_w = self.pso_w * (self.max_iter - iteration) / self.max_iter
# Update hunting coefficient A for MWPA (linear decay)
current_A = 2 * (1 - iteration / self.max_iter)
for i in range(self.n_agents):
# Probabilistically choose which algorithm's update to apply
choice = np.random.choice(['ACO', 'PSO+MWPA', 'ABC'], p=self.hybrid_probs)
current_pos = self.agents_pos[i]
current_vel = self.velocities[i]
pbest_pos_i = self.pbest_pos[i]
gbest_pos_i = self.gbest_pos # Global best is the alpha wolf/prey
if choice == 'PSO+MWPA':
# --- PSO + MWPA Velocity Update (based on abstract equation form) ---
r1, r2 = np.random.rand(self.n_dimensions), np.random.rand(self.n_dimensions) # Random vectors for PSO
r3 = np.random.rand() # Random scalar for MWPA D calculation
# PSO terms
pso_vel_term = self.pso_c1 * r1 * (pbest_pos_i - current_pos) + self.pso_c2 * r2 * (gbest_pos_i - current_pos)
# MWPA term (adapted from A*D^beta, added to velocity)
# Distance to alpha (gbest)
D = np.abs(2 * r3 * gbest_pos_i - current_pos)
# The abstract equation adds A*D^alpha to velocity. Let's use A * (D**beta)
mwpa_vel_term = current_A * (D**self.mwpa_beta) * np.random.uniform(-1, 1, self.n_dimensions) # Add randomness and direction
# Combined velocity update
new_velocities[i] = current_w * current_vel + pso_vel_term + mwpa_vel_term
# Update position
new_agents_pos[i] = current_pos + new_velocities[i]
elif choice == 'ABC':
# --- ABC Search Update ---
# Select a random neighbor agent (j != i)
if self.n_agents > 1:
neighbor_idx = np.random.choice([j for j in range(self.n_agents) if j != i])
else:
neighbor_idx = i # If only one, compare to itself (no change)
phi = np.random.uniform(-1, 1, self.n_dimensions)
new_agents_pos[i] = current_pos + phi * (current_pos - self.agents_pos[neighbor_idx])
new_velocities[i] = np.zeros_like(current_vel) # Reset velocity after ABC move
elif choice == 'ACO':
# --- ACO-inspired Update (for continuous space) ---
# Use pheromones to bias a random walk or step towards gbest
# Probability of moving towards gbest vs random walk could depend on pheromones?
# Simple approach: Add a step biased by pheromones and gbest
prob = (self.pheromones + 1e-9) / (np.sum(self.pheromones) + 1e-9)
# Step direction biased by pheromones (more likely to step in dimensions with high pheromone)
biased_random_step = np.random.rand(self.n_dimensions) * prob * (self.bounds[:,1] - self.bounds[:,0]) * 0.01 # Step size scaled by prob and bounds range
# Move towards gbest, scaled by a random factor and potentially pheromones
move_towards_gbest = np.random.rand(self.n_dimensions) * (gbest_pos_i - current_pos) * prob * 0.01
new_agents_pos[i] = current_pos + biased_random_step + move_towards_gbest
new_velocities[i] = np.zeros_like(current_vel) # Reset velocity after ACO move
# Apply bounds to positions after update
new_agents_pos[i] = np.clip(new_agents_pos[i], self.bounds[:, 0], self.bounds[:, 1])
# Evaluate value of the new continuous position
current_values[i] = self.objective_func(new_agents_pos[i])
# --- Update Personal Best (PSO/Hybrid) ---
# MINIMIZING value
if current_values[i] < self.pbest_value[i]:
self.pbest_value[i] = current_values[i]
self.pbest_pos[i] = new_agents_pos[i].copy()
# --- Update Trial Count (ABC) ---
# If the position improved value, reset trial count. Otherwise, increment.
# This requires comparing to the value *before* the update.
# A simpler approach in a hybrid: if value improved, reset trials.
if current_values[i] < self.objective_func(current_pos):
self.trials[i] = 0
else:
self.trials[i] += 1
# --- Apply Global Updates (after all agents are updated) ---
self.agents_pos = new_agents_pos
self.velocities = new_velocities
# Update Global Best (PSO/MWPA/Hybrid - based on MINIMIZING value)
best_current_idx = np.argmin(current_values)
if current_values[best_current_idx] < self.gbest_value:
self.gbest_value = current_values[best_current_idx]
self.gbest_pos = self.agents_pos[best_current_idx].copy()
self.best_value = self.gbest_value # Update overall best value
self.best_position = self.gbest_pos.copy() # Update overall best position
# --- ACO Pheromone Update ---
# Evaporation
self.pheromones *= (1 - self.aco_evap)
# Deposit pheromone based on the global best position (e.g., deposit more on dimensions close to gbest)
# A simple approach: deposit inversely proportional to distance from gbest (within bounds)
# Normalize distance to be between 0 and 1
range_bounds = self.bounds[:, 1] - self.bounds[:, 0]
normalized_distance_to_gbest = np.abs(self.gbest_pos - self.bounds[:, 0]) / range_bounds
# Deposit more where normalized distance is small (closer to one bound, assuming gbest is near an optimum)
# Or deposit more where the position is "good" - e.g., close to the center if optimum is central, or near bounds if optimum is there.
# A simpler approach: deposit based on how "good" the gbest position is in each dimension (e.g., closer to 0 for Sphere/Rastrigin, closer to 1 for Rosenbrock)
# Let's deposit based on the inverse of the absolute value of gbest position (for functions like Sphere/Rastrigin where optimum is 0)
# This is a very simplified continuous ACO pheromone update.
deposit_amount_per_dim = self.aco_deposit / (np.abs(self.gbest_pos) + 1e-9) # Avoid division by zero
deposit_amount_per_dim = np.clip(deposit_amount_per_dim, 0, 1) # Clip deposit amount
self.pheromones += deposit_amount_per_dim # Add deposit
self.pheromones = np.maximum(self.pheromones, 1e-2) # Keep pheromones slightly positive
# --- ABC Scout Phase ---
# Identify exhausted agents (trials > limit) and replace them
scout_indices = np.where(self.trials >= self.limit)[0]
for i in scout_indices:
# Replace with a new randomly generated position within bounds
self.agents_pos[i] = np.random.uniform(self.bounds[:, 0], self.bounds[:, 1], size=self.n_dimensions)
self.velocities[i] = np.zeros(self.n_dimensions) # Reset velocity
self.trials[i] = 0 # Reset trial count
# Re-evaluate value for the new agent and update personal/global bests if needed
new_value = self.objective_func(self.agents_pos[i])
self.pbest_value[i] = new_value # New position is the new personal best
self.pbest_pos[i] = self.agents_pos[i].copy()
if new_value < self.gbest_value:
self.gbest_value = new_value
self.gbest_pos = self.agents_pos[i].copy()
self.best_value = self.gbest_value
self.best_position = self.gbest_pos.copy()
def run(self):
"""Runs the hybrid swarm intelligence algorithm for minimization."""
if self.n_dimensions == 0:
print(f"\nNo dimensions defined for {self.objective_func.__name__}. Skipping benchmark.")
self.best_position = np.array([])
self.best_value = np.inf
self.history = []
self.computational_time = 0
return self.best_position, self.best_value, self.history, self.computational_time
self.initialize_agents()
self.history = [self.best_value] # Start history with initial best
# print(f"\nStarting Hybrid Swarm Optimization for {self.objective_func.__name__}...") # Moved print outside
start_time = time.time()
for iteration in range(self.max_iter):
self.update_agents(iteration) # Pass iteration number if needed for parameters (e.g., inertia)
# Append current best value (could be the same as previous)
self.history.append(self.best_value)
# print(f"  Hybrid Bench Iter {iteration+1}/{self.max_iter}, Best Value: {self.best_value:.4e}") # Uncomment for verbose
end_time = time.time()
self.computational_time = end_time - start_time
# print(f"Optimization finished for {self.objective_func.__name__}.") # Moved print outside
# print(f"Best Value found: {self.best_value:.4e}") # Moved print outside
# print(f"Computational time: {self.computational_time:.2f} seconds") # Moved print outside
return self.best_position, self.best_value, self.history, self.computational_time
# --- End of Plausible Hybrid Swarm Algorithm for Minimization ---
else:
print("\nSkipping hybrid swarm optimizer definition as data was not loaded.")
# Define a dummy class to avoid errors if instantiated
class HybridSwarmOptimizer:
def __init__(self, *args, **kwargs): pass
def run(self, *args, **kwargs):
print("Hybrid optimizer called but data not loaded.")
return np.array([]), np.inf, [], 0
```

# # 10. Benchmark Function Analysis
#
# Evaluate the performance of the hybrid swarm intelligence algorithm on standard optimization benchmark functions like Sphere, Rastrigin, and Rosenbrock. This helps assess its global search and convergence capabilities independently of the feature selection problem.
#
# **Concept:**
# - Define the benchmark functions (mathematical functions with known global minima).
# - Run the hybrid algorithm (`HybridSwarmOptimizer`) on each benchmark function and track the convergence towards the known minimum.
# - Measure computational efficiency.
# - **For comparison (Optional, requires implementing individual continuous algorithms):** Run individual continuous ACO, PSO, ABC, and MWPA algorithms on the same benchmarks.

In [None]:
# Define benchmark functions (already defined in Step 9)
# sphere, rastrigin, rosenbrock
# Define search space bounds (already defined in Step 9)
# bounds_sphere, bounds_rastrigin, bounds_rosenbrock, n_dimensions_benchmark
# Run optimization on benchmark functions using the HybridSwarmOptimizer
benchmark_results = {}
benchmark_bounds = {
'Sphere': bounds_sphere,
'Rastrigin': bounds_rastrigin,
'Rosenbrock': bounds_rosenbrock
}
if n_dimensions_benchmark > 0:
print("\n--- Running Benchmark Function Analysis (Hybrid) ---")
# Parameters for benchmark runs (can be adjusted)
bench_run_params = {
"n_agents": 30,
"max_iter": 100, # More iterations for optimization benchmarks
# Add specific parameters for the hybrid optimizer here if needed
}
for name, func in tqdm(benchmark_bounds.items(), desc="Running Benchmarks"):
bounds = benchmark_bounds[name]
if len(bounds) == 0:
print(f"  Skipping {name}: Bounds not defined.")
benchmark_results[name] = {'best': np.inf, 'hist': [], 'time': 0, 'status': 'No Bounds'}
continue
print(f"\nRunning Hybrid Optimizer on {name}...")
try:
optimizer = HybridSwarmOptimizer(func, bounds, **bench_run_params)
sol, val, hist, dt = optimizer.run()
benchmark_results[name] = {'best': val, 'hist': hist, 'time': dt, 'status': 'Success'}
print(f"{name} completed. Best Value: {val:.4e}, Time: {dt:.2f}s")
except Exception as e:
print(f"Error running Hybrid Optimizer on {name}: {e}")
benchmark_results[name] = {'best': np.inf, 'hist': [], 'time': 0, 'status': f'Error: {e}'}
print(f"{name} failed.")
print("\n--- Benchmark Function Analysis Complete ---")
# Plot benchmark convergence
plt.figure(figsize=(12, 8))
for name, res in benchmark_results.items():
if res['hist']:
plt.plot(res['hist'], label=f'{name} (Best: {res["best"]:.4e})')
plt.xlabel("Iteration")
plt.ylabel("Best Objective Function Value")
plt.title("Hybrid Swarm Intelligence Convergence on Benchmark Functions")
plt.yscale('log') # Use log scale for better visualization of convergence
plt.legend()
plt.grid(True)
plt.show()
# Summarize benchmark results
print("\nBenchmark Summary (Hybrid):")
bench_summary_data = []
for name, res in benchmark_results.items():
bench_summary_data.append({
'Function': name,
'Status': res.get('status', 'Success'),
'Best Value': res['best'],
'Time (s)': res['time']
})
bench_summary_df = pd.DataFrame(bench_summary_data)
print(bench_summary_df.to_string())
else:
print("\nSkipping benchmark function analysis as dimensions are not defined.")
benchmark_results = {}
bench_summary_df = pd.DataFrame()
```

# # 11. Component-wise Analysis
#
# Study the impact of different components and parameters of the hybrid algorithm, as suggested by the abstract (inertia weight tuning, predatory impact, crossover strategy).
#
# **Concept:**
# - Run your custom hybrid algorithm implementations (from Steps 5 and 9) multiple times with different configurations.
# - Configurations could involve:
#     - Varying key parameters (e.g., PSO inertia weight strategy, MWPA parameters, ACO parameters, ABC parameters).
#     - Testing different hybridization strategies (if you designed multiple ways to combine the algorithms).
# - Collect results for each configuration:
#     - For Feature Selection: Final classification metrics (Accuracy, FPR, etc. from Step 8 using a fixed classifier), number of selected features, convergence history, computational time.
#     - For Benchmark Functions: Final best objective value, convergence history, computational time.
# - Compare the results to understand the contribution and sensitivity of different components and parameters.
#
# **Implementation Notes:**
# - This requires implementing the logic within your `HybridSwarmFeatureSelector` and `HybridSwarmOptimizer` classes to allow different components/strategies to be enabled or parameterized via the `**kwargs` in their `__init__` and potentially parameters passed to `update_agents`.
# - Then, write code to loop through different configurations, run the algorithm, collect results, and compare them.
# - You would also ideally implement individual continuous versions of ACO, PSO, ABC, and MWPA to compare against the hybrid on benchmark functions.

In [None]:
# --- Component-wise Analysis (Requires Running Configurable Hybrid) ---
print("\n--- Component-wise Analysis ---")
print("This section requires running experiments with variations of the hybrid algorithm.")
print("You need to modify the HybridSwarmFeatureSelector and HybridSwarmOptimizer classes")
print("to accept parameters that control component behavior and hybridization strategy.")
print("Then, define different configurations and run the algorithms.")
# Example: Outline of how you might define configurations and run experiments
# analysis_configs_fs = [
#     {'name': 'Hybrid (Default)', 'n_agents': 30, 'max_iter': 50, 'hybrid_probs': [0.33, 0.33, 0.34]},
#     {'name': 'Hybrid (More PSO+MWPA)', 'n_agents': 30, 'max_iter': 50, 'hybrid_probs': [0.1, 0.6, 0.3]},
#     {'name': 'Hybrid (Adaptive Inertia)', 'n_agents': 30, 'max_iter': 50, 'pso_w_strategy': 'adaptive'}, # Requires implementing 'pso_w_strategy' in HybridSwarmFeatureSelector
#     # Add configurations to test MWPA parameters, ACO parameters, ABC parameters, etc.
# ]
# analysis_results_fs = {}
# if data_loaded and X_tr.shape[0] > 0 and X_test_p.shape[0] > 0:
#     print("\nStarting Component-wise Analysis (Feature Selection)...")
#     # You would loop through analysis_configs_fs and run HybridSwarmFeatureSelector for each
#     # Then evaluate the resulting feature masks using a fixed classifier (e.g., the best RF from Step 7)
#     # and compare metrics like accuracy, FPR, num_features, and time.
#     print("Placeholder: Implement the loop and evaluation for FS component analysis.")
# else:
#      print("\nSkipping Component-wise Analysis (Feature Selection) as data is missing.")
# analysis_configs_bench = [
#      {'name': 'Hybrid (Default)', 'n_agents': 30, 'max_iter': 100, 'hybrid_probs': [0.33, 0.33, 0.34]},
#      {'name': 'Hybrid (More Predation)', 'n_agents': 30, 'max_iter': 100, 'mwpa_beta': 2.0}, # Requires implementing 'mwpa_beta' in HybridSwarmOptimizer
#      {'name': 'PSO Only (for comparison)', 'n_agents': 30, 'max_iter': 100, 'hybrid_probs': [0, 1, 0]}, # Example: effectively run PSO+MWPA only
#      # Add configurations to test other parameters or compare to individual algorithms
# ]
# analysis_results_bench = {}
# if n_dimensions_benchmark > 0:
#     print("\nStarting Component-wise Analysis (Benchmark Functions)...")
#     # You would loop through analysis_configs_bench and run HybridSwarmOptimizer for each on a benchmark function (e.g., Sphere)
#     # Then compare metrics like best value found, convergence history, and time.
#     print("Placeholder: Implement the loop and evaluation for Benchmark component analysis.")
# else:
#      print("\nSkipping Component-wise Analysis (Benchmark Functions) as dimensions are not defined.")
print("\nComponent-wise Analysis requires implementing the configurable hybrid algorithm and running experiments here.")
```

# # 12. Presentation and Demonstration
#
# Use this Jupyter Notebook and visualizations to present the methodology, results, and analysis, following the structure outlined in the plan and abstract.
#
# - Structure the notebook logically (as done here).
# - Use markdown cells for explanations of each step, the algorithms, the hybrid strategy (once implemented), and the results.
# - Use code cells for implementation and execution.
# - Include plots for:
#     - Hybrid algorithm convergence (feature selection fitness over iterations) - Done in Step 6.
#     - Hybrid algorithm convergence (benchmark function value over iterations) - Done in Step 10.
#     - ROC curves for the final classifiers - Done in Step 8.
#     - Comparison of evaluation metrics (Accuracy, FPR, F1-score, etc.) across different classifiers and different feature selection methods.
#     - (If implemented) Comparison plots from the component-wise analysis (e.g., convergence curves for different configurations, bar plots of final metrics/values).
# - Document the full workflow from data acquisition to final evaluation and analysis.

In [None]:
# This notebook itself serves as the presentation structure.
# Ensure all previous sections are clear, well-commented, and include relevant visualizations.
# Example: Visualizing comparison of final evaluation metrics from Step 8
if not summary_df.empty:
print("\n--- Visualizing Final Model Evaluation Results ---")
# Filter for successful runs
success_df = summary_df[summary_df['Status'] == 'Success'].copy()
if not success_df.empty:
# Pivot the table for easier plotting
# We'll plot metrics for each Classifier, grouped by FS Method
metrics_to_plot = ['Accuracy', 'FPR', 'F1-Score (Attack)', 'Num Features', 'FS Time (s)', 'Train Time (s)']
for metric in metrics_to_plot:
if metric in success_df.columns:
plt.figure(figsize=(14, 7))
# Create a bar plot for the metric, with FS Method and Classifier on the x-axis
# Use seaborn for better grouping
sns.barplot(data=success_df, x='FS Method', y=metric, hue='Classifier', palette='viridis')
plt.title(f'{metric} Comparison by Feature Selection Method and Classifier')
plt.ylabel(metric)
plt.xticks(rotation=45, ha='right')
plt.legend(title='Classifier', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(axis='y')
plt.tight_layout()
plt.show()
else:
print(f"Warning: Metric '{metric}' not found in summary data.")
else:
print("No successful evaluation results to plot.")
# Example: Visualizing comparison of benchmark results from Step 10
if benchmark_results and not bench_summary_df.empty:
print("\n--- Visualizing Benchmark Optimization Results (Hybrid) ---")
# Filter for successful runs
bench_success_df = bench_summary_df[bench_summary_df['Status'] == 'Success'].copy()
if not bench_success_df.empty:
bench_success_df['Best Value'].plot(kind='bar', figsize=(8, 5), color='skyblue')
plt.title("Best Value Found per Benchmark Function (Hybrid)")
plt.ylabel("Best Value (Log Scale)")
plt.yscale('log')
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y')
plt.tight_layout()
plt.show()
bench_success_df['Time (s)'].plot(kind='bar', figsize=(8, 5), color='lightgreen')
plt.title("Computational Time per Benchmark Function (Hybrid)")
plt.ylabel("Time (seconds)")
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y')
plt.tight_layout()
plt.show()
else:
print("No successful benchmark results to plot.")
# Ensure all steps from data loading to final evaluation are documented in markdown.
# Explain the hybrid algorithm concept and your specific implementation choices (when implemented).
```

# # 13. Tools and Libraries
#
# The following libraries were used or are intended for use in this project:
#
# - **Pandas:** Data loading and manipulation.
# - **NumPy:** Numerical operations, array handling.
# - **Scikit-learn:** Data preprocessing, model training (Random Forest, SVM, MLP), evaluation metrics, train/test split, hyperparameter tuning.
# - **Matplotlib & Seaborn:** Data visualization.
# - **SciPy:** Scientific computing, used here for benchmark functions.
# - **Requests & StringIO:** Fetching data from URLs.
# - **Tqdm:** Progress bars.
# - **PySwarm:** (Optional/Potential) A library for Particle Swarm Optimization. Could be a starting point for the PSO component or comparison. (Not directly used in the provided individual/hybrid code but listed in original `pip install`).

In [None]:
# All necessary imports are listed at the beginning of the relevant sections.
# Ensure your environment has these libraries installed:
# pip install pandas numpy scikit-learn matplotlib seaborn scipy requests tqdm
print("\nRequired Libraries:")
print("- pandas")
print("- numpy")
print("- scikit-learn")
print("- matplotlib")
print("- seaborn")
print("- scipy")
print("- requests")
print("- io (built-in)")
print("- time (built-in)")
print("- sys (built-in)")
print("- tqdm")
# print("- pyswarm (Optional, for PSO component or comparison)")
print("\nNotebook structure complete.")
print("Remember to replace the plausible hybrid implementations in Step 5 and Step 9")
print("with your specific novel algorithm if it differs.")
print("Step 11 requires implementing the configurable hybrid algorithm and running experiments.")
```