# Distributed CBO -- Benchmark and Comparison

In [1]:
%load_ext autoreload
%autoreload 2

import sys
sys.path.append('../..')

from dataclasses import dataclass
import math
import time
from typing import List

import matplotlib.pyplot as plt
import numpy as np

from cbx.dynamics import CBO, DistributedCBO, CommunicationType
from cbx.objectives import Rastrigin, Ackley, Michalewicz
import cbx.utils.termination as term

# CBO Configuration

In [2]:
conf = {
    'alpha': 40.0,
    'dt': 0.1,
    'sigma': 1.,
    'lamda': 1.0,
    'term_criteria': [term.max_it_term(100)],
    'track_args': {'names': ['update_norm', 'energy', 'x', 'consensus', 'drift']},

    'noise': 'anisotropic',
    'f_dim': '3D'
}

# Define Benchmarking Functions

In [3]:
import pandas as pd

results_df = pd.DataFrame(columns=['fct_name', 'dim', 'async_communication', 'max_particles', 'splt_factor', 'avg_energy', 'success_rate', 'num_func_evaluations', 'execution_time', 'comment'])

In [4]:
from scipy.optimize import minimize

def find_smallest_non_global_minimum(fun, dim, num_starts=1000):
    bounds = (-2, 2)
    global_min = 0  # The global minimum value of the Ackley function
    local_minima = []

    for _ in range(num_starts):
        # Random initial guess
        x0 = np.random.uniform(*bounds, dim)
        res = minimize(fun, x0, method='L-BFGS-B', bounds=[bounds] * dim)
        if res.success and not np.isclose(res.fun, global_min, atol=1e-6):
            local_minima.append(res.fun)
    
    # Find the smallest local minimum greater than the global minimum
    local_minima = np.array(local_minima)
    smallest_non_global = np.min(local_minima[local_minima > global_min])
    return smallest_non_global

In [None]:
find_smallest_non_global_minimum(Rastrigin(), 20, num_starts=10000)

In [6]:
def benchmark_distributed_cbo_splitting_factor(
    f: callable,
    num_runs: int,
    max_splitting_factor: int,
    max_particles: int,
    synchronization_interval: int,
    synchronization_method: str,
    num_steps: int,
    use_async_communication: bool = False,
    axs=None,
    fmin: float = 0,
    success_eps: float = 1
):
    """
    Benchmarks the performance of the distributed Consensus-Based Optimization (CBO) algorithm 
    with varying splitting factors.
    Parameters:
    -----------
    f : callable
        The objective function to be minimized.
    num_runs : int
        The number of runs for each splitting factor.
    max_splitting_factor : int
        The maximum splitting factor to be tested.
    max_particles : int
        The maximum number of particles to be used in the optimization.
    synchronization_interval : int
        The interval at which synchronization occurs.
    synchronization_method : str
        The method used for synchronization.
    num_steps : int
        The number of optimization steps.
    use_async_communication : bool, optional
        Whether to use asynchronous communication (default is False).
    axs : matplotlib.axes.Axes, optional
        The axes on which to plot the results (default is None).
    fmin : float, optional
        The known minimum value of the objective function (default is 0).
    success_eps : float, optional
        The tolerance for considering the optimization successful (default is 1).
    Returns:
    --------
    int
        The splitting factor that resulted in the minimum average energy.
    Notes:
    ------
    This function runs the distributed CBO algorithm for different splitting factors, 
    measures the performance in terms of average energy, execution time, and success rate, 
    and plots the results if `axs` is provided.
    """
    dim = conf['d']
    communication_type = CommunicationType.ASYNC if use_async_communication else CommunicationType.SYNC
    
    energy_means = []
    execution_times = []
    function_evaluations = []
    quantile_10 = []
    quantile_90 = []
    success_rates = {splitting_factor: 0 for splitting_factor in range(1, max_splitting_factor + 1)}
    for splitting_factor in range(1, max_splitting_factor + 1):
        num_particles = math.ceil(max_particles / splitting_factor)
        current_energies = []
        for _ in range(num_runs):
            dyn = DistributedCBO(
                f=f,
                num_agent_batches=splitting_factor,
                synchronization_interval=synchronization_interval,
                synchronization_method=synchronization_method,
                communication_type=communication_type,
                N=num_particles,
                verbose=False,
                max_it=10000,
                **conf
            )

            # Run optimization and measure time
            tick = time.time()
            current_energies.append(
                f(dyn.optimize(num_steps=num_steps, sched='default'))
            )
            tock = time.time()
            execution_times.append(tock - tick)

            # Check if we found true minimum
            if abs(current_energies[-1] - fmin) < success_eps:
                success_rates[splitting_factor] += 1

            function_evaluations.append(dyn.num_f_eval())

        average_energy = np.mean(current_energies)
        average_function_evaluations = np.mean(function_evaluations)
        print(f"Splitting factor: {splitting_factor}, average energy: {average_energy}, success_rate: {success_rates[splitting_factor] / num_runs}, function evaluations: {average_function_evaluations}, execution time: {np.mean(execution_times)}")
        energy_means.append(average_energy)

        quantile_10.append(np.percentile(current_energies, 5))
        quantile_90.append(np.percentile(current_energies, 95))

        results_df.loc[len(results_df)] = [f.__class__.__name__, dim, use_async_communication, max_particles, splitting_factor, average_energy, success_rates[splitting_factor] / num_runs, average_function_evaluations, np.mean(execution_times), 'distributed CBO']

    if axs is not None:
        async_label = 'Async' if use_async_communication else 'Sync'
        axs.plot(list(range(1, max_splitting_factor + 1)), energy_means, label=f'Distributed CBO {async_label}')
        # Plot shaded confidence interval for energy from 10th to 90th percentile
        axs.fill_between(list(range(1, max_splitting_factor + 1)), quantile_10, quantile_90, alpha=0.3, label='90% CI')

    return int(np.argmin(energy_means) + 1)


