In [10]:
# !pip install deap
# !apt-get install -y xvfb x11-utils
# !pip install gym[box2d]==0.17.* pyvirtualdisplay==0.2.* PyOpenGL==3.1.* PyOpenGL-accelerate==3.1.*

In [11]:
# import pyvirtualdisplay

# _display = pyvirtualdisplay.Display(visible=False, size=(1400, 900))
# _ = _display.start()

In [41]:
from deap import base, creator, tools
import numpy as np
from ga_scheme import eaMuPlusLambda
from copy import deepcopy
from typing import Callable, Optional, List, Tuple
import gym
from gym import wrappers

from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Dense, InputLayer

In [20]:
creator.create("BaseFitness", base.Fitness, weights=(1.0, ))
creator.create("Individual", list, fitness=creator.BaseFitness)

In [43]:
class Experiment:

    def __init__(self, 
            population: int, 
            iterations: int,
            model: Model,
            cross_prob: float = 0.4, 
            mut_prob: float = 0.4,
            tournamet_size: int = 3,
            env = gym.make("CartPole-v1"), 
            engine: base.Toolbox = base.Toolbox()):
        self.population: int = population
        self.iterations: int = iterations
        self.mut_prob = mut_prob
        self.cross_prob = cross_prob 
        self.tournamet_size = tournamet_size

        self.model: Model = model
        self.params = self.model.get_weights()

        self.env = env
        self.env = wrappers.Monitor(env, "./gym-results", force=True)
        self.env.reset()

        self.engine: base.Toolbox = base.Toolbox()
        self.engine.register('map', map)
        self.engine.register("individual", 
            tools.initIterate, 
            creator.Individual, 
            self.factory
        )
        self.engine.register('population', 
            tools.initRepeat, 
            list, 
            self.engine.individual, 
            self.population
        )
        self.engine.register('mutate', self.mutation)
        self.engine.register("mate", self.crossover)
        self.engine.register('select', 
            tools.selTournament, 
            tournsize=tournamet_size
        )
        self.engine.register('evaluate', self.fitness)

    def factory(self) -> creator.Individual:
        individual: List[np.array] = list()
        for i in range(len(self.params)):
            if i % 2 == 0:
                individual.append(np.random.normal(0.1, 0.3, size=self.params[i].shape))
            else:
                individual.append(np.zeros(shape=self.params[i].shape))
        return creator.Individual(individual)

    def mutation(self, individual: np.array) -> Tuple[np.array]:
        for i in range(len(individual)):
            if i % 2 != 0:
                continue
            for j in range(len(individual[i])):
                for k in range(len(individual[i][j])):
                    if np.random.random() < 0.15:
                        individual[i][j] += np.random.normal(0.0, 0.2)
        return individual,

    def compare(self, ind1: creator.Individual, ind2: creator.Individual) -> bool:
        result: bool = True
        for i in range(len(ind1)):
            if i % 2 != 1:
                continue
            for j in range(len(ind1[i])):
                for k in range(len(ind1[i][j])):
                    if ind1[i][j][k] != ind2[i][j][k]:
                        return False
        return result

    def crossover(self, p1: creator.Individual, p2: creator.Individual) \
            -> Tuple[creator.Individual, creator.Individual]:

        c1: List[np.array] = list()
        c2: List[np.array] = list()

        c1.append(deepcopy(p1[0]))
        c1.append(deepcopy(p1[1])) # zero
        c1.append(deepcopy(p2[2]))
        c1.append(deepcopy(p1[3])) # zero
        c1.append(deepcopy(p1[4]))
        c1.append(deepcopy(p1[5])) # zero

        c2.append(deepcopy(p2[0]))
        c2.append(deepcopy(p2[1]))  # zero
        c2.append(deepcopy(p1[2]))
        c2.append(deepcopy(p2[3]))  # zero
        c2.append(deepcopy(p2[4]))
        c2.append(deepcopy(p2[5]))  # zero

        return creator.Individual(c1), creator.Individual(c2)

    def fitness(self, individual: creator.Individual):
        self.model.set_weights(individual)
        scores: List[float] = []
        for _ in range(1):
            state = self.env.reset()
            score = 0.0
            for t in range(200):
                self.env.render()
                act_prob = self.model.predict(state.reshape(1, self.model.input_shape[0])).squeeze()
                action = rnd.choice(np.arange(2), 1, p=act_prob)[0]
                next_state, reward, done, _ = self.env.step(action)
                score += reward
                state = next_state
                if done:
                    break
            scores.append(score)
        return np.mean(scores),
    
    def run(self):
        pop: int = self.engine.population()
        hof: tools.HallOfFame = tools.HallOfFame(self.tournamet_size, similar=self.compare)
        stats: tools.Statistics = tools.Statistics(lambda ind: ind.fitness.values[0])
        stats.register('min', np.min)
        stats.register('max', np.max)
        stats.register('avg', np.mean)
        stats.register('std', np.std)

        pop, log = eaMuPlusLambda(
            pop,
            self.engine,
            mu = self.population,
            lambda_ = int(0.8 * self.population), 
            cxpb = self.cross_prob, 
            mutpb = self.mut_prob,
            ngen = self.iterations, 
            halloffame = hof, 
            stats = stats,
            verbose = True
        )

        best = hof[0]
        print("Best fitness = {}".format(best.fitness.values[0]))
        return log, best

In [24]:
def build_model(dim: Tuple[int, int, int, int]):
    model = Sequential()
    model.add(InputLayer(dim[0]))
    model.add(Dense(dim[1], activation='tanh'))
    model.add(Dense(dim[2], activation='tanh'))
    model.add(Dense(dim[-1], activation='softmax'))
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
model: Model = build_model((4, 20, 12, 2))

experiment = Experiment(
    population=10,
    iterations=10,
    model=model,
    env=gym.make("CartPole-v1")
)

experiment.run()