In [1]:
# import required libraries for EvoMan
from evoman.environment import Environment
from demo_controller import player_controller

# imports other libs
import numpy as np
import os
import random
import sys
import matplotlib.pyplot as plt
from scipy.stats import norm



pygame 2.6.0 (SDL 2.30.7, Python 3.12.6)
Hello from the pygame community. https://www.pygame.org/contribute.html


## Support functions 

In [2]:
# set up simulation
def simulation(env, genome):
    f,p,e,t = env.play(pcont=genome)
    return f

experiment_name = 'generalist1'

#sets up environment
headless = True
if headless:
    os.environ["SDL_VIDEODRIVER"] = "dummy"


experiment_name = 'debugging ai'
if not os.path.exists(experiment_name):
    os.makedirs(experiment_name)

In [3]:
def get_fitness(population):
    n_hidden_neurons = 10
    total_fitness = []
    # buffer whenever we have a population of 1; eg -> Local Search 
    if population.ndim == 1:
        population = [population]
        
    # get fitness for entire population
    for individual in population:
        ind_fitness = []
        for enemy in enemies:
            env = Environment(experiment_name=experiment_name,
                        # multiplemode="no",
                        enemies=[enemy],
                        playermode="ai",
                        player_controller=player_controller(n_hidden_neurons),  
                        enemymode="static",
                        level=2,
                        speed="fastest",
                        visuals=False)
            # get individual fitness
            ind_fitness.append(simulation(env, individual))
        ind_fitness.append(np.mean(ind_fitness))
        # get generalist fitness
        total_fitness.append(ind_fitness)

    return np.array(total_fitness)

def pop_init(n):
    """K-Tournament initialization"""
    population = []
    genome_size = (20 + 1) * 10 + (10 + 1) * 5
    # generate population from iterativer tournaments  
    for _ in range(n):
        # create initalization tournamentn
        tournament = np.random.uniform(-1, 1, size=(5, genome_size))
        tournament_fitness = get_fitness(tournament)
        print("tournament_fitness", tournament_fitness)
        mean_fitness = [tournament_fitness[i][-1] for i in range(len(tournament_fitness))]
        
        # extract tournament winner and add to population
        best = tournament[np.argmax(mean_fitness)]
        population.append(best)
    
    return np.array(population)

In [4]:
def my_levy(u, c = 1.0, mu = 0.0):
    return mu + c / (2.0 * (norm.ppf(1.0 - u))**2)

def swag_mutation(population, fit_values, m_rate, fitness_threshold=None):
    mutated_population = []

    for i in range(len(population)):
        individual = population[i]
        mutated_individual = individual
        # if fitness is below threshold, mutate all genes
        if fit_values[i] < fitness_threshold:
            for j in range(len(individual)):
                if rng.random() < m_rate:
                    u = rng.random()
                    sign = np.random.choice([-1, 1])
                    mutated_individual[j] = sign*my_levy(u)
        # if fitness is above threshold, mutate only one gene
        elif fit_values[i] >= fitness_threshold or fitness_threshold == None:
            j = random.randint(0, len(individual)-1)
            u = rng.random()
            sign = np.random.choice([-1, 1])
            mutated_individual[j] = sign*my_levy(u)

        # append mutated individual to the mutated population
        mutated_population.append(mutated_individual)

    return np.array(mutated_population)

In [10]:
def adaptive_ensemble_mutation(population, p_mutation, nr_gen, p_gaussian):
    """mutation operator that applies gaussian mutation and cauchy distribution adaptively"""
    #initialize variables
    print("test first line for p gaussian", p_gaussian)
    gaussian_rows = int(p_gaussian*population.shape[0])
    gaussian_indx = np.random.choice(population.shape[0],gaussian_rows,replace=False)
    gaussian_pop = population[gaussian_indx]
    cauchy_pop = np.array([individual for individual in population.tolist() if individual not in gaussian_pop.tolist()])
    success_counter = 0

    print("test in adaptive ensemble")

    #mutation per operator_subgroup
    for index, g_ind in  enumerate(gaussian_pop):
        for gene in range(len(g_ind)):
            if p_mutation > random.uniform(0,1):
                gaussian_pop[index][gene]+=np.random.normal(0,1) 
    for index, c_ind in enumerate(cauchy_pop):
        for gene in range(len(c_ind)):
            if p_mutation > random.uniform(0,1):                     #still have to debug
                cauchy_pop[index][gene]+=1+(np.arctan(np.random.standard_cauchy())/np.pi)

    print("test 2 in adaptive ensemble")
    #form new population and evaluate top percent 
    population = np.concatenate((gaussian_pop,cauchy_pop))
    if nr_gen > 10:
        print("inside if loop")
        fitness_list = get_fitness(population)
        fitness_pop=fitness_list[-1]
        sorted_fitness_ind = sorted(range(len(fitness_pop)), key=lambda x:fitness_pop[x], reverse=True)
        elites = population[sorted_fitness_ind[:int(0.2*population.shape[0])]]
        for element in elites:
            if element in gaussian_pop:
                success_counter+=1
        p_gaussian = success_counter/len(elites)

    print("p_gaussian=", p_gaussian)
    # p_gaussian=0.3
    return population, p_gaussian

