<a href="https://colab.research.google.com/github/t-snd/cartPole/blob/main/testCartPole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import gymnasium as gym

env = gym.make('CartPole-v0')

import torch

class PolicyNet(torch.nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(4, 64)
        self.fc2 = torch.nn.Linear(64, 2)

    def forward(self, x):
        x = torch.nn.functional.relu(self.fc1(x))
        x = self.fc2(x)
        return x

net = PolicyNet()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
gamma = 0.99

episode_reward_window = []

for i_episode in range(5000):
    observation = env.reset()[0]
    log_probs = []
    episode_reward = 0

    for t in range(200):
        # get prob distribution over actions
        logits = net(torch.from_numpy(observation).float())
        probs = torch.nn.functional.softmax(logits, dim=-1)
        # sample an action
        action = torch.multinomial(probs, 1).item()
        # take the action
        observation, reward, done, _, info = env.step(action)
        # save prob of chosen action and reward
        log_prob = torch.log(probs[action])
        log_probs.append(log_prob)

        episode_reward += reward
        if done:
            break

    normalized_reward = episode_reward / 200.0
    # calculate policy gradient loss
    policy_loss = []
    for log_prob in log_probs:
        policy_loss.append(-log_prob * normalized_reward)

    policy_loss = torch.stack(policy_loss).sum()

    # update the weights
    optimizer.zero_grad()
    policy_loss.backward()
    optimizer.step()

    episode_reward_window.append(episode_reward)
    if len(episode_reward_window) > 100:
        episode_reward_window.pop(0)
    avg_reward = sum(episode_reward_window) / len(episode_reward_window)

    if avg_reward > 195:
        print('solved at episode', i_episode)
        break

    if i_episode % 100 == 0:
        print('episode', i_episode, 'avg_reward', avg_reward)

env.close()

  logger.deprecation(


episode 0 avg_reward 16.0
episode 100 avg_reward 21.37
episode 200 avg_reward 24.7
episode 300 avg_reward 32.65
episode 400 avg_reward 30.68
episode 500 avg_reward 45.83
episode 600 avg_reward 55.12
episode 700 avg_reward 63.6
episode 800 avg_reward 83.98
episode 900 avg_reward 76.84
episode 1000 avg_reward 109.59
episode 1100 avg_reward 109.31
episode 1200 avg_reward 117.84
episode 1300 avg_reward 127.42
episode 1400 avg_reward 145.33
episode 1500 avg_reward 166.29
episode 1600 avg_reward 168.66
episode 1700 avg_reward 180.45
episode 1800 avg_reward 183.08
episode 1900 avg_reward 181.85
episode 2000 avg_reward 182.38
episode 2100 avg_reward 186.64
episode 2200 avg_reward 181.18
episode 2300 avg_reward 187.71
solved at episode 2383


## Actor-Critic

In [4]:
import torch.nn as nn
import torch.optim as optim
import numpy as np

In [9]:
class ActorCriticNet(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(ActorCriticNet, self).__init__()

        # Actor network (policy)
        self.actor = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim)
        )

        # Critic network (value)
        self.critic = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, state):
        # Returns action logits and state value
        return self.actor(state), self.critic(state)

In [10]:
class ActorCritic:
    def __init__(self, state_dim, action_dim, learning_rate=0.001):
        self.model = ActorCriticNet(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.gamma = 0.99

    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        action_logits, _ = self.model(state)
        action_probs = torch.softmax(action_logits, dim=-1)
        action = torch.multinomial(action_probs, 1).item()
        return action

    def train(self, states, actions, total_reward, terminal_state):
        # Convert lists to tensors
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)

        # Compute returns (works because of sparse reward)
        returns = torch.zeros_like(states[:, 0], dtype=torch.float)
        returns[-1] = total_reward

        # Compute loss for each step
        actor_losses = []
        critic_losses = []

        for t in reversed(range(len(states) - 1)):
            # Actor-Critic update
            action_logits, value = self.model(states[t].unsqueeze(0))
            _, next_value = self.model(states[t+1].unsqueeze(0))

            # Compute advantage (using sparse total reward)
            advantage = total_reward - value.detach()

            # Actor loss (policy gradient)
            action_probs = torch.softmax(action_logits, dim=-1)
            log_prob = torch.log(action_probs[0, actions[t]])
            actor_loss = -log_prob * advantage
            actor_losses.append(actor_loss)

            # Critic loss
            critic_loss = advantage.pow(2)
            critic_losses.append(critic_loss)

        # Combine and backpropagate
        total_loss = torch.stack(actor_losses).sum() + 0.5 * torch.stack(critic_losses).sum()

        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return total_loss.item()

