In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import torch
from torch import nn 
import torch.optim as optim
from torch.distributions import Normal

# Teaching a quadruped to walk

Time to try out the learning algorithms that you just implemented on a more difficult problem. The WalkerEnv implements a quadruped robot kind-of thing, see for yourself. The goal is to move in the $x$ direction as fast and as far as possible.

Your goal is to implement a class `WalkerPolicy` with function `determine_actions()` just like the StochasticPolicy we used earlier to control the pendulum. Below is a template of this class, but feel free to alter it however you want. The only important thing is the `determine_actions()` function!

After you implement it, copy `WalkerPolicy` into a separate file `WalkerPolicy.py` that you will upload to BRUTE together with the (optional) learned weights in a zip file. How the policy is implemented is up to you! You are constrained to only the libraries we used so far though, such as torch, numpy etc..

You will get some free points just for uploading a working policy (irrelevant of the performance). Further 2 points will be awarded for successfully traversing a small distance in the x direction.


# Hints

There is no single easy way of doing this, but here are some suggestions on what you could try to improve your policy:

1. This problem is much more difficult, than balancing a pendulum. It is a good idea to use a bit larger network than for the pendulum policy.

2. You can also try to use a different optimizer, such as Adam and play with the hyperparameters.

3. Using a neural network to compute the normal distribution scale $\sigma$ can lead to too much randomness in the actions (i.e. exploration). You can use a fixed $\sigma$ instead, or replace it with a learnable `torch.Parameter` initialized to some small constant. Make sure, you run it through an exponential, or softplus function to ensure $\sigma$ is positive.

4. The exploration can also be reduced by penalizing the variance of the action distribution in an additional loss term.

5. If you see some undesirable behaviour, you can tweak the reward function to penalize it. Even though the $x$ distance is all we care about, adding extra terms to the reward can help guide the learning process (This is known as reward shaping). Simply define a reward function mapping the state $s_{t+1}$ and action $a_t$ to a scalar reward $r_t$ and put it in the config dictionary under the key `'reward_fcn'`. See the `WalkerEnv` class for the implementation of the default reward.

6. Using the normal distribution on a bounded action space can lead to certain problems caused by action clipping. This can be mitigated by using a different distribution, such as the Beta distribution. See the `torch.distributions.beta` module for more information. (Note that Beta distribution is defined on the interval [0,1] and works better with parameters $\alpha,\beta \geq 1$.)


In [2]:
# If you cannot run with the visualization, you can set this to False
VISUALIZE = True

In [3]:
from environment.WalkerEnv import WalkerEnv
from WalkerPolicy import WalkerPolicy

In [4]:
def sample_trajectories(env, pi, T):
    """given an environment env, a stochastic policy pi and number of timesteps T, interact with the environment for T steps 
    using actions sampled from policy. Return torch tensors of collected states, actions and rewards"""
    states = np.zeros((T + 1, N, env.num_states), dtype=float)  # states from s(0) to s(T+1)
    actions = np.zeros((T, N, env.num_actions), dtype=float)  # actions from a(0) to a(T)
    rewards = np.zeros((T, N), dtype=float)  # rewards from r(0) to r(T)

    s = env.vector_reset()
    states[0] = s
    for t in range(T):
        a = pi.sample_actions(torch.tensor(states[t]).float())  # policy needs float torch tensor (N, state_dim)
        s_next, r = env.vector_step(np.array(a))  # env needs numpy array of (Nx1)
        states[t + 1], actions[t], rewards[t] = s_next, a, r

    tensor_s = torch.tensor(states).float()  # (T+1, N, state_dim)  care for the extra timestep at the end!
    tensor_a = torch.tensor(actions).float()  # (T, N, 1)
    tensor_r = torch.tensor(rewards).float()  # (T, N)

    return tensor_s, tensor_a, tensor_r

In [5]:
from solution import discount_cum_sum

def compute_advantage_estimates(tensor_r, values, gamma, bootstrap=False):
    """given reward tensor (T, N), value estimates tensor (T+1, N) and gamma scalar"""
    if bootstrap:  # use last value estimates as a return estimate
        terminal_value_estimates = values[-1].unsqueeze(0)  # values of the last states (1, N)
        rs_v = torch.cat((tensor_r, terminal_value_estimates), dim=0)
        value_targets = discount_cum_sum(rs_v, gamma)[:-1]
    else:
        value_targets = discount_cum_sum(tensor_r, gamma)
    advantages = value_targets - values[:-1]
    return value_targets, advantages


def compute_gae(tensor_r, values, gamma, lambda_):
    """generalized advantage estimation (GAE) implementation"""
    delta_t = tensor_r + gamma * values[1:] - values[:-1]
    advantages = discount_cum_sum(delta_t, gamma * lambda_)
    value_targets = advantages + values[:-1]
    return value_targets, advantages

