In [None]:
!pip install pygame
!pip install pygad

!git clone https://github.com/karinemiras/evoman_framework.git tmp
!cp -r /kaggle/working/tmp/* /kaggle/working/
!rm -R /kaggle/working/tmp

In [None]:
import sys, os
import numpy as np
import pandas as pd

import random

from evoman.environment import Environment
from evoman.controller import Controller
from pygad.kerasga import model_weights_as_vector,model_weights_as_matrix
from deap import base, creator, tools, algorithms
import multiprocessing

In [None]:
def sigmoid_activation(x):
	return 1./(1.+np.exp(-x))


class player_controller(Controller):
	def __init__(self, _n_hidden):
		self.n_hidden = [_n_hidden]

	def set(self,controller, n_inputs):
		# Number of hidden neurons
		
		if self.n_hidden[0] > 0:
			# Preparing the weights and biases from the controller of layer 1

			# Biases for the n hidden neurons
			self.bias1 = controller[:self.n_hidden[0]].reshape(1, self.n_hidden[0])
			# Weights for the connections from the inputs to the hidden nodes
			weights1_slice = n_inputs * self.n_hidden[0] + self.n_hidden[0]
			self.weights1 = controller[self.n_hidden[0]:weights1_slice].reshape((n_inputs, self.n_hidden[0]))

			# Outputs activation first layer.


			# Preparing the weights and biases from the controller of layer 2
			self.bias2 = controller[weights1_slice:weights1_slice + 5].reshape(1, 5)
			self.weights2 = controller[weights1_slice + 5:].reshape((self.n_hidden[0], 5))

	def control(self, inputs, controller):
		# Normalises the input using min-max scaling
		inputs = (inputs-min(inputs))/float((max(inputs)-min(inputs)))
		
		if self.n_hidden[0]>0:
			# Preparing the weights and biases from the controller of layer 1

			# Outputs activation first layer.
			output1 = sigmoid_activation(inputs.dot(self.weights1) + self.bias1)

			# Outputting activated second layer. Each entry in the output is an action
			output = sigmoid_activation(output1.dot(self.weights2)+ self.bias2)[0]
		else:
			bias = controller[:5].reshape(1, 5)
			weights = controller[5:].reshape((len(inputs), 5))

			output = sigmoid_activation(inputs.dot(weights) + bias)[0]

		# takes decisions about sprite actions
		if output[0] > 0.5:
			left = 1
		else:
			left = 0

		if output[1] > 0.5:
			right = 1
		else:
			right = 0

		if output[2] > 0.5:
			jump = 1
		else:
			jump = 0

		if output[3] > 0.5:
			shoot = 1
		else:
			shoot = 0

		if output[4] > 0.5:
			release = 1
		else:
			release = 0

		return [left, right, jump, shoot, release]

In [None]:
def parameter_count(hidden_neurons):
    if hidden_neurons>0:
        n_w = (20*hidden_neurons) + (hidden_neurons*5)
        n_b = hidden_neurons + 5
        return n_w + n_b
    else:
        return (20*5)+5
    
def fitness_function(solution):
    return (env.play(np.array(solution))[0],)

def cx_function(parent1,parent2,alpha):
    child = (alpha*np.array(parent1))+((1-alpha)*np.array(parent2))
    return creator.Individual(child)

In [None]:
def steady_state(pop_size,n_offspring,tournsize_parent,cxpb,alpha,mutpb,sigma,indpb,tournsize_survival,ngen,verbose):
    
    pop = toolbox.population(n=pop_size)
    hof = tools.HallOfFame(1)
    
    fitnesses = toolbox.map(toolbox.evaluate, pop)
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit
    hof.update(pop)
    
    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

    record = stats.compile(pop) if stats else {}
    logbook.record(gen=0, nevals=len(pop), **record)
    if verbose==1:
        print(logbook.stream)
    
    for g in range(ngen):
    
        # Select parents
        parents = tools.selTournament(individuals=pop,k=2*n_offspring*pop_size,tournsize=tournsize_parent)

        # Generate offspring with crossover
        offspring = []
        for parent1, parent2 in zip(parents[::2], parents[1::2]):
            if np.random.uniform() < cxpb:
                if alpha=="random":
                    child = cx_function(parent1, parent2,np.random.uniform())
                else:
                    child = cx_function(parent1, parent2,alpha)
            else:
                child = random.sample([parent1,parent2],1)[0]
            offspring.append(child)

        # Apply mutation on the offspring
        for mutant in offspring:
            if np.random.uniform() < mutpb:
                if indpb=="random":
                    tools.mutGaussian(mutant,mu=0,sigma=sigma,indpb=np.random.uniform())
                else:
                    tools.mutGaussian(mutant,mu=0,sigma=sigma,indpb=indpb)
                

        # Evaluate the offspring
        fitnesses = toolbox.map(toolbox.evaluate, offspring)
        for ind, fit in zip(offspring, fitnesses):
            ind.fitness.values = fit
            
        # Add offspring to the population
        pop = pop + offspring
        hof.update(pop)
        # Survival Selection
        pop = tools.selTournament(individuals=pop,k=pop_size,tournsize=tournsize_survival)

        record = stats.compile(pop) if stats else {}
        logbook.record(gen=g+1, nevals=len(pop), **record)
        if verbose==1:
            print(logbook.stream)
    
    return hof, logbook

In [None]:
def generational_model(pop_size,n_offspring,tournsize_parent,cxpb,alpha,mutpb,sigma,indpb,tournsize_survival,ngen,verbose):
    
    pop = toolbox.population(n=pop_size)
    hof = tools.HallOfFame(1)
    
    fitnesses = toolbox.map(toolbox.evaluate, pop)
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit
    hof.update(pop)
    
    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

    record = stats.compile(pop) if stats else {}
    logbook.record(gen=0, nevals=len(pop), **record)
    if verbose==1:
        print(logbook.stream)
    
    for g in range(ngen):
    
        # Select parents
        parents = tools.selTournament(individuals=pop,k=n_offspring*2*pop_size,tournsize=tournsize_parent)

        # Generate offspring with crossover
        offspring = []
        for parent1, parent2 in zip(parents[::2], parents[1::2]):
            if np.random.uniform() < cxpb:
                if alpha=="random":
                    child = cx_function(parent1, parent2,np.random.uniform())
                else:
                    child = cx_function(parent1, parent2,alpha)
            else:
                child = random.sample([parent1,parent2],1)[0]
            offspring.append(child)

        # Apply mutation on the offspring
        for mutant in offspring:
            if np.random.uniform() < mutpb:
                if indpb=="random":
                    tools.mutGaussian(mutant,mu=0,sigma=sigma,indpb=np.random.uniform())
                else:
                    tools.mutGaussian(mutant,mu=0,sigma=sigma,indpb=indpb)

        # Evaluate the offspring
        fitnesses = toolbox.map(toolbox.evaluate, offspring)
        for ind, fit in zip(offspring, fitnesses):
            ind.fitness.values = fit
        
        offspring = tools.selTournament(individuals=offspring,k=pop_size,tournsize=tournsize_survival)
        pop = offspring
        hof.update(pop)
        
        record = stats.compile(pop) if stats else {}
        logbook.record(gen=g+1, nevals=len(pop), **record)
        if verbose==1:
            print(logbook.stream)
    
    return hof, logbook

In [None]:
def simulation(enemy_no,algorithm,n_simulation,hidden_neurons,params):
    
    individual_size = parameter_count(hidden_neurons)
    
    headless = True
    if headless:
        os.environ["SDL_VIDEODRIVER"] = "dummy"
    
    global env,toolbox,stats
    env = Environment(playermode="ai",
                      player_controller=player_controller(hidden_neurons),
                      speed="fastest",
                      enemymode="static",
                      enemies=enemy_no,
                      level=2,
                      visuals=False,
                      logs="off")
    
    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
    creator.create("Individual", list, fitness=creator.FitnessMax)

    toolbox = base.Toolbox()
    toolbox.register("weight_bin", np.random.uniform,-1,1)
    toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.weight_bin, n=individual_size)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", fitness_function)
    pool = multiprocessing.Pool()
    toolbox.register("map", pool.map)

    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("Mean", np.mean)
    stats.register("Max", np.max)
    stats.register("Std", np.std)
    
    results = []
    if algorithm=="steady_state":
        for iteration in range(n_simulation): 
            hof,logs = steady_state(**params)
            for i in range(len(logs)):
                logs[i]['iteration']=iteration
            for i in range(len(logs)):
                logs[i]['enemy']=enemy_no
            results = results + logs
            
    elif algorithm=="generational_model":
        for iteration in range(n_simulation): 
            hof,logs = generational_model(**params)
            for i in range(len(logs)):
                logs[i]['iteration']=iteration
            for i in range(len(logs)):
                logs[i]['enemy']=enemy_no
            results = results + logs
            
    return pd.DataFrame(results)

In [None]:
import optuna

def objective(trial):
    
    params = {'pop_size': trial.suggest_int("pop_size", 100,100,step=100),
              'n_offspring': trial.suggest_int("n_offspring", 1,3,step=1),
              'tournsize_parent': trial.suggest_int("tournsize_parent", 1, 3, step=1),
              'cxpb': trial.suggest_float("cxpb", 0.5, 1., step=0.1),
              'alpha': trial.suggest_categorical("alpha",["random",0.5]),     
              'mutpb': trial.suggest_float("mutpb", 0.1, 0.5, step=0.1),
              'sigma': trial.suggest_float("sigma", 0.5, 2.5, step=0.5),
              'indpb': trial.suggest_categorical("indpb",["random",0.01,0.1,0.2]),
              'tournsize_survival': trial.suggest_int("tournsize_survival", 1, 3, step=1),
              'ngen':20,'verbose':1}
    
    hidden_neurons = 10
    enemy_no = "6"
    algorithm = "steady_state"
    n_simulation = 3
    
    logs_pd = simulation(enemy_no,algorithm,n_simulation,hidden_neurons,params)
    
    return np.mean(logs_pd['Max'])

In [None]:
study = optuna.create_study(study_name="steady_state", direction="maximize")

study.optimize(objective, n_trials=100)

In [None]:
#Trial 10 finished with value: 78.97733046879364 and parameters: {'pop_size': 100, 'n_offspring': 3, 'tournsize_parent': 2, 'cxpb': 0.5, 'alpha': 0.5, 'mutpb': 0.30000000000000004, 'sigma': 0.5, 'indpb': 0.01, 'tournsize_survival': 3}

In [None]:
%%time
hidden_neurons = 10
n_simulation = 3
enemy_no = "6"
algorithm = "steady_state"
params={"pop_size":100,
        "n_offspring":2,
        "tournsize_parent":2,
        "cxpb":1,
        "alpha":"random",
        "mutpb":0.2,
        "sigma":1,
        "indpb":"random",
        "tournsize_survival":2,
        "ngen":30,
        "verbose":1}

logs_pd = simulation(enemy_no,algorithm,n_simulation,hidden_neurons,params)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# A more verbose but explicit version of the above not relying on defaults:
sns.lineplot(data=logs_pd, x='gen', y='Mean', estimator='mean', errorbar=('ci', 95))
sns.lineplot(data=logs_pd, x='gen', y='Max', estimator='mean', errorbar=('ci', 95))
plt.legend(labels=["mean","mean_ci", "max", "max_ci"], loc="lower right")

In [None]:
logs_pd.groupby(['gen','enemy'])[['Mean','Max','Std']].mean()

In [None]:
logs_pd.groupby(['gen','enemy'])[['Mean','Max']].std()

In [None]:
n_simulation = 10
enemy_no = "6"
algorithm = "generational_model"

logs_pd = simulation(enemy_no,algorithm,n_simulation)

In [None]:
headless = True
if headless:
    os.environ["SDL_VIDEODRIVER"] = "dummy"

env = Environment(playermode="ai",
				  player_controller=player_controller(hidden_neurons),
			  	  speed="fastest",
				  enemymode="static",
                  enemies=enemy_no,
				  level=2,
                  visuals=False,
                  logs="off")

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("weight_bin", np.random.uniform,-1,1)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.weight_bin, n=individual_size)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", fitness_function)
pool = multiprocessing.Pool()
toolbox.register("map", pool.map)

stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("Mean", np.mean)
stats.register("Max", np.max)
stats.register("Std", np.std)

In [None]:
pop_size=10
hof_size=1
cxpb=1
mutpb=0.2
ngen=10

results = []
for iteration in range(n_simulation): 
    hof,logs = steady_state(pop_size,hof_size,cxpb,mutpb,ngen)
    for i in range(len(logs)):
        logs[i]['iteration']=iteration
    for i in range(len(logs)):
        logs[i]['enemy']=enemy_no
    results = results + logs

In [None]:
pop_size=200
hof_size=1
cxpb=1
mutpb=0.2
ngen=30

hof,logs = steady_state(pop_size,hof_size,cxpb,mutpb,ngen)
for i in range(len(logs)):
    logs[i]['iteration']=0
for i in range(len(logs)):
    logs[i]['enemy']=enemy_no

In [None]:
pop_size=200
hof_size=1
cxpb=1
mutpb=0.2
ngen=25
n_offspring = 500


hof,logs = generational_model(pop_size,hof_size,cxpb,mutpb,ngen,n_offspring)

In [None]:
"""
def generate_individual(ind_class, size):
    initializer = tf.keras.initializers.HeUniform()
    individual = initializer(shape=(size,))
    return ind_class(individual.numpy().tolist())

toolbox.register('individual' ,generate_individual, creator.Individual, size=individual_size)
"""