# Homework Testing Framework

This notebook serves as an automated testing framework for homework assignment. Solution fields will be extracted from the designated notebook, `HW_NB_PATH`. Students can refer to the provided example inputs and outputs for better comprehension of the tasks. Commented print statements are included in the tests to aid debugging.

## Homework Evaluation Criteria
- **Final Evaluation**: Homeworks will be graded based on the same tests using different sets of inputs and outputs.
- **Additional Checks**: Training plots displayed in the final cell of homework notebook and the overall training performance will also be reviewed.

## Tester Notebook Requirements And Information
- Use only provided context objects and variables for each solution (common: numpy, torch) - example: (numpy, torch, self, x, action)
- See provided completed solution for reference which has its own test - `PPOT compute_advantages`.
- Include your full name in the designated solution field **Student\'s name** - **required**.
- Use the designated solution field **Message** for any additional information to assist the reviewing instructor - optional.
- You can modify this notebook because it is only for student aid - **do not send this testing notebook as a part of solved homework**.
- In **homework notebook** do not modify any content outside the `### BEGIN SOLUTION ... ### END SOLUTION` blocks. If changes seem necessary, consult on Slack.


In [1]:
import json
import re
import math
import numpy

import random
import argparse
import time
from functools import partial
from collections import deque
from dataclasses import dataclass

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from gym.wrappers.normalize import RunningMeanStd
from torch.distributions.categorical import Categorical
from torch.utils.tensorboard import SummaryWriter

# RL library for vectorized environments
import pufferlib
import pufferlib.vector
import pufferlib.emulation

In [2]:
HW_NB_PATH = "hw4-ppo-rnd-minihack-student.ipynb" # upload to colab env with homework notebook, or gather and run locally

with open(HW_NB_PATH, "r") as f:
    nb = json.load(f)
cells = nb["cells"]
cells = [cell for cell in cells if cell["cell_type"] == "code"]
code = []
for cell in cells:
    code.append("".join(cell["source"]))
code_all = "".join(code)


In [3]:

pattern = r"( *### BEGIN SOLUTION - [\s\S]*?)### END SOLUTION"
matches = re.findall(pattern, code_all)

solution = {
    # "example":{
    #     "ix": None,
    #     "content": None,
    #     "code_indent": None
    # }
}

def count_leading_spaces(s):
    match = re.match(r'^ *', s)
    return len(match.group(0)) if match else 0

def remove_indentation(solution_content, ls):
    pattern = r'\n {0,' + str(ls) + '}'
    return re.sub(pattern, '\n', solution_content)


if matches:
    for i, solution_content in enumerate(matches, start=1):
        solution_name = solution_content.partition('\n')[0]
        ls = count_leading_spaces(solution_name)
        solution_name = solution_name.replace(" "*ls + "### BEGIN SOLUTION - ", "")
        solution_content = "\n".join(solution_content.partition('\n')[1:])[1:]

        solution_content = remove_indentation(solution_content, ls)
        solution[solution_name] = {
            "ix":i,
            "content":solution_content,
            "code_indent": ls
        }

In [None]:
print(solution.keys()) # should be 11 total - 2 info - 5 PPO - 4 RND

In [None]:
def student_info(solution):
    assert "\n# --- --------\n" != solution['Student\'s name']['content'], "Please provide name and surname"
    print(f"Name:\n" + solution['Student\'s name']['content'])
    print(f"Message:\n" + solution['Message']['content'])
student_info(solution)

## Argument Parsing

In [6]:
def str2bool(v):
    if isinstance(v, bool):
        return v
    if isinstance(v, str) and v.lower() in ("true",):
        return True
    elif isinstance(v, str) and v.lower() in ("false",):
        return False
    else:
        raise argparse.ArgumentTypeError("Boolean value expected")

@dataclass
class Args:
    exp_name: str = "ppo_rnd"
    """the name of this experiment"""
    seed: int = 1
    """seed of the experiment"""
    torch_deterministic: str2bool = True
    """if toggled, `torch.backends.cudnn.deterministic=False`"""
    cuda: str2bool = True
    """if toggled, cuda will be enabled by default"""
    track: str2bool = False
    """if toggled, this experiment will be tracked with Weights and Biases"""
    wandb_project_name: str = "cleanRL"
    """the wandb's project name"""
    wandb_entity: str = None
    """the entity (team) of wandb's project"""
    capture_video: str2bool = False
    """whether to capture videos of the agent performances (check out `videos` folder)"""

    # Algorithm specific arguments
    env_id: str = "MiniHack-KeyRoom-Dark-S5-v0"
    """the id of the environment"""
    max_episode_steps: int = None
    """number of episode steps"""
    total_timesteps: int = 5000000
    """total timesteps of the experiments"""
    learning_rate: float = 2e-4
    """the learning rate of the optimizer"""
    num_envs: int = 128
    """the number of parallel game environments"""
    num_workers: int = 16
    """the number of workers"""
    num_steps: int = 128
    """the number of steps to run in each environment per policy rollout"""
    anneal_lr: str2bool = True
    """Toggle learning rate annealing for policy and value networks"""
    gamma: float = 0.999
    """the discount factor gamma"""
    gae_lambda: float = 0.95
    """the lambda for the general advantage estimation"""
    num_minibatches: int = 8
    """the number of mini-batches"""
    update_epochs: int = 8
    """the K epochs to update the policy"""
    norm_adv: str2bool = True
    """Toggles advantages normalization"""
    clip_coef: float = 0.1
    """the surrogate clipping coefficient"""
    clip_vloss: str2bool = True
    """Toggles whether or not to use a clipped loss for the value function, as per the paper."""
    ent_coef: float = 0.001
    """coefficient of the entropy"""
    vf_coef: float = 0.5
    """coefficient of the value function"""
    max_grad_norm: float = 40
    """the maximum norm for the gradient clipping"""
    target_kl: float = None
    """the target KL divergence threshold"""
    penalty_step: float = 0.0
    """the penalty for each env step"""
    value_bootstrap: str2bool = True
    """Value bootstrapping"""

    # RND arguments
    update_proportion: float = 1.0
    """proportion of exp used for predictor update"""
    int_coef: float = 0.1
    """coefficient of extrinsic reward"""
    ext_coef: float = 1.0
    """coefficient of intrinsic reward"""
    int_gamma: float = 0.99
    """Intrinsic reward discount rate"""
    num_iterations_obs_norm_init: int = 5
    """number of iterations to initialize the observations normalization parameters"""
    forward_coef: float = 0.1
    """weight on modelling loss (ie convergence of predictor)"""

    # to be filled in runtime
    batch_size: int = 0
    """the batch size (computed in runtime)"""
    minibatch_size: int = 0
    """the mini-batch size (computed in runtime)"""
    num_iterations: int = 0
    """the number of iterations (computed in runtime)"""