In [6]:
def test_policy(pi, config, T=128, deterministic=True):
    test_env = WalkerEnv(config)
    mean_reward = 0
    
    s = test_env.vector_reset()
    x = s[0, 0]
    for i in range(T):
        with torch.no_grad():
            if deterministic:
                actions = pi.determine_actions(torch.tensor(s).float()) 
            else:
                actions = pi.sample_actions(torch.tensor(s).float()) 
        s, r = test_env.vector_step(actions.numpy())
        x = max(x, s[0, 0])
        mean_reward += sum(r) / (T * config['N'])

    print(f"Max x: {x}")
    test_env.close()
    return mean_reward

In [56]:
def walker_reward(state, action):
    pos = state[:15]  # first 15 elements of state vector are generalized coordinates [xyz, quat, joint_angles]
    vel = state[15:]  # last 14 elements of state vector are generalized velocities [xyz_vel, omega, joint_velocities]
    return vel[0]*1.5  # return the x velocity as the reward by default


In [57]:
from solution import ppo_loss, value_loss
import torch
from utils.plotting import plot_training
import numpy as np

N = 128
base_config = {
    "N": N,
    "vis": False,
    "track": 0,
    "reward_fcn": walker_reward
}
torch.manual_seed(42)

# training parameters

T = 256
epochs = 500
lr = 0.001
gamma = 0.9
epsilon = 0.3

sgd_iters = 5

# policy, environment and optimizer
pi = WalkerPolicy(state_dim=29, action_dim=8)
train_env = WalkerEnv(base_config)
optim = torch.optim.Adam(pi.parameters(), lr=lr)

mean_rewards, p_losses, v_losses = np.zeros(epochs), np.zeros(epochs), np.zeros(epochs)  # for logging mean rewards over epochs

Environment ready


In [None]:
epochs = 400
for epoch in range(epochs):
    tensor_s, tensor_a, tensor_r = sample_trajectories(train_env, pi, T)  # collect trajectories using current policy

    # tensor_s = torch.tensor(tensor_s).float()  # convert numpy array to PyTorch tensor

    with torch.no_grad():  # compute the old probabilities
        logp_old = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))

    for i in range(sgd_iters):  # we can even do multiple gradient steps
        values = pi.value_estimates(tensor_s)  # estimate value function for all states
        logp = pi.log_prob(tensor_a, tensor_s[:T]).squeeze(2)  # compute log(pi(a_t | s_t))

        with torch.no_grad():  # no need for gradients when computing the advantages and value targets
            value_targets, advantage_estimates = compute_advantage_estimates(tensor_r, values, gamma, bootstrap=True)
            # value_targets, advantage_estimates = compute_gae(tensor_r, values, gamma, lambda_=0.97)
            advantage_estimates = (advantage_estimates - advantage_estimates.mean()) / advantage_estimates.std()  # normalize advantages

        L_v = value_loss(values[:T], value_targets)  # add the value loss

        p_ratios = torch.exp(logp - logp_old)  # compute the ratios r_\theta(a_t | s_t)
        L_ppo = ppo_loss(p_ratios, advantage_estimates, epsilon=epsilon)  # compute the policy gradient loss
        total_loss = L_v + L_ppo

        optim.zero_grad()
        total_loss.backward()  # backprop and gradient step
        optim.step()

    if epoch % 10 == 0:
        print('Epoch %d, mean reward: %.3f, value loss: %.3f' % (epoch, tensor_r.mean(), L_v.item()))
    mean_rewards[epoch] = tensor_r.mean()
    v_losses[epoch] = L_v.item()
    p_losses[epoch] = L_ppo.item()

# train_env.close()

plot_training(mean_rewards, p_losses, v_losses)

Epoch 0, mean reward: 0.038, value loss: 13.116
Epoch 10, mean reward: 0.091, value loss: 14.442
Epoch 20, mean reward: 0.137, value loss: 10.092
Epoch 30, mean reward: 0.195, value loss: 9.832
Epoch 40, mean reward: 0.282, value loss: 10.139
Epoch 50, mean reward: 0.312, value loss: 8.536
Epoch 60, mean reward: 0.319, value loss: 6.460
Epoch 70, mean reward: 0.379, value loss: 6.812
Epoch 80, mean reward: 0.395, value loss: 4.941


In [55]:
config = {
    "N": 1,
    "vis": True,
    "track": 0,
    "reward_fcn": walker_reward
}
test_policy(pi, config, 512)
# 5.2

Environment ready
Max x: 1.1363857984542847


0.9819980405482056

In [46]:
# pi.save_weights("walker_policy42_7.pth")
# pi.load_state_dict(torch.load("walker_policy27_3.pth"))