In [2]:
import numpy as np
import gym
import torch
from torch import nn
import torch.nn.functional as F
import copy
from time import time

In [3]:
torch.set_grad_enabled(False)

<torch.autograd.grad_mode.set_grad_enabled at 0x7f813030f5b0>

## Neuroevolution Setup

In [4]:
class Agent(nn.Module):
    '''The brain of the agent'''
    def __init__(self):
        super().__init__()
        self.fc = nn.Sequential(nn.Linear(4, 32),
                                nn.ReLU(),
                                nn.Linear(32, 2))
        
    def forward(self, inputs):
        x = self.fc(inputs)
        return F.softmax(x, dim=1)

In [5]:
def initialize_population(pop_size=2):
    '''Randomly initialize a bunch of agents'''
    population = [Agent() for _ in range(pop_size)]
    
    return population

In [6]:
def evaluate_agent(agent, episodes=15, max_episode_length=250):
    '''
    한 agent를 한번만 돌리는게 아니라 'episoides'수만큼 돌린 후 나온 reward들의 평균을 그 agent의 reward로 봄
    예를 들어 한 세대의 agent를 3개(agent1,agent2,agent3)로 설정하고 episodes를 5로 설정하면
    agent1-1, agent1-2, agent1-3, agent1-4, agent1-5의 reward들의 평균이 agent1의 최종 reward가 되는 것

    max_episode_length는 얻을수있는 최대 reward를 의미
    매 스텝(카트의 좌우움직임 한번)마다 reward가 +1이 됨
    즉 설정한 max_episode_length만큼 스텝하는동안 막대를 안떨어뜨렸으면 그 episode의 reward는 max_episode_length와 같음 '''
    env = gym.make("CartPole-v0")
    agent.eval()
    
    total_rewards = []
    
    for ep in range(episodes):
        observation = env.reset()
        # Modify the maximum steps that can be taken in a single episode
        env._max_episode_steps = max_episode_length
        
        episodic_reward = 0
        # Start episode
        for step in range(max_episode_length):
            input_obs = torch.Tensor(observation).unsqueeze(0)
            observation, reward, done, info = env.step(agent(input_obs).argmax(dim=1).item())
            
            episodic_reward += reward
            if done:
                break
                
        total_rewards.append(episodic_reward)
                
    return np.array(total_rewards).mean()

In [7]:
def evaluate_population(population, episodes=15, max_episode_length=250):
    '''Evaluate the population'''
    pop_fitness = []
    
    for agent in population:
        pop_fitness.append(evaluate_agent(agent, episodes, max_episode_length))
        
    return pop_fitness

In [8]:
'''
처음에 topology를 고정했기 때문에 crossover는 하지 않고 바로 mutation

mutation방법: 부모 agnet의 parameter에 정규분포에서 뽑은 랜덤한 수 x mutation_power를 더해줌
'''

def mutate(parent_agent, mutation_power=0.02):
    child_agent = copy.deepcopy(parent_agent)
    
    for param in child_agent.parameters():
        param.data = param.data + (torch.randn(param.shape) * mutation_power)
        
    return child_agent

In [9]:
def repopulate(top_agents, pop_size, mutation_power):
    '''Repopulate the population from the top agents by mutation'''
    new_population = []
    
    n = 0
    while(n < pop_size):
        for parent in top_agents:
            child = mutate(parent, mutation_power)
            new_population.append(child)
            n += 1
            
    return new_population[:pop_size - 1]

## Train

In [10]:
TRAINED_AGENT = {}

In [11]:
def evolve(generations=10, max_time=60, 
           pop_size=100, 
           topK=20, 
           episodes=15, 
           max_episode_length=250, 
           mutation_power=0.02):
    '''
    topK: 자손세대를 만들 부모세대 agent수
    예를 들어 pop_size=100, topK=20이면 부모세대의 agent 100개 중 reward가 가장 높은 20개의 agent들로 자식세대 agent100개를 만듦
    '''
    
    global TRAINED_AGENT
    
    population = initialize_population(pop_size)
    global_best = {}
    
    t1 = time()
