In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import torch
import matplotlib.pyplot as plt
from torch import nn
from torch.nn import functional as F
torch.autograd.set_detect_anomaly(True)

# Teaching a quadruped to walk

Time to try out the learning algorithms that you just implemented on a more difficult problem. The WalkerEnv implements a quadruped robot kind-of thing, see for yourself. The goal is to move in the $x$ direction as fast and as far as possible.

Your goal is to implement a class `WalkerPolicy` with function `determine_actions()` just like the StochasticPolicy we used earlier to control the pendulum. Below is a template of this class, but feel free to alter it however you want. The only important thing is the `determine_actions()` function!

After you implement it, copy `WalkerPolicy` into a separate file `WalkerPolicy.py` that you will upload to BRUTE together with the (optional) learned weights in a zip file. How the policy is implemented is up to you! You are constrained to only the libraries we used so far though, such as torch, numpy etc..

You will get some free points just for uploading a working policy (irrelevant of the performance). Further 2 points will be awarded for successfully traversing a small distance in the x direction.


# Hints

There is no single easy way of doing this, but here are some suggestions on what you could try to improve your policy:

1. This problem is much more difficult, than balancing a pendulum. It is a good idea to use a bit larger network than for the pendulum policy.

2. You can also try to use a different optimizer, such as Adam and play with the hyperparameters.

3. Using a neural network to compute the normal distribution scale $\sigma$ can lead to too much randomness in the actions (i.e. exploration). You can use a fixed $\sigma$ instead, or replace it with a learnable `torch.Parameter` initialized to some small constant. Make sure, you run it through an exponential, or softplus function to ensure $\sigma$ is positive.

4. The exploration can also be reduced by penalizing the variance of the action distribution in an additional loss term.

5. If you see some undesirable behaviour, you can tweak the reward function to penalize it. Even though the $x$ distance is all we care about, adding extra terms to the reward can help guide the learning process (This is known as reward shaping). Simply define a reward function mapping the state $s_{t+1}$ and action $a_t$ to a scalar reward $r_t$ and put it in the config dictionary under the key `'reward_fcn'`. See the `WalkerEnv` class for the implementation of the default reward.

6. Using the normal distribution on a bounded action space can lead to certain problems caused by action clipping. This can be mitigated by using a different distribution, such as the Beta distribution. See the `torch.distributions.beta` module for more information. (Note that Beta distribution is defined on the interval [0,1] and works better with parameters $\alpha,\beta \geq 1$.)


In [2]:
# If you cannot run with the visualization, you can set this to False
VISUALIZE = True

### import self-made modules

In [3]:
from environment.WalkerEnv import WalkerEnv
from WalkerPolicy import WalkerPolicy
import solution

### Define reward function

In [4]:
DISTANCE_MULTIPLIER = 1
VELOCITY_MULTIPLIER = 1
ACTION_PENALTY_MULTIPLIER = 0
STABILITY_PENALTY_MULTIPLIER = 0.25

def walker_reward(state, action):
    pos = state[:15]  # first 15 elements of state vector are generalized coordinates [xyz, quat, joint_angles]
    vel = state[15:]  # last 14 elements of state vector are generalized velocities [xyz_vel, omega, joint_velocities]
    x_velocity = vel[0]  # this is the x axis velocity
    x_distance = pos[0]  # this is the x axis position
    stability_penalty = np.sum(np.abs(vel[1:3]))  # Penalize y and z velocities
    action_penalty = np.sum(np.square(action))  # Penalize large actions
    return (x_distance * DISTANCE_MULTIPLIER + 
            x_velocity * VELOCITY_MULTIPLIER - 
            STABILITY_PENALTY_MULTIPLIER * stability_penalty - 
            ACTION_PENALTY_MULTIPLIER * action_penalty)

## Train loop

