**Postgrado UAI**

Aprendizaje Reforzado para Sistemas Autónomos - [Prof. Miguel Solis](https://www.miguelsolis.info)

08 de noviembre de 2023

In [None]:
!sudo apt-get install -y xvfb ffmpeg
!pip install 'imageio==2.4.0'
!pip install pyvirtualdisplay
!pip install tf-agents

In [None]:
from __future__ import absolute_import, division, print_function

import base64
import imageio
import IPython
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import PIL.Image
import pyvirtualdisplay

import tensorflow as tf

from tf_agents.agents.dqn import dqn_agent
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.eval import metric_utils
from tf_agents.metrics import tf_metrics
from tf_agents.networks import sequential
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.specs import tensor_spec
from tf_agents.utils import common

In [None]:
# Configura monitor virtual para visualización de entornos animados
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

In [None]:
num_iterations = 5000 # @param {type:"integer"}

initial_collect_steps = 100
collect_steps_per_iteration = 1
replay_buffer_max_length = 100000

batch_size = 64
learning_rate = 1e-3
log_interval = 200

num_eval_episodes = 10
eval_interval = 1000

In [None]:
env_name = 'CartPole-v0'
env = suite_gym.load(env_name)
env.reset()
PIL.Image.fromarray(env.render())

In [None]:
# Verificación del espacio de estados y espacio de acciones
print("Espacio de acciones: {}".format(env.action_space))
print("Espacio de estados: {}".format(env.observation_space))

time_step = env.reset()
print('Instante actual:')
print(time_step)

action = np.array(1, dtype=np.int32)

next_time_step = env.step(action)
print('Instante siguiente:')
print(next_time_step)

In [None]:
# Carga de entornos distintos para entrenamiento y para validación
train_py_env = suite_gym.load(env_name)
eval_py_env = suite_gym.load(env_name)

In [None]:
# Conversión de arreglos a tensores
train_env = tf_py_environment.TFPyEnvironment(train_py_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)

In [None]:
fc_layer_params = (100, 50)
action_tensor_spec = tensor_spec.from_spec(env.action_spec())
num_actions = action_tensor_spec.maximum - action_tensor_spec.minimum + 1

def dense_layer(num_units):
  return tf.keras.layers.Dense(num_units,
activation=tf.keras.activations.relu, kernel_initializer=tf.keras.initializers.VarianceScaling(scale=2.0, mode='fan_in', distribution='truncated_normal'))

dense_layers = [dense_layer(num_units) for num_units in fc_layer_params]
q_values_layer = tf.keras.layers.Dense(
    num_actions, kernel_initializer=tf.keras.initializers.RandomUniform(minval=-0.03, maxval=0.03), bias_initializer=tf.keras.initializers.Constant(-0.2))
q_net = sequential.Sequential(dense_layers + [q_values_layer])

In [None]:
# Se instancia un agente con Deep Q-learning
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_step_counter = tf.Variable(0)

agent = dqn_agent.DqnAgent(train_env.time_step_spec(), train_env.action_spec(), q_network=q_net, optimizer=optimizer,
                           td_errors_loss_fn=common.element_wise_squared_loss, train_step_counter=train_step_counter)

agent.initialize()

In [None]:
eval_policy = agent.policy # política principal usada para evaluación y despliegue
collect_policy = agent.collect_policy # política secundaria usada para recolección de datos
random_policy = random_tf_policy.RandomTFPolicy(train_env.time_step_spec(), train_env.action_spec()) # política aleatoria

example_environment = tf_py_environment.TFPyEnvironment(suite_gym.load('CartPole-v0'))
time_step = example_environment.reset()
random_policy.action(time_step)

In [None]:
def compute_avg_return(environment, policy, num_episodes=10):
  total_return = 0.0
  for _ in range(num_episodes):

    time_step = environment.reset()
    episode_return = 0.0

    while not time_step.is_last():
      action_step = policy.action(time_step)
      time_step = environment.step(action_step.action)
      episode_return += time_step.reward
    total_return += episode_return

  avg_return = total_return / num_episodes
  return avg_return.numpy()[0]

def collect_step(environment, policy, buffer):
  time_step = environment.current_time_step()
  action_step = policy.action(time_step)
  next_time_step = environment.step(action_step.action)
  traj = trajectory.from_transition(time_step, action_step, next_time_step)

  buffer.add_batch(traj)

def collect_data(env, policy, buffer, steps):
  for _ in range(steps):
    collect_step(env, policy, buffer)

In [None]:
# Se crea el buffer de experiencias recolectadas
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=train_env.batch_size,
    max_length=replay_buffer_max_length)

collect_data(train_env, random_policy, replay_buffer, initial_collect_steps)

dataset = replay_buffer.as_dataset(num_parallel_calls=3, sample_batch_size=batch_size, num_steps=2).prefetch(3)
iterator = iter(dataset)

In [None]:
# Entrenamiento
agent.train = common.function(agent.train)
agent.train_step_counter.assign(0)

# Se evalúa la política inicial
avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
returns = [avg_return]

for _ in range(num_iterations):
  # Se recolectan algunas transiciones
  collect_data(train_env, agent.collect_policy, replay_buffer, collect_steps_per_iteration)
  # Se toman algunos de estos datos para entrenar
  experience, unused_info = next(iterator)
  train_loss = agent.train(experience).loss

  step = agent.train_step_counter.numpy()

  if step % log_interval == 0:
    print('episodio = {0}: costo = {1}'.format(step, train_loss))

  if step % eval_interval == 0:
    avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
    print('episodio = {0}: Retorno promedio = {1}'.format(step, avg_return))
    returns.append(avg_return)

In [None]:
# Visualización de desempeño
iterations = range(0, num_iterations + 1, eval_interval)
plt.plot(iterations, returns)
plt.ylabel('Retorno promedio')
plt.xlabel('Episodio')
plt.ylim(top=250)
plt.grid(True)

In [None]:
# Función para visualizar videos en python notebook
def embed_mp4(filename):
  video = open(filename,'rb').read()
  b64 = base64.b64encode(video)
  tag = '''
  <video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4">
  Your browser does not support the video tag.
  </video>'''.format(b64.decode())

  return IPython.display.HTML(tag)

def create_policy_eval_video(policy, filename, num_episodes=5, fps=30):
  filename = filename + ".mp4"
  with imageio.get_writer(filename, fps=fps) as video:
    for _ in range(num_episodes):
      time_step = eval_env.reset()
      video.append_data(eval_py_env.render())
      while not time_step.is_last():
        action_step = policy.action(time_step)
        time_step = eval_env.step(action_step.action)
        video.append_data(eval_py_env.render())
  return embed_mp4(filename)

create_policy_eval_video(agent.policy, "pendulo_entrenado")

In [None]:
# Comparación con política aleatoria
create_policy_eval_video(random_policy, "pendulo_aleatorio")