In [4]:
"""
Dependencies: gym, ale-py

pip install gym[atari,accept-rom-license]

"""

import gym
import random
import numpy as np

from copy import copy, deepcopy

from functools import cmp_to_key

In [5]:
"""
Action space is of size 7:

0 NOOP
1 FIRE
2 UP
3 RIGHT
4 LEFT
5 RIGHTFIRE
6 LEFTFIRE
"""

"""
A class for each chromosome.
"""
class Chromosome:

    """
    This is used to initialize a chromosome

    args is used since Python doesn't support multiple constructors. (self, env) initializes a chromosome with a random
    action sequence and is used during population initialization.

    (self, actions, env) is used during crossover to generate new chromosomes given a crossed action buffer.
    """
    def __init__(self, *args):

        if len(args) > 1:
            self.action_buffer = args[0]
            self.env = args[1]
            self.reward_sum = 0
            self.time_alive = 0
            return

        # Stores actions taken, rewards obtained, total reward for the run, and the number of frames that the agent has been alive for.
        self.action_buffer = []
        self.reward_buffer = []

        self.reward_sum = 0
        self.time_alive = 0

        # Store the provided env as a class variable.
        self.env = args[0]

        _ = self.env.reset()
        done = False

        while(done == False):
    
            action = random.randint(0, 6)

            _, reward, done, _ = self.env.step(action)

            self.action_buffer.append(action)
            self.reward_buffer.append(reward)

            self.time_alive = self.time_alive + 1
            self.reward_sum = self.reward_sum + reward

    """
    This reruns the environment and performs a new run with the updated action buffer to generate new stats.
    """
    def update(self):

        # Stores updated actions and rewards taken.
        new_action_buffer = []
        new_reward_buffer = []

        # This is needed in case we manage to survive for more frames than the action buffer has actions, in which case we do random ones.
        limit = self.time_alive
        counter = 0

        self.reward_sum = 0
        self.time_alive = 0

        _ = self.env.reset()

        done = False

        while(done == False):

            # Determine if action is from supplied buffer or random.
            counter = counter + 1

            if counter <= limit:
                action = self.action_buffer[counter-1]
            else:  
                action = random.randint(0, 6)

            _, reward, done, _ = self.env.step(action)

            new_action_buffer.append(action)
            new_reward_buffer.append(reward)
            self.time_alive = self.time_alive + 1
            self.reward_sum = self.reward_sum + reward

        self.action_buffer = new_action_buffer
        self.reward_buffer = new_reward_buffer

    """
    Since the environment has a certain level of randomness, this function reruns it without saving results
    except the aggregate reward and time alive. Used to gauge how accurate a score is during fitness.
    """
    def simulate(self):

        # This is needed in case we manage to survive for more frames than the action buffer has actions, in which case we do random ones.
        limit = self.time_alive
        counter = 0

        new_reward_sum = 0
        new_time_alive = 0

        _ = self.env.reset()

        done = False

        while(done == False):

            # Determine if action is from supplied buffer or random.
            counter = counter + 1

            if counter <= limit:
                action = self.action_buffer[counter-1]
            else:  
                action = random.randint(0, 6)

            _, reward, done, _ = self.env.step(action)

            new_time_alive = self.time_alive + 1
            new_reward_sum = self.reward_sum + reward

        return new_reward_sum, new_time_alive

    """
    This mutates a chromosome's genes (individual actions taken) based on a supplied probability. If the mutation occurs, the action is replaced with a random one.
    """
    def mutate(self, mutation_rate):
        for i in range(0, len(self.action_buffer)):
            if random.random() <= mutation_rate:
                self.action_buffer[i] = random.randint(0,6)




In [6]:
"""
GA and helper methods
"""

#Comparator for a Chromosome list
def chromosome_comparator(a, b):
    if a.reward_sum > b.reward_sum:
        return -1
    elif a.reward_sum == b.reward_sum:
        if a.time_alive > b.time_alive:
            return -1
        else:
            return 1
    else:
        return 1

# Feed this to sorting function
compare_key = cmp_to_key(chromosome_comparator)

# Generates initial population
def create_initial_population(number, env):
    population = []

    for i in range(0, number):
        population.append(Chromosome(env))

    return population

"""
Applies a fitness function and then ranks the population in descending order of best to worst.

In this case, the primary marker of fitness is the high score. Assuming a tie, then secondary criterion is how long each agent has been alive.

Also returns the best chromosome found.
"""
def selection(population):

    # The first stage is the fitness function.
    optimal_index = 0
    longest_alive = 0
    max_score = 0

    for i, chrom in enumerate(population):

        total_reward = []
        total_time = []

        # Since the game isn't purely deterministic (some variation in how a run plays out), we average the results of a given action list.
        for j in range(0, 5):
            current_reward, current_time = chrom.simulate()
            total_reward.append(current_reward)
            total_time.append(current_time)

        mean_reward = np.mean(np.array(total_reward))
        mean_time = np.mean(np.array(total_time))

        # Update the chromosome's measure of its fitness with the mean reward.
        chrom.reward_sum = mean_reward

        if mean_reward > max_score:
            best = True
        elif (mean_reward == max_score) and mean_time > mean_time:
            best = True
        else:
            best = False

        if best:
            longest_alive = mean_time
            max_score = mean_reward
            optimal_index = i

    # Select best chromosome.
    best_chromosome = population[optimal_index]

    # Sort chromosomes in descending order.
    population.sort(key=compare_key, reverse=True)

    # This was for an AIS test, should remain commented out.

    #for i in range(0, len(population)):
        #population[i] = best_chromosome

    # Select only the best 50% of chromosomes.
    return population[0:int(len(population)/2)], best_chromosome

