In [None]:
import numpy as np
import gym
import torch
from torch import nn
import torch.nn.functional as F
import copy
from time import time

In [2]:
torch.set_grad_enabled(False)

<torch.autograd.grad_mode.set_grad_enabled at 0x1d8f7c77908>

In [3]:
class Agent(nn.Module):
    '''The brain of the agent'''
    def __init__(self):
        super().__init__()
        self.fc = nn.Sequential(nn.Linear(4, 32),
                                nn.ReLU(),
                                nn.Linear(32, 2))
        
    def forward(self, inputs):
        x = self.fc(inputs)
        return F.softmax(x, dim=1)

In [4]:
def initialize_population(pop_size=2):
    '''Randomly initialize a bunch of agents'''
    population = [Agent() for _ in range(pop_size)]
    
    return population

In [5]:
def evaluate_agent(agent, episodes=15, max_episode_length=250):
    '''Run an agent for a given number episodes and get the rewards'''
    env = gym.make("CartPole-v0")
    agent.eval()
    
    total_rewards = []
    
    for ep in range(episodes):
        observation = env.reset()
        # Modify the maximum steps that can be taken in a single episode
        env._max_episode_steps = max_episode_length
        
        episodic_reward = 0
        # Start episode
        for step in range(max_episode_length):
            input_obs = torch.Tensor(observation).unsqueeze(0)
            observation, reward, done, info = env.step(agent(input_obs).argmax(dim=1).item())
            
            episodic_reward += reward
            if done:
                break
                
        total_rewards.append(episodic_reward)
                
    return np.array(total_rewards).mean()

In [6]:
def evaluate_population(population, episodes=15, max_episode_length=250):
    '''Evaluate the population'''
    pop_fitness = []
    
    for agent in population:
        pop_fitness.append(evaluate_agent(agent, episodes, max_episode_length))
        
    return pop_fitness

In [7]:
def mutate(parent_agent, mutation_power=0.02):
    '''Creates a mutated copy of the parent agent by adding a weighted gaussian noise to the params'''
    child_agent = copy.deepcopy(parent_agent)
    
    for param in child_agent.parameters():
        param.data = param.data + (torch.randn(param.shape) * mutation_power)
        
    return child_agent

In [8]:
def repopulate(top_agents, pop_size, mutation_power):
    '''Repopulate the population from the top agents by mutation'''
    new_population = []
    
    n = 0
    while(n < pop_size):
        for parent in top_agents:
            child = mutate(parent, mutation_power)
            new_population.append(child)
            n += 1
            
    return new_population[:pop_size - 1]

In [9]:
TRAINED_AGENT = {}

In [10]:
def evolve(generations=10, max_time=60, 
           pop_size=100, 
           topK=20, 
           episodes=15, 
           max_episode_length=250, 
           mutation_power=0.02):
    '''Start the process of evolution'''
    
    global TRAINED_AGENT
    
    population = initialize_population(pop_size)
    global_best = {}
    
    t1 = time()
#     g = 0 # uncomment when using max_time for training instead of generations
    for g in range(generations):
#     while ((time() - t1) <= max_time): # uncomment when using max_time for training instead of generations
        
        # Evaluate the population
        pop_fitness = evaluate_population(population, episodes, max_episode_length)
        mean_pop_reward = np.array(pop_fitness).mean()
        
        # Rank the agents in descending order
        topK_idx = np.argsort(pop_fitness)[::-1][:topK]
        topK_agents = [population[i] for i in topK_idx]
        
        # Get Best Agent
        best_agent = population[topK_idx[0]]
        best_reward = pop_fitness[topK_idx[0]]
        
        # Check with global best
        if g == 0:
            global_best['reward'] = best_reward
            global_best['agent'] = best_agent
        else:
            if best_reward >= global_best['reward']:
                global_best['reward'] = best_reward
                global_best['agent'] = best_agent
                
        print('Generation', g)
        print('Mean Reward of Population', mean_pop_reward)
        print('Best Agent Reward (mean)', best_reward)
        print('Global Best Reward (mean)', global_best['reward'], '\n')
        
        # Mutate and Repopulate
        new_population = repopulate(topK_agents, pop_size, mutation_power)
        # take the best agent of generation forward without cloning as well
        new_population.append(best_agent)
        
        population = new_population
        
        TRAINED_AGENT = global_best
        
#         g += 1 # uncomment when using max_time for training instead of generations

In [11]:
evolve(generations=20,
       pop_size=20, 
       topK=10, 
       episodes=15, 
       max_episode_length=200, 
       mutation_power=0.02)

Generation 0
Mean Reward of Population 15.926666666666668
Best Agent Reward (mean) 85.4
Global Best Reward (mean) 85.4 

Generation 1
Mean Reward of Population 31.750000000000007
Best Agent Reward (mean) 200.0
Global Best Reward (mean) 200.0 

Generation 2
Mean Reward of Population 38.81333333333334
Best Agent Reward (mean) 200.0
Global Best Reward (mean) 200.0 

Generation 3
Mean Reward of Population 76.23666666666668
Best Agent Reward (mean) 200.0
Global Best Reward (mean) 200.0 

Generation 4
Mean Reward of Population 102.73333333333335
Best Agent Reward (mean) 200.0
Global Best Reward (mean) 200.0 

Generation 5
Mean Reward of Population 134.02333333333334
Best Agent Reward (mean) 200.0
Global Best Reward (mean) 200.0 

Generation 6
Mean Reward of Population 141.35666666666663
Best Agent Reward (mean) 200.0
Global Best Reward (mean) 200.0 

Generation 7
Mean Reward of Population 163.77333333333334
Best Agent Reward (mean) 200.0
Global Best Reward (mean) 200.0 

Generation 8
Mean Re

In [14]:
def play_agent(agent, episodes=5, max_episode_length=200, render=False):
    env = gym.make("CartPole-v0")
    
    agent.eval()
    
    total_rewards = []
    
    for ep in range(episodes):
        observation = env.reset()
        env._max_episode_steps = max_episode_length
        
        episodic_reward = 0
        
        for step in range(max_episode_length):
            if render:
                env.render()
            
            input_obs = torch.Tensor(observation).unsqueeze(0)
            observation, reward, done, info = env.step(agent(input_obs).argmax(dim=1).item())
            
            episodic_reward += reward
            if done:
                break
        
        
        total_rewards.append(episodic_reward)
    
    env.close()
    print('Mean Rewards across all episodes', np.array(total_rewards).mean())
    print('Best Reward in any single episode', max(total_rewards))

In [15]:
play_agent(TRAINED_AGENT['agent'], episodes=100, max_episode_length=200, render=False)

Mean Rewards across all episodes 200.0
Best Reward in any single episode 200.0
