## <center>CSE 546: Reinforcement Learning</center>
### <center>Prof. Alina Vereshchaka</center>
#### <center>Spring 2025</center>

Welcome to the Assignment 3, Part 1: Introduction to Actor-Critic Methods! It includes the implementation of simple actor and critic networks and best practices used in modern Actor-Critic algorithms.

## Section 0: Setup and Imports

In [23]:
!pip install swig
!pip install "gymnasium[box2d]"
!pip install gymnasium[atari,accept-rom-license]
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gymnasium as gym
import matplotlib.pyplot as plt
from collections import deque
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.nn.utils as utils
# Set seed for reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)



<torch._C.Generator at 0x7852900f4030>

## Section 1: Actor-Critic Network Architectures and Loss Computation

In this section, you will explore two common architectural designs for Actor-Critic methods and implement their corresponding loss functions using dummy tensors. These architectures are:
- A. Completely separate actor and critic networks
- B. A shared network with two output heads

Both designs are widely used in practice. Shared networks are often more efficient and generalize better, while separate networks offer more control and flexibility.

---


### Task 1a – Separate Actor and Critic Networks with Loss Function

Define a class `SeparateActorCritic`. Your goal is to:
- Create two completely independent neural networks: one for the actor and one for the critic.
- The actor should output a probability distribution over discrete actions (use `nn.Softmax`).
- The critic should output a single scalar value.

 Use `nn.ReLU()` as your activation function. Include at least one hidden layer of reasonable width (e.g. 64 or 128 units).

```python
# TODO: Define SeparateActorCritic class
```

 Next, simulate training using dummy tensors:
1. Generate dummy tensors for log-probabilities, returns, estimated values, and entropies.
2. Compute the actor loss using the advantage (return - value).
3. Compute the critic loss as mean squared error between values and returns.
4. Use a single optimizer for both the Actor and the Critic. In this case, combine the actor and critic losses into a total loss and perform backpropagation.
5. Use a separate optimizers for both the Actor and the Critic. In this case, keep the actor and critic losses separate and perform backpropagation.

```python
# TODO: Simulate loss computation and backpropagation
```

🔗 Helpful references:
- PyTorch Softmax: https://pytorch.org/docs/stable/generated/torch.nn.Softmax.html
- PyTorch MSE Loss: https://pytorch.org/docs/stable/generated/torch.nn.functional.mse_loss.html

---

In [3]:
# TODO: Define a class SeparateActorCritic with separate networks for actor and critic

# BEGIN_YOUR_CODE
class SeparateActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_size=64):
        """
        Args:
            state_dim (int): Dimension of state/input.
            action_dim (int): Number of discrete actions.
            hidden_size (int): Width of the hidden layer.
        """
        super(SeparateActorCritic, self).__init__()
        # Actor network
        self.actor_fc1 = nn.Linear(state_dim, hidden_size)
        self.actor_fc2 = nn.Linear(hidden_size, action_dim)
        self.actor_softmax = nn.Softmax(dim=-1)

        # Critic network
        self.critic_fc1 = nn.Linear(state_dim, hidden_size)
        self.critic_fc2 = nn.Linear(hidden_size, 1)

    def forward(self, state):
        """Forward pass for both actor and critic."""
        # Actor forward pass
        actor_hidden = F.relu(self.actor_fc1(state))
        action_logits = self.actor_fc2(actor_hidden)
        action_probs = self.actor_softmax(action_logits)

        # Critic forward pass
        critic_hidden = F.relu(self.critic_fc1(state))
        value = self.critic_fc2(critic_hidden)

        return action_probs, value

# END_YOUR_CODE

