# Flappy bird circuit (training)

Training is done using a genetic algorithm using the deap library. An environment similar to OpenAI's gym is created using the created pygame. This is then used to train the model.

## The model
The model is a simple linear model with 4 paramters (3 weights and a bias). Given three inputs (x,y,z,) the model computes $$x*p_{1} + y*p_{2}+z*p_{3}+p_{4}$$
This is then used to determine is the bird will jump or not. If the value is above a certain threshold the bird will jump. (2.5 is used because the goal is to turn this into a circuit using an arduino which has a range of values 0-5 Volts)


## The inputs
* The first input is the distance to the next nearest barrel
* The second input is the height difference between the bird and the center of the gap between the top and bottom barrels
* The third input is the vertical velocity of the bird

#### Importing the main libraries

In [1]:
import pygame
import random
import numpy as np
from deap import creator, base, tools, algorithms

pygame 1.9.4
Hello from the pygame community. https://www.pygame.org/contribute.html




#### The code from game.py without the game loop

In [2]:
pygame.init()
pygame.mixer.init()

WIDTH = 600
HEIGHT = 400
FPS = 60

#Global game variables
gravity = 0.8
jumpImpulse = -12 # Represents the impulse applied when a bird jumps
barrelSpeed = 6  # Speed by which the barrel is moving to the left
barrelWidth = 125 # Width of the opening in the barrel
barrelDepth = 80 # Depth of the barrel (along the x axis)
dist_between_barrels = 75 # number of frames between adding new barrels


'''
Player class using pygame sprites

update():
    updates the current speed and current position using a simple Explicit Euler update

isDead():
    return True is the bird has died (collided with an edge of the screen, collisions with the barrels are checked in the game loop)

jump():
    Applies jump impulse to the bird's vertical speed
'''
class Player(pygame.sprite.Sprite):
    def __init__(self):
        pygame.sprite.Sprite.__init__(self)
        self.image = pygame.Surface((30,30))
        self.image.fill((255,0,0))
        self.rect = self.image.get_rect()
        self.rect.y = HEIGHT/2
        self.rect.x = 100
        self.speed_y = 0

    def update(self):
        self.rect.y += self.speed_y
        self.speed_y += gravity

    def isDead(self):
        if self.rect.top < 0 or self.rect.bottom > HEIGHT:
            return True
        else:
            return False

    def jump(self):
        self.speed_y += jumpImpulse


'''
Obstacle class using pygame sprites
__init__():
    There are two types of obstacles:
     - top obstacle is the top part of the barrels
     - bottom obstacle is the bottom part of the barrels
    The top obstacle also has a reference to the bottom obstacle

update():
    updates the position of the obstacle according to the global variable barrelSpeed

restart():
    Used to kill the barrel when it gets off the screen
'''
class Obstacle(pygame.sprite.Sprite):
    def __init__(self,center_position,top=True,bottom_reference=None):
        pygame.sprite.Sprite.__init__(self)
        self.center_position = center_position
        self.top = top
        self.bottom_reference = bottom_reference
        if top:
            self.image = pygame.Surface((barrelDepth,self.center_position-barrelWidth/2))
            self.image.fill((0,255,0))
            self.rect = self.image.get_rect()
            self.rect.top = 0
            self.rect.left = WIDTH+1
        else:
            self.image = pygame.Surface((barrelDepth,HEIGHT-(self.center_position+barrelWidth)))
            self.image.fill((0,255,0))
            self.rect = self.image.get_rect()
            self.rect.bottom = HEIGHT
            self.rect.left = WIDTH+1

        self.speed_x = barrelSpeed

    def update(self):
        self.rect.x -= self.speed_x

    def restart(self,center_position):
        self.kill()

#### Creating the environment similar to OpenAI's gym

In [3]:
'''
class env


step(action):
    Inputs:
        action: The action to execute (either 1 for jump or 0 for not jump)
        
    Actions:
        Executes one game frame (updates sprites, checks for collisions with the player, and adds new barrels)
    Returns:
        observations: The three inputs to be given to the model to decide the next action
        game_state: True is the player is alive, False otherwise
        score: The number of frames the player has been alive
    
    get_observations():
        Returns:
            The three input variables used in deciding an action:
                - Horizontal distance to the next nearest barrel
                - Height difference between player and center of the gap between the nearest top and bottom barrels
                - Vertical velocity of the player
    get_state():
        Returns:
            False if the player has collided with and edge of the game or with a barrel
            True otherwise
'''