In [5]:
def train(env, policy, optimizer, num_iterations=500, gamma=0.99, epsilon=0.2, sgd_iters=5):
    mean_rewards, p_losses, v_losses = np.zeros(num_iterations), np.zeros(num_iterations), np.zeros(num_iterations)  # for logging mean rewards over epochs

    for iteration in range(num_iterations):
        # 1. Rollout
        observations, actions, rewards, log_probs, values = [], [], [], [], []
        obs = env.vector_reset()
        for _ in range(512):  # Rollout for 512 steps
            obs = np.array(obs[0])  # Ensure obs is a numpy array
            obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)  # Convert to tensor and add batch dimension
            action, logp, value = policy(obs_tensor)
            action = action.squeeze(0).numpy()  # Remove batch dimension and convert to numpy
            next_obs, reward = env.vector_step(action)
            reward = walker_reward(obs, action)  # Incorporate action into the reward
            observations.append(obs)
            actions.append(action)
            rewards.append(reward)
            log_probs.append(logp)
            values.append(value)
            obs = next_obs

        # 2. Convert data to tensors
        obs_tensor = torch.tensor(np.array(observations), dtype=torch.float32)
        act_tensor = torch.tensor(np.array(actions), dtype=torch.float32)
        rew_tensor = torch.tensor(np.array(rewards), dtype=torch.float32)
        logp_tensor = torch.stack(log_probs)
        values_tensor = torch.stack(values).squeeze(-1)

        # 3. Compute advantage or returns
        returns = discount_cum_sum(rew_tensor, gamma)
        returns = returns.unsqueeze(-1)  # Ensure returns have the same shape as values_tensor
        advantages = returns - values_tensor

        # Normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # 4. PPO loss
        for _ in range(sgd_iters):
            logp = policy.log_prob(act_tensor, obs_tensor)
            p_ratios = torch.exp(logp - logp_tensor.detach())
            L_ppo = ppo_loss(p_ratios, advantages, epsilon)
            L_v = value_loss(values_tensor, returns)
            total_loss = L_ppo + L_v

            optimizer.zero_grad()
            total_loss.backward(retain_graph=True)
            optimizer.step()

        mean_rewards[iteration] = rew_tensor.mean().item()
        v_losses[iteration] = L_v.item()
        p_losses[iteration] = L_ppo.item()

        # 5. Logging and plotting
        if iteration % 10 == 0:
            print(f"Iteration {iteration}: Loss = {total_loss.item()}, Mean reward = {mean_rewards[iteration]}")
    
    env.close()

    # Plotting
    plt.figure()
    plt.plot(mean_rewards, label='Mean Reward')
    plt.plot(p_losses, label='Policy Loss')
    plt.plot(v_losses, label='Value Loss')
    plt.xlabel('Iteration')
    plt.ylabel('Value')
    plt.legend()
    plt.show()

def discount_cum_sum(rewards, gamma):
    discounted_sum = 0
    discounted_rewards = []
    for reward in reversed(rewards):
        discounted_sum = reward + gamma * discounted_sum
        discounted_rewards.insert(0, discounted_sum)
    return torch.tensor(discounted_rewards, dtype=torch.float32)

def ppo_loss(p_ratios, advantages, epsilon):
    surr1 = p_ratios * advantages
    surr2 = torch.clamp(p_ratios, 1 - epsilon, 1 + epsilon) * advantages
    return -torch.min(surr1, surr2).mean()

def value_loss(values, returns):
    return F.mse_loss(values, returns)

policy = WalkerPolicy()
optimizer = torch.optim.Adam(policy.parameters(), lr=1e-3)
config = {'N': 1, 'vis': False, "track": 0, "reward_fcn": walker_reward}
env = WalkerEnv(config)
train(env, policy, optimizer, num_iterations=500)
policy.save_weights()

Environment ready


RuntimeError: one of the variables needed for gradient computation has been modified by an inplace operation: [torch.FloatTensor [128, 1]], which is output 0 of AsStridedBackward0, is at version 2; expected version 1 instead. Hint: enable anomaly detection to find the operation that failed to compute its gradient, with torch.autograd.set_detect_anomaly(True).

## Visualise the trained quadruped

In [None]:
# This is the configuration for the Walker environment
# N is the number of robots controlled in parallel
# vis is a boolean flag to enable visualization
# !! IMPORTANT track is an integer index to enable camera tracking of a particular robot (indexed by the value of the argument), this is useful when evaluating the performance of the policy after training
# reward_fcn is the reward function that the environment will use to calculate the reward

T = 1000
x = -1000
env = WalkerEnv({'N': 1, 'vis': VISUALIZE, "track": 0, "reward_fcn": walker_reward})
obs = env.vector_reset()  # Observation vector is of shape (N, 29)
POLICY_PATH = "walker_policy.pth"
policy = WalkerPolicy(load_weights=True)
for i in range(1000):
    obs = torch.tensor(obs[0], dtype=torch.float32).unsqueeze(0)
    # THIS COULD BE USEFUL: a = np.random.randn(1, 8) * 4 - 2  # Random actions with standard deviation of 2 and mean of 0
    obs, reward = env.vector_step(policy.determine_actions(obs))
    x = max(x, obs[0][0])
env.close()
print(f"After {T} steps, the maximum x value reached was {x}")

Environment ready


  self.load_state_dict(torch.load(path))


After 1000 steps, the maximum x value reached was 0.0022255179937928915