In [4]:
def simulate_training_single_optimizer(model, state):
    """
    Simulate training using dummy tensors with a single optimizer for both Actor and Critic.
    """
    # Forward pass through our model
    action_probs, value = model(state)

    # --- Dummy tensor simulation ---
    # Simulate dummy log probabilities from the actor; here we assume a batch of actions
    dummy_log_probs = torch.log(action_probs + 1e-8)

    # Simulated returns (e.g., discounted rewards)
    dummy_returns = torch.randn_like(value)

    # Simulated dummy entropies (for the actor)
    dummy_entropies = -torch.sum(action_probs * dummy_log_probs, dim=1, keepdim=True)

    # Compute advantage: (return - value)
    advantage = dummy_returns - value.detach()

    # Actor loss: negative log probability * advantage - entropy bonus (we use a factor of 0.01 for entropy regularization)
    actor_loss = -(dummy_log_probs * advantage).mean() - 0.01 * dummy_entropies.mean()

    # Critic loss: mean squared error between value and return
    critic_loss = F.mse_loss(value, dummy_returns)

    # Total loss as weighted sum (here we assume equal weighting)
    total_loss = actor_loss + critic_loss

    # Single optimizer: update model parameters
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    optimizer.zero_grad()
    total_loss.backward()
    optimizer.step()

    print("Single Optimizer:")
    print("Actor Loss:", actor_loss.item())
    print("Critic Loss:", critic_loss.item())
    print("Total Loss:", total_loss.item())


def simulate_training_separate_optimizers(model, state):
    """
    Simulate training using dummy tensors with separate optimizers for Actor and Critic.
    """
    # Forward pass through our model
    action_probs, value = model(state)

    # --- Dummy tensor simulation ---
    dummy_log_probs = torch.log(action_probs + 1e-8)
    dummy_returns = torch.randn_like(value)
    dummy_entropies = -torch.sum(action_probs * dummy_log_probs, dim=1, keepdim=True)
    advantage = dummy_returns - value.detach()

    actor_loss = -(dummy_log_probs * advantage).mean() - 0.01 * dummy_entropies.mean()
    critic_loss = F.mse_loss(value, dummy_returns)

    # Separate optimizers for actor and critic
    # We need to separate the parameters that belong to each network.
    actor_params = list(model.actor_fc1.parameters()) + list(model.actor_fc2.parameters())
    critic_params = list(model.critic_fc1.parameters()) + list(model.critic_fc2.parameters())

    optimizer_actor = optim.Adam(actor_params, lr=1e-3)
    optimizer_critic = optim.Adam(critic_params, lr=1e-3)

    # Update actor parameters
    optimizer_actor.zero_grad()
    actor_loss.backward(retain_graph=True)  # Retain graph because critic loss still needs the graph.
    optimizer_actor.step()

    # Update critic parameters
    optimizer_critic.zero_grad()
    critic_loss.backward()
    optimizer_critic.step()

    print("\nSeparate Optimizers:")
    print("Actor Loss:", actor_loss.item())
    print("Critic Loss:", critic_loss.item())

In [6]:
state_dim = 10
action_dim = 4
batch_size = 5  # simulate a batch of 5 states

# Create dummy state tensor
dummy_state = torch.randn(batch_size, state_dim)

# Instantiate the model
model = SeparateActorCritic(state_dim, action_dim, hidden_size=64)

# Simulate training with a single optimizer for both networks.
simulate_training_single_optimizer(model, dummy_state)

# Simulate training with separate optimizers for actor and critic.
simulate_training_separate_optimizers(model, dummy_state)

Single Optimizer:
Actor Loss: 0.2998315095901489
Critic Loss: 2.5514073371887207
Total Loss: 2.85123872756958

Separate Optimizers:
Actor Loss: -0.46809422969818115
Critic Loss: 1.2666232585906982


### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

### Task 1b – Shared Network with Actor and Critic Heads + Loss Function

Now define a class `SharedActorCritic`:
- Build a shared base network (e.g., linear layer + ReLU)
- Create two heads: one for actor (output action probabilities) and one for critic (output state value)

```python
# TODO: Define SharedActorCritic class
```

Then:
1. Pass a dummy input tensor through the model to obtain action probabilities and value.
2. Simulate dummy rewards and compute advantage.
3. Compute the actor and critic losses, combine them, and backpropagate.

```python
# TODO: Simulate shared network loss computation and backpropagation
```

 Use `nn.Softmax` for actor output and `nn.Linear` for scalar critic output.

