# Important libraries

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC
from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder
from sklearn.metrics import make_scorer, f1_score, accuracy_score, precision_score, recall_score, roc_auc_score, classification_report
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import time
import random
import kagglehub
import os
import warnings
from time import time

In [None]:
# Filter out specific DeprecationWarning from scikit-learn/numpy dtype check
warnings.filterwarnings("ignore", category=DeprecationWarning, module='sklearn')

In [None]:
# Define path for saving results incrementally
output_csv_path = '/kaggle/working/all_svm_tuning_results.csv'

# Load existing results if file exists, otherwise create empty DataFrame
if os.path.exists(output_csv_path):
    print(f"Loading existing results from {output_csv_path}")
    all_results_df = pd.read_csv(output_csv_path)
else:
    print("No existing results file found. Starting fresh.")
    all_results_df = pd.DataFrame()

No existing results file found. Starting fresh.


### Downloading Dataset

In [None]:
path = kagglehub.dataset_download("wenruliu/adult-income-dataset")
import os
csv_file_path = None
expected_file = 'adult.csv'
for root, _, files in os.walk(path):
    if expected_file in files:
        csv_file_path = os.path.join(root, expected_file)
        break

df = pd.read_csv(csv_file_path, na_values=['?'])

### Take subset. Drop and rename columns. Define x, y

In [None]:
df_processed = df.copy()

SUBSET_SIZE = 5000
df_processed = df_processed.sample(n=SUBSET_SIZE, random_state=42).reset_index(drop=True)
TARGET_COLUMN = 'income'
NEW_TARGET_NAME = 'Outcome'
y = df_processed[TARGET_COLUMN]
X = df_processed.drop(TARGET_COLUMN, axis=1)
COLUMNS_TO_DROP = ['fnlwgt']
if 'ID' in X.columns:
    COLUMNS_TO_DROP.append('ID')
if 'policy_id' in X.columns:
    COLUMNS_TO_DROP.append('policy_id')

X = X.drop(columns=COLUMNS_TO_DROP, errors='ignore')
y = y.rename(NEW_TARGET_NAME)

### Encode categorical columns

In [None]:
if y.dtype == 'object':
    le = LabelEncoder()
    y = pd.Series(le.fit_transform(y), name=NEW_TARGET_NAME, index=y.index)

numerical_features = X.select_dtypes(include=np.number).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()

all_identified_features = numerical_features + categorical_features

### Split the dataset

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)


numerical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

transformers = []
if numerical_features:
    transformers.append(('num', numerical_pipeline, numerical_features))
if categorical_features:
    transformers.append(('cat', categorical_pipeline, categorical_features))

preprocessor = ColumnTransformer(transformers=transformers, remainder='passthrough')
preprocessor.fit(X_train)

X_train shape: (4000, 13)
y_train shape: (4000,)
X_test shape: (1000, 13)
y_test shape: (1000,)


### SVM Model Evaluation

In [None]:
def evaluate_classifier(model, X, y):
    y_pred = model.predict(X)
    try:
        if hasattr(model, 'predict_proba'):
             y_prob = model.predict_proba(X)[:, 1]
             if len(np.unique(y)) == 2:
                 auc = roc_auc_score(y, y_prob)
             else:
                 auc = np.nan
        else:
             auc = np.nan
    except Exception as e:
        # print(f"Warning: Could not calculate ROC AUC. Error: {e}")
        auc = np.nan

    metrics = {
        'accuracy': accuracy_score(y, y_pred),
        'precision': precision_score(y, y_pred),
        'recall': recall_score(y, y_pred),
        'f1_score': f1_score(y, y_pred),
        'roc_auc': auc
    }
    return metrics

### Baseline model without any tuning ways

In [None]:
print("--- Baseline SVM (Default Parameters) ---")
# Check if Baseline results already exist
# Need to check if 'Method' column exists first, otherwise check will fail on empty df
baseline_exists = False
if 'Method' in all_results_df.columns:
    if 'Baseline' in all_results_df['Method'].unique():
        baseline_exists = True

if not baseline_exists:
    svm_baseline_estimator = SVC(random_state=42, probability=True)
    baseline_pipeline = Pipeline([('preprocessor', preprocessor),('svc', svm_baseline_estimator)])
    print(baseline_pipeline.named_steps['svc'].get_params())
    start_time = time()
    baseline_pipeline.fit(X_train, y_train)
    train_time_baseline = time() - start_time
    print(f"\nBaseline model training time: {train_time_baseline:.4f} seconds")
    baseline_metrics = evaluate_classifier(baseline_pipeline, X_test, y_test)
    for metric, value in baseline_metrics.items(): print(f"{metric}: {value:.4f}")

    print("Saving Baseline results...")
    baseline_data = {
        'Method': 'Baseline',
        'Run': 1, # Baseline is like a single run
        'Seed': np.nan, # No specific run seed
        'Best Parameters': str(svm_baseline_estimator.get_params()), # Store default params
        'Best Training Val F1-Score': np.nan, # No validation tuning
        'Tuning Time (s)': train_time_baseline,
        'Test Accuracy': baseline_metrics.get('accuracy', np.nan),
        'Test F1-Score': baseline_metrics.get('f1_score', np.nan),
        'Test Precision': baseline_metrics.get('precision', np.nan),
        'Test Recall': baseline_metrics.get('recall', np.nan),
        'Test ROC AUC': baseline_metrics.get('roc_auc', np.nan)
    }
    # Convert single run dict to DataFrame to append/concat
    baseline_df_row = pd.DataFrame([baseline_data])
    # Append to the main DataFrame
    all_results_df = pd.concat([all_results_df, baseline_df_row], ignore_index=True)
    # Save immediately
    all_results_df.to_csv(output_csv_path, index=False)
    print("Baseline results saved.")
