In [2]:
!pip install gym[classic_control,mujoco,atari,accept-rom-license]==0.25.2

Collecting pygame==2.1.0 (from gym[accept-rom-license,atari,classic_control,mujoco]==0.25.2)
  Downloading pygame-2.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m60.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ale-py~=0.7.5 (from gym[accept-rom-license,atari,classic_control,mujoco]==0.25.2)
  Downloading ale_py-0.7.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m84.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mujoco==2.2.0 (from gym[accept-rom-license,atari,classic_control,mujoco]==0.25.2)
  Downloading mujoco-2.2.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.6/3.6 MB[0m [31m67.5 MB/s[0m eta [36m0:00:00[0m
Collecting autorom[accept-rom-license]~=0.4.2 (from gym[accept-r

In [71]:
from typing import Sequence, Callable, Tuple, Optional, Union, List, Dict

import numpy as np
import torch
from torch import nn
import cv2

import gym
from gym import wrappers
from gym.wrappers.record_episode_statistics import RecordEpisodeStatistics

import time
import argparse
import os

import tqdm

%matplotlib inline
from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import display, clear_output, HTML
import numpy as np

In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"

  and should_run_async(code)


In [5]:
def from_numpy(data: Union[np.ndarray, dict], **kwargs):
    if isinstance(data, dict):
        return {k: from_numpy(v) for k, v in data.items()}
    else:
        data = torch.from_numpy(data, **kwargs)
        if data.dtype == torch.float64:
            data = data.float()
        return data.to(device)

def to_numpy(tensor: Union[torch.Tensor, dict]):
    if isinstance(tensor, dict):
        return {k: to_numpy(v) for k, v in tensor.items()}
    else:
        return tensor.to("cpu").detach().numpy()

In [6]:
class DQNAgent(nn.Module):
    def __init__(
        self,
        observation_shape: Sequence[int],
        num_actions: int,
        make_critic: Callable[[Tuple[int, ...], int], nn.Module],
        make_optimizer: Callable[[torch.nn.ParameterList], torch.optim.Optimizer],
        make_lr_schedule: Callable[
            [torch.optim.Optimizer], torch.optim.lr_scheduler._LRScheduler
        ],
        discount: float,
        target_update_period: int,
        use_double_q: bool = False,
        clip_grad_norm: Optional[float] = None,
    ):
        super().__init__()

        self.critic = make_critic(observation_shape, num_actions)
        self.target_critic = make_critic(observation_shape, num_actions)
        self.critic_optimizer = make_optimizer(self.critic.parameters())
        self.lr_scheduler = make_lr_schedule(self.critic_optimizer)

        self.observation_shape = observation_shape
        self.num_actions = num_actions
        self.discount = discount
        self.target_update_period = target_update_period
        self.clip_grad_norm = clip_grad_norm
        self.use_double_q = use_double_q

        self.critic_loss = nn.MSELoss()

        self.update_target_critic()

    def get_action(self, observation: np.ndarray, epsilon: float = 0.02) -> int:
        """
        Used for evaluation.
        """
        observation = from_numpy(np.asarray(observation))[None]

        # TODO: get the action from the critic using an epsilon-greedy strategy
        if np.random.uniform() < epsilon:
            action = torch.randint(0, self.num_actions, (observation.shape[0],))
        else:
            action = self.critic(observation).argmax(dim=-1)

        return to_numpy(action).squeeze(0).item()

    def update_critic(
        self,
        obs: torch.Tensor,
        action: torch.Tensor,
        reward: torch.Tensor,
        next_obs: torch.Tensor,
        done: torch.Tensor,
    ) -> dict:
        """Update the DQN critic, and return stats for logging."""
        (batch_size,) = reward.shape

        # Compute target values
        with torch.no_grad():
            # TODO: compute target values
            next_qa_values = self.target_critic(next_obs)

            if self.use_double_q:
                raise NotImplementedError
            else:
                next_action = next_qa_values.argmax(dim=-1, keepdim=True)

            next_q_values = torch.gather(next_qa_values, -1, next_action)
            target_values = reward.unsqueeze(-1) + self.discount * next_q_values
            target_values = torch.where(done, reward, target_values.squeeze(-1))

        # TODO: train the critic with the target values
        qa_values = self.critic(obs)
        q_values = torch.gather(qa_values, -1, action.unsqueeze(-1)).squeeze(-1) # Compute from the data actions; see torch.gather
        loss = self.critic_loss(q_values, target_values)


        self.critic_optimizer.zero_grad()
        loss.backward()
        grad_norm = torch.nn.utils.clip_grad.clip_grad_norm_(
            self.critic.parameters(), self.clip_grad_norm or float("inf")
        )
        self.critic_optimizer.step()

        self.lr_scheduler.step()

        return {
            "critic_loss": loss.item(),
            "q_values": q_values.mean().item(),
            "target_values": target_values.mean().item(),
            "grad_norm": grad_norm.item(),
        }

    def update_target_critic(self):
        self.target_critic.load_state_dict(self.critic.state_dict())

    def update(
        self,
        obs: torch.Tensor,
        action: torch.Tensor,
        reward: torch.Tensor,
        next_obs: torch.Tensor,
        done: torch.Tensor,
        step: int,
    ) -> dict:
        """
        Update the DQN agent, including both the critic and target.
        """
        # TODO: update the critic, and the target if needed
        critic_stats = self.update_critic(obs,
                                          action,
                                          reward,
                                          next_obs,
                                          done)

        if step % self.target_update_period == 0:
            self.update_target_critic()

        return critic_stats

In [7]:
class ReplayBuffer:
    def __init__(self, capacity=1000000):
        self.max_size = capacity
        self.size = 0
        self.observations = None
        self.actions = None
        self.rewards = None
        self.next_observations = None
        self.dones = None

    def sample(self, batch_size):
        rand_indices = np.random.randint(0, self.size, size=(batch_size,)) % self.max_size
        return {
            "observations": self.observations[rand_indices],
            "actions": self.actions[rand_indices],
            "rewards": self.rewards[rand_indices],
            "next_observations": self.next_observations[rand_indices],
            "dones": self.dones[rand_indices],
        }

    def __len__(self):
        return self.size

    def insert(
        self,
        /,
        observation: np.ndarray,
        action: np.ndarray,
        reward: np.ndarray,
        next_observation: np.ndarray,
        done: np.ndarray,
    ):
        """
        Insert a single transition into the replay buffer.

        Use like:
            replay_buffer.insert(
                observation=observation,
                action=action,
                reward=reward,
                next_observation=next_observation,
                done=done,
            )
        """
        if isinstance(reward, (float, int)):
            reward = np.array(reward)
        if isinstance(done, bool):
            done = np.array(done)
        if isinstance(action, int):
            action = np.array(action, dtype=np.int64)

        if self.observations is None:
            self.observations = np.empty(
                (self.max_size, *observation.shape), dtype=observation.dtype
            )
            self.actions = np.empty((self.max_size, *action.shape), dtype=action.dtype)
            self.rewards = np.empty((self.max_size, *reward.shape), dtype=reward.dtype)
            self.next_observations = np.empty(
                (self.max_size, *next_observation.shape), dtype=next_observation.dtype
            )
            self.dones = np.empty((self.max_size, *done.shape), dtype=done.dtype)

        assert observation.shape == self.observations.shape[1:]
        assert action.shape == self.actions.shape[1:]
        assert reward.shape == ()
        assert next_observation.shape == self.next_observations.shape[1:]
        assert done.shape == ()

        self.observations[self.size % self.max_size] = observation
        self.actions[self.size % self.max_size] = action
        self.rewards[self.size % self.max_size] = reward
        self.next_observations[self.size % self.max_size] = next_observation
        self.dones[self.size % self.max_size] = done

        self.size += 1

In [101]:
def sample_trajectory(
    env: gym.Env, policy, max_length: int, render: bool = False
) -> Dict[str, np.ndarray]:
    """Sample a rollout in the environment from a policy."""
    ob = env.reset()
    obs, acs, rewards, next_obs, terminals, image_obs = [], [], [], [], [], []
    steps = 0

    while True:
        # render an image
        if render:
            if hasattr(env, "sim"):
                img = env.sim.render(camera_name="track", height=500, width=500)[::-1]
            else:
                img = env.render(mode="rgb_array")

            if isinstance(img, list):
                img = img[0]

            image_obs.append(
                cv2.resize(img, dsize=(250, 250), interpolation=cv2.INTER_CUBIC)
            )

        ac = policy.get_action(ob)

        next_ob, rew, done, info = env.step(ac)

        steps += 1
        rollout_done = done or steps > max_length

        # record result of taking that action
        obs.append(ob)
        acs.append(ac)
        rewards.append(rew)
        next_obs.append(next_ob)
        terminals.append(rollout_done)

        ob = next_ob  # jump to next timestep

        # end the rollout if the rollout ended
        if rollout_done:
            break

    episode_statistics = {"l": steps, "r": np.sum(rewards)}
    if "episode" in info:
        episode_statistics.update(info["episode"])

    env.close()

    return {
        "observation": np.array(obs, dtype=np.float32),
        "image_obs": np.array(image_obs, dtype=np.uint8),
        "reward": np.array(rewards, dtype=np.float32),
        "action": np.array(acs, dtype=np.float32),
        "next_observation": np.array(next_obs, dtype=np.float32),
        "terminal": np.array(terminals, dtype=np.float32),
        "episode_statistics": episode_statistics,
    }

def sample_n_trajectories(
    env: gym.Env, policy, ntraj: int, max_length: int, render: bool = False
):
    """Collect ntraj rollouts."""
    trajs = []
    for _ in range(ntraj):
        # collect rollout
        traj = sample_trajectory(env, policy, max_length, render)
        trajs.append(traj)
    return trajs

def log_paths_as_videos(paths, max_videos_to_save=2):
        # reshape the rollouts
        # videos = [np.transpose(p['image_obs'], [0, 3, 1, 2]) for p in paths]
        videos = [p['image_obs'] for p in paths]

        # max rollout length
        max_videos_to_save = np.min([max_videos_to_save, len(videos)])
        max_length = videos[0].shape[0]
        for i in range(max_videos_to_save):
            if videos[i].shape[0]>max_length:
                max_length = videos[i].shape[0]

        # pad rollouts to all be same length
        for i in range(max_videos_to_save):
            if videos[i].shape[0]<max_length:
                padding = np.tile([videos[i][-1]], (max_length-videos[i].shape[0],1,1,1))
                videos[i] = np.concatenate([videos[i], padding], 0)

        # log videos to tensorboard event file
        videos = np.stack(videos[:max_videos_to_save], 0)

        return videos

def plot_trajectories(videos):
    fig = plt.figure()
    imgs = []

    n_trajs = videos.shape[0]
    for i in range(1, n_trajs + 1):
        fig.add_subplot(1, n_trajs, i)
        imgs.append(plt.imshow(videos[i - 1, 0, ...]))

    plt.close() # this is required to not display the generated image

    def init():
        for j, im in enumerate(imgs):
            im.set_data(videos[j, 0, ...])

        return imgs

    def animate(i):
        for j, im in enumerate(imgs):
            im.set_data(videos[j, i, ...])

        return imgs

    anim = animation.FuncAnimation(fig,
                                   animate,
                                   init_func=init,
                                   frames=videos.shape[1],
                                   interval=25,
                                   repeat=False)
                                #    repeat_delay=1000)

    clear_output(True)
    display(HTML(anim.to_html5_video()))

def run_training_loop(config,
                      seed=42):
    # set random seeds
    np.random.seed(seed)
    torch.manual_seed(seed)

    env = config["make_env"]()
    eval_env = config["make_env"]()
    # render_env = RecordVideo(config["make_env"](render=True), video_folder="videos")
    render_env = config["make_env"](render=True)

    # make the gym environment
    exploration_schedule = config["exploration_schedule"]
    discrete = isinstance(env.action_space, gym.spaces.Discrete)

    assert discrete, "DQN only supports discrete action spaces"

    agent = DQNAgent(
        env.observation_space.shape,
        env.action_space.n,
        **config["agent_kwargs"],
    )

    # simulation timestep, will be used for video saving
    if "model" in dir(env):
        fps = 1 / env.model.opt.timestep
    elif "render_fps" in env.env.metadata:
        fps = env.env.metadata["render_fps"]
    else:
        fps = 4

    ep_len = env.spec.max_episode_steps

    observation = None

    stacked_frames = False
    replay_buffer = ReplayBuffer()

    def reset_env_training():
        nonlocal observation

        observation = env.reset()

        assert not isinstance(
            observation, tuple
        ), "env.reset() must return np.ndarray - make sure your Gym version uses the old step API"
        observation = np.asarray(observation)

    reset_env_training()

    stats = {}

    t = tqdm.trange(config["total_steps"], dynamic_ncols=True)

    for step in t:
        epsilon = exploration_schedule.value(step)

        # TODO: Compute action
        action = agent.get_action(observation, epsilon)

        # TODO: Step the environment
        next_observation, reward, done, info = env.step(action)

        next_observation = np.asarray(next_observation)
        truncated = info.get("TimeLimit.truncated", False)

        # TODO: Add the data to the replay buffer
        replay_buffer.insert(observation=observation,
                             action=action,
                             reward=reward,
                             next_observation=next_observation,
                             done=(not truncated) and done)

        # Handle episode termination
        if done:
            reset_env_training()
        else:
            observation = next_observation

        # Main DQN training loop
        if step >= config["learning_starts"]:
            # TODO: Sample config["batch_size"] samples from the replay buffer
            batch = replay_buffer.sample(config["batch_size"])

            # Convert to PyTorch tensors
            batch = from_numpy(batch)

            # TODO: Train the agent. `batch` is a dictionary of numpy arrays,
            update_info = agent.update(batch["observations"],
                                       batch["actions"],
                                       batch["rewards"],
                                       batch["next_observations"],
                                       batch["dones"],
                                       step)

            # Logging code
            update_info["epsilon"] = epsilon
            update_info["lr"] = agent.lr_scheduler.get_last_lr()[0]

            if step % config["log_interval"] == 0:
                for k, v in update_info.items():
                    stats[k] = v

                t.set_postfix(stats, refresh=True)

        if step % config["eval_interval"] == 0:
            # Evaluate
            trajectories = sample_n_trajectories(
                eval_env,
                agent,
                config["num_eval_trajectories"],
                ep_len,
            )
            returns = [t["episode_statistics"]["r"] for t in trajectories]
            ep_lens = [t["episode_statistics"]["l"] for t in trajectories]

            stats["eval_return"] = np.mean(returns)
            stats["eval_ep_len"] = np.mean(ep_lens)

            if len(returns) > 1:
                stats["eval/return_std"] = np.std(returns)
                stats["eval/return_max"] = np.max(returns)
                stats["eval/return_min"] = np.min(returns)
                stats["eval/ep_len_std"] = np.std(ep_lens)
                stats["eval/ep_len_max"] = np.max(ep_lens)
                stats["eval/ep_len_min"] = np.min(ep_lens)

                t.set_postfix(stats, refresh=True)

            if config["num_render_trajectories"] > 0:
                video_trajectories = sample_n_trajectories(
                    render_env,
                    agent,
                    config["num_render_trajectories"],
                    ep_len,
                    render=True,
                )

                videos = log_paths_as_videos(video_trajectories,
                                             max_videos_to_save=5)

                plot_trajectories(videos)


In [102]:
class LinearSchedule(object):
    def __init__(self, schedule_timesteps, final_p, initial_p=1.0):
        """Linear interpolation between initial_p and final_p over
        schedule_timesteps. After this many timesteps pass final_p is
        returned.
        Parameters
        ----------
        schedule_timesteps: int
            Number of timesteps for which to linearly anneal initial_p
            to final_p
        initial_p: float
            initial output value
        final_p: float
            final output value
        """
        self.schedule_timesteps = schedule_timesteps
        self.final_p            = final_p
        self.initial_p          = initial_p

    def value(self, t):
        """See Schedule.value"""
        fraction  = min(float(t) / self.schedule_timesteps, 1.0)
        return self.initial_p + fraction * (self.final_p - self.initial_p)

_str_to_activation = {
    "relu": nn.ReLU(),
    "tanh": nn.Tanh(),
    "leaky_relu": nn.LeakyReLU(),
    "sigmoid": nn.Sigmoid(),
    "selu": nn.SELU(),
    "softplus": nn.Softplus(),
    "identity": nn.Identity(),
}

def build_mlp(
    input_size: int,
    output_size: int,
    n_layers: int,
    size: int,
    activation = "tanh",
    output_activation = "identity",
):
    """
    Builds a feedforward neural network

    arguments:
        input_placeholder: placeholder variable for the state (batch_size, input_size)
        scope: variable scope of the network

        n_layers: number of hidden layers
        size: dimension of each hidden layer
        activation: activation of each hidden layer

        input_size: size of the input layer
        output_size: size of the output layer
        output_activation: activation of the output layer

    returns:
        output_placeholder: the result of a forward pass through the hidden layers + the output layer
    """
    if isinstance(activation, str):
        activation = _str_to_activation[activation]
    if isinstance(output_activation, str):
        output_activation = _str_to_activation[output_activation]
    layers = []
    in_size = input_size
    for _ in range(n_layers):
        layers.append(nn.Linear(in_size, size))
        layers.append(activation)
        in_size = size
    layers.append(nn.Linear(in_size, output_size))
    layers.append(output_activation)

    mlp = nn.Sequential(*layers)
    mlp.to(device)
    return mlp

In [103]:
def make_critic(observation_shape: Tuple[int, ...], num_actions: int) -> nn.Module:
        return build_mlp(
            input_size=np.prod(observation_shape),
            output_size=num_actions,
            n_layers=2,
            size=64,
        )

def make_optimizer(params: torch.nn.ParameterList) -> torch.optim.Optimizer:
        return torch.optim.Adam(params, lr=1e-4)

def make_lr_schedule(
    optimizer: torch.optim.Optimizer,
) -> torch.optim.lr_scheduler._LRScheduler:
    return torch.optim.lr_scheduler.ConstantLR(optimizer, factor=1.0)

def make_env(render: bool = False):
    #LunarLander-v2
    return RecordEpisodeStatistics(gym.make("CartPole-v1", render_mode="rgb_array" if render else None))

agent_kwargs = {
            "make_critic": make_critic,
            "make_optimizer": make_optimizer,
            "make_lr_schedule": make_lr_schedule,
            "discount": 0.99,
            "target_update_period": 1000,
            "clip_grad_norm": None,
            "use_double_q": False,
        }

total_steps = 100000

config = {"exploration_schedule": LinearSchedule(total_steps, 0.1),
          "total_steps": total_steps,
          "num_render_trajectories": 3,
          "num_eval_trajectories": 10,
          "log_interval": 2000,
          "eval_interval": 10000,
          "learning_starts": 20000, #20000
          "batch_size": 128,
          "make_env": make_env,
          "agent_kwargs": agent_kwargs}

In [None]:
run_training_loop(config)

 40%|███▉      | 39991/100000 [02:03<02:59, 334.56it/s, eval_return=121, eval_ep_len=121, eval/return_std=11.5, eval/return_max=151, eval/return_min=105, eval/ep_len_std=11.5, eval/ep_len_max=151, eval/ep_len_min=105, critic_loss=0.212, q_values=17.5, target_values=17.5, grad_norm=1.52, epsilon=0.64, lr=0.0001]

See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(
