# Neuro evolution

By Juan-Pablo Silva (https://github.com/juanpablos) jpsilva@dcc.uchile.cl, Alexandre Bergel (abergel@dcc.uchile.cl) and Alejandro Infante (ainfante@dcc.uchile.cl).

## Install packages and set environments

We will be using the [Open AI Gym learning environment](https://github.com/openai/gym) as our base to executing games, and the [NEAT Python](https://neat-python.readthedocs.io/en/latest/) package for a python implementation of the NEAT (NeuroEvolution of Augmenting Topologies) algorithm.

Because Google Colab does not have rendering hardware, not a physical screen, we will have to workaround this by creating a virtual screen and rendering a video. The original code for this was done by William Xu, and described in the following blog post: https://star-ai.github.io/Rendering-OpenAi-Gym-in-Colaboratory/.

Install system packages.

In [3]:
!apt-get update > /dev/null 2>&1
!apt-get install cmake > /dev/null 2>&1
!apt-get install pkg-config lua5.1 build-essential git > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

Install python packages.

In [4]:
!pip install gym==0.13.1 pyvirtualdisplay > /dev/null 2>&1
!pip install tqdm gym-retro > /dev/null 2>&1
!pip install -U git+git://github.com/frenchie4111/dumbrain.git > /dev/null 2>&1
!pip install --upgrade setuptools 2>&1
!pip install ez_setup > /dev/null 2>&1
!pip install box2d-py > /dev/null 2>&1
!pip install neat-python cloudpickle opencv-python > /dev/null 2>&1
!pip install gym[all] > /dev/null 2>&1



In [5]:
# The gym package to import our games
import gym
from gym import logger as gymlogger
from gym.wrappers import Monitor
gymlogger.set_level(40) #error only

# The python neat package
import neat

# Plotting
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

# Utilities and rendering
import numpy as np
import random
import math
import time
import pickle
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay

## The NEAT package

The Python NEAT package provides a general framework for using genetic algorithms applied to neural networks. To use it we need to import the `neat` package and define a `population`, `fitness function` and a `configuration file`. The first 2 are defined in python. The config file is a separate file that contains several parameters and values to fully customize the network and how it will be mutating over time. We will not be discussing this in the session and are providing you with all the configuration files you need. In case you want to know more details, please read the [documentation](https://neat-python.readthedocs.io/en/latest/config_file.html) on this topic.
Feel free to modify the configuration files we provide to see what can each option change in the process.

Download the configuration files.

In [6]:
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-bipedal-walker > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-cart-pole > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-lunar > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-lunar-rec > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-mountain > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-mountain-rec > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-xor > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/config-and > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/winner_bipedal_pre.pkl > /dev/null 2>&1

### Learning with NEAT
 When we use the NEAT algorithm, we have to understand that we are using an AI, to create another AI. We are not training on a dataset, nor teaching the network how it should perform. Based on evaluations and feedback (fitness), the NEAT algorithm is capable of optimizing the parameters and topology (how many layers, how many neurons, and what neurons and connected to which) of a population of neural networks to solve a particular problem.

 Let's take a look at how we would implement the AND logic gate that we used in Day 1.

First let's define a function to let us visualize how the fitness evolved, and some cool statistics about the generation process.

In [7]:
def plot_neat_info(statistics, ylog=False, view=False, filename='neat.png'):
    # general stats
    generation = range(len(statistics.most_fit_genomes))
    best_fitness = [c.fitness for c in statistics.most_fit_genomes]
    avg_fitness = np.array(statistics.get_fitness_mean())
    stdev_fitness = np.array(statistics.get_fitness_stdev())

    # species
    species_sizes = statistics.get_species_sizes()
    num_generations = len(species_sizes)
    curves = np.array(species_sizes).T

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 5))

    ax1.plot(generation, avg_fitness, 'b-', label="average")
    ax1.plot(generation, avg_fitness - stdev_fitness, 'g-.', label="-std")
    ax1.plot(generation, avg_fitness + stdev_fitness, 'g-.', label="+std")
    ax1.plot(generation, best_fitness, 'r-', label="best")
    ax1.set_title("Population's average and best fitness")
    ax1.set_xlabel("Generations")
    ax1.set_ylabel("Fitness")
    ax1.grid()
    ax1.legend(loc="best")

    ax2.stackplot(range(num_generations), *curves)
    ax2.set_title("Speciation")
    ax2.set_ylabel("Size per Species")
    ax2.set_xlabel("Generations")

    if ylog:
        ax1.gca().set_yscale('symlog')

    fig.suptitle('NEAT traning stats')

    plt.savefig(filename)
    if view:
        plt.show()

    plt.close()

Here we define the AND logic gate inputs and the fitness function.

In [8]:
# Let's define a function for it to be more flexible
def run_and():
    # the inputs and outputs of the logic gate
    and_inputs = [[0.0, 0.0], [0.0, 1.0], [1.0, 0.0], [1.0, 1.0]]
    and_outputs = [[0.0], [0.0], [0.0], [1.0]]

    # This would be our fitness function
    # what we are evaluating here is how good our generated networks
    # are representing the output values.
    # In this case a perfect score would be 0, as in 0 errors.
    # For each mistake the network makes, we will be subtracting
    # the square error of the output (just like on a traditional MSE loss).
    # The worst score possible would be -4.
    def eval_genomes(genomes, config):
        # The neat package gives us a list of all individuals
        # so we have to test them all
        for genome_id, genome in genomes:
            # Each individual represents a network structure and weights
            # so we just need to create a neural network out of them
            net = neat.nn.FeedForwardNetwork.create(genome, config)
            # Let's make each network start with a fitness of 0.
            # This is an arbitrary decision. We could have used a starting
            # fitness of 4.0, and make the best score 4, and the min 0.
            genome.fitness = 0.
            # Testing the individual performance
            for x, y in zip(and_inputs, and_outputs):
                # Get an output from the network
                output = net.activate(x)
                # MSE and setting the fitness
                genome.fitness -= (output[0] - y[0]) ** 2

    def run(config_file):
        # Load the configuration files.
        # Generally you won't need to touch this
        config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                             neat.DefaultSpeciesSet, neat.DefaultStagnation,
                             config_file)

        # Create the population.
        pop = neat.Population(config)

        # Make NEAT to print some nice stat values as it trains
        stats = neat.StatisticsReporter()
        pop.add_reporter(stats)
        # comment this line if the output is too annoying
        # pop.add_reporter(neat.StdOutReporter(True))


        # Run for up to 100 generations.
        # If the solution is found before that, stop.
        winner = pop.run(eval_genomes, 100)

        # Print the output values of the best individual
        # and plot some training information
        print('Output:')
        winner_net = neat.nn.FeedForwardNetwork.create(winner, config)
        for xi, xo in zip(and_inputs, and_outputs):
            output = winner_net.activate(xi)
            print("\tinput {!r}, expected output {!r}, got {!r}".format(xi, xo, output))

        # plot training stas
        plot_neat_info(stats, ylog=False, view=True, filename='stats_and.png')

    # the name of the configuration file
    run('config-and')