else:
    print("Baseline results already exist. Skipping.")

--- Baseline SVM (Default Parameters) ---
{'C': 1.0, 'break_ties': False, 'cache_size': 200, 'class_weight': None, 'coef0': 0.0, 'decision_function_shape': 'ovr', 'degree': 3, 'gamma': 'scale', 'kernel': 'rbf', 'max_iter': -1, 'probability': True, 'random_state': 42, 'shrinking': True, 'tol': 0.001, 'verbose': False}

Baseline model training time: 2.2975 seconds
accuracy: 0.8700
precision: 0.7865
recall: 0.6292
f1_score: 0.6991
roc_auc: 0.9041
Saving Baseline results...
Baseline results saved.


### Grid Search for parameter tuning

In [None]:
print("\n--- Traditional Tuning: Grid Search for SVM Parameters ---")
# Check if GridSearch results already exist
# Need to check if 'Method' column exists first, otherwise check will fail on empty df
gs_exists = False
if 'Method' in all_results_df.columns:
    if 'GridSearch' in all_results_df['Method'].unique():
        gs_exists = True

if not gs_exists:
    svm_gs_estimator = SVC(random_state=42, probability=True)
    gs_pipeline = Pipeline([('preprocessor', preprocessor),('svc', svm_gs_estimator)])
    param_grid = {'svc__C': [0.1, 1, 10],'svc__gamma': [0.01, 0.1, 'scale'],'svc__kernel': ['rbf']}
    print(param_grid)
    cv_strategy = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    grid_search = GridSearchCV(estimator=gs_pipeline,param_grid=param_grid,cv=cv_strategy,scoring='f1',n_jobs=-1,verbose=0)
    start_time = time()
    grid_search.fit(X_train, y_train)
    tuning_time_gs = time() - start_time
    best_params_gs = grid_search.best_params_
    best_svm_model_gs = grid_search.best_estimator_
    print(best_params_gs)
    print(f"Best cross-validation score: {grid_search.best_score_:.4f}") # Simpler print
    tuned_metrics_gs = evaluate_classifier(best_svm_model_gs, X_test, y_test)
    for metric, value in tuned_metrics_gs.items(): print(f"{metric}: {value:.4f}")

    print("Saving GridSearch results...")
    gs_data = {
        'Method': 'GridSearch',
        'Run': 1, # GridSearch is like a single run
        'Seed': np.nan, # No specific run seed
        'Best Parameters': str(best_params_gs), # Store best params found
        'Best Training Val F1-Score': grid_search.best_score_, # CV score
        'Tuning Time (s)': tuning_time_gs,
        'Test Accuracy': tuned_metrics_gs.get('accuracy', np.nan),
        'Test F1-Score': tuned_metrics_gs.get('f1_score', np.nan),
        'Test Precision': tuned_metrics_gs.get('precision', np.nan),
        'Test Recall': tuned_metrics_gs.get('recall', np.nan),
        'Test ROC AUC': tuned_metrics_gs.get('roc_auc', np.nan)
    }
    # Convert single run dict to DataFrame to append/concat
    gs_df_row = pd.DataFrame([gs_data])
    # Append to the main DataFrame
    all_results_df = pd.concat([all_results_df, gs_df_row], ignore_index=True)
    # Save immediately
    all_results_df.to_csv(output_csv_path, index=False)
    print("GridSearch results saved.")
else:
    print("GridSearch results already exist. Skipping.")


--- Traditional Tuning: Grid Search for SVM Parameters ---
{'svc__C': [0.1, 1, 10], 'svc__gamma': [0.01, 0.1, 'scale'], 'svc__kernel': ['rbf']}
{'svc__C': 10, 'svc__gamma': 0.01, 'svc__kernel': 'rbf'}
Best cross-validation score: 0.6572
accuracy: 0.8660
precision: 0.7650
recall: 0.6375
f1_score: 0.6955
roc_auc: 0.9080
Saving GridSearch results...
GridSearch results saved.


# PSO

In [None]:
# Transform training data once outside the optimization loop
X_train_transformed = preprocessor.transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

# Create a single validation split from the transformed training data for fast evaluation
X_train_small_transformed, X_val_transformed, y_train_small, y_val = train_test_split(
    X_train_transformed, y_train,
    test_size=0.25, # Use 25% for validation
    random_state=42,
    stratify=y_train # Stratify this split
)

# Define the parameter space for PSO (tuning C and gamma)
PARAM_RANGES_PSO = {'C': (0.01, 1000), 'gamma': (0.0001, 10)} # Use wider ranges
# Kernel is not tuned in this simple version, hardcoded to rbf
DIMENSION_PSO = len(PARAM_RANGES_PSO) # Dimension is 2

In [None]:
# Returns F1 score (maximization)
def svm_fitness_function_pso(params):
    C, gamma = params # Expecting [C, gamma]
    kernel_val = 'rbf' # Hardcoded kernel

    try:
        model = SVC(C=C, gamma=gamma, kernel=kernel_val, random_state=42)
        model.fit(X_train_small_transformed, y_train_small)

        preds = model.predict(X_val_transformed)
        score = f1_score(y_val, preds) # Using F1 score for fitness

        if np.isnan(score) or not np.isfinite(score):
             return -1.0 # Return a poor fitness
        return score
    except Exception as e:
        return -1.0 # Return a poor fitness if evaluation fails

