In [1]:
import numpy as np
import matplotlib.pyplot as plt
import time
import pandas as pd

# Parameters to run the experiment
FUNCTIONS = ['sphere', 'rosenbrock', 'ackley', 'schwefel', 'rastrigin']
N_RUNS = 30           # Numbers of independent runs per function
MAX_ITERATIONS = 500  # Maximum number of iterations per run
DIMENSIONS = 2        # Problem dimensionality
TOLERANCE = 1e-3      # Threshold to consider the test a "Success"
MAX_FEVALS = 50000    # Maximum number of function evaluations
KNOWN_OPTIMUM = 0.0   # Known optimum value for the test functions

# Parameters to run the optimization algorithm
NUM_PARTICLES = 200   # Number of particles
ALPHA_START = 0.01    # Initial exploration parameter
ALPHA_END = 50.0      # Final exploration parameter
LAMBDA = 1.0          # Drift
SIGMA = 0.5           # Diffusion
DELTA_T = 0.1         # Time step

results = []

In [2]:
def get_bounds(func_name):
    if func_name == 'schwefel':
        return -500, 500
    elif func_name == 'rastrigin':
        return -5.12, 5.12
    else:
        return -10, 10

def objective_function(x, function_name):
    if function_name == 'sphere':
        return np.sum(x**2)
    elif function_name == 'rosenbrock':
        return sum(100.0 * (x[1:] - x[:-1]**2.0)**2.0 + (1 - x[:-1])**2.0)
    elif function_name == 'ackley':
        first_sum = np.sum(x**2)
        second_sum = np.sum(np.cos(2 * np.pi * x))
        n = len(x)
        return -20 * np.exp(-0.2 * np.sqrt(first_sum / n)) - np.exp(second_sum / n) + 20 + np.e
    elif function_name == 'schwefel':
        return 418.9829 * len(x) - np.sum(x * np.sin(np.sqrt(np.abs(x))))
    elif function_name == 'rastrigin':
        return 10 * len(x) + np.sum(x**2 - 10 * np.cos(2 * np.pi * x))
    else:
        raise ValueError("Unknown function name")

In [3]:
def cbo_optimization(func_name, run_id=1, dims=DIMENSIONS, plotting=False):
    # Initialization
    lower_bound, upper_bound = get_bounds(func_name)
    particles = np.random.uniform(lower_bound, upper_bound, (NUM_PARTICLES, dims))
    global_best_cost = float('inf')
    global_best_position = particles[0].copy()

    epsilon = 1e-3 # Convergence threshold
    start_time = time.process_time() # METRICS: Start CPU time measurement
    n_fevals = 0
    n_fevals_to_success = None
    success = False
    initial_particles = particles.copy() if plotting else None

    for iteration in range(MAX_ITERATIONS):
    
        # Cost evaluation and weights computation
        costs = np.array([objective_function(p, func_name) for p in particles])
        n_fevals += NUM_PARTICLES # METRICS: Update number of function evaluations

        # Update global best
        current_best_index = np.argmin(costs)
        if costs[current_best_index] < global_best_cost: 
            global_best_cost = costs[current_best_index]
            global_best_position = particles[current_best_index].copy() 

        # METRICS: Check for success
        if not success and abs(global_best_cost - KNOWN_OPTIMUM) < TOLERANCE:
            success = True
            n_fevals_to_success = n_fevals
        
        # Update alpha for exploration-exploitation trade-off
        progress = iteration / MAX_ITERATIONS
        alpha = ALPHA_START + (ALPHA_END - ALPHA_START) * (progress ** 2)

        # Update weights based on costs
        v_min = np.min(costs)
        v_max = np.max(costs)
        denom = v_max - v_min if v_max - v_min > 1e-10 else 1.0
        norm_costs = (costs - v_min) / denom
        weights = np.exp(-alpha * norm_costs)

        # Normalize weights
        sum_weights = np.sum(weights)
        if sum_weights > 0:
            weights /= sum_weights
        else:
            # Case all weights are zero (can happen with large alpha)
            weights = np.ones(NUM_PARTICLES) / NUM_PARTICLES

        # Calculate consensus point
        consensus_point = np.sum(weights[:, np.newaxis] * particles, axis=0)

        diffusion_vector = particles - consensus_point # Diffusion vector for each particle

        drift = -LAMBDA * diffusion_vector * DELTA_T # Drift towards consensus point

        dist_scalar = np.linalg.norm(diffusion_vector, axis=1, keepdims=True) # Distance from consensus point for each particle

        zi = np.random.randn(NUM_PARTICLES, dims) # Standard normal random variables
        diffusion = SIGMA * dist_scalar * np.sqrt(DELTA_T) * zi # Diffusion component

        particles += drift + diffusion # Update particle positions
        particles = np.clip(particles, lower_bound, upper_bound) # Keep particles within bounds

        if plotting:
            weighted_swarm_radius = np.sum(weights * dist_scalar.flatten())
        
            if weighted_swarm_radius < epsilon and iteration > 20:
                print(f"Converged after {iteration} iterations.")
                print(f"Weighted swarm radius: {weighted_swarm_radius:.6f} < epsilon ({epsilon})")
                break

    total_time = time.process_time() - start_time # METRICS: Total CPU time
    final_error = abs(global_best_cost - KNOWN_OPTIMUM) # METRICS: Final error

    return {
        "Function": func_name,
        "Run_ID": run_id,
        "Success": success,
        "Fevals_to_Success": n_fevals_to_success,
        "Final_Error": final_error,
        "CPU_Time": total_time,
        "Global_Best_Cost": global_best_cost,
        "Global_Best_Pos": global_best_position,
        "Final_Particles": particles,
        "Initial_Particles": initial_particles
    }