## RecordEpisodeStatistics, RewardForwardFilter, layer_init

In [7]:
class RecordEpisodeStatistics:
    def __init__(self, env, deque_size=100):
        self.env = env
        self.num_envs = getattr(env, "num_envs", 1)
        self.episode_returns = None
        self.episode_lengths = None

    def __getattr__(self, name):
        return getattr(self.env, name)

    def reset(self, **kwargs):
        observations, infos = self.env.reset(**kwargs)
        self.episode_returns = np.zeros(self.num_envs, dtype=np.float32)
        self.episode_lengths = np.zeros(self.num_envs, dtype=np.int32)
        self.lives = np.zeros(self.num_envs, dtype=np.int32)
        self.returned_episode_returns = np.zeros(self.num_envs, dtype=np.float32)
        self.returned_episode_lengths = np.zeros(self.num_envs, dtype=np.int32)
        return observations, {}

    def step(self, action):
        observations, rewards, terminatedes, truncatedes, infos = self.env.step(action)
        self.episode_returns += rewards
        self.episode_lengths += 1
        self.returned_episode_returns[:] = self.episode_returns
        self.returned_episode_lengths[:] = self.episode_lengths
        self.episode_returns *= 1 - terminatedes
        self.episode_lengths *= 1 - terminatedes

        infos = {}
        infos["r"] = self.returned_episode_returns
        infos["l"] = self.returned_episode_lengths

        return (
            observations,
            rewards,
            terminatedes,
            truncatedes,
            infos,
        )


class RewardForwardFilter:
    def __init__(self, gamma):
        self.rewems = None
        self.gamma = gamma

    def update(self, rews):
        if self.rewems is None:
            self.rewems = rews
        else:
            self.rewems = self.rewems * self.gamma + rews
        return self.rewems


# ALGO LOGIC: initialize agent here:
def layer_init(layer, std=np.sqrt(2), bias_const=0.0):
    torch.nn.init.orthogonal_(layer.weight, std)
    torch.nn.init.constant_(layer.bias, bias_const)
    return layer

## Create Envs

In [8]:
import gymnasium
from nle.env.base import NLE

class NLETimeLimit(gymnasium.Wrapper):
    def __init__(self, env: gymnasium.Env):
        super().__init__(env)

    def step(self, action):
        obs, reward, terminated, truncated, info = super().step(action)
        if "end_status" in info:  # Add safety check
            if info["end_status"] == NLE.StepStatus.ABORTED:
                truncated = True
        return obs, reward, terminated, truncated, info


def env_creator(args):
    '''NetHack binding creation function'''
    import minihack
    import gym
    import shimmy
    from pufferlib.environments.minihack.environment import MinihackWrapper, EXTRA_OBS_KEYS

    obs_key = minihack.base.MH_DEFAULT_OBS_KEYS + EXTRA_OBS_KEYS
    kwargs = dict(
        observation_keys=obs_key,
        penalty_step=args.penalty_step,
    )
    env = gym.make(args.env_id, **kwargs)
    env = shimmy.GymV21CompatibilityV0(env=env)
    env = NLETimeLimit(env)
    env = MinihackWrapper(env)
    return pufferlib.emulation.GymnasiumPufferEnv(env=env)

def create_env(args):
    env = pufferlib.vector.make(partial(env_creator, args), batch_size=args.num_envs, num_envs=args.num_envs, num_workers=args.num_workers, backend=pufferlib.vector.Multiprocessing)
    env = RecordEpisodeStatistics(env)
    return env

In [10]:
seed = 27
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True

global_common_context = {"torch":torch, "np":np, "numpy":numpy}

# PPO Agent - Solution context