In [None]:
# Particle Swarm Optimization (PSO) function
def PSO(fitness_func, bounds, X_test, y_test, preprocessor, evaluate_func, n_particles=50, max_iterations=10, early_stop=4, w=0.9, c1=1.5, c2=1.5):
    dim = len(bounds) # Dimension is 2 (C, gamma)
    min_bounds = np.array([b[0] for b in bounds])
    max_bounds = np.array([b[1] for b in bounds])

    # Initialize particles (positions and velocities)
    # Positions randomly initialized within bounds
    positions = min_bounds + np.random.rand(n_particles, dim) * (max_bounds - min_bounds)
    # Velocities randomly initialized (e.g., scaled by 10% of range)
    velocities = (max_bounds - min_bounds) * 0.1 * (np.random.rand(n_particles, dim) * 2 - 1)
    # Example max velocity (50% of range per dimension)
    max_vel = (max_bounds - min_bounds) * 0.5


    # Initialize personal bests
    pbest_positions = positions.copy()
    pbest_fitness = np.array([fitness_func(p) for p in positions])

    # Initialize global best
    gbest_index = np.argmax(pbest_fitness)
    gbest_position = pbest_positions[gbest_index].copy()
    gbest_fitness = pbest_fitness[gbest_index]

    no_improve_counter = 0
    best_fitness_history = [gbest_fitness] # Track history

    # PSO Main Loop
    for iteration in range(max_iterations):
        for i in range(n_particles):
            # Update velocity
            r1, r2 = np.random.rand(2, dim) # Random vectors for cognitive (personal) and social (global) components
            cognitive_velocity = c1 * r1 * (pbest_positions[i] - positions[i])
            social_velocity = c2 * r2 * (gbest_position - positions[i])
            velocities[i] = w * velocities[i] + cognitive_velocity + social_velocity

            # Clip velocity to max velocity bounds
            velocities[i] = np.clip(velocities[i], -max_vel, max_vel)


            # Update position
            positions[i] = positions[i] + velocities[i]

            # Clip position to search space bounds
            for k in range(dim):
                positions[i, k] = np.clip(positions[i, k], bounds[k][0], bounds[k][1])


            # Evaluate fitness of the new position
            current_fitness = fitness_func(positions[i])

            # Update personal best
            if current_fitness > pbest_fitness[i]:
                pbest_fitness[i] = current_fitness
                pbest_positions[i] = positions[i].copy()

        # Update global best after all particles have moved and updated personal bests
        current_gbest_index = np.argmax(pbest_fitness)
        current_gbest_fitness = pbest_fitness[current_gbest_index]

        if current_gbest_fitness > gbest_fitness:
            gbest_fitness = current_gbest_fitness
            gbest_position = pbest_positions[current_gbest_index].copy()
            no_improve_counter = 0 # Reset counter on improvement
        else:
            no_improve_counter += 1 # Increment counter if no improvement

        best_fitness_history.append(gbest_fitness) # Track history

        # Evaluate Test F1 for the current global best parameter set
        try:
            # Build and train a temporary model on the SMALLER training data
            temp_model = SVC(C=gbest_position[0], gamma=gbest_position[1], kernel='rbf', random_state=42)
            # Use a temporary pipeline for evaluation with original test data
            temp_pipeline = Pipeline([
                ('preprocessor', SimpleImputer()), # Need a placeholder preprocessor if evaluating directly on transformed data
                ('svc', temp_model)
            ])
            # If evaluate_func expects original X, need a full pipeline
            eval_model_pipeline = Pipeline([('preprocessor', preprocessor),('svc', SVC(C=gbest_position[0], gamma=gbest_position[1], kernel='rbf', random_state=42, probability=True))])
            eval_model_pipeline.fit(X_train, y_train) # Re-fit pipeline quickly for eval structure
            test_metrics_at_iteration = evaluate_func(eval_model_pipeline, X_test, y_test) # Evaluate on original X_test
            test_f1_at_iteration = test_metrics_at_iteration.get('f1_score', np.nan)
        except Exception as e:
             #print(f"Warning: evaluation during PSO failed {e}") # Optional debug
             test_f1_at_iteration = np.nan


        # Print per-iteration progress
        print(f"Iteration {iteration+1}/{max_iterations}, Best Val Fitness: {gbest_fitness:.4f}, Best Params: {gbest_position.round(4).tolist()}, Test F1: {test_f1_at_iteration:.4f}")


        # Check for convergence (no improvement)
        if no_improve_counter >= early_stop:
            print(f"Convergence reached. No improvement for {early_stop} iterations. Stopping early.")
            break

    # Return the final global best position and fitness
    return gbest_position, gbest_fitness

In [None]:
# Parameters for PSO (tuning C and gamma)
param_bounds_pso = [(0.01, 1000), (0.0001, 10)]

# Set global seed for overall reproducibility if needed, but run seeds handle per-run
global_random_state = 42
random.seed(global_random_state)
np.random.seed(global_random_state)

# Check if 'Method' column exists before filtering
method_col_exists = 'Method' in all_results_df.columns

