In [4]:
from torch import nn
import torch
import gym
from collections import deque
import itertools
import numpy as np
import random

In [5]:
GAMMA = 0.99                # discount rate
BATCH_SIZE = 32             # quanti elementi dal replay buffer
BUFFER_SIZE = 50_000        # dimensione del replay buffer, superato questo valore, i vecchi elementi vengono sovrascritti
MIN_REPLAY_SIZE = 1_000     # quanti elementi sono necessari prima di iniziare la discesa del gradiente
EPSILON_START = 1.0 
EPSILON_END = 0.02
EPSILON_DECAY = 10_000      # quanti episodi per arrivare da EPSILON_START a EPSILON_END
TARGET_UPDATE_FREQ = 1_000  # ogni quanti episodi aggiorno Q con Q^
LEARNING_RATE = 5e-4
TARGET_SAVE_FREQ = TARGET_UPDATE_FREQ*25
MODELS_DIR = './saved_models'

In [6]:
class Network(nn.Module):
    def __init__(self,env):
        super().__init__()

        # ottengo le dimensioni dello spazio di osservazione
        in_features = int(np.prod(env.observation_space.shape))

        self.net = nn.Sequential(
            nn.Linear(in_features, 64),
            nn.Tanh(),
            # questo algoritmo può essere usato solo
            # se le azioni appartengono a uno spazio
            # finito
            nn.Linear(64, env.action_space.n),
        )
    def forward(self, x):
        return self.net(x)
    def act(self,obs):
        obs = obs[0] if type(obs) is tuple else obs
        obs_t = torch.as_tensor(obs, dtype=torch.float32)
        # perchè è un batch di un solo elemento
        q_values = self.forward(obs_t.unsqueeze(0))
        max_q_index = torch.argmax(q_values, dim=1)[0]
        action = max_q_index.detach().item()
        # a valute between [0,num_actions-1]
        return action

In [None]:
env = gym.make('CartPole-v0')

replay_buffer = deque(maxlen=BUFFER_SIZE)

# tieni traccia dei reward per ogni episodio
reward_buffer = deque([0.0],maxlen=100)
episode_reward = 0.0


In [None]:
online_net = Network(env)
target_net = Network(env)
target_net.load_state_dict(online_net.state_dict())

optimizer = torch.optim.Adam(online_net.parameters(), lr=LEARNING_RATE)

In [None]:
# iniziatlize Replay Buffer
obs = env.reset()

In [None]:
for _ in range(MIN_REPLAY_SIZE):
    action = env.action_space.sample()

    new_obs, rew, done, _,_ = env.step(action)
    
    transition = (obs,action,rew,done,new_obs)
    
    replay_buffer.append(transition)
    obs = new_obs

    if done:
        obs = env.reset()


In [None]:
# training loop

obs = env.reset()

for step in itertools.count():
    epsilon = np.interp(step,[0,EPSILON_DECAY],[EPSILON_START,EPSILON_END])
    rnd_sample = random.random()

    if rnd_sample < epsilon:
        action = env.action_space.sample()
    else:
        
        action = online_net.act(obs if type(obs) is not tuple else obs[0])

    new_obs, rew, done, _,_ = env.step(action)

    transition = (obs,action,rew,done,new_obs)
    replay_buffer.append(transition)
    obs = new_obs

    episode_reward += rew

    if done:
        obs = env.reset()

        reward_buffer.append(episode_reward)
        episode_reward = 0.0


    # start gradient step 
    transitions = random.sample(replay_buffer,BATCH_SIZE)
    # si usa il numpy perch torch è più veloce sui numpy
    
    obses = np.asarray([t[0] if type(t[0]) is not tuple else t[0][0] for t in transitions])
    actions = np.asarray([t[1] for t in transitions])
    rewards = np.asarray([t[2] for t in transitions])
    dones = np.asarray([t[3] for t in transitions])
    new_obses = np.asarray([t[4] for t in transitions])


    obses_t = torch.as_tensor(obses, dtype=torch.float32)
    # qui è unsqueeze(-1) perchè è un batch effettivo e quindi dobbiamo
    # fare il resize alla dimensione del batch
    actions_t = torch.as_tensor(actions, dtype=torch.int64).unsqueeze(-1)
    rewards_t = torch.as_tensor(rewards, dtype=torch.float32).unsqueeze(-1)
    dones_t = torch.as_tensor(dones, dtype=torch.float32).unsqueeze(-1)
    new_obses_t = torch.as_tensor(new_obses, dtype=torch.float32)

    # compute targets

    target_q_values = target_net(new_obses_t)
    # ogni predizione restituisce il q value per ogni azione, prendiamo solo
    # quello per l'azione scelta
    max_target_q_values = target_q_values.max(dim=1,keepdim=True)[0]

    targets = rewards_t + GAMMA * (1-dones_t) * max_target_q_values

    # compute loss

    q_values = online_net(obses_t)
    # in questo caso dobbiamo prendere il q value per l'azione scelta
    # che potrebbe essere diversa da quella massima per il passo random
    action_q_values = torch.gather(input = q_values, dim=1, index = actions_t)

    loss = nn.functional.smooth_l1_loss(action_q_values,targets)

    # gradient descent

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # update target network
    if step % TARGET_UPDATE_FREQ == 0:
        target_net.load_state_dict(online_net.state_dict())
        
    # checkpointing
    if step % TARGET_SAVE_FREQ == 0:
        print("Saving target net")
        torch.save(target_net.state_dict(), MODELS_DIR+"/cart_pole_target_net.pt")
    
    # Logging
    if step % 1000 == 0:
        print()
        print('Step', step)
        print('Avg Rew',np.mean(reward_buffer))


    


In [None]:
env.close()

In [7]:
env = gym.make('CartPole-v0',render_mode='human')
online_net = Network(env)
online_net.load_state_dict(torch.load(MODELS_DIR+"/cart_pole_target_net.pt"))
obs = env.reset()\

done = False
while True:
    done = False
    env.reset()
    while not done:
        action = online_net.act(obs)
        obs, rew, done, _,_ = env.step(action)
        env.render()
    


  logger.warn(
  if not isinstance(terminated, (bool, np.bool8)):


KeyboardInterrupt: 