In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gymnasium as gym

In [None]:
class PPOPolicyNetwork(nn.Module): # Actor
    def __init__(self, input_dim, output_dim, num_layers=3, hidden_dim=16):
        super(PPOPolicyNetwork, self).__init__()

        self.input_dim = input_dim
        self.output_dim = output_dim
        self.num_layers = num_layers # Ensure at least 2 layers
        self.hidden_dim = hidden_dim

        layers = [nn.Linear(input_dim, hidden_dim), nn.ReLU()]
        for _ in range(num_layers - 2):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_dim, output_dim))  # Output layer
        
        self.network = nn.Sequential(*layers)

        # Proper weight initialization (important for stable training)
        self._init_weights()

    def _init_weights(self): # for better convergence
        for layer in self.network:
            if isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)
                nn.init.zeros_(layer.bias)

    def forward(self, x):
        return torch.tanh(self.network(x))  # Use tanh for continuous action spaces # Ensures actions remain in [-1,1]
    
    def custom_dump(self):
        return {
            'args': (self.input_dim, self.output_dim),
            'kwargs': {
                'num_layers': self.num_layers,
                'hidden_dim': self.hidden_dim,
            },
            'state_dict': self.state_dict(),
        }

    @classmethod
    def custom_load(cls, data):
        model = cls(*data['args'], **data['kwargs'])
        model.load_state_dict(data['state_dict'])
        return model
    
def _test_ppo_policy_forward(policy_model, input_shape, output_shape):
    """Tests that the PPO policy network returns correctly shaped tensors."""
    inputs = torch.randn(input_shape)  # Random input
    outputs = policy_model(inputs)  # Forward pass

    if not isinstance(outputs, torch.Tensor):
        raise Exception(f'Policy forward returned {type(outputs)} instead of torch.Tensor')

    if outputs.shape != output_shape:
        raise Exception(f'Policy forward returned shape {outputs.shape}, expected {output_shape}')

    if not outputs.requires_grad:
        raise Exception('Policy forward output does not require a gradient (it should).')

# Example Test
obs_dim, act_dim = 17, 6  # Walker2d observation & action space
policy_model = PPOPolicyNetwork(obs_dim, act_dim)
_test_ppo_policy_forward(policy_model, (64, obs_dim), (64, act_dim))  # Batch size 64
_test_ppo_policy_forward(policy_model, (10, obs_dim), (10, act_dim))  # Batch size 10
print("PPO Policy Network is correctly implemented!")

# Testing custom dump / load
ppo1 = PPOPolicyNetwork(17, 6, num_layers=4, hidden_dim=128)
ppo_dump = ppo1.custom_dump()
ppo2 = PPOPolicyNetwork.custom_load(ppo_dump)

# Assertions to verify correct restoration
assert ppo2.input_dim == 17
assert ppo2.output_dim == 6
assert ppo2.num_layers == 4
assert ppo2.hidden_dim == 128
print("PPO custom_dump and custom_load methods work correctly!")

PPO Policy Network is correctly implemented!
PPO custom_dump and custom_load methods work correctly!


In [None]:
class PPOValueNetwork(nn.Module): # Critic
    def __init__(self, input_dim, num_layers=3, hidden_dim=16):
        super(PPOValueNetwork, self).__init__()

        self.input_dim = input_dim
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        # Define layers similar to the PPOPolicyNetwork
        layers = [nn.Linear(input_dim, hidden_dim), nn.ReLU()]
        for _ in range(num_layers - 2):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_dim, 1))  # Output layer (single scalar value)

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)  # Single value per input
    
        # Custom dump method for saving model state
    def custom_dump(self):
        return {
            'args': (self.input_dim,),
            'kwargs': {
                'num_layers': self.num_layers,
                'hidden_dim': self.hidden_dim,
            },
            'state_dict': self.state_dict(),
        }

    # Custom load method for restoring model state
    @classmethod
    def custom_load(cls, data):
        model = cls(*data['args'], **data['kwargs'])
        model.load_state_dict(data['state_dict'])
        return model    
    
def _test_ppo_value_forward(value_model, input_shape):
    """Tests that the PPO value network returns a correctly shaped tensor."""
    inputs = torch.randn(input_shape)  # Random input
    outputs = value_model(inputs)  # Forward pass

    if not isinstance(outputs, torch.Tensor):
        raise Exception(f'Value forward returned {type(outputs)} instead of torch.Tensor')

    if outputs.shape != (input_shape[0], 1):  # Expecting (batch_size, 1)
        raise Exception(f'Value forward returned shape {outputs.shape}, expected ({input_shape[0]}, 1)')

    if not outputs.requires_grad:
        raise Exception('Value forward output does not require a gradient (it should).')

# Example Test
value_model = PPOValueNetwork(obs_dim)
_test_ppo_value_forward(value_model, (64, obs_dim))  # Batch size 64
_test_ppo_value_forward(value_model, (10, obs_dim))  # Batch size 10
print("PPO Value Network is correctly implemented!")

