In [9]:
import random
import numpy as np
import torch
from torch import nn


torch.manual_seed(10)
random.seed(10)
np.random.seed(10)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [10]:
device

device(type='cpu')

In [4]:
class ReplayBuffer:
    def __init__(self, buffer_size, batch_size, seed):
        """
        Replay memory permite que o agente registre experiências e aprenda 
        com elas.
        
        Parametros
        ---------
        buffer_size (int): tamanho máximo da memória interna
        batch_size (int): tamanho do batch que será amostrado durante o treino
        seed (int): random seed
        """

        self.batch_size = batch_size
        self.seed = random.seed(seed)
        self.memory = deque(maxlen=buffer_size)
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
    
    def add(self, state, action, reward, next_state, done):
        """Adicionar experiência"""
        experience = self.experience(state, action, reward, next_state, done)
        self.memory.append(experience)
                
    def sample(self):
        """ 
        Amostrar aleatoriamente e retornar a tupla (estado, ação, recompensa, 
        próximo_estado, concluído) como torch tensors
        """
        experiences = random.sample(self.memory, k=self.batch_size)
        
        # Converter em torch tensors
        states = torch.from_numpy(np.vstack([experience.state for experience in experiences if experience is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([experience.action for experience in experiences if experience is not None])).long().to(device)        
        rewards = torch.from_numpy(np.vstack([experience.reward for experience in experiences if experience is not None])).float().to(device)        
        next_states = torch.from_numpy(np.vstack([experience.next_state for experience in experiences if experience is not None])).float().to(device)  
        
        # Converter done de boolean para int
        dones = torch.from_numpy(np.vstack([experience.done for experience in experiences if experience is not None]).astype(np.uint8)).float().to(device)        
        
        return (states, actions, rewards, next_states, dones)
        
    def __len__(self):
        return len(self.memory)

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, seed):
        """
        Construa uma rede neural fully connected
        
        Parameters
        ----------
        state_size (int): Dimensão do estado
        action_size (int): Dimensão da ação
        seed (int): random seed
        """
        super(QNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 32)
        self.fc2 = nn.Linear(32, 64)
        self.fc3 = nn.Linear(64, action_size)  
        
    def forward(self, x):
        """Forward pass"""
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x        

In [6]:
class DQNAgent:
    def __init__(self, state_size, action_size, seed):
        """
        O Agente DQN interage com o ambiente, 
        armazena a experiência e aprende com ela
        
        Parametros
        ----------
        state_size (int): Dimensão do estado
        action_size (int): Dimensão da ação
        seed (int): random seed
        """

        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        # Inicializar redes Q 
        self.q_network = QNetwork(state_size, action_size, seed).to(device)
        self.fixed_network = QNetwork(state_size, action_size, seed).to(device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=LR)

        # Inicializar memória
        self.memory = ReplayBuffer(BUFFER_SIZE, BATCH_SIZE, seed)
        self.timestep = 0


    # HW defina a função de loss
    def criterion(self, predicted, target):
        """
        Calcula o erro quadrático médio entre o valor predito e o valor alvo

        Parametros
        ----------
        predicted (torch.tensor): valor predito
        target (torch.tensor): valor alvo

        -> torch.Te
        """
        loss = nn.MSELoss()
        predicted = predicted
        target = target
        return loss(predicted, target)
        
    def step(self, state, action, reward, next_state, done):
        """
        Atualiza o conhecimento do Agente
        
        Parametros
        ----------
        state (array_like): Estado atual do ambiente
        action (int): Ação realizada no estado atual
        reward (float): Recompensa recebida após a ação
        next_state (array_like): Próximo estado retornado pelo ambiente após a ação
        done (bool): se o episódio terminou após a ação
        """

        # Salva transição no replay buffer
        self.memory.add(state, action, reward, next_state, done)
        self.timestep += 1

        # Realiza o aprendizado do agente a cada UPDATE_EVERY steps
        if self.timestep % UPDATE_EVERY == 0:
            if len(self.memory) > BATCH_SIZE:
                sampled_experiences = self.memory.sample()
                loss = self.learn(sampled_experiences)

                return loss
        return None

    def learn(self, experiences):
        """
        Aprende com a experiência treinando a q_network
        
        Parametros
        ----------
        experiences (array_like): Lista de experiências amostradas da memória do agente
        """
        states, actions, rewards, next_states, dones = experiences

        # Obtenha a ação com valor máximo de Q
        action_values = self.fixed_network(next_states).detach()

        # Notas
        # tensor.max(1)[0] retorna os valores, tensor.max(1)[1] retorna os índices
        # operação unsqueeze --> np.reshape
        # Aqui nós fazemos torch.Size([64]) -> torch.Size([64, 1])
        # O valor obtido será correspondente a max_a Q(S', a)
        max_action_values = action_values.max(1)[0].unsqueeze(1)
        
        
        # HW Defina o Q_target (Q*) com valores de ação com desconto para todas
        # as transições não finais (done == 0). Para as transições finais (done == 1)
        # Q(S', a) é 0, portanto utilize a recompensa diretamente.
        # Q*(S, A) <- r + γ max_a Q(S', a)
        Q_target = max_action_values * GAMMA + rewards

        # Calcula Q(s_t, a) - o modelo calcula Q (s_t), então selecionamos as 
        # colunas de ações tomadas. Estas são as ações que teriam sido tomadas 
        # para cada estado do batch de acordo com a rede Q
        Q_predicted = self.q_network(states).gather(1, actions)
        
        # HW Calcula a loss
        loss = self.criterion(Q_predicted, Q_target)
        # HW zerar os gradientes
        self.optimizer.zero_grad()
        # HW calcula os novos gradientes (backward pass)
        loss.backward()
        # HW atualiza os pesos com o otimizador
        self.optimizer.step()
        # Atualizar pesos da rede Q fixa
        self.update_fixed_network(self.q_network, self.fixed_network)

        return loss.detach().cpu().numpy()
        
    def update_fixed_network(self, q_network, fixed_network):
        """
        Atualize a rede fixa copiando os pesos da rede Q usando o parâmetro TAU
        
        Parametros
        ----------
        q_network (PyTorch model): Q network
        fixed_network (PyTorch model): target network fixa
        """
        for source_parameters, target_parameters in zip(q_network.parameters(), fixed_network.parameters()):
            target_parameters.data.copy_(TAU * source_parameters.data + (1.0 - TAU) * target_parameters.data)
        
        
    def act(self, state, eps=0.0):
        """
        Escolha a ação
        
        Parametros
        ----------
        state (array_like): estado atual do ambiente
        eps (float): epsilon para seleção epsilon-greedy de ação
        """
        rnd = random.random()

        if rnd < eps:
            return np.random.randint(self.action_size)
        else:
            # Seleciona a melhor ação com probabilidade 1 - eps
            state = torch.from_numpy(state).float().unsqueeze(0).to(device)
            # coloque a rede em modo de avaliação
            self.q_network.eval()
            with torch.no_grad():
                action_values = self.q_network(state)

            # Voltar ao modo de treino
            self.q_network.train()
            action = np.argmax(action_values.cpu().data.numpy())
            return action    
        
    def checkpoint(self, filename):
        torch.save(self.q_network.state_dict(), filename)

    def load_model(self, filename):
        self.q_network.load_state_dict(torch.load(filename))
        self.fixed_network.load_state_dict(torch.load(filename))


In [7]:
BUFFER_SIZE = int(1e5)  # Tamanho do Replay Buffer
BATCH_SIZE = 64         # Número de experiências para amostrar da memória
GAMMA = 0.99            # Fator de desconto
TAU = 1e-3              # Parâmetro de atualização suave para atualização de rede Q fixa
LR = 1e-4               # Taxa de aprendizagem
UPDATE_EVERY = 4        # Com que frequência atualizar a rede Q 

MAX_EPISODES = 3000  # Número máximo de episódios para jogar
MAX_STEPS = 1000     # passos máximos permitidos em um único episódio / jogo
ENV_SOLVED = 200     # Pontuação MAX em que consideramos o ambiente ser resolvido
PRINT_EVERY = 100    # Com que freqüência imprimir o progresso

# Epsilon 
EPS_START = 1.0      # Valor padrão / inicial de eps
EPS_DECAY = 0.999    # Taxa de decaimento do épsilon
EPS_MIN = 0.01       # Épsilon mínimo

In [8]:
EPS_DECAY_RATES = [0.9, 0.99, 0.999, 0.9999]
plt.figure(figsize=(10,6))

fig = go.Figure([])

_eps_list = []
for decay_rate in EPS_DECAY_RATES:
    test_eps = EPS_START
    eps_list = []
    for _ in range(MAX_EPISODES):
        test_eps = max(test_eps * decay_rate, EPS_MIN)
        eps_list.append(test_eps)          
    
    _eps_list.append(eps_list)


fig = go.Figure([
    go.Scatter(
        y=_eps_list[0],
        x=[ep for ep in range(1, len(eps_list) + 1)],
        mode="lines",
        name=EPS_DECAY_RATES[0]
    ),
    go.Scatter(
        y=_eps_list[1],
        x=[ep for ep in range(1, len(eps_list) + 1)],
        mode="lines",
        name=EPS_DECAY_RATES[1]
    ),
    go.Scatter(
        y=_eps_list[2],
        x=[ep for ep in range(1, len(eps_list) + 1)],
        mode="lines",
        name=EPS_DECAY_RATES[2]
    ),
    go.Scatter(
        y=_eps_list[3],
        x=[ep for ep in range(1, len(eps_list) + 1)],
        mode="lines",
        name=EPS_DECAY_RATES[3]
    ),
])
fig.update_layout(
    title="Efeito de várias taxas de decaimento",
    xaxis = dict(title="# Episódios"),
    yaxis = dict(title="Epsilon")
)
fig.show()

NameError: name 'plt' is not defined