# Libraries

In [None]:
import numpy as np
import random
from deap import base, creator, benchmarks
import matplotlib.pyplot as plt
import time
from scipy import stats
from abc import ABC, abstractmethod

# parameters

In [None]:
central_point_function_name = "mean"  # Check possible names below in 'possible_central_point_functions'
obj_func = "ackley"  # Check possible names below in 'possible_obj_func'
dim = 20 # dimension of the objective function

population_size=100  # Int, the size of the population
num_generations=50  # num_generations: Int, the number of generations
num_best=20  # For EDA: Int, the number of best individuals to select

proportion_to_cut = 0.1  # float [0, 1] Parameter used if you are using central point trimmed_mean or trimmed_median

## Validation Params

In [None]:
possible_central_point_functions = ['mean', 'static_weight_avg', 'dynamic_weighted_average', 'median',
 'trimmed_mean', 'trimmed_median', 'randomized_weighted_mean']
if central_point_function_name not in possible_central_point_functions:
  raise Exception(f"Wrong parameter as defining central point, received: {central_point_function_name}, possible: {possible_central_point_functions}")
possible_obj_func = ['sphere', 'rastrigin', 'rosenbrock', 'ackley', 'griewank', 'schwefel', 'himmelblau']
if obj_func not in possible_obj_func:
  raise Exception(f"Wrong parameter as defining objective function, received: {obj_func}, possible: {possible_obj_func}")

# Algorithms

## Definition of central point

In [None]:
def mean_point(selected, fitness):
  return np.mean(selected, axis=0)

def static_weighted_average(selected, fitness):
    weights = np.linspace(1, 0.5, num=len(selected))
    weighted_mean = np.average(selected, axis=0, weights=weights)
    return weighted_mean

def dynamic_weighted_average(selected, fitness):
    weights = fitness / np.sum(fitness)
    weighted_mean = np.average(selected, axis=0, weights=weights)
    return weighted_mean

def median(selected, fitness):
    return np.median(selected, axis=0)

def trimmed_mean(selected, fitness):
    return stats.trim_mean(selected, proportion_to_cut, axis=0)

def trimmed_median(selected, fitness):
    sorted_selected = np.sort(selected, axis=0)
    cut_off = int(proportion_to_cut * len(sorted_selected))
    trimmed = sorted_selected[cut_off:-cut_off]
    return np.median(trimmed, axis=0)

def randomized_weighted_mean(selected, fitness):
    random_weights = np.random.rand(len(selected))
    random_weights /= sum(random_weights)
    return np.average(selected, axis=0, weights=random_weights)

In [None]:
central_point_mapper = {
    "mean": mean_point,
    "static_weight_avg": static_weighted_average,
    "dynamic_weighted_average": dynamic_weighted_average,
    "median": median,
    "trimmed_mean": trimmed_mean,
    "trimmed_median": trimmed_median,
    "randomized_weighted_mean": randomized_weighted_mean
}

In [None]:
print(list(central_point_mapper.keys()))

In [None]:
def central_point_function():
  return central_point_mapper[central_point_function_name]

## Definition of the objective function

In [None]:
def get_obj_function_and_bounds(name, dimension=2):
    """Returns the function and its associated bounds based on the given name."""
    functions = {
        "sphere": (lambda x: -benchmarks.sphere(x)[0], [0, 10]),
        "rastrigin": (lambda x: -benchmarks.rastrigin(x)[0], [-5.12, 5.12]),
        "rosenbrock": (lambda x: -benchmarks.rosenbrock(x)[0], [-2.048, 2.048]),
        "ackley": (lambda x: -benchmarks.ackley(x)[0], [-32.768, 32.768]),
        "griewank": (lambda x: -benchmarks.griewank(x)[0], [-600, 600]),
        "schwefel": (lambda x: -benchmarks.schwefel(x)[0], [-500, 500]),
        "himmelblau": (lambda x: -benchmarks.himmelblau(x)[0], [-6, 6])
    }

    func, single_dim_bounds = functions[name]
    # Expand the bounds to the specified number of dimensions, except for inherently 2D functions
    if name in ["himmelblau", "booth"] and dimension != 2:
        raise ValueError(f"{name} function is typically a 2D function.")

    bounds = [single_dim_bounds] * dimension  # Create a list of bounds for each dimension
    return func, bounds

# Algorithms definition

In [None]:
class EvolutionInterface(ABC):
  def __init__(self, objective_function, bounds, central_point_function, population_size, num_generations):
        self.objective_function = objective_function
        self.bounds = bounds
        self.central_point_function = central_point_function
        self.population_size = population_size
        self.num_generations = num_generations
        self.dimension = len(bounds)  # The length of bounds should determine the number of dimensions.

  @abstractmethod
  def run():
    pass


In [None]:
class EDA(EvolutionInterface):
    def __init__(self, objective_function, bounds, central_point_function, population_size, num_generations, num_best):
        super(EDA).__init__(objective_function, bounds, central_point_function, population_size, num_generations)
        self.num_best = num_best

    def initialize_population(self):
        """Generate an initial population within given bounds for each dimension."""
        return np.array([[np.random.uniform(low, high) for low, high in self.bounds] for _ in range(self.population_size)])

    def evaluate_population(self, population):
        """Evaluate a population using the objective function."""
        return np.array([self.objective_function(ind) for ind in population])

    def select_best_individuals(self, population, fitness):
        """Select the best individuals based on their fitness."""
        indices = np.argsort(-fitness)
        selected = population[indices][:self.num_best]
        selected_fitness = fitness[indices][:self.num_best]
        return selected, selected_fitness


    def estimate_distribution(self, selected, fitness):
        """Use the central point function to calculate distribution parameters."""
        mean = self.central_point_function(selected, fitness)
        std = np.std(selected, axis=0)
        return mean, std

    def sample_new_population(self, distribution_params):
        """Sample a new population based on the distribution parameters."""
        mean, std = distribution_params
        return np.array([np.random.normal(mean, std) for _ in range(self.population_size)])

    def run(self):
        """Run the Estimation of Distribution Algorithm."""
        population = self.initialize_population()
        fitness_history = []

        for generation in range(self.num_generations):
            fitness = self.evaluate_population(population)
            fitness_history.append(np.max(fitness))
            selected, fitness = self.select_best_individuals(population, fitness)
            distribution_params = self.estimate_distribution(selected, fitness)
            population = self.sample_new_population(distribution_params)
            print(f"Generation {generation}: Max fitness = {np.max(fitness)}")

        return population, fitness_history

# Experiments

In [None]:
objective_function, bounds = get_obj_function_and_bounds(obj_func, dim)

# Create EDA instance with custom central point function
eda = EDA(objective_function, bounds, central_point_function(), population_size, num_generations, num_best)
start_time = time.perf_counter()
final_population, fitness_history = eda.run()
end_time = time.perf_counter()

# Analyze

In [None]:
try:
  print("Time of the evaluation:", end_time-start_time)
except Exception:
  print("Cannot calculate time of execution because execution was stopped by force and it was not completed so there is no ending time")
plt.plot(fitness_history)
plt.title('Fitness over Generations')
plt.xlabel('Generation')
plt.ylabel('Max Fitness')
plt.show()