In [11]:
def train_sparse_reward_actor_critic(env_name='CartPole-v1', num_episodes=5000):
    env = gym.make(env_name)

    # Get state and action dimensions
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    # Initialize Actor-Critic agent
    agent = ActorCritic(state_dim, action_dim)

    episode_rewards = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False

        # For sparse reward tracking
        states = []
        actions = []

        while not done:
            # Select action
            action = agent.select_action(state)

            # Store state and action for later training
            states.append(state)
            actions.append(action)

            # Take action
            next_state, reward, done, _, _ = env.step(action)

            state = next_state

        # Sparse reward: total steps as reward
        total_reward = len(states)

        # Train the agent with all collected states and actions
        agent.train(states, actions, total_reward, state)

        episode_rewards.append(total_reward)

        # Compute moving average
        if len(episode_rewards) > 100:
            episode_rewards.pop(0)
        avg_reward = np.mean(episode_rewards)

        # Print progress
        if episode % 100 == 0:
            print(f'Episode {episode}, Average Reward: {avg_reward:.2f}')

        # Check if environment is solved
        if avg_reward > 195:
            print(f'Solved in {episode} episodes!')
            break

    env.close()
    return agent

In [12]:
agent = train_sparse_reward_actor_critic()

  states = torch.FloatTensor(states)


Episode 0, Average Reward: 24.00
Episode 100, Average Reward: 26.92
Episode 200, Average Reward: 35.33
Episode 300, Average Reward: 39.68
Episode 400, Average Reward: 45.00
Episode 500, Average Reward: 44.31
Episode 600, Average Reward: 52.33
Episode 700, Average Reward: 73.87
Episode 800, Average Reward: 65.07
Episode 900, Average Reward: 101.50
Episode 1000, Average Reward: 105.41
Episode 1100, Average Reward: 141.84
Episode 1200, Average Reward: 166.82
Episode 1300, Average Reward: 162.92
Solved in 1340 episodes!


## PPO