# Testing custom dump / load
value_model1 = PPOValueNetwork(obs_dim, num_layers=4, hidden_dim=128)
value_dump = value_model1.custom_dump()
value_model2 = PPOValueNetwork.custom_load(value_dump)

# Assertions to verify correct restoration
assert value_model2.input_dim == obs_dim
assert value_model2.num_layers == 4
assert value_model2.hidden_dim == 128
print("PPOValueNetwork custom_dump and custom_load methods work correctly!")

PPO Value Network is correctly implemented!
PPOValueNetwork custom_dump and custom_load methods work correctly!


: 

In [None]:
def compute_gae(rewards, values, gamma=0.99, tau=0.95):
    deltas = [] # Stores the advantages (deltas)
    gae = 0
    for t in reversed(range(len(rewards))):
        delta = rewards[t] + gamma * (values[t + 1] if t + 1 < len(values) else 0) - values[t] # Calculate the TD error (delta)
        gae = delta + gamma * tau * gae # Compute the GAE (advantage)
        deltas.insert(0, gae) # Insert the advantage at the front of the list (reverse order)
    return deltas

def ppo_loss(old_log_probs, new_log_probs, advantages, clip_epsilon):
    # Compute the ratio (pi_theta / pi_theta_old)
    ratio = torch.exp(new_log_probs - old_log_probs)
    
    # Compute the surrogate loss
    obj_surrogate = ratio * advantages
    obj_clipped = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon) * advantages

    # Final PPO objective (minimize the negative objective)
    loss = -torch.min(obj_surrogate, obj_clipped).mean()
    return loss

In [None]:
def train_ppo(env, policy, value_net, policy_optimizer, value_optimizer, num_episodes=100, batch_size=64):
    # Hyperparameters for PPO
    gamma = 0.99
    tau = 0.95
    clip_epsilon = 0.2
    n_epochs = 10  # Number of epochs to update the policy after each batch
    
    for episode in range(num_episodes):
        states, actions, rewards, log_probs = [], [], [], []

        # Collect trajectory
        obs, _ = env.reset()
        done = False
        while not done:
            obs_tensor = torch.tensor(obs, dtype=torch.float32)
            action = policy(obs_tensor).detach().numpy()  # Get action from policy
            log_prob = torch.log(policy(obs_tensor))  # Log probability of tak en action
            next_obs, reward, done, _, _ = env.step(action)

            states.append(obs)
            actions.append(action)
            rewards.append(reward)
            log_probs.append(log_prob)

            obs = next_obs

        # Convert lists to tensors
        states_tensor = torch.tensor(states, dtype=torch.float32)
        actions_tensor = torch.tensor(actions, dtype=torch.float32)
        rewards_tensor = torch.tensor(rewards, dtype=torch.float32)

        # Calculate values and advantages using the critic
        values = value_net(states_tensor)
        advantages = compute_gae(rewards_tensor, values, gamma, tau)

        # Update policy and critic
        for _ in range(n_epochs):
            # Compute the loss for the policy
            old_log_probs = torch.tensor(log_probs, dtype=torch.float32)
            new_log_probs = torch.log(policy(states_tensor))

            # Calculate PPO loss and perform backpropagation
            policy_loss = ppo_loss(old_log_probs, new_log_probs, advantages, clip_epsilon)
            policy_optimizer.zero_grad()
            policy_loss.backward()
            policy_optimizer.step()

            # Compute the loss for the value function
            value_loss = ((values - rewards_tensor) ** 2).mean()  # Mean squared error
            value_optimizer.zero_grad()
            value_loss.backward()
            value_optimizer.step()

        print(f"Episode {episode} completed")

    env.close()

In [6]:
# Reserve this cell for all the modifying parameters functions
def modify_mass_parameters(model):
    # Modify the mass of specific body parts
    model.body_mass[1] = 4.15  # Set mass of torso 
    model.body_mass[2], model.body_mass[5] = 0.6, 0.6  # Set mass of thigh
    model.body_mass[3], model.body_mass[6] = 0.3, 0.3  # Set mass of leg
    model.body_mass[4], model.body_mass[7]= 0.1, 0.1  # Set mass of foot

In [7]:
# Initialize environment
env = gym.make('Walker2d-v5')
# env.reset()
model = env.unwrapped.model
modify_mass_parameters(model) # Modify mass parameters for the environment's model

# Create policy and value networks
policy = PPOPolicyNetwork(input_dim=env.observation_space.shape[0], output_dim=env.action_space.shape[0])
value_net = PPOValueNetwork(input_dim=env.observation_space.shape[0])

# Create optimizers
policy_optimizer = optim.Adam(policy.parameters(), lr=3e-4)
value_optimizer = optim.Adam(value_net.parameters(), lr=3e-4)

# Train PPO
train_ppo(env, policy, value_net, policy_optimizer, value_optimizer)

  states_tensor = torch.tensor(states, dtype=torch.float32)


IndexError: index 12 is out of bounds for dimension 0 with size 12