## One-Step Actor-Critic Algorithm

https://gymnasium.farama.org/environments/classic_control/

### Imports

In [2]:
import torch
import numpy as np
import random
import numpy as np
import gymnasium

### Seteamos los devices

In [3]:
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Running on {DEVICE}")
print("Cuda Available:", torch.cuda.is_available())

Running on cuda:0
Cuda Available: True


### Seteo de seeds
Siempre es buena práctica hacer el seteo de seeds para la reproducibilidad de los experimentos

In [4]:
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True

### Creamos el ambiente y probamos algunas de sus funciones.

En este caso elegimos el CartPole pero pueden cambiarlo en la variable *ENV_NAME*.
El ambiente CartPole tiene la ventaja de que las recompensas son positivas y es mas fácil propagar estas hacia los estados iniciales. Mountain Car tiene como recompensa -1 por cada paso que damos y esta limitado a 200 pasos.

In [5]:
ENVS = ["MountainCar-v0", "CartPole-v1"]
ENV_NAME = ENVS[1]

env = gymnasium.make(ENV_NAME, render_mode="rgb_array")

print("Actions #",env.action_space)
print(env.observation_space.shape)
env.reset()
next_state, reward, terminated, truncated, info = env.step(action=0)

print(f"{next_state.shape},\n {reward},\n {terminated},\n {info}")

Actions # Discrete(2)
(4,)
(4,),
 1.0,
 False,
 {}


### Seteamos los hyperparámetros

In [6]:
def process_state(obs, device):
    return torch.tensor(obs, device=device).unsqueeze(0)

#Hiperparámetros de entrenamiento del agente DQN
TOTAL_STEPS = 1000000
EPISODES = 1500
STEPS = 200

EPSILON_INI = 1
EPSILON_MIN = 0.1
EPSILON_DECAY = 40000
EPISODE_BLOCK = 20
EPSILON_TIME = 100000

BATCH_SIZE = 128
BUFFER_SIZE = 10000

GAMMA = 0.999
LEARNING_RATE = 1e-4

### Creamos el ambiente que vamos a estar usando para el entrenamiento

In [7]:
env = gymnasium.make(ENV_NAME)
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n

print(f"Input dim: {input_dim}, Output dim: {output_dim}")

Input dim: 4, Output dim: 2


### Repasamos el pesudo código del agente One-Step Actor-Critic
![](pseudocode.png)

### Definimos nuestras redes Actor y Critic

In [8]:
import torch.nn as nn
import torch.nn.functional as F

class ActorCriticGeneric_Model(nn.Module):
  def __init__(self, input_dim, output_dim):
    super().__init__()
    self.fc1 = nn.Linear(in_features=input_dim, out_features=32)
    self.output = nn.Linear(in_features=32, out_features=output_dim)

  def forward(self, env_input):
    result = F.relu(self.fc1(env_input))
    return self.output(result)

In [9]:
actor_net = ActorCriticGeneric_Model(input_dim, output_dim).to(DEVICE)
critic_net = ActorCriticGeneric_Model(input_dim, 1).to(DEVICE)

In [24]:
from abc import ABC, abstractmethod
from tqdm import tqdm

class OneStepActorCriticAgent():
    def __init__(self, gym_env, obs_processing_func, gamma, episode_block, actor_net, critic_net):
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        
        #Guardo las redes en el estado del agente
        self.actor_net = actor_net
        self.critic_net = critic_net


        # Funcion phi para procesar los estados.
        self.state_processing_function = obs_processing_func

        self.env = gym_env
        # Hyperparameters
        self.gamma = gamma

        self.episode_block = episode_block

        self.total_steps = 0

        # Asignar una función de costo para el Actor y Critic (y enviarlas al dispositivo adecuado)
        # self.loss_function = ?

        # Asignar optimizadores (Adam)
        # self.optimizerActor = ?
        # self.optimizerCritic = ?
    
    def train(self, number_episodes = 50000, max_steps_episode = 10000, max_steps=1000000):
      rewards = []
      total_steps = 0

      for ep in tqdm(range(number_episodes), unit=' episodes'):
        if total_steps > max_steps:
            break
        
        # Observar estado inicial como indica el algoritmo

        current_episode_reward = 0.0

        for s in range(max_steps):

            # Seleccionar accion usando la política del actor.

            # Ejecutar la accion, observar resultado y procesarlo como indica el algoritmo.

            current_episode_reward += reward
            total_steps += 1

            # Calcular la ventaja (delta)
            
            # Actualizar el modelo del Critic
            
            # Actualizar el modelo del Actor

            # Actualizar el estado

            done = False
            if done: 
                break
        
        rewards.append(current_episode_reward)
        mean_reward = np.mean(rewards[-100:])

        # Report on the traning rewards every EPISODE BLOCK episodes
        if ep % self.episode_block == 0:
          print(f"Episode {ep} - Avg. Reward over the last {self.episode_block} episodes {np.mean(rewards[-self.episode_block:])} total steps {total_steps}")

      print(f"Episode {ep + 1} - Avg. Reward over the last {self.episode_block} episodes {np.mean(rewards[-self.episode_block:])} total steps {total_steps}")

      return rewards    
        

### Creamos el agente con los hyperparámetros y la red

In [25]:
agent = OneStepActorCriticAgent(env, process_state, GAMMA, EPISODE_BLOCK, actor_net, critic_net)

### Entrenamos a nuestro agente!

In [26]:
rewards, wins = agent.train(EPISODES, STEPS, TOTAL_STEPS)

  0%|          | 2/1500 [00:00<02:48,  8.86 episodes/s]

Episode 0 - Avg. Reward over the last 20 episodes 1000000.0 total steps 1000000
Episode 3 - Avg. Reward over the last 20 episodes 1000000.0 total steps 2000000





### Graficamos las recompensas obtenidas durante el entrenamiento 

In [None]:
import matplotlib.pyplot as plt

average_range = EPISODE_BLOCK
episode_ticks = int(len(rewards) / average_range)

avg_rewards = np.array(rewards).reshape((episode_ticks, average_range))
avg_rewards = np.mean(avg_rewards, axis=1)

plt.plot(range(len(avg_rewards)), avg_rewards)

### Creamos un video para ver la performance del agente

In [None]:
import glob
from gymnasium.wrappers.record_video import RecordVideo
from IPython.display import HTML
from IPython import display as ipythondisplay
import io
import base64

def show_video():
  """
  Utility function to enable video recording of gym environment and displaying it
  To enable video, just do "env = wrap_env(env)""
  """
  mp4list = glob.glob('./videos/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else:
    print("Could not find video")


def wrap_env(env):
  """
  Wrapper del ambiente donde definimos un Monitor que guarda la visualizacion como un archivo de video.
  """

  #env = Monitor(env, './video', force=True)
  env = RecordVideo(env,video_folder='./videos')
  return env

In [None]:
env = wrap_env(gymnasium.make(ENV_NAME, render_mode="rgb_array"))
observation,_ = env.reset()

while True:
    env.render()

    action = agent.select_action(process_state(observation, DEVICE), train=False)
    observation, reward, done, truncated, info = env.step(action)

    if done or truncated:
      break

# Cerramos la conexion con el Monitor de ambiente y mostramos el video.
env.close()
show_video()

del env