In [9]:
# Run the AND logic gate example
run_and()

RuntimeError: Missing required configuration item in [NEAT] section: 'no_fitness_termination'
This parameter must be explicitly specified in your configuration file.
Suggested value: no_fitness_termination = False

### Exercise: Implement XOR logic gate
Based on the AND example above, implement the XOR logic gate and see how the generated networks perform.

In [None]:
# Let's define a function for it to be more flexible
def run_xor():
    # Something must go here...
    # Remember you need to provide examples
    #########################

    def eval_genomes(genomes, config):
        # This is the FITNESS
        # You have to complete this function
        #########################
        pass

    def run(config_file):
        # Load the configuration files.
        # Generally you won't need to touch this
        config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                             neat.DefaultSpeciesSet, neat.DefaultStagnation,
                             config_file)

        # Create the population.
        pop = neat.Population(config)

        # Make NEAT to print some nice stat values as it trains
        stats = neat.StatisticsReporter()
        pop.add_reporter(neat.StdOutReporter(True))
        pop.add_reporter(stats)

        # Run for up to 100 generations.
        # If the solution is found before that, stop.
        winner = pop.run(eval_genomes, 100)

       # Print the output values of the best individual
        # and plot some training information
        print('Output:')
        winner_net = neat.nn.FeedForwardNetwork.create(winner, config)
        #####################
        # COMPLETE HERE
        for xi, xo in zip(..., ...):
            output = winner_net.activate(xi)
            print("\tinput {!r}, expected output {!r}, got {!r}".format(xi, xo, output))


        print(stats)
        # plot training stas
        plot_neat_info(stats, ylog=False, view=True, filename='stats_xor.png')

    # the name of the configuration file
    run('config-xor')

In [None]:
# Run the XOR logic gate exercise
run_xor()

## The Gym Environment

Generally used to compare and train Reinforcement Learning algorithms, the Open AI Gym environment can still be used to run on a NEAT policy.

Then we need to implement the workaround for colab not having a screen...

In [None]:
######################
# ---- don't touch this -----
from pyvirtualdisplay import Display
display = Display(visible=0, size=(300, 300))
display.start()