class env():
    def __init__(self):
        self.all_sprites = pygame.sprite.Group()
        self.obstacles_top = pygame.sprite.Group()
        self.obstacles_bottom = pygame.sprite.Group()

        self.player = Player()
        self.all_sprites.add(self.player)
        self.score = -1
        
        self.alive = True
    
    def step(self,action):
        if action == 1:
            self.player.jump()
        
        self.all_sprites.update()
        game_state = self.state()
        
        observations = self.get_observations()
        
        if self.score%75 == 0:
            center_position = random.randrange(barrelWidth,HEIGHT-barrelWidth)
            obs_bottom = Obstacle(center_position,top=False)
            obs_top = Obstacle(center_position,top=True,bottom_reference = obs_bottom)

            self.all_sprites.add(obs_top)
            self.all_sprites.add(obs_bottom)
            self.obstacles_top.add(obs_top)
            self.obstacles_bottom.add(obs_bottom)
        if game_state:
            self.score += 1
        
        return observations,game_state,self.score
    
    def get_observations(self):
        sprites_list = self.obstacles_top.sprites()
        dist_to_nearest_barrel = 200
        height_diff_nearest_barrel = 0
        for sprite in sprites_list:
            dist = sprite.rect.right - self.player.rect.x
            if dist > 0:
                dist_to_nearest_barrel = dist
                height_diff_nearest_barrel = sprite.center_position - self.player.rect.y
                break
        observations = [dist_to_nearest_barrel,height_diff_nearest_barrel,self.player.speed_y]
        return observations
    
    def state(self):
        game_state = True
        hits_top = pygame.sprite.spritecollide(self.player,self.obstacles_top,False)
        hits_bottom = pygame.sprite.spritecollide(self.player,self.obstacles_bottom,False)
        if hits_top or hits_bottom:
            game_state = False
        
        if self.player.isDead():
            game_state = False
        return game_state

In [4]:
'''
Given a value that is in the range (min_1,max_1) return that value if the range was mapped linearly to (min_2,max_2)
'''
def convert_value_to_range(value, min_1, max_1, min_2, max_2):
    span_1 = max_1 - min_1
    span_2 = max_2 - min_2

    scaled_value = (value - min_1) / (span_1)

    return min_2 + (scaled_value * span_2)

'''
Use convert_value_to_range to map all observations to the range 0-5 (to be used with an arduino 0-5 Volts)
'''
def normalize_observations(observations):
    observations[0] = convert_value_to_range(observations[0],0,dist_between_barrels,0,5)
    observations[1] = convert_value_to_range(observations[1],0,HEIGHT,0,5)
    observations[2] = convert_value_to_range(observations[2],-5,5,0,5)
    return observations

'''
The model that combines the paramters and observations to return a result
'''
def take_decision(observations,parameters):
    observations = normalize_observations(observations)
    weights = parameters[:3]
    bias = parameters[3]
    answer = 0
    for idx,weight in enumerate(weights):
        answer += observations[idx]*weight
    if answer > 2.5:
        return 1
    else:
        return 0

In [5]:
'''
Run one episode using the input paramters to the model. (run a game until the player dies or a time limit "max_frames_per_episode" is reached)
Returns:
    Final score the player achieved before the episode terminated
'''
def run_episode(parameters):
    game_env = env()
    observations = game_env.get_observations()
    total_reward = 0
    for _ in range(max_frames_per_episode):
        action = take_decision(observations,parameters)
        observations,alive,score = game_env.step(action)
        total_reward = score
        if not alive:
            break
    return total_reward

'''
Run n episodes and return their mean score
'''
def run_episodes(n,parameters):
    total_reward = 0
    for _ in range(n):
        total_reward += run_episode(parameters)
    return total_reward/n

'''
Used in the genetic algorithm
'''
def eval_function(parameters):
    return run_episodes(sampling_number,parameters),

#### Training paramters

In [6]:
sampling_number = 3 # The value n using in run_episodes
max_frames_per_episode = 20000 # the max number of frames to run an episode before it is forced to terminate

In [7]:
'''
Code used from the deap documentation 
'''

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()

toolbox.register("attr_float", random.random)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_float, n=4)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("evaluate", eval_function)

population = toolbox.population(n=500)

NGEN = 20
CXPB = 0.8
MUTPB = 0.05

best_player = None
best_player_score = 0
for gen in range(NGEN):
    # Select the next generation individuals
    offspring = toolbox.select(population, len(population))
    # Clone the selected individuals
    offspring = list(map(toolbox.clone, offspring))
    
    # Apply crossover on the offspring
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < CXPB:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values

    # Apply mutation on the offspring
    for mutant in offspring:
        if random.random() < MUTPB:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    # The population is entirely replaced by the offspring
    population[:] = offspring
    
    top_player = tools.selBest(population,k=1)[0]
    top_player_score = run_episodes(sampling_number,top_player)
    if top_player_score >= best_player_score:
        best_player = top_player
        best_player_score = top_player_score
    if best_player_score == max_frames_per_episode-1:
        print("Reached maximum fitness = {}".format(best_player_score))
        break
    print("Generation number {}, best has fitness {}".format(gen+1,top_player_score))
top10 = tools.selBest(population, k=10)

Generation number 1, best has fitness 68.33333333333333
Generation number 2, best has fitness 129.0
Reached maximum fitness = 19999.0