def plot_cbo_state(initial_data, final_data, best_pos, best_cost, func_name, title):
    """
    Plots the initial and final state of the particles on a 2D contour map.
    """
    lower_bound, upper_bound = get_bounds(func_name)
    range_width = upper_bound - lower_bound
    density = 10 # Points per unit range
    min_points = 100
    max_points = 400 
    num_points = int(min(max(min_points, range_width * density), max_points))
    
    # Create grid for contour
    x = np.linspace(lower_bound, upper_bound, num_points)
    y = np.linspace(lower_bound, upper_bound, num_points)
    X, Y = np.meshgrid(x, y)

    Z = np.array([[objective_function(np.array([i, j, 0]), func_name) for i in x] for j in y])

    def get_contour(ax, particle_data, title, show_best=False):
        # Background contour plot of the objective function
        contour = ax.contourf(X, Y, Z, levels=50, cmap='viridis')
        
        # Draw all particles
        ax.scatter(particle_data[:, 0], particle_data[:, 1], c='white', edgecolors='black', s=30, alpha=0.7, label='Particles')
        
        # If requested, highlight the Global Best (only in the final plot)
        if show_best:
            ax.scatter(best_pos[0], best_pos[1], 
                    c='red', marker='*', s=300, label='Global Best') # s=300 is the size of the star
            ax.legend()
            
        ax.set_title(title)
        ax.set_xlabel("X")
        ax.set_ylabel("Y")
        return contour

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 7))

    get_contour(ax1, initial_data, "Initial Swarm Distribution")

    contour = get_contour(ax2, final_data, f"Best Cost: {best_cost:.4f}", show_best=True)

    fig.colorbar(contour, ax=[ax1, ax2], label='Objective Function Cost')

    plt.show()

def run_experiments():
    """
    Runs the full benchmark suite on all functions (N_RUNS times each).
    Saves results to CSV.
    """
    print(f"Starting Massive Experiment: {N_RUNS} runs per function, {DIMENSIONS} dimensions.")
    all_results = []
    
    for func in FUNCTIONS:
        print(f"Processing {func}...")
        for run in range(1, N_RUNS + 1):
            # Run CBO without storing history
            res = cbo_optimization(func, run_id=run, dims=DIMENSIONS, plotting=False)
            
            # Keep only scalar metrics for DataFrame
            metric_data = {k: v for k, v in res.items() if k not in ['Final_Particles', 'Initial_Particles', 'Global_Best_Pos']}
            all_results.append(metric_data)
            
    df = pd.DataFrame(all_results)
    filename = "cbo_results.csv"
    df.to_csv(filename, index=False)
    print(f"\nExperiment completed. Data saved to {filename}")
    print("\nSummary of Success Rates:")
    print(df.groupby('Function')['Success'].mean())
    return df

def run_single_execution():
    """
    Interactive mode: Asks user for function name, runs once in 2D, and plots results.
    """
    func_name = input(f"Enter function to test {FUNCTIONS}: ").lower().strip()
    
    if func_name not in FUNCTIONS:
        print("Invalid function name.")
        return

    print(f"Running single execution on {func_name} (2D for visualization)...")
    
    # Force 2D for plotting logic
    res = cbo_optimization(func_name, run_id=1, dims=2, plotting=True)
    
    print(f"Run Completed in {res['CPU_Time']:.4f}s")
    print(f"Best Cost: {res['Global_Best_Cost']}")
    print(f"Success: {res['Success']}")
    
    plot_cbo_state(
        initial_data=res['Initial_Particles'], 
        final_data=res['Final_Particles'], 
        best_pos=res['Global_Best_Pos'], 
        best_cost=res['Global_Best_Cost'], 
        func_name=func_name, 
        title="CBO Optimization"
    )

In [None]:
# Uncomment the first line to run a single execution with plotting or the second line to run the full experiment

#run_single_execution()

#df_results = run_experiments()

Starting Massive Experiment: 30 runs per function, 10 dimensions.
Processing sphere...
Processing rosenbrock...