for run in range(30):
    run_seed = run
    random.seed(run_seed)
    np.random.seed(run_seed)

    # Check if this specific PSO run already exists based on seed
    run_exists = False
    if method_col_exists:
        if not all_results_df[(all_results_df['Method'] == 'PSO') & (all_results_df['Seed'] == run_seed)].empty:
            run_exists = True

    if run_exists:
        print(f"--- Run {run+1}/30 for PSO already found. Skipping. ---")
        continue # Skip to the next iteration of the loop
    else:
         print(f"--- Running PSO Run {run+1}/30 ---")


    start_time = time()

    best_params_pso_run, best_fitness_pso_run = PSO(
        fitness_func=svm_fitness_function_pso, # Use the single split fitness function
        bounds=param_bounds_pso,
        X_test=X_test,
        y_test=y_test,
        preprocessor=preprocessor,
        evaluate_func=evaluate_classifier,
        n_particles=50,      # Example PSO parameter
        max_iterations=10,   # Example PSO parameter
        early_stop=4,       # Example PSO parameter
        w=0.9, c1=1.5, c2=1.5 # Example PSO parameters
    )
    tuning_time_pso_run = time() - start_time

    # Train the final SVM model with the best parameters found in this run on the *original* X_train
    if best_params_pso_run is not None:
        C_best, gamma_best = best_params_pso_run
        final_svm_params_pso_run = {'C': C_best, 'gamma': gamma_best, 'kernel': 'rbf'}

        final_svm_model_pso_run = Pipeline([
            ('preprocessor', preprocessor),
            ('svc', SVC(random_state=42, probability=True, **final_svm_params_pso_run))
        ])

        final_svm_model_pso_run.fit(X_train, y_train) # Fit the pipeline on ORIGINAL X_train, y_train

        # Evaluate on the test set ONE FINAL TIME
        final_test_metrics_pso_run = evaluate_classifier(final_svm_model_pso_run, X_test, y_test)

        pso_run_data = {
            'Method': 'PSO',
            'Run': run + 1,
            'Seed': run_seed,
            'Best Parameters': str(best_params_pso_run.tolist()), # Save params as string
            'Best Training Val F1-Score': best_fitness_pso_run, # This is the validation score from tuning
            'Tuning Time (s)': tuning_time_pso_run,
            'Test Accuracy': final_test_metrics_pso_run.get('accuracy', np.nan),
            'Test F1-Score': final_test_metrics_pso_run.get('f1_score', np.nan),
            'Test Precision': final_test_metrics_pso_run.get('precision', np.nan),
            'Test Recall': final_test_metrics_pso_run.get('recall', np.nan),
            'Test ROC AUC': final_test_metrics_pso_run.get('roc_auc', np.nan)
        }
        pso_df_row = pd.DataFrame([pso_run_data])
        all_results_df = pd.concat([all_results_df, pso_df_row], ignore_index=True)
        all_results_df.to_csv(output_csv_path, index=False) # Save after each run

        print(f"Run {run+1} Final: Best Params: {best_params_pso_run.round(4).tolist()}, Best Val F1: {best_fitness_pso_run:.4f}, Time: {tuning_time_pso_run:.2f}s, FINAL Test F1: {final_test_metrics_pso_run.get('f1_score', np.nan):.4f}. Saved to CSV.")

    else:
        # This case is less likely for PSO but included for robustness
        print(f"Run {run+1}: Algorithm failed to find valid parameters.")
        pso_run_data = {
            'Method': 'PSO',
            'Run': run + 1,
            'Seed': run_seed,
            'Best Parameters': None,
            'Best Training Val F1-Score': np.nan,
            'Tuning Time (s)': tuning_time_pso_run,
            'Test Accuracy': np.nan,
            'Test F1-Score': np.nan,
            'Test Precision': np.nan,
            'Test Recall': np.nan,
            'Test ROC AUC': np.nan
        }
        pso_df_row = pd.DataFrame([pso_run_data])
        all_results_df = pd.concat([all_results_df, pso_df_row], ignore_index=True)
        all_results_df.to_csv(output_csv_path, index=False) # Save even if failed

        print(f"Run {run+1}: Algorithm failed, Time: {tuning_time_pso_run:.2f}s. Saved failure info to CSV.")

print(f"\n--- Completed PSO Runs - Current Results ({output_csv_path}) ---")
if 'Method' in all_results_df.columns:
    print(all_results_df['Method'].value_counts())
else:
    print("Results DataFrame is currently empty.")

--- Running PSO Run 1/30 ---
Iteration 1/10, Best Val Fitness: 0.6636, Best Params: [749.6095, 0.0001], Test F1: 0.6667
Iteration 2/10, Best Val Fitness: 0.6636, Best Params: [800.4175, 0.0001], Test F1: 0.6621
Iteration 3/10, Best Val Fitness: 0.6636, Best Params: [800.4175, 0.0001], Test F1: 0.6621
Iteration 4/10, Best Val Fitness: 0.6651, Best Params: [803.2613, 0.0001], Test F1: 0.6621
Iteration 5/10, Best Val Fitness: 0.6651, Best Params: [803.2613, 0.0001], Test F1: 0.6621
Iteration 6/10, Best Val Fitness: 0.6651, Best Params: [803.2613, 0.0001], Test F1: 0.6621
Iteration 7/10, Best Val Fitness: 0.6651, Best Params: [803.2613, 0.0001], Test F1: 0.6621
Iteration 8/10, Best Val Fitness: 0.6651, Best Params: [803.2613, 0.0001], Test F1: 0.6621
Convergence reached. No improvement for 4 iterations. Stopping early.
Run 1 Final: Best Params: [803.2613, 0.0001], Best Val F1: 0.6651, Time: 235.66s, FINAL Test F1: 0.6621. Saved to CSV.
--- Running PSO Run 2/30 ---
Iteration 1/10, Best Val 

# ICA

### We discovered that ICA is not a natural-based algorithm after we finished it, so we did not mention it in our discussion.