In [13]:
class PPONet(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PPONet, self).__init__()

        # Actor network (policy)
        self.actor = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim),
            nn.Softmax(dim=-1)
        )

        # Critic network (value)
        self.critic = nn.Sequential(
            nn.Linear(state_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, state):
        # Returns action probabilities and state value
        return self.actor(state), self.critic(state)

In [14]:
class PPO:
    def __init__(self, state_dim, action_dim, learning_rate=0.001,
                 clip_ratio=0.2, value_loss_coef=0.5, entropy_coef=0.01):
        self.model = PPONet(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)

        # PPO hyperparameters
        self.clip_ratio = clip_ratio
        self.value_loss_coef = value_loss_coef
        self.entropy_coef = entropy_coef
        self.gamma = 0.99

    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0)
        action_probs, _ = self.model(state)
        action = torch.multinomial(action_probs, 1).item()
        return action

    def train(self, states, actions, total_reward):
        # Convert lists to tensors
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)

        # Compute old action probabilities
        old_action_probs, old_values = self.model(states)
        old_action_probs = old_action_probs.detach()
        old_values = old_values.detach()

        # Get current action probabilities and values
        curr_action_probs, curr_values = self.model(states)

        # Compute advantages
        advantages = total_reward - old_values.squeeze()

        # PPO policy loss
        ratios = curr_action_probs.gather(1, actions.unsqueeze(1)).squeeze() / \
                 old_action_probs.gather(1, actions.unsqueeze(1)).squeeze()
        surr1 = ratios * advantages
        surr2 = torch.clamp(ratios, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages
        policy_loss = -torch.min(surr1, surr2).mean()

        # Value loss
        value_loss = (curr_values.squeeze() - total_reward).pow(2).mean()

        # Entropy loss for exploration
        entropy_loss = -(curr_action_probs * torch.log(curr_action_probs + 1e-10)).sum(dim=1).mean()

        # Total loss
        total_loss = (policy_loss +
                      self.value_loss_coef * value_loss -
                      self.entropy_coef * entropy_loss)

        # Backpropagate
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return total_loss.item()


In [15]:
def train_sparse_reward_ppo(env_name='CartPole-v1', num_episodes=5000):
    env = gym.make(env_name)

    # Get state and action dimensions
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    # Initialize PPO agent
    agent = PPO(state_dim, action_dim)

    episode_rewards = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False

        # For sparse reward tracking
        states = []
        actions = []

        while not done:
            # Select action
            action = agent.select_action(state)

            # Store state and action for later training
            states.append(state)
            actions.append(action)

            # Take action
            next_state, reward, done, _, _ = env.step(action)

            state = next_state

        # Sparse reward: total steps as reward
        total_reward = len(states)

        # Train the agent with all collected states and actions
        agent.train(states, actions, total_reward)

        episode_rewards.append(total_reward)

        # Compute moving average
        if len(episode_rewards) > 100:
            episode_rewards.pop(0)
        avg_reward = np.mean(episode_rewards)

        # Print progress
        if episode % 100 == 0:
            print(f'Episode {episode}, Average Reward: {avg_reward:.2f}')

        # Check if environment is solved
        if avg_reward > 195:
            print(f'Solved in {episode} episodes!')
            break

    env.close()
    return agent


In [16]:
agent = train_sparse_reward_ppo()

Episode 0, Average Reward: 19.00
Episode 100, Average Reward: 20.21
Episode 200, Average Reward: 19.84
Episode 300, Average Reward: 17.67
Episode 400, Average Reward: 24.64
Episode 500, Average Reward: 26.97
Episode 600, Average Reward: 31.63
Episode 700, Average Reward: 36.26
Episode 800, Average Reward: 46.76
Episode 900, Average Reward: 63.60
Episode 1000, Average Reward: 88.56
Episode 1100, Average Reward: 154.16
Solved in 1181 episodes!


## GRPO

In [17]:
import random
from collections import deque

class PolicyNet(torch.nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(4, 64)
        self.fc2 = torch.nn.Linear(64, 2)

    def forward(self, x):
        x = torch.nn.functional.relu(self.fc1(x))
        x = self.fc2(x)
        return x

def collect_trajectory(env, net):
    observation = env.reset()[0]
    log_probs = []
    observations = []
    chosen_actions = []
    episode_reward = 0

    for t in range(200):
        observations.append(observation)
        logits = net(torch.from_numpy(observation).float())
        probs = torch.nn.functional.softmax(logits, dim=0)
        action = torch.multinomial(probs, 1).item()

        observation, reward, done, _, info = env.step(action)
        log_prob = torch.log(probs[action])
        log_probs.append(log_prob.item())
        chosen_actions.append(action)
        episode_reward += reward

        if done:
            break

    normalized_reward = episode_reward / 200.0 # because max reward possible in this env is 200
    return observations, log_probs, chosen_actions, normalized_reward


def grpo_update(trajectories, net, optimizer, n_iterations=20):
    rewards = [r for o, l, a, r in trajectories]
    mean_reward = sum(rewards) / len(rewards)
    std_reward = np.std(rewards) + 1e-8
    advantages = [(r - mean_reward) / std_reward for r in rewards]

    for i_iter in range(n_iterations):
        loss = 0
        # iterating over each trajectory in the group
        for traj, advantage in zip(trajectories, advantages):
            (observations, log_probs, chosen_actions, _) = traj
            trajectory_loss = 0
            # iterating over each time step in the trajectory
            for t in range(len(observations)):
                new_policy_probs = torch.nn.functional.softmax(net(torch.from_numpy(observations[t]).float()), dim=0)
                new_log_probs = torch.log(new_policy_probs)[chosen_actions[t]]

                ratio = torch.exp(new_log_probs - log_probs[t])
                clipped_ratio = torch.clamp(ratio, min=1 - eps, max=1 + eps)
                trajectory_loss += -clipped_ratio * advantage
            trajectory_loss /= len(observations)
            loss += trajectory_loss
        loss /= len(trajectories)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


env = gym.make('CartPole-v0')
net = PolicyNet()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
episode_reward_window = deque(maxlen=100)

# GRPO specific parameters
trajectories_per_update = 5  # group size
# epsilon for clipping
eps = 0.2

# training loop
for i_episode in range(5000):
    trajectories = []
    episode_rewards = []

    for _ in range(trajectories_per_update):
        observations, log_probs, chosen_actions, normalized_reward = collect_trajectory(env, net)
        trajectories.append((observations, log_probs, chosen_actions, normalized_reward))
        episode_rewards.append(normalized_reward * 200)  # unnormalize for tracking

    # update policy using grpo on the collected trajectories
    grpo_update(trajectories, net, optimizer)

    episode_reward_window.extend(episode_rewards)
    avg_reward = sum(episode_reward_window) / len(episode_reward_window)

    if avg_reward > 195:
        print('solved at episode', i_episode)
        break

    if i_episode % 10 == 0:
        print(f'episode {i_episode}, avg reward: {avg_reward:.2f}')

env.close()

  logger.deprecation(


episode 0, avg reward: 19.80
episode 10, avg reward: 34.44
episode 20, avg reward: 51.11
episode 30, avg reward: 79.44
episode 40, avg reward: 133.47
episode 50, avg reward: 179.92
episode 60, avg reward: 192.34
solved at episode 67