🔗 More reading:
- Policy Gradient Methods: https://spinningup.openai.com/en/latest/algorithms/vpg.html
- Actor-Critic Overview: https://www.tensorflow.org/agents/tutorials/6_reinforce_tutorial
- PyTorch Categorical Distribution: https://pytorch.org/docs/stable/distributions.html#categorical

---

In [8]:
# BEGIN_YOUR_CODE
class SharedActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_size=64):
        """
        Args:
            state_dim (int): Dimension of state/input.
            action_dim (int): Number of discrete actions.
            hidden_size (int): Number of hidden units for the shared network.
        """
        super(SharedActorCritic, self).__init__()

        # Shared base network (e.g., one hidden layer)
        self.shared_fc = nn.Linear(state_dim, hidden_size)

        # Actor head: outputs action logits that will be converted to probabilities
        self.actor_fc = nn.Linear(hidden_size, action_dim)
        self.actor_softmax = nn.Softmax(dim=-1)

        # Critic head: outputs a single scalar value for the state
        self.critic_fc = nn.Linear(hidden_size, 1)

    def forward(self, state):
        """
        Forward pass for the shared network.

        Args:
            state (torch.Tensor): Input tensor representing the state.

        Returns:
            action_probs (torch.Tensor): Probability distribution over actions.
            value (torch.Tensor): Scalar state value.
        """
        # Shared network
        shared_hidden = F.relu(self.shared_fc(state))

        # Actor branch: produce action probabilities
        action_logits = self.actor_fc(shared_hidden)
        action_probs = self.actor_softmax(action_logits)

        # Critic branch: produce state value
        value = self.critic_fc(shared_hidden)

        return action_probs, value

# END_YOUR_CODE

In [9]:
def simulate_training_shared_network(model, state):
    """
    Simulate training on the shared network using a dummy input.
    The procedure includes:
    - Forward pass through model to obtain action probabilities and value.
    - Dummy rewards and advantage computation.
    - Loss computation for both actor and critic.
    - Combined loss backpropagation using a single optimizer.
    """
    # Forward pass through the shared network
    action_probs, value = model(state)

    # --- Dummy data simulation ---
    # For actor loss: generate dummy log probabilities from the actor
    dummy_log_probs = torch.log(action_probs + 1e-8)

    # Simulate dummy rewards or returns
    dummy_returns = torch.randn_like(value)

    # Compute advantage (return - value)
    advantage = dummy_returns - value.detach()

    # Compute an entropy bonus (to encourage exploration)
    dummy_entropies = -torch.sum(action_probs * dummy_log_probs, dim=1, keepdim=True)

    # Actor loss: negative log probability weighted by advantage, with an entropy bonus.
    actor_loss = -(dummy_log_probs * advantage).mean() - 0.01 * dummy_entropies.mean()

    # Critic loss: mean squared error (MSE) between estimated value and return.
    critic_loss = F.mse_loss(value, dummy_returns)

    # Combine losses (here we simply sum them)
    total_loss = actor_loss + critic_loss

    # Backpropagation using a single optimizer
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    optimizer.zero_grad()
    total_loss.backward()
    optimizer.step()

    # Print losses for reference
    print("Shared Network Training:")
    print("Actor Loss:", actor_loss.item())
    print("Critic Loss:", critic_loss.item())
    print("Total Loss:", total_loss.item())

In [10]:
state_dim = 10    # Dimension of the state
action_dim = 4    # Number of discrete actions
batch_size = 5    # Batch size for dummy input

# Create a dummy state tensor with shape (batch_size, state_dim)
dummy_state = torch.randn(batch_size, state_dim)

# Initialize the shared actor-critic model
model = SharedActorCritic(state_dim, action_dim, hidden_size=64)

# Simulate training using the shared network
simulate_training_shared_network(model, dummy_state)

Shared Network Training:
Actor Loss: -0.031828150153160095
Critic Loss: 0.6814764142036438
Total Loss: 0.6496482491493225


### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

## Section 2: Auto-Adaptive Network Setup for Environments