In [None]:
# Re-define transformed data just in case (though should be same as PSO)
X_train_transformed = preprocessor.transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

# Re-define validation split just in case (though should be same as PSO)
X_train_small_transformed, X_val_transformed, y_train_small, y_val = train_test_split(
    X_train_transformed, y_train,
    test_size=0.25, # Use 25% for validation
    random_state=42,
    stratify=y_train # Stratify this split
)

# Define the parameter space for ICA (tuning C and gamma)
PARAM_RANGES_ICA = {'C': (0.01, 1000), 'gamma': (0.0001, 10)} # Use wider ranges like in initial examples
# Kernel is not tuned in this simple version, hardcoded to rbf
DIMENSION_ICA = len(PARAM_RANGES_ICA) # Dimension is 2

In [None]:
# Decoder function for ICA (translates position to SVM parameters)
def decode_country_position_ica(country_position):
    c_val, gamma_val = country_position
    c_val = np.clip(c_val, PARAM_RANGES_ICA['C'][0], PARAM_RANGES_ICA['C'][1])
    gamma_val = np.clip(gamma_val, PARAM_RANGES_ICA['gamma'][0], PARAM_RANGES_ICA['gamma'][1])
    decoded_params = {'C': c_val,'gamma': gamma_val,'kernel': 'rbf'}
    return decoded_params

In [None]:
# Cost function for ICA (evaluates a position using a single train/validation split)
def ica_cost_function_single_split(country_position, X_train_split, y_train_split, X_val_split, y_val_split):
     svm_params = decode_country_position_ica(country_position)
     try:
         model = SVC(C=svm_params['C'], gamma=svm_params['gamma'], kernel=svm_params['kernel'], random_state=42)
         model.fit(X_train_split, y_train_split)
         preds = model.predict(X_val_split)
         # Use F1 score for cost (negative for minimization)
         score = f1_score(y_val_split, preds)
         if np.isnan(score) or not np.isfinite(score): cost = 1.0
         else: cost = -score
     except Exception as e:
         cost = 1.0
     return cost