"""
Utility functions to enable video recording of gym environment and displaying it.
To enable video, just do "env = wrap_env(env)""
"""

def show_video():
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{0}" type="video/mp4" />
                 </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")


def wrap_env(env):
    env = Monitor(env, './video', force=True)
    return env
# ---- don't touch this -----
######################

### Cart Pole example

The Cart Pole environment consists on balancing a pole over a cart only by pushing the cart to the right or left. Intuitively the pole will start falling to one side and the cart should compensate that falling by moving to that site. This first example shows a simple heuristic where we push the cart to the left if the pole is moving to the left and push the cart to the right otherwise.

In [None]:
# Example of the Cart-Pole environment. Balancing a pole of length 1.0
# Observation:
#        Type: Box(4)
#        Num	Observation                 Min         Max
#        0	Cart Position             -4.8            4.8
#        1	Cart Velocity             -Inf            Inf
#        2	Pole Angle                 -24 deg        24 deg
#        3	Pole Velocity At Tip      -Inf            Inf
#
#    Actions:
#        Type: Discrete(2)
#        Num	Action
#        0	Push cart to the left
#        1	Push cart to the right

env = wrap_env(gym.make("CartPole-v1"))
env.reset()
default_action = 1 #Default action
observation = env.state
for t in range(1000):
    env.render()
    if observation[3] < -0.3: # Evaluate if pole is moving to the left
      #action = 0 #Push the cart to the left
      default_action = 0
    if observation[3] > 0.3: # Evaluate if pole is moving to the right
      action = 1 #Push the cart to the right
      default_action = 1
    action = default_action

    observation, reward, done, info = env.step(action) #Execute action and update observed state
    if done:
        break

env.close()
show_video()

### Exercise: Implement simple heuristic for MountainCart

Another environment in Gym is the Mountain Car environment, which is basically a car trying to climb a hill. The way to do it is by gaining speed by going the opposite direction first. Here we have 2 things to look at (observations): the velocity and the position of the car. We also have 3 possible actions to perform: push left, no push, push right.

The following cells shows a random agent trying, and failing, to solve the task.

In [None]:
# Example of the MountainCar environment taking random actions
# Observation:
#        Type: Box(2)
#        Num	Observation                 Min         Max
#        0	Cart Position             -1.2            0.6
#        1	Cart Velocity             -0.07           0.07
#
#    Actions:
#        Type: Discrete(3)
#        Num	Action
#        0	Push cart to the left
#        1	Do nothing
#        2	Push cart to the right
env = wrap_env(gym.make("MountainCar-v0"))
env.reset()
for t in range(20000):
    env.render()
    action = env.action_space.sample()
    observation, reward, done, info = env.step(action)
    if done:
        break

env.close()
show_video()

**Exercise: Implement simple heuristic for MountainCart**

In this exercise you need to implement a simple heuristic based on the current speed of the cart. The heuristic consists on pushing the cart in the direction of the movement of it to increase its momentum. Implement it in the code below where it is pointed in with the comments and then run the code block to see how your code performs.

In [None]:
# Excercise of the MountainCar environment
# Observation:
#        Type: Box(2)
#        Num	Observation                 Min         Max
#        0	Cart Position             -1.2            0.6
#        1	Cart Velocity             -0.07           0.07
#
#    Actions:
#        Type: Discrete(3)
#        Num	Action
#        0	Push cart to the left
#        1	Do nothing
#        2	Push cart to the right
env = wrap_env(gym.make("MountainCar-v0"))
env.reset()
observation = env.state
for t in range(1000):
    env.render()
    ##### Write your code here #####
    action = 0
    ### Answer
    if observation[1] < 0:
      action = 0
    if observation[1] > 0:
      action = 2
    ##### End of your code #####
    observation, reward, done, info = env.step(action)
    # observation -> (position, velocity)
    if done:
        break

env.close()
show_video()

### Bipedal Walker environment
The Bipedal Walker environment is about teaching a bipedal -something- how to walk through a rough terrain. In this case we have 24 possible variables we can observate, from the hip, knee and leg angles, to the position and velocity. Likewise, we have 4 possible actions, apply torque to the hips and knees.

As you can imagine, this problem is more difficult than the previous one and you can see how it behaves when it takes random actions.

In [None]:
# Example of the BipedalWalker environment taking random actions
env = wrap_env(gym.make("BipedalWalker-v2"))
env.reset()
for t in range(1000):
    env.render()
    action = env.action_space.sample()
    observation, reward, done, info = env.step(action)
    if done:
        break

env.close()
show_video()