You will now create a function that builds a shared actor-critic network that adapts to any Gymnasium environment. This function should inspect the environment and build input/output layers accordingly.

### Task 2: Auto-generate Input and Output Layers
Write a function `create_shared_network(env)` that constructs a neural network using the following rules:
- The input layer should match the environment's observation space.
- The output layer for the **actor** should depend on the action space:
  - For discrete actions: output probabilities using `nn.Softmax`.
  - For continuous actions: output mean and log std for a Gaussian distribution.
- The **critic** always outputs a single scalar value.

```python
# TODO: Define function `create_shared_network(env)`
```

#### Environments to Support:
Test your function with the following environments:
1. `CliffWalking-v0` (Use one-hot encoding for discrete integer observations.)
2. `LunarLander-v3` (Standard Box space for observations and discrete actions.)
3. `PongNoFrameskip-v4` (Use gym wrappers for Atari image preprocessing.)
4. `HalfCheetah-v5` (Continuous observation and continuous action.)

```python
# TODO: Loop through environments and test `create_shared_network`
```

Hint: Use `gym.spaces` utilities to determine observation/action types dynamically.

🔗 Observation/Action Space Docs:
- https://gymnasium.farama.org/api/spaces/

---

In [1]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F

# Shared Actor-Critic Network
class ActorCriticNet(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_size=128, is_continuous=False):
        super(ActorCriticNet, self).__init__()
        self.is_continuous = is_continuous

        self.fc1 = nn.Linear(input_dim, hidden_size)
        self.actor = nn.Linear(hidden_size, output_dim if not is_continuous else hidden_size)
        self.critic = nn.Linear(hidden_size, 1)

        if is_continuous:
            self.log_std = nn.Parameter(torch.zeros(output_dim))

    def forward(self, x):
        x = F.relu(self.fc1(x))
        value = self.critic(x)

        if self.is_continuous:
            mean = self.actor(x)
            return (mean, self.log_std.exp()), value
        else:
            logits = self.actor(x)
            probs = F.softmax(logits, dim=-1)
            return probs, value

In [2]:
def create_shared_network(env, hidden_size=128):
    obs_space = env.observation_space
    act_space = env.action_space

    if isinstance(obs_space, gym.spaces.Discrete):
        input_dim = obs_space.n
    else:  # Box
        input_dim = int(torch.prod(torch.tensor(obs_space.shape)))

    is_continuous = isinstance(act_space, gym.spaces.Box)
    output_dim = act_space.shape[0] if is_continuous else act_space.n

    return ActorCriticNet(input_dim, output_dim, hidden_size, is_continuous)


In [3]:
env_names = [
    "CliffWalking-v0",       # Discrete observation, discrete action
    "LunarLander-v2",        # Box observation, discrete action
    "PongNoFrameskip-v4",    # Atari env, pixel-based
    "HalfCheetah-v2",        # Continuous observation and action
]

for env_name in env_names:
    try:
        env = gym.make(env_name)
        print(f"\nCreating network for {env_name}:")
        model = create_shared_network(env, hidden_size=128)
    except Exception as e:
        print(f"Failed to create environment {env_name}: {e}")
        continue

    # Generate dummy input
    if isinstance(env.observation_space, gym.spaces.Discrete):
        dummy_input = torch.nn.functional.one_hot(torch.tensor([0, 1, 2]), num_classes=env.observation_space.n).float()
    elif isinstance(env.observation_space, gym.spaces.Box):
        shape = env.observation_space.shape
        input_dim = int(torch.prod(torch.tensor(shape)))
        dummy_input = torch.randn(3, input_dim)
    else:
        print("Unsupported observation space.")
        continue

    # Forward pass
    output = model(dummy_input)
    if isinstance(env.action_space, gym.spaces.Discrete):
        action_probs, value = output
        print("Actor Output (Action Probabilities):", action_probs)
        print("Critic Output (State Value):", value)
    elif isinstance(env.action_space, gym.spaces.Box):
        (mean, std), value = output
        print("Actor Output (Mean):", mean)
        print("Actor Output (Std):", std)
        print("Critic Output (State Value):", value)