In [None]:
# Standard ICA optimizer function
def ICA(cost_func, bounds, X_train_split, y_train_split, X_val_split, y_val_split, n_countries=50, n_imperialists=10, max_decades=5, assimilation_coeff=2, revolution_prob=0.02, revolution_rate=0.1, imperialist_assimilation_coeff=0.5, elimination_threshold=0.05, early_stop=5):
    dim = len(bounds)
    min_bounds = np.array([b[0] for b in bounds])
    max_bounds = np.array([b[1] for b in bounds])

    # Initialize countries
    countries = min_bounds + np.random.rand(n_countries, dim) * (max_bounds - min_bounds)

    # Calculate initial costs
    costs = np.array([cost_func(c, X_train_split, y_train_split, X_val_split, y_val_split) for c in countries])

    # Check for all infinite costs which can happen if fitness always fails
    if np.all(costs == 1.0): # Assuming 1.0 is failure cost
        print("Warning: All initial countries failed evaluation in ICA.")
        return None, -np.inf # Indicate failure


    # Sort countries by cost (ascending)
    sorted_idx = np.argsort(costs)
    countries = countries[sorted_idx]
    costs = costs[sorted_idx]

    # Select imperialists and colonies
    actual_n_imperialists = min(n_imperialists, len(countries)) # Handle cases where n_countries < n_imperialists
    if actual_n_imperialists == 0: return None, -np.inf # Cannot proceed
    imperialists = [{'position': countries[i], 'cost': costs[i]} for i in range(actual_n_imperialists)]
    colonies = [{'position': countries[i], 'cost': costs[i]} for i in range(actual_n_imperialists, n_countries)]

    # Assign colonies to imperialists based on power (inverse cost)
    if actual_n_imperialists > 0:
        imperialist_cost_vals = np.array([imp['cost'] for imp in imperialists])
        max_cost_init = np.max(imperialist_cost_vals) if len(imperialist_cost_vals)>0 else 0
        min_cost_init = np.min(imperialist_cost_vals) if len(imperialist_cost_vals)>0 else 0

        if max_cost_init == min_cost_init or len(imperialist_cost_vals) <= 1:
             imperialist_power_init = np.ones(actual_n_imperialists)
        else:
             imperialist_power_init = max_cost_init - imperialist_cost_vals

        total_power_init = np.sum(imperialist_power_init)
        if total_power_init == 0 or actual_n_imperialists == 0:
             imperialist_probs_init = np.ones(actual_n_imperialists) / actual_n_imperialists if actual_n_imperialists > 0 else []
        else:
             imperialist_probs_init = imperialist_power_init / total_power_init

        # Ensure probabilities sum to 1 (handle potential floating point issues)
        if len(imperialist_probs_init)>0:
           imperialist_probs_init = imperialist_probs_init / np.sum(imperialist_probs_init)

        assigned_imperialist_indices = []
        if len(colonies) > 0 and len(imperialist_probs_init) > 0:
            assigned_imperialist_indices = np.random.choice(np.arange(actual_n_imperialists), size=len(colonies), p=imperialist_probs_init)

        # Create empires
        empires_list = []
        for i in range(actual_n_imperialists):
            imperialist = imperialists[i]
            assigned_colonies = [colonies[j] for j in range(len(colonies)) if len(assigned_imperialist_indices)>j and assigned_imperialist_indices[j] == i]
            empires_list.append({'imperialist': imperialist, 'colonies': assigned_colonies})
    else:
         empires_list = [] # No imperialists possible


    # Initial global best cost and position
    if empires_list:
        best_cost = empires_list[0]['imperialist']['cost']
        best_country_position = empires_list[0]['imperialist']['position'].copy()
    else:
        # Handle case where no empires could be formed
        print("Warning: No empires formed in ICA initialization.")
        best_cost = np.inf
        best_country_position = None # Or some default array

    no_improve_counter = 0
    best_cost_history = [best_cost]

    # ICA Main Loop (Decades)
    for iteration in range(max_decades):
        if not empires_list:
            print("Terminating ICA early: No empires left.")
            break # Exit if no empires left

        # Assimilation & Revolution Check
        # Iterate through empires and colonies
        empire_indices_to_remove = []
        for emp_idx, empire in enumerate(empires_list):
            imp = empire['imperialist']
            colonies = empire['colonies']
            i = 0
            while i < len(colonies):
                colony = colonies[i]
                colony_pos = colony['position']

                # Assimilation movement towards *its* imperialist
                direction = imp['position'] - colony_pos
                movement = assimilation_coeff * np.random.rand(dim) * direction
                new_pos = colony_pos + movement

                # Apply Revolution probability
                if np.random.rand() < revolution_prob:
                    revolution_change = np.array([np.random.uniform(-revolution_rate * (bounds[k][1] - bounds[k][0]), revolution_rate * (bounds[k][1] - bounds[k][0])) for k in range(dim)])
                    new_pos = colony_pos + revolution_change # Revolution changes from original colony position

                # Clip to bounds after movement/revolution
                for k in range(dim):
                    new_pos[k] = np.clip(new_pos[k], bounds[k][0], bounds[k][1])

                colony['position'] = new_pos
                colony['cost'] = cost_func(new_pos, X_train_split, y_train_split, X_val_split, y_val_split)

                # Revolution Check (Colony challenging Imperialist)
                if colony['cost'] < imp['cost']: # If colony is better (lower cost)
                    # Swap roles
                    # print("  Revolution: Colony became new imperialist!") # Optional debug print
                    old_imperialist = imp.copy()
                    empire['imperialist'] = colony.copy() # New imperialist is the former colony
                    empire['colonies'].pop(i) # Remove former colony from list
                    empire['colonies'].append(old_imperialist) # Add old imperialist as a colony

                    # Imperialist Assimilation (move the old imperialist towards the new one)
                    # Check if old_imperialist exists and has position before assimilation attempt
                    if 'position' in old_imperialist:
                        new_imp_pos = empire['imperialist']['position']
                        old_imp_pos = old_imperialist['position']
                        direction_rev = new_imp_pos - old_imp_pos
                        movement_rev = imperialist_assimilation_coeff * direction_rev * np.random.rand(dim)
                        new_old_imp_pos = old_imp_pos + movement_rev
                        for k in range(dim):
                            new_old_imp_pos[k] = np.clip(new_old_imp_pos[k], bounds[k][0], bounds[k][1])
                        old_imperialist['position'] = new_old_imp_pos
                        old_imperialist['cost'] = cost_func(old_imperialist['position'], X_train_split, y_train_split, X_val_split, y_val_split)

                    # Update imp variable to point to the new imperialist for subsequent checks
                    imp = empire['imperialist']
                    # No increment 'i' here, the list size changed, next loop iteration handles next element

                # Check global best after each colony update/revolution check
                if colony['cost'] < best_cost:
                    best_cost = colony['cost']
                    best_country_position = colony['position'].copy()
                    # Reset no_improve_counter here as well if global best improves
                    no_improve_counter = 0


                # Increment index only if no swap happened (otherwise the list changed)
                if 'cost' in imp and colony['cost'] >= imp['cost']: # Ensure imp['cost'] exists
                    i += 1


        # Imperialist Competition
        if len(empires_list) <= 1: break # Break if only one or zero empires left

        # Recalculate total empire cost and power/probabilities
        # Handle potential empty colony lists gracefully
        total_empire_cost = np.array([
            e['imperialist'].get('cost', np.inf) + sum(c.get('cost', np.inf) for c in e.get('colonies', []))
            for e in empires_list
        ])

        if len(total_empire_cost) == 0: break # Should not happen if len(empires_list)>1 but safety check

        max_total_cost = np.max(total_empire_cost)
        min_total_cost = np.min(total_empire_cost)

        if max_total_cost == min_total_cost or len(empires_list) <= 1:
             empire_selection_power = np.ones(len(empires_list))
        else:
             empire_selection_power = max_total_cost - total_empire_cost # Higher cost -> Lower power

        total_selection_power_sum = np.sum(empire_selection_power)

        if total_selection_power_sum > 0 and len(empires_list) > 1:
             # Assign probabilities inversely proportional to cost (higher power = higher prob)
             empire_probs = empire_selection_power / total_selection_power_sum
             empire_probs = empire_probs / np.sum(empire_probs) # Ensure sums to 1

             weakest_empire_idx = np.argmax(total_empire_cost) # Empire with highest total cost is weakest

             if empires_list[weakest_empire_idx]['colonies']:
                  # Find the weakest colony within the weakest empire (optional sophistication, here just random)
                  # weakest_colony_idx = np.argmax([c.get('cost', -np.inf) for c in empires_list[weakest_empire_idx]['colonies']])
                  # colony_to_take = empires_list[weakest_empire_idx]['colonies'].pop(weakest_colony_idx)
                  colony_to_take_idx = np.random.randint(0, len(empires_list[weakest_empire_idx]['colonies']))
                  colony_to_take = empires_list[weakest_empire_idx]['colonies'].pop(colony_to_take_idx)

                  # Assign the colony to another empire based on probability
                  if len(empire_probs) == len(empires_list): # Safety check
                     winning_empire_idx = np.random.choice(np.arange(len(empires_list)), p=empire_probs)
                     # Ensure winner is not the loser (if only 2 empires, winner must be the other one)
                     if winning_empire_idx == weakest_empire_idx and len(empires_list) > 1:
                          winning_empire_idx = (weakest_empire_idx + 1) % len(empires_list)

                     winning_empire = empires_list[winning_empire_idx]
                     winning_empire['colonies'].append(colony_to_take)
                  else:
                     # Fallback: return the colony if probability array mismatch
                      empires_list[weakest_empire_idx]['colonies'].append(colony_to_take)

        # Eliminate weak empires (those that lost all colonies)
        empires_list = [e for e in empires_list if e['colonies'] or e['imperialist'].get('cost', np.inf) < np.inf] # Keep if has colonies OR if imperialist is valid

        # More robust elimination: Only eliminate if imperialist is also very poor relative to others (or use threshold)
        # Or simply eliminate if colonies list is empty
        empires_list = [e for e in empires_list if e['colonies']]

        # Update global best from remaining imperialists
        current_best_imp_cost = np.inf
        current_best_imp_pos = None
        if empires_list:
            costs_imps = [e['imperialist'].get('cost', np.inf) for e in empires_list]
            if costs_imps: # Check if list is not empty
                min_imp_cost_idx = np.argmin(costs_imps)
                current_best_imp_cost = costs_imps[min_imp_cost_idx]
                current_best_imp_pos = empires_list[min_imp_cost_idx]['imperialist'].get('position')

                if current_best_imp_cost < best_cost:
                     best_cost = current_best_imp_cost
                     if current_best_imp_pos is not None: best_country_position = current_best_imp_pos.copy()
                     no_improve_counter = 0 # Reset counter

        # Check termination criteria after elimination
        if len(empires_list) <= 1:
             print("Terminating ICA: One or zero empires left.")
             break

        # Check for convergence (no improvement in global best cost) AFTER all updates in the iteration
        if best_cost < best_cost_history[-1]:
            # Improvement occurred (potentially earlier in colony updates or here via imperialist check)
            # Counter was likely already reset if improvement happened earlier
             pass # Counter already handled
        elif best_cost == np.inf and best_cost_history[-1] == np.inf:
            # Handles case where cost stays at initial failure value
             no_improve_counter += 1
        elif best_cost >= best_cost_history[-1]:
             no_improve_counter += 1

        best_cost_history.append(best_cost)

        # Print per-iteration progress (using the current best cost/position found up to this point)
        # Check if best_country_position is valid before trying to round/list
        if best_country_position is not None:
            params_list = best_country_position.round(4).tolist()
        else:
            params_list = "N/A"

        print(f"Decade {iteration+1}/{max_decades}, Best Val Cost: {best_cost:.4f}, Best Params: {params_list}, Empires: {len(empires_list)}")


        if no_improve_counter >= early_stop:
            print(f"Convergence reached. No improvement for {early_stop} decades. Stopping early.")
            break

    # Return the final best country position and its fitness (negative of the best cost)
    if best_cost == np.inf or best_country_position is None:
        print("ICA finished without finding a valid solution.")
        return None, -np.inf
    return best_country_position, -best_cost # Return fitness (higher is better)