"""
Performs crossover on a population and returns the new population.

Crossover is done by selecting the actions that led to the highest rewards for each given action, reward pair for both parents.

Population is split into pairs of parents and crossover is performed, yielding 4 children in each instance.
"""
def crossover(population):
    crossed_population = []
    for i in range(0, len(population) - 1, 2):

        father = population[i]

        # Edge case if there's only 1 chromosome left, in which case it pairs with a random one.
        if i + 1 > len(population) - 1:
            mother = population[random.randint(0, len(population)-1)]
            
        else:
            mother = population[i+1]

        limit = 0

        if father.time_alive > mother.time_alive:
            new_actions = deepcopy(father.action_buffer)
            limit = mother.time_alive - 1
            
        else:
            new_actions = deepcopy(mother.action_buffer)
            limit - father.time_alive - 1

        for j in range(0, len(new_actions)):

            # We can't permute actions once we've reached a point where we've exhausted the buffer of one parent.
            if j > limit:
                break

            if mother.reward_buffer[j] > father.reward_buffer[j]:
                new_actions[j] = mother.action_buffer[j]

            elif father.reward_buffer[j] > mother.reward_buffer[j]:
                new_actions[j] = father.action_buffer[j]

        # They each have 4 kids, replenishing the population.
        crossed_population.append(Chromosome(new_actions, father.env))
        crossed_population.append(Chromosome(new_actions, father.env))
        crossed_population.append(Chromosome(new_actions, father.env))
        crossed_population.append(Chromosome(new_actions, father.env))

    return crossed_population

"""
Mutate the population. Rate represents chance of mutation.
"""
def mutate_population(population, rate=0.01):
    for _, chrom in enumerate(population): 
        chrom.mutate(rate)

"""
Rerun the environment for all chromosomes and produce updated runs.
"""
def update_population(population):
    for i, chrom in enumerate(population):     
         chrom.update()

In [8]:
"""
GA loop
"""

env = gym.make('ALE/Assault-v5')

n_iter = 20
population_size = 50
mutation_rate = 0.01

best_alive = 0
best_score = 0
best_chromo = None

population = create_initial_population(population_size, env)

for i in range(0, n_iter):

    # Run the fitness function and determine if the best chromosome is better than the best one we've stored for all runs.
    selected_population, best_chromo_candidate = selection(population)
    longest_alive = best_chromo_candidate.time_alive
    max_score = best_chromo_candidate.reward_sum

    if best_chromo_candidate.reward_sum > best_score:
            best = True
    elif (best_chromo_candidate.reward_sum == best_score) and best_chromo_candidate.time_alive > best_alive:
            best = True
    else:
        if i == 0:
                best = True
        else:
                best = False

    if best:
           best_chromo = deepcopy(best_chromo_candidate)
           best_alive = best_chromo_candidate.time_alive
           best_score = best_chromo_candidate.reward_sum
           

    # Print stats.
    print(f"Best Chromosome for Run {i + 1} -> Longest Alive: {longest_alive}    High Score: {max_score}")

    # Cross, Mutate, Update, and prepare the new population for the next run.
    crossed_population = crossover(selected_population)
    mutate_population(crossed_population, mutation_rate)
    update_population(crossed_population)

    population = crossed_population

print(f"Best Chromosome -> Longest Alive: {best_alive}    High Score: {best_score}")

Best Chromosome for Run 1 -> Longest Alive: 737    High Score: 378.0
Best Chromosome for Run 2 -> Longest Alive: 816    High Score: 525.0
Best Chromosome for Run 3 -> Longest Alive: 800    High Score: 441.0
Best Chromosome for Run 4 -> Longest Alive: 688    High Score: 378.0
Best Chromosome for Run 5 -> Longest Alive: 696    High Score: 420.0
Best Chromosome for Run 6 -> Longest Alive: 704    High Score: 336.0
Best Chromosome for Run 7 -> Longest Alive: 720    High Score: 399.0
Best Chromosome for Run 8 -> Longest Alive: 703    High Score: 420.0
Best Chromosome for Run 9 -> Longest Alive: 687    High Score: 399.0
Best Chromosome for Run 10 -> Longest Alive: 752    High Score: 399.0
Best Chromosome for Run 11 -> Longest Alive: 719    High Score: 504.0
Best Chromosome for Run 12 -> Longest Alive: 768    High Score: 441.0
Best Chromosome for Run 13 -> Longest Alive: 615    High Score: 399.0
Best Chromosome for Run 14 -> Longest Alive: 735    High Score: 399.0
Best Chromosome for Run 15 ->

In [9]:
"""
This displays the game to evaluate our best chromosome.
"""
test_env = gym.make('ALE/Assault-v5', render_mode="human")

done = False
_ = test_env.reset()

time_alive = 0
reward_sum = 0

action_list = best_chromo.action_buffer
limit = best_chromo.time_alive 
counter = 0

while(done == False):

    # Determine if action is from supplied buffer or random.
    counter = counter + 1

    if counter <= limit:
        action = action_list[counter-1]
    else:  
        action = random.randint(0, 6)
    
    _, reward, done, info = test_env.step(action)

    reward_sum = reward_sum + reward
    time_alive = time_alive + 1

print(f"Longest Alive: {time_alive}    High Score: {reward_sum}")

Longest Alive: 495    High Score: 168.0