Creating network for CliffWalking-v0:
Actor Output (Action Probabilities): tensor([[0.2537, 0.2514, 0.2460, 0.2489],
        [0.2448, 0.2540, 0.2606, 0.2406],
        [0.2377, 0.2675, 0.2461, 0.2487]], grad_fn=<SoftmaxBackward0>)
Critic Output (State Value): tensor([[0.1194],
        [0.0913],
        [0.1154]], grad_fn=<AddmmBackward0>)

Creating network for LunarLander-v2:
Actor Output (Action Probabilities): tensor([[0.1882, 0.2264, 0.2945, 0.2909],
        [0.1614, 0.1792, 0.3211, 0.3382],
        [0.2363, 0.1976, 0.2451, 0.3210]], grad_fn=<SoftmaxBackward0>)
Critic Output (State Value): tensor([[ 0.0307],
        [ 0.0077],
        [-0.3122]], grad_fn=<AddmmBackward0>)

Creating network for PongNoFrameskip-v4:
Actor Output (Action Probabilities): tensor([[0.2295, 0.1743, 0.1103, 0.1654, 0.1799, 0.1406],
        [0.1752, 0.1676, 0.1420, 0.1350, 0.2967, 0.0835],
        [0.1989, 0.1761, 0.1338, 0.1334, 0.1816, 0.1763]],
       grad_fn=<SoftmaxBackward0>)