In [7]:
def benchmark_distributed_cbo_synchronization_interval(
    f: callable,
    num_runs: int,
    synchronization_intervals: List[int],
    max_particles: int,
    splitting_factor: int,
    synchronization_method: str,
    num_steps: int,
    use_async_communication: bool = False,
    axs=None,
    fmin=0,
    success_eps: float = 1
):
    """
    Benchmarks the performance of the Distributed Consensus-Based Optimization (CBO) algorithm
    with varying synchronization intervals.

    Parameters:
    f (callable): The objective function to be minimized.
    num_runs (int): The number of runs to perform for each synchronization interval.
    synchronization_intervals (List[int]): A list of synchronization intervals to test.
    max_particles (int): The maximum number of particles to use in the optimization.
    splitting_factor (int): The factor by which the number of particles is split into batches.
    synchronization_method (str): The method used for synchronization (e.g., 'average', 'best').
    num_steps (int): The number of optimization steps to perform in each run.
    use_async_communication (bool, optional): Whether to use asynchronous communication. Defaults to False.
    axs (matplotlib.axes.Axes, optional): The axes on which to plot the results. Defaults to None.
    fmin (float, optional): The known minimum value of the objective function. Defaults to 0.
    success_eps (float, optional): The tolerance for considering an optimization run successful. Defaults to 1.

    Returns:
    None

    This function runs the Distributed CBO algorithm for each synchronization interval specified,
    measures the performance in terms of average energy, success rate, function evaluations, and execution time,
    and optionally plots the results on the provided axes.
    """
    communication_type = CommunicationType.ASYNC if use_async_communication else CommunicationType.SYNC

    energy_means = []
    execution_times = []
    function_evaluations = []
    quantile_10 = []
    quantile_90 = []
    success_rates = {synchronization_interval: 0 for synchronization_interval in synchronization_intervals}
    for synchronization_interval in synchronization_intervals:
        num_particles = math.ceil(max_particles / splitting_factor)
        current_energies = []
        for _ in range(num_runs):
            dyn = DistributedCBO(
                f=f,
                num_agent_batches=splitting_factor,
                synchronization_interval=synchronization_interval,
                synchronization_method=synchronization_method,
                communication_type=communication_type,
                N=num_particles,
                verbose=False,
                max_it=10000,
                **conf
            )

            # Run optimization and measure time
            tick = time.time()
            current_energies.append(
                f(dyn.optimize(num_steps=num_steps, sched='default'))
            )
            tock = time.time()
            execution_times.append(tock - tick)

            # Check if we found true minimum
            if abs(current_energies[-1] - fmin) < success_eps:
                success_rates[synchronization_interval] += 1

            function_evaluations.append(dyn.num_f_eval())

        average_energy = np.median(current_energies)
        average_function_evaluations = np.mean(function_evaluations)
        print(f"Synchronization interval: {synchronization_interval}, average energy: {average_energy}, success_rate: {success_rates[synchronization_interval] / num_runs}, function evaluations: {average_function_evaluations}, execution time: {np.mean(execution_times)}")
        energy_means.append(average_energy)

        quantile_10.append(np.percentile(current_energies, 5))
        quantile_90.append(np.percentile(current_energies, 95))

    if axs is not None:
        async_label = 'Async' if use_async_communication else 'Sync'
        axs.set_xscale('log')
        axs.plot(synchronization_intervals, energy_means, label=f'Distributed CBO {async_label}')
        # axs.set_xticks(synchronization_intervals)
        # Plot shaded confidence interval for energy from 10th to 90th percentile
        # axs.fill_between(synchronization_intervals, quantile_10, quantile_90, alpha=0.3, label='90% CI')


