In [3]:
!pip uninstall gym -y
!pip install gymnasium minigrid torch numpy


import numpy as np

# Patch numpy to add bool8 for compatibility
if not hasattr(np, 'bool8'):
    np.bool8 = np.bool_

Found existing installation: gym 0.26.2
Uninstalling gym-0.26.2:
  Successfully uninstalled gym-0.26.2
Collecting minigrid
  Downloading minigrid-3.0.0-py3-none-any.whl.metadata (6.7 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)


In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
from torch.distributions import Categorical
from minigrid.wrappers import RGBImgPartialObsWrapper, ImgObsWrapper

# Hyperparameters
GAMMA = 0.99
CLIP_EPS = 0.2
LR = 2.5e-4
EPOCHS = 4
BATCH_SIZE = 64
STEPS_PER_UPDATE = 2048

def make_env():
    env = gym.make("MiniGrid-Empty-Random-5x5-v0", render_mode=None)
    env = RGBImgPartialObsWrapper(env)
    env = ImgObsWrapper(env)
    return env

class ActorCritic(nn.Module):
    def __init__(self, input_shape, n_actions):
        super().__init__()
        c, h, w = input_shape
        self.conv = nn.Sequential(
            nn.Conv2d(c, 16, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, 3, padding=1),
            nn.ReLU(),
            nn.Flatten()
        )
        conv_out_size = 32 * h * w
        self.policy = nn.Sequential(
            nn.Linear(conv_out_size, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )
        self.value = nn.Sequential(
            nn.Linear(conv_out_size, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        x = x.float() / 255.0
        x = self.conv(x)
        return self.policy(x), self.value(x)

def compute_returns_and_advantages(rewards, dones, values, gamma=GAMMA):
    returns, advs = [], []
    G = 0
    A = 0
    next_value = 0
    for i in reversed(range(len(rewards))):
        mask = 1.0 - dones[i]
        delta = rewards[i] + gamma * next_value * mask - values[i]
        A = delta + gamma * 0.95 * A * mask
        G = rewards[i] + gamma * G * mask
        returns.insert(0, G)
        advs.insert(0, A)
        next_value = values[i]
    return torch.tensor(returns), torch.tensor(advs)

def ppo_update(model, optimizer, obs, actions, log_probs_old, returns, advantages):
    for _ in range(EPOCHS):
        idx = np.random.permutation(len(obs))
        for start in range(0, len(obs), BATCH_SIZE):
            end = start + BATCH_SIZE
            batch_idx = idx[start:end]

            obs_batch = torch.tensor(obs[batch_idx], dtype=torch.float32).permute(0, 3, 1, 2)
            actions_batch = torch.tensor(actions[batch_idx])
            old_log_probs = torch.tensor(log_probs_old[batch_idx])
            returns_batch = returns[batch_idx]
            advantages_batch = advantages[batch_idx]

            logits, values = model(obs_batch)
            dist = Categorical(logits=logits)
            log_probs = dist.log_prob(actions_batch)

            ratio = torch.exp(log_probs - old_log_probs)
            surr1 = ratio * advantages_batch
            surr2 = torch.clamp(ratio, 1.0 - CLIP_EPS, 1.0 + CLIP_EPS) * advantages_batch
            actor_loss = -torch.min(surr1, surr2).mean()
            critic_loss = (returns_batch - values.squeeze()).pow(2).mean()
            loss = actor_loss + 0.5 * critic_loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

def train():
    env = make_env()
    obs_shape = env.observation_space.shape
    n_actions = env.action_space.n

    model = ActorCritic((obs_shape[2], obs_shape[0], obs_shape[1]), n_actions)
    optimizer = torch.optim.Adam(model.parameters(), lr=LR)

    obs, _ = env.reset()
    obs = obs.astype(np.uint8)
    episode_rewards = []
    ep_reward = 0
    episode_count = 0
    step_count = 0

    for update in range(1000):
        obs_list, actions, log_probs, rewards, dones, values = [], [], [], [], [], []

        for _ in range(STEPS_PER_UPDATE):
            obs_tensor = torch.tensor(obs).permute(2, 0, 1).unsqueeze(0).float() / 255.0
            logits, value = model(obs_tensor)
            dist = Categorical(logits=logits)
            action = dist.sample()

            next_obs, reward, terminated, truncated, _ = env.step(action.item())
            done = terminated or truncated

            obs_list.append(obs)
            actions.append(action.item())
            log_probs.append(dist.log_prob(action).item())
            values.append(value.item())
            rewards.append(reward)
            dones.append(done)
            obs = next_obs.astype(np.uint8)
            ep_reward += reward
            step_count += 1

            if done:
                episode_count += 1
                print(f"episode = {episode_count} | steps = {step_count}")
                obs, _ = env.reset()
                obs = obs.astype(np.uint8)
                episode_rewards.append(ep_reward)
                ep_reward = 0
                step_count = 0

        returns, advantages = compute_returns_and_advantages(rewards, dones, values)
        obs_array = np.array(obs_list)
        actions = np.array(actions)
        log_probs = np.array(log_probs)

        ppo_update(model, optimizer, obs_array, actions, log_probs, returns, advantages)

        if update % 10 == 0:
            avg_reward = np.mean(episode_rewards[-10:]) if episode_rewards else 0
            print(f"Update {update}, Average Reward: {avg_reward:.2f}")

if __name__ == "__main__":
    train()


episode = 1 | steps = 100
episode = 2 | steps = 100
episode = 3 | steps = 100
episode = 4 | steps = 11
episode = 5 | steps = 6
episode = 6 | steps = 100
episode = 7 | steps = 80
episode = 8 | steps = 2
episode = 9 | steps = 100
episode = 10 | steps = 63
episode = 11 | steps = 100
episode = 12 | steps = 27
episode = 13 | steps = 12
episode = 14 | steps = 7
episode = 15 | steps = 100
episode = 16 | steps = 100
episode = 17 | steps = 100
episode = 18 | steps = 18
episode = 19 | steps = 100
episode = 20 | steps = 100
episode = 21 | steps = 100
episode = 22 | steps = 100
episode = 23 | steps = 1
episode = 24 | steps = 56
episode = 25 | steps = 100
episode = 26 | steps = 100
episode = 27 | steps = 14
episode = 28 | steps = 21
episode = 29 | steps = 100
episode = 30 | steps = 100
Update 0, Average Reward: 0.32
episode = 31 | steps = 100
episode = 32 | steps = 76
episode = 33 | steps = 1
episode = 34 | steps = 78
episode = 35 | steps = 18
episode = 36 | steps = 100
episode = 37 | steps = 100
e