In [None]:
# Parameters for ICA (tuning C and gamma)
param_bounds_ica = [(0.01, 1000), (0.0001, 10)] # Use wider ranges again

# Set global seed for overall reproducibility if needed
global_random_state = 42
random.seed(global_random_state)
np.random.seed(global_random_state)

# Check if 'Method' column exists before filtering
method_col_exists = 'Method' in all_results_df.columns

for run in range(30):
    run_seed = run
    random.seed(run_seed)
    np.random.seed(run_seed)

    # Check if this specific ICA run already exists based on seed
    run_exists = False
    if method_col_exists:
        if not all_results_df[(all_results_df['Method'] == 'ICA') & (all_results_df['Seed'] == run_seed)].empty:
            run_exists = True

    if run_exists:
        print(f"--- Run {run+1}/30 for ICA already found. Skipping. ---")
        continue # Skip to the next iteration of the loop
    else:
         print(f"--- Running ICA Run {run+1}/30 ---")

    start_time = time()

    best_params_ica_run, best_fitness_ica_run = ICA(
        cost_func=ica_cost_function_single_split,
        bounds=param_bounds_ica,
        X_train_split=X_train_small_transformed,
        y_train_split=y_train_small,
        X_val_split=X_val_transformed,
        y_val_split=y_val,
        n_countries=50,
        n_imperialists=10,
        max_decades=10, # Increased iterations for standard ICA
        early_stop=5, # Increased early stop limit
        assimilation_coeff=2,
        revolution_prob=0.02,
        revolution_rate=0.1,
        imperialist_assimilation_coeff=0.5,
        elimination_threshold=0.05
    )
    tuning_time_ica_run = time() - start_time

    if best_params_ica_run is not None:
        C_best, gamma_best = best_params_ica_run
        final_svm_params_ica_run = {'C': C_best, 'gamma': gamma_best, 'kernel': 'rbf'}

        final_svm_model_ica_run = Pipeline([
            ('preprocessor', preprocessor),
            ('svc', SVC(random_state=42, probability=True, **final_svm_params_ica_run))
        ])

        # Fit the final model on the entire original training data
        final_svm_model_ica_run.fit(X_train, y_train)

        # Evaluate on the test set ONE FINAL TIME
        final_test_metrics_ica_run = evaluate_classifier(final_svm_model_ica_run, X_test, y_test)

        ica_run_data = {
            'Method': 'ICA',
            'Run': run + 1,
            'Seed': run_seed,
            'Best Parameters': str(best_params_ica_run.tolist()), # Save params as string
            'Best Training Val F1-Score': best_fitness_ica_run, # This is the validation score from tuning
            'Tuning Time (s)': tuning_time_ica_run,
            'Test Accuracy': final_test_metrics_ica_run.get('accuracy', np.nan),
            'Test F1-Score': final_test_metrics_ica_run.get('f1_score', np.nan),
            'Test Precision': final_test_metrics_ica_run.get('precision', np.nan),
            'Test Recall': final_test_metrics_ica_run.get('recall', np.nan),
            'Test ROC AUC': final_test_metrics_ica_run.get('roc_auc', np.nan)
        }
        ica_df_row = pd.DataFrame([ica_run_data])
        all_results_df = pd.concat([all_results_df, ica_df_row], ignore_index=True)
        all_results_df.to_csv(output_csv_path, index=False) # Save after each run

        print(f"Run {run+1} Final: Best Params: {best_params_ica_run.round(4).tolist()}, Best Val F1: {best_fitness_ica_run:.4f}, Time: {tuning_time_ica_run:.2f}s, FINAL Test F1: {final_test_metrics_ica_run.get('f1_score', np.nan):.4f}. Saved to CSV.")
    else:
        print(f"Run {run+1}: Algorithm failed to find valid parameters.")
        ica_run_data = {
            'Method': 'ICA',
            'Run': run + 1,
            'Seed': run_seed,
            'Best Parameters': None,
            'Best Training Val F1-Score': np.nan,
            'Tuning Time (s)': tuning_time_ica_run,
            'Test Accuracy': np.nan,
            'Test F1-Score': np.nan,
            'Test Precision': np.nan,
            'Test Recall': np.nan,
            'Test ROC AUC': np.nan
        }
        ica_df_row = pd.DataFrame([ica_run_data])
        all_results_df = pd.concat([all_results_df, ica_df_row], ignore_index=True)
        all_results_df.to_csv(output_csv_path, index=False) # Save even if failed

        print(f"Run {run+1}: Algorithm failed, Time: {tuning_time_ica_run:.2f}s. Saved failure info to CSV.")