In [11]:
class Agent(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.dtype = pufferlib.pytorch.nativize_dtype(env.emulated)

        self.blstats_net = nn.Sequential(
            nn.Embedding(256, 32),
            nn.Flatten(),
        )

        self.char_embed = nn.Embedding(256, 32)
        self.chars_net = nn.Sequential(
            layer_init(nn.Conv2d(32, 32, 5, stride=(2, 3))),
            nn.ReLU(),
            layer_init(nn.Conv2d(32, 64, 5, stride=(1, 3))),
            nn.ReLU(),
            layer_init(nn.Conv2d(64, 64, 3, stride=1)),
            nn.ReLU(),
            nn.Flatten(),
        )

        self.proj = nn.Linear(864+960, 256)
        self.actor = layer_init(nn.Linear(256, env.single_action_space.n), std=0.01)
        self.critic_ext = layer_init(nn.Linear(256, 1), std=0.01)
        self.critic_int = layer_init(nn.Linear(256, 1), std=0.01)

    def encode_observations(self, x):
        x = x.type(torch.uint8) # Undo bad cleanrl cast
        x = pufferlib.pytorch.nativize_tensor(x, self.dtype)

        blstats = torch.clip(x['blstats'] + 1, 0, 255).int()
        blstats = self.blstats_net(blstats)

        chars = self.char_embed(x['chars'].int())
        chars = torch.permute(chars, (0, 3, 1, 2))
        chars = self.chars_net(chars)

        concat = torch.cat([blstats, chars], dim=1)
        return self.proj(concat)

    def get_action_and_value(self, x, action=None, student_solution=None):
        ### BEGIN SOLUTION - PPO get_action_and_value
        input_context = {"self": self, "x": x, "action": action}
        global global_common_context
        input_context.update(global_common_context)
        output_context = {}
        exec(student_solution, input_context, output_context)
        action = output_context.get("action")
        log_prob = output_context.get("log_prob")
        entropy_prop = output_context.get("entropy_prop")
        extrinsic_output = output_context.get("extrinsic_output")
        intrinsic_output = output_context.get("intrinsic_output")
        ### END SOLUTION
        return (
            action,
            log_prob,
            entropy_prop,
            extrinsic_output,
            intrinsic_output
        )

    def get_value(self, x, student_solution:str):
        ### BEGIN SOLUTION - PPO get_value
        input_context = {"self": self, "x": x}
        global global_common_context
        input_context.update(global_common_context)
        output_context = {}
        exec(student_solution, input_context, output_context)
        extrinsic_output = output_context.get("extrinsic_output")
        intrinsic_output = output_context.get("intrinsic_output")
        ### END SOLUTION
        return extrinsic_output, intrinsic_output

In [12]:
args = Args(
    num_steps=32,
    num_workers=8,
    num_envs=128,
    env_id="MiniHack-KeyRoom-S5-v0",
    total_timesteps=100000,
)
device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu")
env = create_env(args)

agent = Agent(env).to(device)


In [None]:
exp_PPO_get_action_and_value = {
    "x":torch.ones([1, 13016]).to(device),
    "action":None,
    'entropy_prop':2.3025617599487305,
    'extrinsic_output':-0.0010622446425259113,
    'intrinsic_output':0.0011777263134717941
}

def test_PPO_get_action_and_value(agent, solution, exp_in_out: dict, rel_tol=1e-1):
    action, log_prob, entropy_prop, extrinsic_output, intrinsic_output = agent.get_action_and_value(exp_in_out['x'], action=exp_in_out['action'], student_solution = solution['PPO get_action_and_value']['content'])
    # print(f"action: {action.item()}") #dev
    # print(f"log_prob: {log_prob.item()}")
    # print(f"entropy_prop: {entropy_prop.item()}")
    # print(f"extrinsic_output: {extrinsic_output.item()}")
    # print(f"intrinsic_output: {intrinsic_output.item()}")

    assert math.isclose(entropy_prop.item(), exp_in_out['entropy_prop'], rel_tol=rel_tol)
    assert math.isclose(extrinsic_output.item(), exp_in_out['extrinsic_output'], rel_tol=rel_tol)
    assert math.isclose(intrinsic_output.item(), exp_in_out['intrinsic_output'], rel_tol=rel_tol)

print(solution['PPO get_action_and_value']['content']) #dev
test_PPO_get_action_and_value(agent, solution, exp_PPO_get_action_and_value)

In [None]:
exp_PPO_get_value = {
    "x":torch.ones([128, 13016]).to(device),
    'extrinsic_output[0][0]':-0.001062246854417026,
    'intrinsic_output[0][0]':0.0011777239851653576
}

def test_PPO_get_value(agent, solution, exp_in_out: dict, rel_tol=1e-2):
    extrinsic_output, intrinsic_output = agent.get_value(exp_in_out['x'], solution['PPO get_value']['content'])
    # print(extrinsic_output.shape) #dev
    # print(extrinsic_output[0][0].item())
    # print(intrinsic_output.shape)
    # print(intrinsic_output[0][0].item())
    assert math.isclose(extrinsic_output[0][0].item(), exp_in_out['extrinsic_output[0][0]'], rel_tol=rel_tol)
    assert math.isclose(intrinsic_output[0][0].item(), exp_in_out['intrinsic_output[0][0]'], rel_tol=rel_tol)

print(solution['PPO get_value']['content']) #dev
test_PPO_get_value(agent, solution, exp_PPO_get_value)

# PPO Trainer - Solution context

In [15]:
class PPOTrainer:
    def __init__(self, args, env, agent):
        self.args = args
        self.env = env
        self.agent = agent
        self.device = next(agent.parameters()).device

        self.combined_parameters = list(self.agent.parameters())
        self.optimizer = optim.Adam(
            self.combined_parameters,
            lr=self.args.learning_rate,
            eps=1e-5,
        )

        self.obs = torch.zeros((self.args.num_steps, self.args.num_envs) + self.env.single_observation_space.shape).to(self.device)
        self.actions = torch.zeros((self.args.num_steps, self.args.num_envs) + self.env.single_action_space.shape).to(self.device)
        self.logprobs = torch.zeros((self.args.num_steps, self.args.num_envs)).to(self.device)
        self.rewards = torch.zeros((self.args.num_steps, self.args.num_envs)).to(self.device)
        self.dones = torch.zeros((self.args.num_steps, self.args.num_envs)).to(self.device)
        self.time_outs = torch.zeros((self.args.num_steps, self.args.num_envs)).to(self.device)
        self.ext_values = torch.zeros((self.args.num_steps, self.args.num_envs)).to(self.device)

        self.er_counter = 0
        self.av_er_interval = 20

    def compute_advantages(self, next_obs, next_done, student_solution:str, student_solution_ppo_get_value:str):
        with torch.no_grad():
            next_value_ext, _ = self.agent.get_value(next_obs, student_solution_ppo_get_value)
            # tip - next_obs not needed in solution context :)
            ### BEGIN SOLUTION - PPOT compute_advantages
            input_context = {"self": self, "next_obs": next_obs, "next_done": next_done, "next_value_ext":next_value_ext}
            global global_common_context
            input_context.update(global_common_context)
            output_context = {}

            exec(student_solution, input_context, output_context)
            ext_advantages = output_context.get("ext_advantages")
            ### END SOLUTION
        return ext_advantages

    def compute_policy_loss(self, mb_advantages, ratio, student_solution:str):
        ### BEGIN SOLUTION - PPOT compute_policy_loss
        input_context = {"self": self, "mb_advantages": mb_advantages, "ratio": ratio}
        global global_common_context
        input_context.update(global_common_context)
        output_context = {}

        exec(student_solution, input_context, output_context)
        pg_loss = output_context.get("pg_loss")
        ### END SOLUTION
        return pg_loss

    def compute_value_loss(self, new_values, returns, student_solution:str):
        ### BEGIN SOLUTION - PPOT compute_value_loss
        input_context = {"self": self, "new_values": new_values, "returns": returns}
        global global_common_context
        input_context.update(global_common_context)
        output_context = {}

        exec(student_solution, input_context, output_context)
        v_loss = output_context.get("v_loss")
        ### END SOLUTION
        return v_loss

    def train(self):
        # ALGO Logic: Storage setup
        avg_returns = deque(maxlen=20)

        # start the game
        global_step = 0
        start_time = time.time()
        next_obs = torch.Tensor(self.env.reset()[0]).to(self.device)
        next_done = torch.zeros(self.args.num_envs).to(self.device)
        next_time_out = torch.zeros(self.args.num_envs).to(self.device)
        num_updates = self.args.total_timesteps // self.args.batch_size

        for update in range(1, num_updates + 1):
            # Annealing the rate if instructed to do so.
            if self.args.anneal_lr:
                frac = 1.0 - (update - 1.0) / num_updates
                lrnow = frac * self.args.learning_rate
                self.optimizer.param_groups[0]["lr"] = lrnow

            for step in range(0, self.args.num_steps):
                global_step += 1 * self.args.num_envs
                self.obs[step] = next_obs
                self.dones[step] = next_done
                self.time_outs[step] = next_time_out

                # ALGO LOGIC: action logic
                with torch.no_grad():
                    value_ext, _ = self.agent.get_value(self.obs[step])
                    self.ext_values[step] = value_ext.flatten()
                    action, logprob, _, _, _ = self.agent.get_action_and_value(self.obs[step])

                self.actions[step] = action
                self.logprobs[step] = logprob

                # execute the game and log data.
                next_obs, reward, terminated, truncated, info = self.env.step(action.cpu().numpy())
                self.rewards[step] = torch.tensor(reward).to(self.device).view(-1)
                next_obs = torch.Tensor(next_obs).to(self.device)
                next_done = torch.Tensor(terminated).to(self.device)
                next_time_out = torch.Tensor(truncated).to(self.device)

                for idx, d in enumerate(terminated | truncated):
                    if d:
                        avg_returns.append(info["r"][idx])
                        epi_ret = np.average(avg_returns)
                        epi_suc = np.average([1 if r > 0 else 0 for r in avg_returns])

            if self.args.value_bootstrap:
                self.rewards.add_(self.args.gamma * self.ext_values * self.time_outs)

            # bootstrap value if not done
            with torch.no_grad():
                ext_advantages = self.compute_advantages(next_obs, next_done)
                ext_returns = ext_advantages + self.ext_values

            # flatten the batch
            b_obs = self.obs.reshape((-1,) + self.env.single_observation_space.shape)
            b_logprobs = self.logprobs.reshape(-1)
            b_actions = self.actions.reshape(-1)
            b_ext_advantages = ext_advantages.reshape(-1)
            b_ext_returns = ext_returns.reshape(-1)

            b_advantages = b_ext_advantages * self.args.ext_coef

            # Optimizing the policy and value network
            b_inds = np.arange(self.args.batch_size)

            clipfracs = []
            for epoch in range(self.args.update_epochs):
                np.random.shuffle(b_inds)
                for start in range(0, self.args.batch_size, self.args.minibatch_size):
                    end = start + self.args.minibatch_size
                    mb_inds = b_inds[start:end]

                    _, newlogprob, entropy, new_ext_values, _ = self.agent.get_action_and_value(
                        b_obs[mb_inds], b_actions.long()[mb_inds]
                    )
                    logratio = newlogprob - b_logprobs[mb_inds]
                    ratio = logratio.exp()

                    with torch.no_grad():
                        # calculate approx_kl http://joschu.net/blog/kl-approx.html
                        old_approx_kl = (-logratio).mean()
                        approx_kl = ((ratio - 1) - logratio).mean()
                        clipfracs += [((ratio - 1.0).abs() > self.args.clip_coef).float().mean().item()]

                    mb_advantages = b_advantages[mb_inds]
                    if self.args.norm_adv:
                        mb_advantages = (mb_advantages - mb_advantages.mean()) / (mb_advantages.std() + 1e-8)

                    # Policy loss
                    pg_loss = self.compute_policy_loss(mb_advantages, ratio)

                    # Value loss
                    v_loss = self.compute_value_loss(new_ext_values, b_ext_returns[mb_inds])

                    # Entropy loss
                    entropy_loss = entropy.mean()
                    loss = pg_loss - self.args.ent_coef * entropy_loss + v_loss * self.args.vf_coef

                    self.optimizer.zero_grad()
                    loss.backward()
                    if self.args.max_grad_norm:
                        nn.utils.clip_grad_norm_(
                            self.combined_parameters,
                            self.args.max_grad_norm,
                        )
                    self.optimizer.step()

                if self.args.target_kl is not None:
                    if approx_kl > self.args.target_kl:
                        break

In [16]:
trainer = PPOTrainer(args, env, agent)

In [None]:
exp_PPOT_compute_advantages = {
    'next_obs':torch.ones([128, 13016]).to(device),
    'next_done':torch.zeros([128]).to(device),
    'ext_advantages[0][0]':-0.0002,
    'ext_advantages.shape':[32, 128],
}


def test_PPOT_compute_advantages(trainer:PPOTrainer, solution, exp_in_out: dict, rel_tol=1e-1):
    ext_advantages = trainer.compute_advantages(exp_in_out['next_obs'], exp_in_out['next_done'], solution['PPOT compute_advantages']['content'], solution['PPO get_value']['content'])
    # print(ext_advantages[:1]) #dev
    # print(ext_advantages[0][0])
    # print(ext_advantages.shape)

    assert list(ext_advantages.shape) == exp_in_out['ext_advantages.shape']
    assert math.isclose(ext_advantages[0][0].item(), exp_in_out['ext_advantages[0][0]'], rel_tol=rel_tol)

print(solution['PPOT compute_advantages']['content']) #dev
test_PPOT_compute_advantages(trainer, solution, exp_PPOT_compute_advantages)

In [None]:
ratio_input = torch.ones([512]).to(device)
ratio_input[0:10] = 10
exp_PPOT_compute_policy_loss = {
    'mb_advantages':torch.ones([512]).to(device),
    'ratio':ratio_input,
    'pg_loss':-1.001953125,
}

def test_PPOT_compute_policy_loss(trainer:PPOTrainer, solution, exp_in_out: dict, rel_tol=1e-6):
    pg_loss = trainer.compute_policy_loss(exp_in_out['mb_advantages'], exp_in_out['ratio'], solution['PPOT compute_policy_loss']['content'])
    # print(pg_loss.item()) #dev
    assert math.isclose(pg_loss.item(), exp_in_out['pg_loss'], rel_tol=rel_tol)

print(solution['PPOT compute_policy_loss']['content']) #dev
test_PPOT_compute_policy_loss(trainer, solution, exp_PPOT_compute_policy_loss)

In [None]:
returns_input = torch.ones([512]).to(device)
returns_input[0:10] = 10
exp_PPOT_compute_value_loss = {
    'new_values':torch.ones([512, 1]).to(device),
    'returns':returns_input,
    'v_loss': 0.791015625,
}

def test_PPOT_compute_value_loss(trainer:PPOTrainer, solution, exp_in_out: dict, rel_tol=1e-4):
    v_loss = trainer.compute_value_loss(exp_in_out['new_values'], exp_in_out['returns'], solution['PPOT compute_value_loss']['content'])
    # print(v_loss.item()) #dev

    assert math.isclose(v_loss.item(), exp_in_out['v_loss'], rel_tol=rel_tol)

print(solution['PPOT compute_value_loss']['content']) #dev
test_PPOT_compute_value_loss(trainer, solution, exp_PPOT_compute_value_loss)

# RND Model - Solution context

In [20]:
class RNDModel(nn.Module):
    def __init__(self, env):
        super().__init__()

        self.dtype = pufferlib.pytorch.nativize_dtype(env.emulated)

        # Predictor Network
        self.predictor_blstats_net = nn.Sequential(
            nn.Embedding(256, 32),
            nn.Flatten(),
        )

        self.predictor_char_embed = nn.Embedding(256, 32)
        self.predictor_chars_net = nn.Sequential(
            layer_init(nn.Conv2d(32, 32, 5, stride=(2, 3))),
            nn.ReLU(),
            layer_init(nn.Conv2d(32, 64, 5, stride=(1, 3))),
            nn.ReLU(),
            layer_init(nn.Conv2d(64, 64, 3, stride=1)),
            nn.ReLU(),
            nn.Flatten(),
        )

        self.predictor_proj = nn.Linear(864+960, 256)
        self.predictor = nn.Sequential(
            layer_init(nn.Linear(256, 512)),
            nn.ReLU(),
            layer_init(nn.Linear(512, 512)),
            nn.ReLU(),
            layer_init(nn.Linear(512, 512)),
        )

        # Target Network
        self.target_blstats_net = nn.Sequential(
            nn.Embedding(256, 32),
            nn.Flatten(),
        )

        self.target_char_embed = nn.Embedding(256, 32)
        self.target_chars_net = nn.Sequential(
            layer_init(nn.Conv2d(32, 32, 5, stride=(2, 3))),
            nn.ReLU(),
            layer_init(nn.Conv2d(32, 64, 5, stride=(1, 3))),
            nn.ReLU(),
            layer_init(nn.Conv2d(64, 64, 3, stride=1)),
            nn.ReLU(),
            nn.Flatten(),
        )

        self.target_proj = nn.Linear(864+960, 256)
        self.target = nn.Sequential(
            layer_init(nn.Linear(256, 512)),
        )

        # target network is not trainable
        for name, param in self.named_parameters():
            if name.startswith('target'):
                param.requires_grad = False

    def encode_predictor(self, x):
        x = x.type(torch.uint8)  # Undo bad cleanrl cast
        x = pufferlib.pytorch.nativize_tensor(x, self.dtype)

        blstats = torch.clip(x['blstats'] + 1, 0, 255).int()
        blstats = self.predictor_blstats_net(blstats)

        chars = self.predictor_char_embed(x['chars'].int())
        chars = torch.permute(chars, (0, 3, 1, 2))
        chars = self.predictor_chars_net(chars)

        concat = torch.cat([blstats, chars], dim=1)
        return self.predictor_proj(concat)

    def encode_target(self, x):
        x = x.type(torch.uint8)  # Undo bad cleanrl cast
        x = pufferlib.pytorch.nativize_tensor(x, self.dtype)

        blstats = torch.clip(x['blstats'] + 1, 0, 255).int()
        blstats = self.target_blstats_net(blstats)

        chars = self.target_char_embed(x['chars'].int())
        chars = torch.permute(chars, (0, 3, 1, 2))
        chars = self.target_chars_net(chars)

        concat = torch.cat([blstats, chars], dim=1)
        return self.target_proj(concat)

    def forward(self, next_obs, student_solution:str):
        ### BEGIN SOLUTION - RND forward
        input_context = {"self": self, "next_obs": next_obs}
        global global_common_context
        input_context.update(global_common_context)
        output_context = {}

        exec(student_solution, input_context, output_context)
        predict_feature = output_context.get("predict_feature")
        target_feature = output_context.get("target_feature")
        ### END SOLUTION
        return predict_feature, target_feature


In [21]:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

args = Args(
    num_steps=32,
    num_workers=8,
    num_envs=128,
    env_id="MiniHack-KeyRoom-S5-v0",
    total_timesteps=100000,
    forward_coef=0.1,
    int_coef=0.1,
    num_iterations_obs_norm_init=50,
)

args.batch_size = int(args.num_envs * args.num_steps)
args.minibatch_size = int(args.batch_size // args.num_minibatches)
args.num_iterations = args.total_timesteps // args.batch_size

device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu")
env = create_env(args)

rnd_model = RNDModel(env).to(device)


In [None]:
exp_RND_forward = {
    'next_obs': torch.zeros([128, 13016]).to(device),
    'predict_feature[0][1]': 0.02161688357591629,
    'predict_feature.shape': [128, 512],
    'target_feature[0][1]': 0.4340367019176483,
    'target_feature.shape': [128, 512],
}

def test_PPOT_compute_value_loss(rnd_model:RNDModel, solution, exp_in_out: dict, rel_tol=1e-2):
    predict_feature, target_feature = rnd_model.forward(exp_in_out['next_obs'], solution['RND forward']['content'])
    # print(predict_feature.shape)  #dev
    # print(predict_feature[0][1].item())
    # print(predict_feature)
    # print(target_feature.shape)
    # print(target_feature[0][1].item())
    # print(target_feature)

    assert exp_in_out['predict_feature.shape'] == list(predict_feature.shape)
    assert exp_in_out['target_feature.shape'] == list(target_feature.shape)
    assert math.isclose(predict_feature[0][1].item(), exp_in_out['predict_feature[0][1]'], rel_tol=rel_tol)
    assert math.isclose(target_feature[0][1].item(), exp_in_out['target_feature[0][1]'], rel_tol=rel_tol)

print(solution['RND forward']['content']) #dev
test_PPOT_compute_value_loss(rnd_model, solution, exp_RND_forward)

# RND Trainer - solution context

In [23]:
class RNDTrainer(PPOTrainer):
    def __init__(self, args, env, agent, rnd_model):
        super().__init__(args, env, agent)
        self.rnd_model = rnd_model
        self.curiosity_rewards = torch.zeros((self.args.num_steps, self.args.num_envs)).to(self.device)
        self.int_values = torch.zeros((self.args.num_steps, self.args.num_envs)).to(self.device)

        self.combined_parameters = list(self.agent.parameters()) + list(self.rnd_model.predictor.parameters())
        self.optimizer = optim.Adam(
            self.combined_parameters,
            lr=self.args.learning_rate,
            eps=1e-5,
        )

    def calculate_curiosity_rewards(self, step, next_obs, student_solution:str):
        ### BEGIN SOLUTION - RNDT calculate_curiosity_rewards
        input_context = {"self": self, "step": step, "next_obs": next_obs}
        global global_common_context
        input_context.update(global_common_context)
        output_context = {}

        exec(student_solution, input_context, output_context)
        steps_curiosity_rewards = output_context.get("steps_curiosity_rewards")
        ### END SOLUTION
        self.curiosity_rewards[step] = steps_curiosity_rewards.to(self.device)
        return steps_curiosity_rewards # returned for tests


    def compute_int_advantages(self, next_obs, student_solution:str):
        with torch.no_grad():
            ### BEGIN SOLUTION - RNDT compute_int_advantages
            input_context = {"self": self, "next_obs": next_obs}
            global global_common_context
            input_context.update(global_common_context)
            output_context = {}

            exec(student_solution, input_context, output_context)
            int_advantages = output_context.get("int_advantages")
            ### END SOLUTION
            return int_advantages

    def compute_rnd_loss(self, rnd_next_obs, student_solution:str):
        ### BEGIN SOLUTION - RNDT compute_rnd_loss
        input_context = {"self": self, "rnd_next_obs": rnd_next_obs}
        global global_common_context
        input_context.update(global_common_context)
        output_context = {}

        exec(student_solution, input_context, output_context)
        forward_loss = output_context.get("forward_loss")
        ### END SOLUTION
        return forward_loss

    def train(self):
        # ALGO Logic: Storage setup
        avg_returns = deque(maxlen=20)

        # start the game
        global_step = 0
        start_time = time.time()
        next_obs = torch.Tensor(self.env.reset()[0]).to(self.device)
        next_done = torch.zeros(self.args.num_envs).to(self.device)
        next_time_out = torch.zeros(self.args.num_envs).to(self.device)
        num_updates = self.args.total_timesteps // self.args.batch_size

        reward_rms = RunningMeanStd()
        self.obs_rms = RunningMeanStd(shape=(1,) + self.env.single_observation_space.shape)
        discounted_reward = RewardForwardFilter(self.args.int_gamma)

        print("Start to initialize observation normalization parameter.....")
        next_ob = []
        for step in range(self.args.num_steps * self.args.num_iterations_obs_norm_init):
            acs = np.random.randint(0, self.env.single_action_space.n, size=(self.args.num_envs,))
            o, r, te, tr, _ = self.env.step(acs)
            next_ob += o.tolist()

            if len(next_ob) % (self.args.num_steps * self.args.num_envs) == 0:
                next_ob = np.stack(next_ob)
                self.obs_rms.update(next_ob)
                next_ob = []
        print("End to initialize...")

        for update in range(1, num_updates + 1):
            # Annealing the rate if instructed to do so.
            if self.args.anneal_lr:
                frac = 1.0 - (update - 1.0) / num_updates
                lrnow = frac * self.args.learning_rate
                self.optimizer.param_groups[0]["lr"] = lrnow

            for step in range(0, self.args.num_steps):
                global_step += 1 * self.args.num_envs
                self.obs[step] = next_obs
                self.dones[step] = next_done
                self.time_outs[step] = next_time_out

                # ALGO LOGIC: action logic
                with torch.no_grad():
                    value_ext, value_int = self.agent.get_value(self.obs[step])
                    self.ext_values[step], self.int_values[step] = (
                        value_ext.flatten(),
                        value_int.flatten(),
                    )
                    action, logprob, _, _, _ = self.agent.get_action_and_value(self.obs[step])

                self.actions[step] = action
                self.logprobs[step] = logprob

                # execute the game and log data.
                next_obs, reward, terminated, truncated, info = self.env.step(action.cpu().numpy())
                self.rewards[step] = torch.tensor(reward).to(self.device).view(-1)
                next_obs = torch.Tensor(next_obs).to(self.device)
                next_done = torch.Tensor(terminated).to(self.device)
                next_time_out = torch.Tensor(truncated).to(self.device)

                self.calculate_curiosity_rewards(step, next_obs)

                for idx, d in enumerate(terminated | truncated):
                    if d:
                        avg_returns.append(info["r"][idx])
                        epi_ret = np.average(avg_returns)
                        epi_suc = np.average([1 if r > 0 else 0 for r in avg_returns])

            curiosity_reward_per_env = np.array(
                [discounted_reward.update(reward_per_step) for reward_per_step in self.curiosity_rewards.cpu().data.numpy().T]
            )
            mean, std, count = (
                np.mean(curiosity_reward_per_env),
                np.std(curiosity_reward_per_env),
                len(curiosity_reward_per_env),
            )
            reward_rms.update_from_moments(mean, std**2, count)

            self.curiosity_rewards /= np.sqrt(reward_rms.var)

            if self.args.value_bootstrap:
                # Value bootstrapping is a technique that reduces the surprise for the critic in case
                # we're ending the episode by timeout. Intuitively, in this case the cumulative return for the last step
                # should not be zero, but rather what the critic expects. This improves learning in many envs
                # because otherwise the critic cannot predict the abrupt change in rewards in a timed-out episode.
                # What we really want here is v(t+1) which we don't have because we don't have obs(t+1) (since
                # the episode ended). Using v(t) is an approximation that requires that rew(t) can be generally ignored.
                self.rewards.add_(self.args.gamma * self.ext_values * self.time_outs)
                self.curiosity_rewards.add_(self.args.int_gamma * self.int_values * self.time_outs)

            # bootstrap value if not done
            with torch.no_grad():
                ext_advantages = self.compute_advantages(next_obs, next_done)
                int_advantages = self.compute_int_advantages(next_obs)
                ext_returns = ext_advantages + self.ext_values
                int_returns = int_advantages + self.int_values

            # flatten the batch
            b_obs = self.obs.reshape((-1,) + self.env.single_observation_space.shape)
            b_logprobs = self.logprobs.reshape(-1)
            b_actions = self.actions.reshape(-1)
            b_ext_advantages = ext_advantages.reshape(-1)
            b_int_advantages = int_advantages.reshape(-1)
            b_ext_returns = ext_returns.reshape(-1)
            b_int_returns = int_returns.reshape(-1)
            b_ext_values = self.ext_values.reshape(-1)

            b_advantages = b_int_advantages * self.args.int_coef + b_ext_advantages * self.args.ext_coef

            self.obs_rms.update(b_obs.cpu().numpy())

            # Optimizing the policy and value network
            b_inds = np.arange(self.args.batch_size)

            rnd_next_obs = (
                (
                    (b_obs - torch.from_numpy(self.obs_rms.mean).to(self.device))
                    / torch.sqrt(torch.from_numpy(self.obs_rms.var).to(self.device))
                ).clip(-5, 5)
            ).float()

            clipfracs = []
            for epoch in range(self.args.update_epochs):
                np.random.shuffle(b_inds)
                for start in range(0, self.args.batch_size, self.args.minibatch_size):
                    end = start + self.args.minibatch_size
                    mb_inds = b_inds[start:end]

                    # Forward loss
                    forward_loss = self.compute_rnd_loss(rnd_next_obs[mb_inds])

                    _, newlogprob, entropy, new_ext_values, new_int_values = self.agent.get_action_and_value(
                        b_obs[mb_inds], b_actions.long()[mb_inds]
                    )
                    logratio = newlogprob - b_logprobs[mb_inds]
                    ratio = logratio.exp()

                    with torch.no_grad():
                        # calculate approx_kl http://joschu.net/blog/kl-approx.html
                        old_approx_kl = (-logratio).mean()
                        approx_kl = ((ratio - 1) - logratio).mean()
                        clipfracs += [((ratio - 1.0).abs() > self.args.clip_coef).float().mean().item()]

                    mb_advantages = b_advantages[mb_inds]
                    if self.args.norm_adv:
                        mb_advantages = (mb_advantages - mb_advantages.mean()) / (mb_advantages.std() + 1e-8)

                    # Policy loss
                    pg_loss = self.compute_policy_loss(mb_advantages, ratio)

                    # Value loss
                    ext_v_loss = self.compute_value_loss(new_ext_values, b_ext_returns[mb_inds])
                    int_v_loss = self.compute_value_loss(new_int_values, b_int_returns[mb_inds])
                    v_loss = ext_v_loss + int_v_loss

                    # Entropy loss
                    entropy_loss = entropy.mean()
                    loss = pg_loss - self.args.ent_coef * entropy_loss + v_loss * self.args.vf_coef + self.args.forward_coef * forward_loss

                    self.optimizer.zero_grad()
                    loss.backward()
                    if self.args.max_grad_norm:
                        nn.utils.clip_grad_norm_(
                            self.combined_parameters,
                            self.args.max_grad_norm,
                        )
                    self.optimizer.step()

                if self.args.target_kl is not None:
                    if approx_kl > self.args.target_kl:
                        break

In [24]:
trainer = RNDTrainer(args, env, agent, rnd_model)

In [None]:

exp_RNDT_calculate_curiosity_rewards = {
    'seed':27,
    'trainer.rnd_model = lambda x:': [128],
    'step': 0,
    'next_obs': torch.ones([128, 13016]).to(device),
    'steps_curiosity_rewards[0]': 41.92801284790039,
    'steps_curiosity_rewards[1]': 43.05669403076172,
    'steps_curiosity_rewards.shape': [128],
}


def test_RNDT_calculate_curiosity_rewards(trainer:RNDTrainer, solution, exp_in_out: dict, rel_tol=1e-4):
    torch.manual_seed(exp_in_out['seed'])
    trainer.obs_rms = RunningMeanStd(shape=(1,) + trainer.env.single_observation_space.shape)
    trainer.rnd_model = lambda _: (torch.rand([128, 512]).to(device), torch.rand([128, 512]).to(device))
    steps_curiosity_rewards = trainer.calculate_curiosity_rewards(exp_in_out['step'], exp_in_out['next_obs'], solution['RNDT calculate_curiosity_rewards']['content'])
    # print(steps_curiosity_rewards.shape)  #dev
    # print(steps_curiosity_rewards[0].item())
    # print(steps_curiosity_rewards[1].item())

    assert exp_in_out['steps_curiosity_rewards.shape'] == list(steps_curiosity_rewards.shape)
    assert math.isclose(steps_curiosity_rewards[0].item(), exp_in_out['steps_curiosity_rewards[0]'], rel_tol=rel_tol)
    assert math.isclose(steps_curiosity_rewards[1].item(), exp_in_out['steps_curiosity_rewards[1]'], rel_tol=rel_tol)

print(solution['RNDT calculate_curiosity_rewards']['content']) #dev
test_RNDT_calculate_curiosity_rewards(trainer, solution, exp_RNDT_calculate_curiosity_rewards)

In [None]:
exp_RNDT_compute_int_advantages = {
    'next_obs': torch.ones([128, 13016]).to(device), # passed to self.agent.get_value - return replaced with lambda below
    'int_advantages[0][0]': 41.92818832397461,
    'int_advantages[0][1]': 43.05686950683594,
    'int_advantages[2][0]': 0.00019682712445501238,
    'int_advantages[2][1]': 0.00019682712445501238,
    'int_advantages.shape': [32, 128],
    'trainer.agent.get_value = lambda x:': (torch.tensor(-0.0010622446425259113).to(device), torch.tensor(0.0011777263134717941).to(device)),
}

def test_RNDT_compute_int_advantages(trainer:RNDTrainer, solution, exp_in_out: dict, rel_tol=1e-4):
    trainer.agent.get_value = lambda _: exp_in_out['trainer.agent.get_value = lambda x:']
    int_advantages = trainer.compute_int_advantages(exp_in_out['next_obs'], solution['RNDT compute_int_advantages']['content'])
    # print(int_advantages.shape)  #dev
    # print(int_advantages[0][0].item())
    # print(int_advantages[0][1].item())
    # print(int_advantages[2][0].item())
    # print(int_advantages[2][1].item())

    assert exp_in_out['int_advantages.shape'] == list(int_advantages.shape)
    assert math.isclose(int_advantages[0][0].item(), exp_in_out['int_advantages[0][0]'], rel_tol=rel_tol)
    assert math.isclose(int_advantages[0][1].item(), exp_in_out['int_advantages[0][1]'], rel_tol=rel_tol)
    assert math.isclose(int_advantages[2][0].item(), exp_in_out['int_advantages[2][0]'], rel_tol=rel_tol)
    assert math.isclose(int_advantages[2][1].item(), exp_in_out['int_advantages[2][1]'], rel_tol=rel_tol)

print(solution['RNDT compute_int_advantages']['content']) #dev
test_RNDT_compute_int_advantages(trainer, solution, exp_RNDT_compute_int_advantages)

In [None]:
exp_RNDT_compute_rnd_loss = {
    'seed': 27,
    'rnd_next_obs': torch.ones([128, 13016]).to(device), # passed to self.agent.get_value - return replaced with lambda below
    'forward_loss': 0.1669679880142212,
}

def test_RNDT_compute_rnd_loss(trainer:RNDTrainer, solution, exp_in_out: dict, rel_tol=1e-3):
    torch.manual_seed(exp_RNDT_compute_rnd_loss['seed'])
    trainer.rnd_model = lambda _: (torch.rand([128, 512]).to(device), torch.rand([128, 512]).to(device))
    forward_loss = trainer.compute_rnd_loss(exp_in_out['rnd_next_obs'], solution['RNDT compute_rnd_loss']['content'])
    # print(forward_loss.item())  #dev

    assert math.isclose(forward_loss.item(), exp_in_out['forward_loss'], rel_tol=rel_tol)

print(solution['RNDT compute_rnd_loss']['content']) #dev
test_RNDT_compute_rnd_loss(trainer, solution, exp_RNDT_compute_rnd_loss)