Critic Output (State Value)

  logger.warn(


In [5]:
pip install "gymnasium[mujoco]"

Collecting mujoco>=2.1.5 (from gymnasium[mujoco])
  Downloading mujoco-3.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Collecting glfw (from mujoco>=2.1.5->gymnasium[mujoco])
  Downloading glfw-2.8.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl.metadata (5.4 kB)
Downloading mujoco-3.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m71.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading glfw-2.8.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl (243 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m243.4/243.4 kB[0m [31m20.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packag

In [6]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F

# Shared network definition for both discrete and continuous action spaces
class SharedNetwork(nn.Module):
    def __init__(self, input_dim, action_space, hidden_size=128):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)

        self.critic = nn.Linear(hidden_size, 1)

        if isinstance(action_space, gym.spaces.Discrete):
            self.actor = nn.Linear(hidden_size, action_space.n)
            self.is_discrete = True
        elif isinstance(action_space, gym.spaces.Box):
            self.actor_mean = nn.Linear(hidden_size, action_space.shape[0])
            self.actor_log_std = nn.Parameter(torch.zeros(action_space.shape[0]))
            self.is_discrete = False
        else:
            raise NotImplementedError("Unsupported action space type")

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        value = self.critic(x)

        if self.is_discrete:
            logits = self.actor(x)
            action_probs = F.softmax(logits, dim=-1)
            return action_probs, value
        else:
            mean = self.actor_mean(x)
            log_std = self.actor_log_std.expand_as(mean)
            return (mean, log_std), value

def create_shared_network(env, hidden_size=128):
    obs_space = env.observation_space
    if isinstance(obs_space, gym.spaces.Discrete):
        input_dim = obs_space.n
    elif isinstance(obs_space, gym.spaces.Box):
        input_dim = int(torch.tensor(obs_space.shape).prod().item())
    else:
        raise NotImplementedError("Unsupported observation space type")
    return SharedNetwork(input_dim, env.action_space, hidden_size)

# Use gymnasium environments only
env_names = [
    "CliffWalking-v0",         # Discrete obs/act
    "LunarLander-v3",          # Box obs, discrete act
    # "PongNoFrameskip-v4",    # Only if Atari and AutoROM set up
    "HalfCheetah-v5",        # Requires MuJoCo, skip unless installed
]

for env_name in env_names:
    try:
        env = gym.make(env_name)
        print(f"\n✅ Creating network for {env_name}:")
        model = create_shared_network(env, hidden_size=128)
    except Exception as e:
        print(f"❌ Failed to create environment {env_name}: {e}")
        continue

    # Dummy input setup
    if isinstance(env.observation_space, gym.spaces.Discrete):
        dummy_input = F.one_hot(torch.tensor([0, 1, 2]), num_classes=env.observation_space.n).float()
    elif isinstance(env.observation_space, gym.spaces.Box):
        obs_shape = env.observation_space.shape
        input_dim = int(torch.tensor(obs_shape).prod().item())
        dummy_input = torch.randn(3, input_dim)
    else:
        print(f"⚠️ Unsupported observation space for {env_name}")
        continue

    # Forward pass
    output = model(dummy_input)
    if isinstance(env.action_space, gym.spaces.Discrete):
        action_probs, value = output
        print("Actor Output (Action Probabilities):", action_probs)
        print("Critic Output (State Value):", value)
    elif isinstance(env.action_space, gym.spaces.Box):
        (mean, log_std), value = output
        print("Actor Output (Mean):", mean)
        print("Actor Output (Log Std):", log_std)
        print("Critic Output (State Value):", value)



✅ Creating network for CliffWalking-v0:
Actor Output (Action Probabilities): tensor([[0.2622, 0.2393, 0.2595, 0.2390],
        [0.2633, 0.2386, 0.2590, 0.2391],
        [0.2636, 0.2410, 0.2608, 0.2347]], grad_fn=<SoftmaxBackward0>)
Critic Output (State Value): tensor([[-0.0108],
        [ 0.0163],
        [ 0.0144]], grad_fn=<AddmmBackward0>)

✅ Creating network for LunarLander-v3:
Actor Output (Action Probabilities): tensor([[0.2782, 0.2424, 0.2597, 0.2197],
        [0.2309, 0.2520, 0.2414, 0.2757],
        [0.2445, 0.2619, 0.2434, 0.2502]], grad_fn=<SoftmaxBackward0>)
Critic Output (State Value): tensor([[0.4528],
        [0.1845],
        [0.2574]], grad_fn=<AddmmBackward0>)

✅ Creating network for HalfCheetah-v5:
Actor Output (Mean): tensor([[ 0.0101,  0.0047, -0.0708, -0.0378, -0.0396,  0.0467],
        [-0.1205, -0.0997,  0.0503, -0.0955, -0.0657,  0.2039],
        [-0.0487,  0.0825,  0.0891, -0.2951, -0.0791,  0.0661]],
       grad_fn=<AddmmBackward0>)
Actor Output (Log Std): t

### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

### Task 3: Write Observation Normalization Function
Create a function `normalize_observation(obs, env)` that:
- Checks if the observation space is `Box` and has `low` and `high` attributes.
- If so, normalize the input observation.
- Otherwise, return the observation unchanged.

```python
# TODO: Define `normalize_observation(obs, env)`
```

Test this function with observations from:
- `LunarLander-v3`
- `PongNoFrameskip-v4`

Note: Atari observations are image arrays. Normalize pixel values to [0, 1]. For LunarLander-v3, the different elements in the observation vector have different ranges. Normalize them to [0, 1] using the `low` and `high` attributes of the observation space.


---

In [19]:
import gymnasium as gymn  # For LunarLander-v3
import gym               # For PongNoFrameskip-v4
import numpy as np

def normalize_observation(obs, env):
    """
    Normalize an observation based on the observation space attributes.

    - If `obs` is a dict, normalize each value.
    - If `obs` is a list or tuple, attempt to stack if all elements have the same shape;
      otherwise, process each element individually.
    - For environments with a Box observation space:
         - If the dtype is uint8 (e.g., Atari images), convert to float32 and scale by 1/255.
         - Otherwise, normalize using the low/high bounds elementwise.
    - The output is clipped to [0, 1].
    """
    # Handle dict observations
    if isinstance(obs, dict):
        return {k: normalize_observation(v, env) for k, v in obs.items()}

    # Handle sequence observations (list or tuple)
    if isinstance(obs, (list, tuple)):
        try:
            # Try converting each element to a numpy array.
            arrays = [np.array(o) for o in obs]
            shapes = [a.shape for a in arrays]
            # If all elements share the same shape, stack them.
            if len(set(shapes)) == 1:
                obs = np.stack(arrays, axis=0)
            else:
                # Otherwise, normalize each element individually and return the same type.
                return type(obs)(normalize_observation(o, env) for o in obs)
        except Exception:
            # If any error occurs, normalize each element individually.
            return type(obs)(normalize_observation(o, env) for o in obs)

    # At this point, obs should be convertible to a numpy array.
    try:
        obs = np.asarray(obs, dtype=np.float32)
    except Exception as e:
        raise ValueError(f"Unable to convert observation to numpy array: {e}")

    # If the observation space defines low and high, use them.
    if hasattr(env.observation_space, 'low') and hasattr(env.observation_space, 'high'):
        # For Atari environments (commonly uint8 images), scale by 255.
        if hasattr(env.observation_space, 'dtype') and env.observation_space.dtype == np.uint8:
            normalized = obs / 255.0
        else:
            low = env.observation_space.low.astype(np.float32)
            high = env.observation_space.high.astype(np.float32)
            normalized = (obs - low) / (high - low)
        normalized = np.clip(normalized, 0, 1)
        return normalized
    else:
        return obs


In [20]:
env_lunar = gymn.make("LunarLander-v3")
obs_lunar, _ = env_lunar.reset()  # Gymnasium returns (observation, info)
norm_obs_lunar = normalize_observation(obs_lunar, env_lunar)
print("LunarLander-v3 original observation:", obs_lunar)
print("LunarLander-v3 normalized observation:", norm_obs_lunar)


LunarLander-v3 original observation: [ 1.3543129e-03  1.4018842e+00  1.3715582e-01 -4.0159121e-01
 -1.5624668e-03 -3.1067912e-02  0.0000000e+00  0.0000000e+00]
LunarLander-v3 normalized observation: [0.50027084 0.7803768  0.50685775 0.47992045 0.49987566 0.4984466
 0.         0.        ]


In [21]:
env_pong = gym.make("PongNoFrameskip-v4")
obs_pong = env_pong.reset()  # Gym returns the observation directly.
norm_obs_pong = normalize_observation(obs_pong, env_pong)

# For display, check if the result is a sequence or an array.
if isinstance(norm_obs_pong, (list, tuple)):
    shapes = [np.asarray(frame).shape for frame in norm_obs_pong]
    mins = [np.min(frame) for frame in norm_obs_pong]
    maxs = [np.max(frame) for frame in norm_obs_pong]
    print("PongNoFrameskip-v4 normalized observation shapes:", shapes)
    print("PongNoFrameskip-v4 normalized observation value ranges:")
    for i, (mi, ma) in enumerate(zip(mins, maxs)):
        print(f"  Frame {i}: {mi} to {ma}")
else:
    print("PongNoFrameskip-v4 original observation shape:", np.asarray(obs_pong).shape)
    print("PongNoFrameskip-v4 normalized observation range:",
          norm_obs_pong.min(), "to", norm_obs_pong.max())

PongNoFrameskip-v4 normalized observation shapes: [(210, 160, 3), ()]
PongNoFrameskip-v4 normalized observation value ranges:
  Frame 0: 0.0 to 0.8941176533699036
  Frame 1: {'lives': np.float32(0.0), 'episode_frame_number': np.float32(0.0), 'frame_number': np.float32(0.0)} to {'lives': np.float32(0.0), 'episode_frame_number': np.float32(0.0), 'frame_number': np.float32(0.0)}


### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

## Section 4: Gradient Clipping

To prevent exploding gradients, it's common practice to clip gradients before optimizer updates.

### Task 4: Clip Gradients for Actor-Critic Networks
Use dummy tensors and apply gradient clipping with the following PyTorch method:
```python
# During training, after loss.backward():
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
```

Reuse the loss computation from Task 1a or 1b. After computing the gradients, apply gradient clipping.
Print the gradient norm before and after clipping to verify it’s applied.

🔗 PyTorch Docs: https://pytorch.org/docs/stable/generated/torch.nn.utils.clip_grad_norm_.html


---

In [24]:
# BEGIN_YOUR_CODE
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)

class DummyActorCritic(nn.Module):
    def __init__(self, input_size, num_actions):
        super(DummyActorCritic, self).__init__()
        self.fc = nn.Linear(input_size, 128)
        self.actor = nn.Linear(128, num_actions)
        self.critic = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.fc(x))
        # Actor outputs action probabilities via softmax.
        action_probs = F.softmax(self.actor(x), dim=-1)
        # Critic outputs a single value.
        value = self.critic(x)
        return action_probs, value

# END_YOUR_CODE

In [25]:
def simulate_training_with_grad_clip(model, state):
    """
    Simulate training using dummy tensors with a single optimizer for both Actor and Critic.
    After computing the gradients, gradient clipping is applied.
    The gradient norm is printed before and after clipping.
    """
    # Forward pass through the model
    action_probs, value = model(state)

    # --- Dummy tensor simulation ---
    # Simulate dummy log probabilities from the actor; assume a batch of actions.
    dummy_log_probs = torch.log(action_probs + 1e-8)

    # Simulated returns (e.g., discounted rewards)
    dummy_returns = torch.randn_like(value)

    # Simulated dummy entropies (for the actor)
    dummy_entropies = -torch.sum(action_probs * dummy_log_probs, dim=1, keepdim=True)

    # Compute advantage: (return - value)
    advantage = dummy_returns - value.detach()

    # Actor loss: negative log probability * advantage minus entropy bonus (with a factor of 0.01)
    actor_loss = -(dummy_log_probs * advantage).mean() - 0.01 * dummy_entropies.mean()

    # Critic loss: mean squared error between value and return
    critic_loss = F.mse_loss(value, dummy_returns)

    # Total loss as weighted sum (here we assume equal weighting)
    total_loss = actor_loss + critic_loss

    # Create a single optimizer for both actor and critic
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    optimizer.zero_grad()

    # Backward pass: compute gradients
    total_loss.backward()

    # Compute gradient norm before clipping
    pre_clip_norm = 0.0
    for p in model.parameters():
        if p.grad is not None:
            pre_clip_norm += p.grad.data.norm(2).item() ** 2
    pre_clip_norm = pre_clip_norm ** 0.5

    # Apply gradient clipping: returns total norm before clipping (for reference)
    clip_norm = utils.clip_grad_norm_(model.parameters(), max_norm=0.5)

    # Compute gradient norm after clipping
    post_clip_norm = 0.0
    for p in model.parameters():
        if p.grad is not None:
            post_clip_norm += p.grad.data.norm(2).item() ** 2
    post_clip_norm = post_clip_norm ** 0.5

    # Optimizer step
    optimizer.step()

    print("Single Optimizer Training Step:")
    print("Actor Loss:", actor_loss.item())
    print("Critic Loss:", critic_loss.item())
    print("Total Loss:", total_loss.item())
    print("Gradient norm before clipping:", pre_clip_norm)
    print("Gradient norm reported by clip_grad_norm_ (before clipping):", clip_norm)
    print("Gradient norm after clipping:", post_clip_norm)

In [26]:
input_size = 8     # Example input dimension (e.g., LunarLander-v3 state size)
num_actions = 4    # Example number of actions
model = DummyActorCritic(input_size, num_actions)

# Create a dummy state tensor (batch size 1)
state = torch.randn(1, input_size)

simulate_training_with_grad_clip(model, state)

Single Optimizer Training Step:
Actor Loss: -2.9831323623657227
Critic Loss: 4.527118682861328
Total Loss: 1.5439863204956055
Gradient norm before clipping: 23.084273351243823
Gradient norm reported by clip_grad_norm_ (before clipping): tensor(23.0843)
Gradient norm after clipping: 0.4999999788338938


### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

If you are working in a team, provide a contribution summary.
| Team Member | Step# | Contribution (%) |
|---|---|---|
|   | Task 1 |   |
|   | Task 2 |   |
|   | Task 3 |   |
|   | Task 4 |   |
|   | **Total** |   |
