In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install gymnasium
!pip install pygame



In [None]:
!pip install swig
!pip install gymnasium[box2d]

Collecting swig
  Downloading swig-4.3.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.9/1.9 MB[0m [31m56.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m33.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.3.1
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: box2d-py


In [None]:
import gymnasium as gym

gym.pprint_registry()

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim


class Normalizer:
    def __init__(self, shape, epsilon=1e-8):
        self.shape = shape
        self.mean = np.zeros(shape)
        self.var = np.ones(shape)
        self.count = epsilon

    def update(self, x):
        batch_mean = np.mean(x, axis=0)
        batch_var = np.var(x, axis=0)

        batch_size = x.shape[0]
        self.count += batch_size
        self.mean += (batch_mean - self.mean) * batch_size / self.count
        self.var += (batch_var - self.var) * batch_size / self.count

    def normalize(self, x):
        return (x - self.mean) / np.sqrt(self.var + 1e-8)


def compute_gae(rewards, values, next_values, dones, gamma, gae_lambda):
    advantages = []
    advantage = 0
    for i in reversed(range(len(rewards))):
        delta = rewards[i] + gamma * next_values[i] * dones[i] - values[i]
        advantage = delta + gamma * gae_lambda * dones[i] * advantage
        advantages.insert(0, advantage)
    return advantages



In [None]:
alpha = 0.1
gamma = 0.99
episodes = 2500
num_steps = 8
initial_lr = 1e-3
gae_lambda = 0.9
# n_timesteps = int(5e6)

In [None]:
class GaussianActor(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dims=(128, 128)):
        super().__init__()
        self.share = nn.Sequential(
            nn.Linear(input_dim, hidden_dims[0]),
            nn.ReLU(),
            nn.Linear(hidden_dims[0], hidden_dims[1]),
            nn.ReLU()
        )

        self.mean_head = nn.Linear(hidden_dims[1], output_dim)

        self.log_std = nn.Parameter(torch.zeros(output_dim))

    def forward(self, x):
        x = self.share(x)
        mean = torch.tanh(self.mean_head(x))
        std = torch.exp(self.log_std.expand_as(mean))
        return mean, std


class Critic(nn.Module):
    def __init__(self, input_dim, hidden_dims=(128, 128)):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dims[0]),
            nn.LayerNorm(hidden_dims[0]),
            nn.ReLU(),
            nn.Linear(hidden_dims[0], hidden_dims[1]),
            nn.LayerNorm(hidden_dims[1]),
            nn.ReLU(),
            nn.Linear(hidden_dims[1], 1)
        )

    def forward(self, x):
        return self.net(x).squeeze(-1)



In [None]:
def a2c_debug_log(ep, steps, ep_reward, values, returns, log_probs, entropies, policy_loss, value_loss, grad_norm):
    advantages = returns - values

    print(f"\n[Episode {ep}] Debug Summary")
    print(f"# of Steps:         {steps:.2f}")
    print(f"Total Reward:       {ep_reward:.2f}")
    print(f"Mean V(s):          {values.mean().item():.4f}")
    print(f"Advantage Mean:     {advantages.mean().item():.4f}")
    print(f"Advantage Std:      {advantages.std().item():.4f}")
    print(f"Entropy (avg):      {entropies.mean().item():.4f}")
    print(f"Policy Loss:        {policy_loss.item():.4f}")
    print(f"Value Loss:         {value_loss.item():.4f}")
    print(f"Gradient Norm:      {grad_norm:.4f}")
    print(f"Log Prob Mean:      {log_probs.mean().item():.4f}")
    print("-" * 50)


In [None]:
from tqdm import tqdm
import torch.profiler

n_envs = 1
envs = [gym.make('LunarLanderContinuous-v3') for _ in range(n_envs)]

n_states = envs[0].observation_space.shape[0]
n_actions = envs[0].action_space.shape[0]

actor_network = GaussianActor(n_states, n_actions)
critic_network = Critic(n_states)

state_normalizer = Normalizer(shape=(envs[0].observation_space.shape[0],))
optimizer = optim.Adam(list(actor_network.parameters()) + list(critic_network.parameters()), lr=initial_lr)
mse_loss = nn.MSELoss()

max_step = 1600
states, actions, log_probs, rewards, values, dones, entropies = [], [], [], [], [], [], []