def get_max_range(population):
    max_range =- 1
    for index,individual in enumerate(population):
        for other in population[index+1:]:
            range = abs(np.linalg.norm(individual-other))
            if range > max_range:
                max_range = range
    return max_range


# fitness sharing
def shared_fitness_function(population, fitness_values, current_gen, max_gen):
    """function that penalizes fitness if individual is too similar"""
    k = 5 / max_gen
    s_fit_values=[]
    max_range = get_max_range(population)
    initial_threshold = max_range * 0.2
    final_threshold = max_range * 0.03
    threshold = final_threshold + (initial_threshold-final_threshold)*np.exp(-k*current_gen)
    
   #loop over individual in population and the possible pairs
    for value in range(population.shape[0]):
        s_fit_pen =  0
        for other in range(population.shape[0]):
            if other != value:
                
                #calculates euclidean distance between individual and candidate for niche
                euc = np.linalg.norm(population[value]-population[other])
                if euc < threshold:
                    
                    #sums penalisation
                    s_fit_pen += (1-(euc / threshold))
        
        #calculates new value
        if s_fit_pen > 0:
            s_fit_value = fitness_values[value] / s_fit_pen
            s_fit_values.append(s_fit_value)
        else:
            s_fit_values.append(fitness_values[value])
    return np.array(s_fit_values)

def get_diversity(population):
    """calculates how diverse each individual is"""
    mean_ind = np.mean(population,axis=0)
    diversity_list = []
    for individual in population:
        diversity = abs(np.linalg.norm(individual-mean_ind))
        diversity_list.append(diversity)
    return diversity_list

def two_archives_survival(parents, children, p_fitness,current_gen, max_gen):
    """function that performs survivor selection proportional to p_fitness based on fitness and otherwise based on diversity"""
    
    survivors=[]
    ind = 0
    final_fit=[]
    #concatenates parents and children into one array, calculates fitness and diversity
    total_pop = np.concatenate((parents,children))
    diversity_p = get_diversity(total_pop)
    total_pop_fit = get_fitness(total_pop)
    total_shared = np.array(shared_fitness_function(total_pop,total_pop_fit,current_gen,max_gen))
    
    #sorts the fitnesses and diversity and returns indeces
    fitness_sorted = np.argsort(total_shared[:, -1])[::-1]
    diversity_sorted = np.argsort(diversity_p)[::-1]
    #loop that chooses survivors proportionally to p_fitness from fitness or else from diversity
    while len(survivors)<parents.shape[0]:
        if np.random.uniform(0,1) < p_fitness and total_pop[fitness_sorted[ind]].tolist() not in [s.tolist() for s in survivors]:
            survivors.append(total_pop[fitness_sorted[ind]])
            final_fit.append(fitness_sorted[ind])
        elif total_pop[diversity_sorted[ind]].tolist() not in [s.tolist() for s in survivors]:
                survivors.append(total_pop[diversity_sorted[ind]])
                final_fit.append(fitness_sorted[ind])
        else:
            if np.random.uniform(0,1)<0.5:
                survivors.append(total_pop[fitness_sorted[ind]])
                final_fit.append(fitness_sorted[ind])
            else:
                survivors.append(total_pop[diversity_sorted[ind]])   
                final_fit.append(fitness_sorted[ind])
        ind+=1
    
    return np.array(survivors), final_fit