In [8]:
def benchmark_standard_cbo(
    f: callable,
    num_runs: int,
    num_particles: int,
    axs=None,
    fmin=0,
    success_eps: float = 1
):
    """
    Benchmarks the standard Consensus-Based Optimization (CBO) algorithm.

    Parameters:
    f (callable): The objective function to be minimized.
    num_runs (int): The number of runs to perform.
    num_particles (int): The number of particles in the CBO algorithm.
    axs (matplotlib.axes.Axes, optional): The axes on which to plot the results. Default is None.
    fmin (float, optional): The known minimum value of the objective function. Default is 0.
    success_eps (float, optional): The tolerance for considering a run successful. Default is 1.

    Returns:
    None

    The function prints the average energy, success rate, average function evaluations, and average execution time.
    It also appends the results to a global DataFrame `results_df` and optionally plots the mean energy and confidence interval.
    """
    dim = conf['d']

    energies = []
    execution_times = []
    function_evaluations = []
    quantile_10 = []
    quantile_90 = []
    success_rate = 0
    for _ in range(num_runs):
        dyn = CBO(
            f,
            N=num_particles,
            verbosity=0,
            max_it=10000,
            batch_args=None,
            M=1,
            **conf
        )

        # Run optimization and measure time
        tick = time.time()
        energies.append(
            f(dyn.optimize())
        )
        tock = time.time()
        execution_times.append(tock - tick)

        if abs(energies[-1] - fmin) < success_eps:
            success_rate += 1

        function_evaluations.append(dyn.num_f_eval)

    average_energy = np.median(energies)
    average_function_evaluations = np.mean(function_evaluations)
    print(f"Average energy: {average_energy}, success_rate: {success_rate / num_runs}, function evaluations: {average_function_evaluations}, execution time: {np.mean(execution_times)}")
    quantile_10.append(np.percentile(energies, 5))
    quantile_90.append(np.percentile(energies, 95))

    results_df.loc[len(results_df)] = [f.__class__.__name__, dim, False, num_particles, 1, average_energy, success_rate / num_runs, average_function_evaluations, np.mean(execution_times), 'standard CBO']

    # Plot mean of undistributed energies as a horizontal line
    if axs is not None:
        axs.axhline(y=average_energy, color='r', linestyle='--', label='Standard CBO')
        # Plot shaded confidence interval for energy from 10th to 90th percentile
        # axs.fill_between([0, 1], quantile_10, quantile_90, alpha=0.3, label='90% CI')


# Benchmark

In [9]:
# Benchmarking params
NUM_RUNS = 100
MAX_SPLITTING_FACTOR = 10
MAX_PARTICLES = 100
SYNCHRONIZATION_INTERVAL = 10
SYNCHRONIZATION_METHOD = 'weighted_mean'

In [10]:
success_epsilons = {}