for ep in tqdm(range(episodes)):
    states, rewards, dones, values, actions = [], [], [], [], []
    ep_reward = 0
    latest_debug_info = {}

    for env in envs:
        state, _ = env.reset()
        done = False
        steps = 0

        while not done:
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            mean, std = actor_network(state_tensor)
            dist = torch.distributions.Normal(mean, std)
            action = dist.sample()
            log_prob = dist.log_prob(action).sum(dim=-1)
            entropy = dist.entropy().sum(dim=-1)
            value = critic_network(state_tensor)
            steps += 1

            next_state, reward, terminated, truncated, _ = env.step(action.detach().numpy().squeeze(0))
            done = terminated or truncated or steps > max_step

            states.append(state_tensor)
            actions.append(action)
            log_probs.append(log_prob)
            values.append(value)
            rewards.append(reward)
            dones.append(done)
            entropies.append(entropy)
            ep_reward += reward

            if len(rewards) >= num_steps or done:

                next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
                if dones[-1]:
                    bootstrap_value = 0
                else:
                    bootstrap_value = critic_network(next_state_tensor).item()

                returns = []
                R = bootstrap_value
                for i in reversed(range(len(rewards))):
                    R = rewards[-1 - i] + gamma * R
                    returns.insert(0, R)
                returns = torch.tensor(returns, dtype=torch.float32)

                values_tensor = torch.stack(values[-num_steps:]).squeeze(-1)
                log_probs_tensor = torch.stack(log_probs[-num_steps:]).squeeze(-1)
                entropies_tensor = torch.stack(entropies[-num_steps:]).squeeze(-1)
                returns_tensor = returns.clone().detach()
                advantages = returns - values_tensor
                clipped_advantages = torch.clamp(advantages, min=-10.0, max=10.0)

                policy_loss = -(log_probs_tensor * clipped_advantages.detach()).mean()
                value_loss = mse_loss(values_tensor, returns_tensor)
                entropy_loss = -entropies_tensor.mean()

                # total_loss = policy_loss + 0.5 * value_loss + 0.001 * entropy_loss
                total_loss = policy_loss + 0.4 * value_loss    #following rl-baselines3-zoo

                optimizer.zero_grad()
                total_loss.backward()
                grad_norm = torch.nn.utils.clip_grad_norm_(
                    list(actor_network.parameters()) + list(critic_network.parameters()),
                    max_norm=0.5
                )
                optimizer.step()


                latest_debug_info = {
                    "values": values_tensor,
                    "returns": returns,
                    "log_probs": log_probs_tensor,
                    "entropies": entropies_tensor,
                    "policy_loss": policy_loss,
                    "value_loss": value_loss,
                    "grad_norm": grad_norm
                }


                states, actions, log_probs, rewards, values, dones, entropies = [], [], [], [], [], [], []

            state = next_state


    if ep % 10 == 0 and latest_debug_info:
        a2c_debug_log(ep, steps, ep_reward, **latest_debug_info)

In [None]:
import matplotlib.pyplot as plt

test_episodes = 10
test_rewards = []

for _ in range(test_episodes):
    state, _ = env.reset()
    done = False
    ep_reward = 0

    while not done:
        with torch.no_grad():
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            mean, std = actor_network(state_tensor)
            action = mean

        next_state, reward, terminated, truncated, _ = env.step(action.detach().numpy().squeeze(0))
        done = terminated or truncated
        ep_reward += reward
        state = next_state

    test_rewards.append(ep_reward)


print(f"Average reward over {test_episodes} test episodes: {np.mean(test_rewards):.2f}")

plt.plot(test_rewards)
plt.title("A2C Test Episode Rewards")
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.grid(True)
plt.show()


In [None]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
import torch

# Create environment with rendering enabled
env = gym.make("LunarLanderContinuous-v3", render_mode="rgb_array")

# Set up video recording
env = RecordVideo(env, video_folder="/content/videos", name_prefix="lunar-lander-test", episode_trigger=lambda x: True)

# Reset environment
state, _ = env.reset()
done = False

while not done:
    with torch.no_grad():
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        mean, _ = actor_network(state_tensor)
        action = mean.squeeze(0).numpy()

    # Step environment
    state, _, terminated, truncated, _ = env.step(action)
    done = terminated or truncated

env.close()
print("Recording complete. Check /content/videos.")


In [None]:
import gymnasium as gym
import numpy as np

env = gym.make("BipedalWalker-v3")
test_episodes = 10
random_rewards = []

for ep in range(test_episodes):
    state, _ = env.reset()
    done = False
    ep_reward = 0

    while not done:
        action = env.action_space.sample()  # random continuous action
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        ep_reward += reward

    random_rewards.append(ep_reward)
    print(f"Episode {ep+1} reward: {ep_reward:.2f}")

env.close()

avg_reward = np.mean(random_rewards)
print(f"\n✅ Average reward over {test_episodes} random episodes: {avg_reward:.2f}")