## NEAT and Gym
Here comes the fun part. Let's use our NEAT algorithm to try and generate a neural network capable of solving the task of some games. These training processes take time, so be patient and wait for the fun results!

### General Gym NEAT solver
The following class presents a general approach to solving any problem from the Gym environment. The class needs you to supply a game name (check some in the [Gym documentation](https://gym.openai.com/envs/)), and a configuration file for that environment. The configuration files does not differ much from game to game, but you must change the network input and output number to fit the corresponding environment.
In case the game uses a multi action setup, for example pressing multiple buttons at once, you must set the `multi_output` flag to `True`.

In [None]:
class GymNEAT:
    def __init__(self, game_name, config_file, verbose=True, winner_out="winner.pkl",
                load_checkpoint=None, use_recurrent=False, multi_output=False):
        self.game_name = game_name
        self.env = None
        self.config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
                                  neat.DefaultSpeciesSet, neat.DefaultStagnation,
                                  config_file)
        if load_checkpoint:
            self.pop = neat.Checkpointer.restore_checkpoint(load_checkpoint)
        else:
            self.pop = neat.Population(self.config)

        self.stats = neat.StatisticsReporter()
        self.pop.add_reporter(self.stats)
        self.pop.add_reporter(neat.StdOutReporter(verbose))
        # Checkpoint every 25 generations or 900 seconds.
        self.pop.add_reporter(neat.Checkpointer(25, 900, filename_prefix=self.game_name+"-"))

        self.winner_file = winner_out
        self.use_recurrent = use_recurrent
        self.box_output = multi_output

    def eval_fitness(self, genomes, config):
        for genome_id, genome in genomes:
            genome.fitness = self.eval_single_genome(genome, config)

    def eval_single_genome(self, genome, config):
        if self.use_recurrent:
            net = neat.nn.recurrent.RecurrentNetwork.create(genome, config)
        else:
            net = neat.nn.FeedForwardNetwork.create(genome, config)
        total_reward = 0.0

        for i in range(self.episodes):
            observation = self.env.reset()

            action = self.eval_network(net, observation)
            done = False
            while not done:
                observation, reward, done, info = self.env.step(action)
                action = self.eval_network(net, observation)
                total_reward += reward
                if done:
                    break

        return total_reward / self.episodes

    def eval_network(self, net, observation):
        action = net.activate(observation)
        if not self.box_output:
            action = np.argmax(action)
        return action

    def plot_stats(self):
        plot_neat_info(self.stats, ylog=False, view=True,
                       filename='neat-{}.png'.format(self.game_name))

    def run_test(self):
        with open(self.winner_file, 'rb') as f:
            best = pickle.load(f)

        if self.use_recurrent:
            winner_net = neat.nn.recurrent.RecurrentNetwork.create(best, self.config)
        else:
            winner_net = neat.nn.FeedForwardNetwork.create(best, self.config)

        env = wrap_env(gym.make(self.game_name))

        done = False
        observation = env.reset()
        action = self.eval_network(winner_net, observation)
        while not done:
            env.render()
            observation, reward, done, info = env.step(action)
            action = self.eval_network(winner_net, observation)
            if done:
                break

        env.close()
        show_video()

    def run(self, iterations, episodes=10):
        self.env = gym.make(self.game_name)
        self.episodes = episodes
        winner = self.pop.run(self.eval_fitness, iterations)
        self.env.close()
        with open(self.winner_file, 'wb') as win_file:
            pickle.dump(winner, win_file)


To use the class, just create a GymNEAT object with the corresponding parameters. Then call the `run` method to start the environment. The `run` method requires you to pass the maximum number of generations to do and the number of episodes each individual have to be tested on. The number of episodes is a custom in the game learning community because a particular individual could archive the task by luck one time and then never again. Forcing it to do it correctly a number of times decrease the chance if it being just luck.

Run the following cell to see how our NEAT algorithm can create a neural network that can balance a pole on top of a cart.

In [None]:
# create the GymNEAT object
cart_pole = GymNEAT(game_name="CartPole-v1", config_file="config-cart-pole",
                    verbose=True, winner_out="winner_cart.pkl")
# Run the algorithm
cart_pole.run(100, 10)
# Create a video out of the best individual!
cart_pole.run_test()
# See some nerdy statistics about the algorithm process!
cart_pole.plot_stats()

Here we are trying to solve the task of getting on top of a hill.

In [None]:
mount_car = GymNEAT(game_name="MountainCar-v0", config_file="config-mountain",
                    verbose=True, winner_out="winner_mountain.pkl")
mount_car.run(100, 10)
mount_car.run_test()
mount_car.plot_stats()

Let's try with a more sophisticated neural network: a recurrent neural network.

In [None]:
mount_car_rec = GymNEAT(game_name="MountainCar-v0", config_file="config-mountain-rec",
                        verbose=True, winner_out="winner_mountain_rec.pkl", use_recurrent=True)
mount_car_rec.run(100, 10)
mount_car_rec.run_test()
mount_car_rec.plot_stats()

Can we generate a neural network that can walk? Are we saying that this AI is teaching itself how to walk?!

We provide an almost finished generation process. Run the following cell and see a neural network that can _almost_ walk pretty well. The last generation takes around 5 minutes to finish. We provide some other checkpoints in the training history, you can run them and see how the networks have improved over time.

In [None]:
bipedal = GymNEAT(game_name="BipedalWalker-v2", config_file="config-bipedal-walker",
                  verbose=True, winner_out="winner_bipedal_pre.pkl", multi_output=True)
#bipedal.run(100, 10)
bipedal.run_test()
#bipedal.plot_stats()

These levels are really hard. What we try to do is landing safely in the moon.

In [None]:
lunar_rec = GymNEAT(game_name="LunarLanderContinuous-v2", config_file="config-lunar-rec",
                    verbose=True, winner_out="winner_lunar_rec.pkl", use_recurrent=True, multi_output=True)
lunar_rec.run(100, 10)
lunar_rec.run_test()
lunar_rec.plot_stats()

## Solutions

Some of these examples can take hours to train and obtain good results. Here are some trained examples you can use to see the best performance.

Download these files:

In [None]:
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/BipedalWalker-v2-20 > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/BipedalWalker-v2-57 > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/BipedalWalker-v2-98 > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/LunarLander-v2-99 > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/LunarLander-v2-174 > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/LunarLander-v2-349 > /dev/null 2>&1
!wget https://raw.githubusercontent.com/juanpablos/config-files-neat/master/winner_mountain_rec_pre.pkl > /dev/null 2>&1

For the BipedalWalker, we can load the genes for the generations 20, 57 and 98. Here we show the best individuals of those generations.

In [None]:
bipedal_solutions = []
for saved_iteration in [20, 57, 98]:
    bipedal = GymNEAT(game_name="BipedalWalker-v2", config_file="config-bipedal-walker",
                      verbose=True, winner_out=f"winner_bipedal_{saved_iteration}.pkl",
                      load_checkpoint=f"BipedalWalker-v2-{saved_iteration}", multi_output=True)
    bipedal.run(1, 1)
    bipedal_solutions.append(bipedal)

for bipedal in bipedal_solutions:
    bipedal.run_test()

To load the serialized winner model for the Bipedal, run this cell.

In [None]:
bipedal_solution = GymNEAT(game_name="BipedalWalker-v2", config_file="config-bipedal-walker",
                  verbose=True, winner_out="winner_bipedal_pre.pkl", multi_output=True)
bipedal_solution.run_test()

The LunarLander is a really hard problem to be solved with NEAT, so even after 349 iterations, it still fails miserably.

In [None]:
lunar_rec = GymNEAT(game_name="LunarLanderContinuous-v2", config_file="config-lunar-rec",
                    load_checkpoint="LunarLander-v2-349",
                    verbose=True, winner_out="winner_lunar_rec.pkl", use_recurrent=True, multi_output=True)
lunar_rec.run(1, 1)
lunar_rec.run_test()

The MountainCar is not a hard problem, here we load a serialized model that can solve the MountainCar environment.

In [None]:
mount_car_rec = GymNEAT(game_name="MountainCar-v0", config_file="config-mountain-rec",
                        verbose=True, winner_out="winner_mountain_rec_pre.pkl", use_recurrent=True)
mount_car_rec.run_test()

## Extras
In case you want to know more and try some other, more cool, games, we invite you to test the NEAT algorithm and the class provided above with, for example, the [Space Invaders](https://gym.openai.com/envs/SpaceInvaders-ram-v0/) Gym environment. You can also follow [Lucas Thompson's](https://www.youtube.com/playlist?list=PLTWFMbPFsvz3CeozHfeuJIXWAJMkPtAdS) tutorial on using NEAT to play Sonic. Here is a link to his code: https://gitlab.com/lucasrthompson/Sonic-Bot-In-OpenAI-and-NEAT/tree/master. Keep in mind these type of games require much more time to find a solution, in the order of several hours, even days.
Here is another [attemp](https://medium.freecodecamp.org/how-to-use-ai-to-play-sonic-the-hedgehog-its-neat-9d862a2aef98) in running sonic.

Thank you!! :)