In [11]:
def run_experiment(
    f: callable,
    d: int,
    n: int,
):
    """
    Runs a series of experiments to benchmark standard CBO and distributed CBO 
    (both synchronous and asynchronous communication) on a given function.
    Parameters:
    -----------
    f : callable
        The function to be optimized.
    d : int
        The dimensionality of the function.
    n : int
        The number of particles to be used in the optimization.
    Returns:
    --------
    None
    """
    fig, axs = plt.subplots(1, 2, figsize=(15, 5))

    if (f.__class__.__name__, d) not in success_epsilons:
        success_eps = find_smallest_non_global_minimum(f, d)
        success_epsilons[(f.__class__.__name__, d)] = success_eps

    success_eps = success_epsilons[(f.__class__.__name__, d)]

    conf['d'] = d
    # Baseline (standard CBO)
    print("Standard CBO:")
    print("=============")
    benchmark_standard_cbo(
        f=f,
        num_runs=NUM_RUNS,
        num_particles=n,
        axs=axs[0],
        success_eps=success_eps
    )
    print()

    # Synchronous
    print("Distributed CBO (synchronous communication):")
    print("=============================================")
    best_splitting_factor_sync = benchmark_distributed_cbo_splitting_factor(
        f=f,
        use_async_communication=False,
        num_runs=NUM_RUNS,
        max_splitting_factor=MAX_SPLITTING_FACTOR,
        max_particles=n,
        synchronization_interval=SYNCHRONIZATION_INTERVAL,
        synchronization_method=SYNCHRONIZATION_METHOD,
        num_steps=100,
        axs=axs[0],
        success_eps=success_eps
    )
    print()

    # Asynchronous
    print("Distributed CBO (asynchronous communication):")
    print("==============================================")
    best_splitting_factor_async = benchmark_distributed_cbo_splitting_factor(
        f=f,
        use_async_communication=True,
        num_runs=NUM_RUNS,
        max_splitting_factor=MAX_SPLITTING_FACTOR,
        max_particles=n,
        synchronization_interval=SYNCHRONIZATION_INTERVAL,
        synchronization_method=SYNCHRONIZATION_METHOD,
        num_steps=100,
        axs=axs[0],
        success_eps=success_eps
    )
    print()

    function_label = f.__class__.__name__ + f" dim={d}"
    axs[0].set_xlabel(f"Splitting factor")
    axs[0].set_ylabel(f"Mean energy")
    axs[0].set_title(f"{function_label}\nsync interval: {SYNCHRONIZATION_INTERVAL}, sync method: {SYNCHRONIZATION_METHOD}, particles: {n}")
    axs[0].legend()

    ############################################################################

    # Baseline (standard CBO)
    print("Standard CBO:")
    print("=============")
    benchmark_standard_cbo(
        f=f,
        num_runs=NUM_RUNS,
        num_particles=n,
        axs=axs[1],
        success_eps=success_eps
    )
    print()

    # Synchronous
    print("Distributed CBO (synchronous communication):")
    print("=============================================")
    benchmark_distributed_cbo_synchronization_interval(
        f=f,
        use_async_communication=False,
        num_runs=NUM_RUNS,
        splitting_factor=best_splitting_factor_sync,
        max_particles=n,
        synchronization_intervals=[1, 10, 20, 30, 40, 50, 200],
        synchronization_method=SYNCHRONIZATION_METHOD,
        num_steps=100,
        axs=axs[1],
        success_eps=success_eps
    )
    print()

    # Asynchronous
    print("Distributed CBO (asynchronous communication):")
    print("==============================================")
    benchmark_distributed_cbo_synchronization_interval(
        f=f,
        use_async_communication=True,
        num_runs=NUM_RUNS,
        splitting_factor=best_splitting_factor_async,
        max_particles=n,
        synchronization_intervals=[1, 10, 20, 30, 40, 50, 200],
        synchronization_method=SYNCHRONIZATION_METHOD,
        num_steps=100,
        axs=axs[1],
        success_eps=success_eps
    )
    print()

    function_label = f.__class__.__name__ + f" dim={d}"
    axs[1].set_xlabel(f"Synchronization interval")
    axs[1].set_ylabel(f"Mean energy")
    axs[1].set_title(f"{function_label}\nsplitting factor (sync, async): {best_splitting_factor_sync, best_splitting_factor_async}, sync method: {SYNCHRONIZATION_METHOD}, particles: {n}")
    axs[1].legend()
    
    fig.show()

## Ackley, Rastrigin (d = 5, 10, 20; N = 50, 100, 200, 1000)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=5, n=50)

print("Rastrigin")
run_experiment(Rastrigin(), d=5, n=50)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=10, n=50)

print("Rastrigin")
run_experiment(Rastrigin(), d=10, n=50)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=20, n=50)

print("Rastrigin")
run_experiment(Rastrigin(), d=20, n=50)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=5, n=100)

print("Rastrigin")
run_experiment(Rastrigin(), d=5, n=100)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=10, n=100)

print("Rastrigin")
run_experiment(Rastrigin(), d=10, n=100)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=20, n=100)

print("Rastrigin")
run_experiment(Rastrigin(), d=20, n=100)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=5, n=200)

print("Rastrigin")
run_experiment(Rastrigin(), d=5, n=200)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=10, n=200)

print("Rastrigin")
run_experiment(Rastrigin(), d=10, n=200)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=20, n=200)

print("Rastrigin")
run_experiment(Rastrigin(), d=20, n=200)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=5, n=1000)

print("Rastrigin")
run_experiment(Rastrigin(), d=5, n=1000)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=10, n=1000)

print("Rastrigin")
run_experiment(Rastrigin(), d=10, n=1000)

In [None]:
print("Ackley")
run_experiment(Ackley(), d=20, n=1000)

print("Rastrigin")
run_experiment(Rastrigin(), d=20, n=1000)

In [None]:
results_df