print(f"\n--- Completed ICA Runs - Current Results ({output_csv_path}) ---")
if 'Method' in all_results_df.columns:
    print(all_results_df['Method'].value_counts())
else:
    print("Results DataFrame is currently empty.")

--- Running ICA Run 1/30 ---
Decade 1/10, Best Val Cost: -0.6636, Best Params: [810.1969, 0.0001], Empires: 7
Decade 2/10, Best Val Cost: -0.6651, Best Params: [805.0023, 0.0001], Empires: 7
Decade 3/10, Best Val Cost: -0.6651, Best Params: [805.0023, 0.0001], Empires: 6
Decade 4/10, Best Val Cost: -0.6651, Best Params: [805.0023, 0.0001], Empires: 6
Decade 5/10, Best Val Cost: -0.6651, Best Params: [805.0023, 0.0001], Empires: 5
Decade 6/10, Best Val Cost: -0.6849, Best Params: [594.0067, 0.0008], Empires: 5
Decade 7/10, Best Val Cost: -0.6849, Best Params: [594.0067, 0.0008], Empires: 5
Decade 8/10, Best Val Cost: -0.6849, Best Params: [594.0067, 0.0008], Empires: 4
Decade 9/10, Best Val Cost: -0.6865, Best Params: [593.966, 0.0008], Empires: 4
Decade 10/10, Best Val Cost: -0.6865, Best Params: [593.966, 0.0008], Empires: 4
Run 1 Final: Best Params: [593.966, 0.0008], Best Val F1: 0.6865, Time: 229.74s, FINAL Test F1: 0.6742. Saved to CSV.
--- Running ICA Run 2/30 ---
Decade 1/10, Be

In [None]:
print(f"\n--- All Experiments Attempted ---")
print(f"Final results are saved in: {output_csv_path}")
print("\nFinal Results Summary:")
if not all_results_df.empty and 'Method' in all_results_df.columns:
    print(all_results_df['Method'].value_counts())
    print("\nDataFrame Head:")
    print(all_results_df.head())
    print("\nDataFrame Tail:")
    print(all_results_df.tail())
else:
    print("Result DataFrame is empty or missing 'Method' column.")


--- All Experiments Attempted ---
Final results are saved in: /kaggle/working/all_svm_tuning_results.csv

Final Results Summary:
Method
ICA           30
PSO           30
GridSearch     1
Baseline       1
Name: count, dtype: int64

DataFrame Head:
       Method  Run  Seed                                    Best Parameters  \
0    Baseline    1   NaN  {'C': 1.0, 'break_ties': False, 'cache_size': ...   
1  GridSearch    1   NaN  {'svc__C': 10, 'svc__gamma': 0.01, 'svc__kerne...   
2         PSO    1   0.0                        [803.2613406429795, 0.0001]   
3         PSO    2   1.0                        [804.4786400692332, 0.0001]   
4         PSO    3   2.0                        [797.4494960415254, 0.0001]   

   Best Training Val F1-Score  Tuning Time (s)  Test Accuracy  Test F1-Score  \
0                         NaN         2.297505          0.870       0.699074   
1                    0.657246        17.876401          0.866       0.695455   
2                    0.665127       2

  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
