In [2]:
!conda create -n sarsa python=3.9 -y
!conda activate sarsa
!pip install "gymnasium[atari]" numpy matplotlib
!pip install autorom[accept-rom-license]  # Downloading Gym env data files
!AutoROM --accept-license  # Accepting the license for data files
!pip install ipykernel  # Install Jupyter kernel manager
!ipython kernel install --user --name=sarsa  # Add the new Conda env to Jupyter

/bin/bash: line 1: conda: command not found
/bin/bash: line 1: conda: command not found
Collecting autorom[accept-rom-license]
  Downloading AutoROM-0.6.1-py3-none-any.whl.metadata (2.4 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Downloading AutoROM-0.6.1-py3-none-any.whl (9.4 kB)
Building wheels for collected packages: AutoROM.accept-rom-license
  Building wheel for AutoROM.accept-rom-license (pyproject.toml) ... [?25l[?25hdone
  Created wheel for AutoROM.accept-rom-license: filename=AutoROM.accept_rom_license-0.6.1-py3-none-any.whl size=446667 sha256=bd4fa45d38aeca5fab53d64e2a99aa9103c10c14224

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt


In [4]:
!pip install wandb -qU
import wandb
import random
import math


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.9/20.9 MB[0m [31m55.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [5]:
wandb.login(key="6d31e0d6a0ebbac0b62eac799f098e4d1094cf52")


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mpsingadi[0m ([33mGroupXV[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

## setting up the environment

In [6]:
env = gym.make("CartPole-v1", render_mode="human")
print(env.action_space.n)

2


In [7]:
#testing the environment
state = env.reset()
for _ in range(100):
    env.render()
    action = env.action_space.sample()
    state, reward, done, _, _ = env.step(action)
    if done:
        state = env.reset()
        break
env.close()

## Implement the Actor policy
it should outputs probabilities of each actions given the state using softmax

In [8]:
class DistActorCritic(nn.Module):
    def __init__(self, env_input_dim, env_output_dim, n_hidden, theta_value,n_channels=3, lr_range=[0.0001,0.3]):
        super(DistActorCritic, self).__init__()

        self.n_channels = n_channels  # dimension of distributional value output, eg. 3 (pess, opt, real) or 8
        self.n_hidden = n_hidden  # dimension of shared representation, eg. 64
        self.lr_range = lr_range
        self.env_input_dim = env_input_dim
        self.env_output_dim = env_output_dim
        self.theta_value = theta_value


        self.alpha_plus = [3*theta_value, 2*theta_value, theta_value]
        self.alpha_minus = [theta_value, 2*theta_value, 3*theta_value]


        self.nn = nn.Sequential(
            nn.Linear(self.env_input_dim, self.n_hidden),
            nn.ReLU()
        )

        self.actor = nn.Sequential(
            nn.Linear(self.n_hidden,self.env_output_dim),
            nn.Softmax(dim=-1)
        )

        self.critic = nn.Sequential(
            nn.Linear(self.n_hidden, self.n_channels),
            #nn.ReLU()
        )

    def forward(self, x):
        x = self.nn(x)
        return self.actor(x), self.critic(x)



In [9]:
# testing the environment
env = gym.make("CartPole-v1", render_mode="human")
state, _ = env.reset()
state_tensor = torch.FloatTensor(state).unsqueeze(0)

dist_actor_critic = DistActorCritic(theta_value=0.1, env_input_dim=env.observation_space.shape[0], env_output_dim=env.action_space.n,
                                    n_hidden=64, n_channels=8)

print("state tensor is",state_tensor.flatten(), state_tensor.shape)

action_probs, values = dist_actor_critic(state_tensor)


print(action_probs)
print(values)

state tensor is tensor([-0.0353,  0.0203,  0.0299, -0.0126]) torch.Size([1, 4])
tensor([[0.5365, 0.4635]], grad_fn=<SoftmaxBackward0>)
tensor([[ 0.1166,  0.0410, -0.0161,  0.1398,  0.0789, -0.2104, -0.1928, -0.2280]],
       grad_fn=<AddmmBackward0>)


In [10]:
# f(delta) functions
def f(delta, k=1):
    return torch.clamp(delta, min=-k, max=k)

In [11]:
# Update critic------ values

def update_critic(dist_actor_critic, state,next_state, reward, gamma, n_channels):
    _, values = dist_actor_critic(state)
    values = values
    _, next_values = dist_actor_critic(next_state)
    next_values = next_values

    # finding ith values in range n_channels to calculate delta and update values for delta +ve or -ve
    for i in range(n_channels):
        # now sampling one of the value predictions on j random index withing n_channels range

        delta = reward + gamma * next_values[0][i] - values[0][i]
        if delta > 0:
            values[0][i] += dist_actor_critic.alpha_plus[i] * f(delta)
        else:
            values[0][i] += dist_actor_critic.alpha_minus[i] * f(delta)

    return values, delta


## Advantage function and Actor-Critic algorithm


In [12]:
def train_actor_critic(env, theta_value, n_hidden=64, num_episodes=1000, gamma=0.99, alpha=0.001, beta=0.001, max_timesteps=200):
    # Initialize Critic and Actor networks

    dist_actor_critic = DistActorCritic(theta_value=theta_value, env_input_dim=env.observation_space.shape[0], env_output_dim=env.action_space.n, n_hidden=n_hidden)

    critic = dist_actor_critic.critic
    actor = dist_actor_critic.actor


    # Initialize optimizers

    optimizer = optim.Adam(dist_actor_critic.nn.parameters(), lr=alpha)

    # To store episode rewards
    episode_rewards = []
    episode_steps = []

    ave_episodes = []
    ave_steps = []

    # Training loop
    for episode in range(num_episodes):
        episode_reward = 0
        steps = 0


        env = gym.make("CartPole-v1", render_mode="human")
        state, _ = env.reset()
        state_tensor = torch.FloatTensor(state).unsqueeze(0)


        for t in range(max_timesteps):

            action_probs, values = dist_actor_critic(state_tensor)
            # Get action probabilities from the actor network
            action = torch.multinomial(action_probs, 1).item()

            # print("action is",action)

            # Take a step in the environment
            next_state, reward, done, _, _ = env.step(action)
            next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)

            # Update critic and get advantage
            values, delta = update_critic(dist_actor_critic, state_tensor, next_state_tensor, reward, gamma,
                                         dist_actor_critic.n_channels)
            advantage = torch.mean(delta)

            # Calculate actor loss (policy gradient)
            actor_loss = -torch.log(action_probs[0][action]) * advantage.detach()

            # Calculate critic loss (mean squared error)
            critic_loss = advantage ** 2


            # calculate main network loss (ave of the two)
            main_loss = actor_loss + critic_loss


            # Update actor and critic
            optimizer.zero_grad()
            main_loss.backward()
            optimizer.step()

            # Update state and action probabilities
            state_tensor = next_state_tensor
            action_probs, values = dist_actor_critic(state_tensor)

            episode_reward += reward
            steps += 1

            if done:
                break

        # Store episode reward
        episode_rewards.append(episode_reward)
        episode_steps.append(steps)

        # Print progress every 10 episodes
        if episode % 10 == 0:
            print(f"Episode {episode}, Reward: {episode_reward}")
            ave_episodes.append(episode)
            ave_steps.append(steps)

    return episode_rewards, episode_steps


In [13]:

def moving_average(data, window_size=10):
    return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

def plot_learning_curve(episode_rewards, episode_lengths, theta_value, window_size=10):
    # Compute smoothed values
    smoothed_rewards = moving_average(episode_rewards, window_size)
    smoothed_lengths = moving_average(episode_lengths, window_size)

    # Logging to WandB
    with wandb.init(project="ProjectName", name=f"theta_{theta_value}"):
        for ep in range(len(smoothed_rewards)):
            wandb.log({
                "theta": theta_value,
                "episode": ep,
                "reward": smoothed_rewards[ep],
                "steps": smoothed_lengths[ep]
            })

    # Plot
    plt.figure(figsize=(8, 5))
    plt.plot(smoothed_rewards, label="Smoothed Rewards", color='b', linewidth=2)
    plt.plot(smoothed_lengths, label="Smoothed Episode Lengths", color='r', linestyle="dashed", linewidth=2)

    plt.title(f"Smoothed Learning Curve (Theta={theta_value})")
    plt.xlabel("Episode")
    plt.ylabel("Value")
    plt.legend()
    plt.grid(True, linestyle="--", alpha=0.6)
    plt.show()



def plot_value_function_convergence(value_estimates, theta_value):
    plt.figure(figsize=(8, 5))
    plt.plot(moving_average(value_estimates, 10), color='purple', linewidth=2)

    plt.title(f"Value Function Convergence (Theta={theta_value})")
    plt.xlabel("Episode")
    plt.ylabel("Estimated Value")
    plt.grid(True, linestyle="--", alpha=0.6)
    plt.show()



def plot_reward_distribution(episode_rewards, theta_value):
    plt.figure(figsize=(8, 5))
    plt.hist(episode_rewards, bins=20, color='blue', alpha=0.7, edgecolor='black')

    plt.title(f"Reward Distribution (Theta={theta_value})")
    plt.xlabel("Total Reward per Episode")
    plt.ylabel("Frequency")
    plt.grid(True, linestyle="--", alpha=0.6)
    plt.show()


def plot_reward_variance(episode_rewards, theta_value, window_size=10):
    std_rewards = [np.std(episode_rewards[max(0, i-window_size):i+1]) for i in range(len(episode_rewards))]

    plt.figure(figsize=(8, 5))
    plt.plot(std_rewards, color='red', linewidth=2)

    plt.title(f"Reward Variance Over Time (Theta={theta_value})")
    plt.xlabel("Episode")
    plt.ylabel("Standard Deviation of Rewards")
    plt.grid(True, linestyle="--", alpha=0.6)
    plt.show()


In [None]:

theta = [0.03,3]
for i in range(len(theta)):
    theta_value = theta[i]
    #train_actor_critic(env, theta_value, num_episodes= 200)
    episode_rewards, episode_steps = train_actor_critic(env, theta_value, num_episodes= 800)
    plot_learning_curve(episode_rewards, episode_steps, theta_value)
    plot_reward_distribution(episode_rewards, theta_value)
    plot_reward_variance(episode_rewards, theta_value)
    plot_value_function_convergence(episode_rewards, theta_value)



Episode 0, Reward: 28.0
Episode 10, Reward: 11.0
Episode 20, Reward: 28.0
Episode 30, Reward: 17.0
Episode 40, Reward: 11.0
Episode 50, Reward: 11.0


don't want to touch the results below

In [None]:
# Plotting the learning curve
def plot_learning_curve(episode_rewards, episode_lengths, theta_value):


    with wandb.init(project="CartPole-v1", name=f"theta_{theta_value}"):

        for ep in range(len(episode_rewards)):
            wandb.log({
                "theta": theta_value,
                "episode": ep,
                "reward": episode_rewards[ep],
                "steps": episode_steps[ep]
            })

    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(episode_rewards)
    plt.title("Episode Rewards")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.subplot(1, 2, 2)
    plt.plot(episode_lengths)
    plt.title("Episode Lengths")
    plt.xlabel("Episode")
    plt.ylabel("Number of Steps")
    plt.tight_layout()
    plt.show()

In [None]:
# RL seems to be sensitive to lr weights, we chose to experiment with different weights of theta
theta = [0.001, 0.005, 0.01, 0.05]
for i in range(len(theta)):
    theta_value = theta[i]
    #train_actor_critic(env, theta_value, num_episodes= 200)
    episode_rewards, episode_steps = train_actor_critic(env, theta_value, num_episodes= 300)
    plot_learning_curve(episode_rewards, episode_steps, theta_value)




## Visualization


## Anallysis of results
The low rewards at the beginning of the training:

*  the agent is learning as it explores the environment more since it's actor's policy is essentially random *it does't know yet which actions gives high reward*

* the other posible cause could be the untrained critic network during early stages of learning resulting in making high inaccurate predictions for state values [the advantage function will be noisy]

* also random initialization in actions by the Actor network don't really give high rewards in CartPole game as the goal is to find a balance a pole on moving cart


The drop of rewards after stabilization:
* over shooting the optimal point during graduate descent or ascent when alpha and beta are set too high
* overfitting after the policy stabilizes during early stage
* policy gradient noise when advantange function is too high