In [6]:
def gaussian_perturb(genome, k):
    """Make a random perturbation to explore local space"""
    for _ in range(k):
        locus = np.random.randint(0, len(genome)) # select a random location to perturbate the genome 
        noise = np.random.normal(0, 0.5) # random noise form a N(0,1/2)
        # modify current solution by searching new solution in the neibourhood
        genome[locus] =+ noise
    
    return genome

def simulated_annealing(initial_solution, initial_fit, temp_start, temp_min, cooling_rate):
    """Local search inspired by the Annealiation process"""
    current_solution, best_solution = initial_solution, initial_solution
    current_fitness, best_fitness = initial_fit, initial_fit
    temperature = temp_start

    while temperature > temp_min:
        # generate a new neighbour solution through gaussina perturbation
        new_solution = gaussian_perturb(current_solution, 2)
        new_fitness = get_fitness(new_solution)

        # get the difference between average fitness
        delta = new_fitness[0, -1] - current_fitness[0, -1]

        # check if new solution is better than current solution
        if delta > 0:
            current_solution = new_solution
        else:
            # if new solution is worse accept it with a probability that decreases with cooling
            # encourages exploration early on 
            accept_prob = np.exp(-delta / temperature)
            if np.random.rand() < accept_prob:
                current_solution = new_solution

        # update best solution
        if new_fitness[0, -1] > best_fitness[0, -1]:
            best_solution = new_solution 
            best_fitness = new_fitness
        
        # decrease temperature
        temperature = temperature * cooling_rate
    
    return best_solution, best_fitness


def vns_local_search(population, fitness_values, max_k):
    """
    Variable Neighborhood Search using Gaussian perturbation
    and Simulated Annealing.
    """
    MAX_ITER = 6
    size_fit = fitness_values.shape[1]
    # create new population and their fitness
    improved_population = []
    imporved_fitness =[]

    for i in range(len(population)):
        # set current best solutions for ith solution
        best_solution = population[i]
        best_fitness = fitness_values[i]
        iter = 0 

        while ((best_fitness[-1] - fitness_values[i][-1]) < 5) and (iter < MAX_ITER):
            current_solution = best_solution
            current_fitness = best_fitness
            k = 1

            while k <= max_k:
                # generate a neighbor solution 
                neighbor_solution = gaussian_perturb(current_solution, k)
                neighbor_fit = get_fitness(neighbor_solution)

                # imporve neighbor solution and evaluate 
                improved_neighbor, improved_fit = simulated_annealing(neighbor_solution, neighbor_fit, 100, 1 ,0.95) 

                # if imporved solution is better update current solution
                if improved_fit[0, -1] > current_fitness[-1]:
                    current_solution = improved_neighbor
                    current_fitness = improved_fit.reshape(size_fit)

                    # if imporved_solution is better than best update best
                    if improved_fit[0, -1] > best_fitness[ -1]:
                        best_solution = improved_neighbor
                        best_fitness = improved_fit.reshape(size_fit)
                        # reset to the first neighborbood 
                        k = 1 
                else:
                    # move to the next neighborhood 
                    k += 1
            # increate iteration of outer loop -> termination criteria 
            iter += 1
        
        # append newly found best solution for ith solution
        improved_population.append(best_solution)
        imporved_fitness.append(best_fitness)
    
    return np.array(improved_population), np.array(imporved_fitness)

def survival_selection():
    pass

def multi_adaprive_mutation(population, fit_values, m_rate):
    """Multi adaptive mutatoion operator"""
    mutated_pop = []
    gen_proport = gen /max_gen # generation proportionality 
    for i in range(len(population[0])):

        solution = np.copy(population[i])

        # choose number of genes to mutate proportional to generation
        k = 10 - int(9*(gen + max_gen*0.5)/max_gen)

        # iterate over k genes
        for _ in range(k):
            # check if mutation occurs 
            if m_rate > np.random.rand():
                # select random gene
                locus = np.random.randint(0, len(solution))
                #choose sigma proportional to fitness of individual 
                # higher fit = lower sigma, lower fit = higher sigma
                sigma = 2 - 1.2*(fit_values[i]/100)

                #choose distribution proportional to generation
                # if earlier in the generation more likely to explore 
                if gen_proport < np.random.rand():
                    noise = np.random.laplace(0, sigma)
                # if later on the generations more likely to exploit 
                else:
                    noise = np.random.normal(0, sigma)
                
                # perturb gene
                solution[locus] += noise
        
        # add mutated solution to mutated population
        mutated_pop.appedn(solution)
    
    return np.array(mutated_pop)