#     g = 0 # uncomment when using max_time for training instead of generations
    for g in range(generations):
#     while ((time() - t1) <= max_time): # uncomment when using max_time for training instead of generations
        
        # Evaluate the population
        pop_fitness = evaluate_population(population, episodes, max_episode_length)
        mean_pop_reward = np.array(pop_fitness).mean()
        
        # Rank the agents in descending order
        topK_idx = np.argsort(pop_fitness)[::-1][:topK]
        topK_agents = [population[i] for i in topK_idx]
        
        # Get Best Agent
        best_agent = population[topK_idx[0]]
        best_reward = pop_fitness[topK_idx[0]]
        
        # Check with global best
        if g == 0:
            global_best['reward'] = best_reward
            global_best['agent'] = best_agent
        else:
            if best_reward >= global_best['reward']:
                global_best['reward'] = best_reward
                global_best['agent'] = best_agent
                
        print('Generation', g)
        print('Mean Reward of Population', mean_pop_reward)
        print('Best Agent Reward (mean)', best_reward)
        print('Global Best Reward (mean)', global_best['reward'], '\n')
        
        # Mutate and Repopulate
        new_population = repopulate(topK_agents, pop_size, mutation_power)
        # take the best agent of generation forward without cloning as well
        new_population.append(best_agent)
        
        population = new_population
        
        TRAINED_AGENT = global_best
        
#         g += 1 # uncomment when using max_time for training instead of generations

In [12]:
evolve(generations=20,
       pop_size=20, 
       topK=10, 
       episodes=15, 
       max_episode_length=200, 
       mutation_power=0.02)

  logger.warn(
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  if hasattr(numpy, type_name):


Generation 0
Mean Reward of Population 12.003333333333334
Best Agent Reward (mean) 45.333333333333336
Global Best Reward (mean) 45.333333333333336 

Generation 1
Mean Reward of Population 15.453333333333333
Best Agent Reward (mean) 46.4
Global Best Reward (mean) 46.4 

Generation 2
Mean Reward of Population 25.369999999999997
Best Agent Reward (mean) 49.8
Global Best Reward (mean) 49.8 

Generation 3
Mean Reward of Population 38.39
Best Agent Reward (mean) 81.86666666666666
Global Best Reward (mean) 81.86666666666666 

Generation 4
Mean Reward of Population 52.416666666666664
Best Agent Reward (mean) 112.66666666666667
Global Best Reward (mean) 112.66666666666667 

Generation 5
Mean Reward of Population 55.126666666666665
Best Agent Reward (mean) 99.13333333333334
Global Best Reward (mean) 112.66666666666667 

Generation 6
Mean Reward of Population 61.006666666666675
Best Agent Reward (mean) 104.33333333333333
Global Best Reward (mean) 112.66666666666667 

Generation 7
Mean Reward of P

## Test the Trained Agent

In [13]:
def play_agent(agent, episodes=5, max_episode_length=200, render=False):
    env = gym.make("CartPole-v0")
    
    agent.eval()
    
    total_rewards = []
    
    for ep in range(episodes):
        observation = env.reset()
        env._max_episode_steps = max_episode_length
        
        episodic_reward = 0
        
        for step in range(max_episode_length):
            if render:
                env.render()
            
            input_obs = torch.Tensor(observation).unsqueeze(0)
            observation, reward, done, info = env.step(agent(input_obs).argmax(dim=1).item())
            
            episodic_reward += reward
            if done:
                break
        
        
        total_rewards.append(episodic_reward)
    
    env.close()
    print('Mean Rewards across all episodes', np.array(total_rewards).mean())
    print('Best Reward in any single episode', max(total_rewards))

In [1]:
play_agent(TRAINED_AGENT['agent'], episodes=100, max_episode_length=200, render=True)

NameError: name 'play_agent' is not defined

In [None]:
torch.save(TRAINED_AGENT['agent'].state_dict(), 'model-200.pth')