In [None]:
# testing mutation
population = np.random.uniform(-1, 1, size=())

## Evo Strategies

In [7]:
def evo_strategies(pop_size, m_rate, max_gen, max_k, mutation_operator, survival_selection ,enemies, **kwargs):
    """Evolution strategy algorithm made for testing different mutation operators"""
    enemies = enemies 
    if kwargs.get("p_gaussian"): p_gaussian = kwargs.get("p_gaussian") 

    # initialize population
    population = pop_init(pop_size)
    fit_values = get_fitness(population)

    # get best individuals
    idx = np.argmax(x[-1] for x in fit_values)
    best_ind = [population[idx]]
    best_fit = [fit_values[idx][-1]]

    # start evos trategies
    for gen in range(max_gen):
        # mutate population and evaluate
        if mutation_operator == adaptive_ensemble_mutation:
           mutated, p_gaussian = adaptive_ensemble_mutation(population, m_rate, gen + 1, p_gaussian=p_gaussian)
           fit_mutated = get_fitness(mutated)
        else:    
            mutated = mutation_operator(population, fit_values, m_rate, **kwargs)
            fit_mutated = get_fitness(mutated)

        # memetic algorithm -> life improvement of individuals 
        improved_mutated, fit_imporved = vns_local_search(mutated, fit_mutated, max_k)
        # survival selection 
        population, fit_values = survival_selection(population, fit_values, improved_mutated, fit_imporved, **kwargs)
        
        # record best solution 
        idx = np.argmax(x[-1] for x in fit_values)
        gen_best = population[idx]
        gen_best_fitness = fit_values[idx][-1]
        # check if gen best is highest 
        if gen_best_fitness > best_fit[-1]:
            best_ind.append(gen_best)
            best_fit.append(gen_best_fitness)
        else:
            best_ind.append(best_ind[-1])
            best_fit.append(best_fit[-1])

        # termination condition
        if best_fit[-1] > 95:
            break
    
    return best_ind, best_fit 

# Training 

In [None]:
# Testing the first operator 
enemies = [1, 2, 3]
solutions, fitness = evo_strategies(50, 0.1, 300, 5, adaptive_ensemble_mutation, two_archives_survival, enemies, p_gaussian = 0.3)

## Hyperparameter tuning
Hyperparameter tuning using Bayesian Optimization. The Tunning will be done 3-4 parameters at a time and for each of the mutation operators.

#### Adaptive_ensemble_Mutation + _rate + pop_size + max_gen

In [20]:
#module for optimization
from bayes_opt import BayesianOptimization, UtilityFunction
# module for logging data 
from bayes_opt.logger import JSONLogger
from bayes_opt.event import Events
# module for retriving datat 
from bayes_opt.util import load_logs

# logger listes to Events.OPTIMIZATION_STEP
# logger will not look back at previous probede points `

# bounded parameter regions 
pbounds = {"pop_size" : (50, 250), "m_rate" : (0.01, 0.5), "max_gen": (50, 150), "p_gaussian": (0.01, 0.5)}

# define wrapped funciton
def evo_strategies_wrapper(pop_size, m_rate, max_gen, p_gaussian):
    max_k = 5
    mutation_operator = adaptive_ensemble_mutation
    survival_selection = two_archives_survival
    enemies = [4, 5, 6, 7]
    pop_size = int(round(pop_size))
    max_gen = int(round(max_gen))

    return evo_strategies(pop_size, m_rate, max_gen, max_k=max_k, mutation_operator=mutation_operator,
                         survival_selection=survival_selection, enemies=enemies, p_gaussian=p_gaussian)

# create instance of optimizer 
optimizer1 = BayesianOptimization(
    f = evo_strategies_wrapper,
    pbounds = pbounds,
    random_state = 1
)

# create UtilityFunction object for aqu. function
utility = UtilityFunction(kind = "ei", xi= 0.02)

# set gaussian process parameter
optimizer1.set_gp_params(alpha = 1e-6)

# create logger 
logger = JSONLogger(path = "./tunning1.log")
optimizer1.subscribe(Events.OPTIMIZATION_STEP, logger)

# initial search 
optimizer1.maximize(
    init_points = 7, # number of random explorations before bayes_opt
    n_iter = 15, # number of bayes_opt iterations
)

# print out the data from the initial run to check if bounds need update 
for i, param in enumerate(optimizer1.res):
    print(f"Iteration {i}: \n\t {param}")

# get best parameter
print(optimizer1.max)


MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.

MESSAGE: Pygame initialized for simulation.
tournament_fitness [[-5.37063803 -6.08677473 -4.62497281 -5.75257264 -5.45873955]
 [21.49061166 87.6

KeyboardInterrupt: 

In [None]:
# updating bounds after initial search 
optimizer1.set_bounds(new_bounds={})

# run optimizer again with new bounds set 
optimizer1.maximize()

# get best parameter
print(optimizer1.max)


# loading logged data into new instnace of optimizer 
new_optimizer = BayesianOptimization(
    f = evo_strategies,
    pbounds = pbounds,
    verbose = 2,
    random_state = 7,
)
print(len(new_optimizer.space))
# load previous data 
load_logs(new_optimizer, logs = ["./logs/log"])
print("New optimizer is not aware of {} points".format(len(new_optimizer.space)))

#### Swag_mutation + m_rate + max_k

In [None]:

# bounded parameter regions 
pbounds = {"max_k" : (50, 250), "m_rate" : (0.01, 50)}

# define wrapped funciton
def evo_strategies_wrapper(max_k, m_rate, ):
    pop_size = 50 
    max_gen = 50
    mutation_operator = swag_mutation
    survival_selection = two_archives_survival
    enemies = [4, 5, 6, 7]
    p_gaussian = 0.3
    return evo_strategies(pop_size, m_rate, max_gen, max_k=max_k, mutation_operator=mutation_operator,
                         survival_selection=survival_selection, enemies=enemies, p_gaussian=p_gaussian)

# create instance of optimizer 
optimizer2 = BayesianOptimization(
    f = evo_strategies_wrapper,
    pbounds = pbounds,
    random_state = 1
)

# create UtilityFunction object for aqu. function
utility = UtilityFunction(kind = "ei", xi= 0.02)

# set gaussian process parameter
optimizer1.set_gp_params(alpha = 1e-6)

# create logger 
logger = JSONLogger(path = "./tunning2.log")
optimizer1.subscribe(Events.OPTIMIZATION_STEP, logger)

# initial search 
optimizer1.maximize(
    init_points = 7, # number of random explorations before bayes_opt
    n_iter = 15, # number of bayes_opt iterations
)

# print out the data from the initial run to check if bounds need update 
for i, param in enumerate(optimizer2.res):
    print(f"Iteration {i}: \n\t {param}")

# get best parameter
print(optimizer1.max)

#### Multi_adaptive_mutation + m_rate

In [None]:
multi_adaprive_mutation

# bounded parameter regions 
pbounds = { "m_rate" : (0.01, 50)}

# define wrapped funciton
def evo_strategies_wrapper(m_rate):
    pop_size = 50 
    max_gen = 50
    max_k = 3
    mutation_operator = multi_adaprive_mutation
    survival_selection = two_archives_survival
    enemies = [4, 5, 6, 7]
    p_gaussian = 0.3
    return evo_strategies(pop_size, m_rate, max_gen, max_k, mutation_operator=mutation_operator,
                         survival_selection=survival_selection, enemies=enemies, p_gaussian=p_gaussian)

# create instance of optimizer 
optimizer3 = BayesianOptimization(
    f = evo_strategies_wrapper,
    pbounds = pbounds,
    random_state = 1
)

# create UtilityFunction object for aqu. function
utility = UtilityFunction(kind = "ei", xi= 0.02)

# set gaussian process parameter
optimizer3.set_gp_params(alpha = 1e-6)

# create logger 
logger = JSONLogger(path = "./tunning3.log")
optimizer1.subscribe(Events.OPTIMIZATION_STEP, logger)

# initial search 
optimizer3.maximize(
    init_points = 7, # number of random explorations before bayes_opt
    n_iter = 15, # number of bayes_opt iterations
)

# print out the data from the initial run to check if bounds need update 
for i, param in enumerate(optimizer3.res):
    print(f"Iteration {i}: \n\t {param}")

# get best parameter
print(optimizer1.max)