# Make Bipedal Robot Walk

This is the midterm assignment for the Move37 course. 

The Goal is to make a 2D robot learn how to walk. That's a simple task, since we're basically given the code in the lectures.

I'm making two substantial changes to the code:
- adding a tanh activation (as this will naturally squeeze actions into the desired [-1, 1] range.
- adding learning rate annealing.

I also change the code, making the training part (at least to my eye) more modular and extendable.

## Setup

In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [2]:
import numpy as np
import gym

from operator import itemgetter

from gym import wrappers

np.random.seed(1)

In [3]:
ENV = gym.make('BipedalWalker-v2')
INIT_POP = 200
KEEP_BEST = 15
KEEP_NONBEST = 5
NUM_CHILDREN = 50
MUTATION_SIZE = 0.2
ENV.env.seed(1)

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


[1]

In [4]:
print(ENV.observation_space, ENV.action_space)
print(min(ENV.observation_space.low), max(ENV.observation_space.high))
print(ENV.action_space.low, ENV.action_space.high)

Box(24,) Box(4,)
-inf inf
[-1. -1. -1. -1.] [1. 1. 1. 1.]


In [5]:
NS = ENV.observation_space.shape[0]
NA = ENV.action_space.shape[0] 

In [6]:
class Base_Policy():
    
    '''Base class. Defines policy by a weight matrix W.'''
    
    def __init__(self, env = ENV, clip_reward = True):
        self.env = env
        self.nS = env.observation_space.shape[0]  # Num of states
        self.nA = env.action_space.shape[0]  # Num of actions
        self.W = np.zeros((self.nA, self.nS))  # Action dim is 0 to avoid transposes later
        self.n = 0  # count of how many times we have played
        self.clip_reward = clip_reward  # if true, we squeeze rewards into [-1, 1]
    
    def pi(self, state, W = None):
        '''Out policy. Returns action from state.
        
        Note: we can optionally supply a weight matrix W, if we do so, we play
        policy according to this supplied matrix.'''
        
        if W is None: W = self.W 
        return np.tanh(W @ state)
    
    def playPol(self, W = None, save_frames = False, save_ext = 'tmp/experiment0/'):
        '''Plays a game from start to finish. 
        
        Note: we can optionally supply a weight matrix W, if we do so, we play
        policy according to this supplied matrix.'''
        
        reward = 0
        
        if save_frames:
            env = wrappers.Monitor(self.env, save_ext, force=True)
        else:
            env = self.env
            
        self.s = env.reset()
            
        for i in range(2000):
            if save_frames: env.render()
            self.call_before_action()
            action = self.pi(self.s, W)
            self.s, self.r, self.done, _ = env.step(action)
            if self.clip_reward:
                self.r = max(min(self.r, 1), -1)
            reward += self.r
            self.n += 1
            if self.done: break
        if save_frames: 
            env.close(); self.env.close()
        return reward
    
    def call_before_action(self):
        '''Optional method that gets executed every round before policy.'''
        pass

## Normalize State

The previous agent didn't normalize its state (the inputs for an action). We add normalization here.

In [7]:
class norm_Policy(Base_Policy):
    '''Same policy as Base, except that we normalize inputs to mean 0, and std 1.'''
    
    def __init__(self, clip_reward = True):
        super().__init__(clip_reward = clip_reward)
        self.inp_mean, self.inp_var = np.zeros(self.nS), np.ones(self.nS)
        self.mean_diff = np.zeros(self.nS)
        
    def call_before_action(self):
        '''This function is called before calling our policy to get the next action.'''
        self.update_stats()  # Updates means and std with current state value
        if self.n >= 2: 
            self.normalize_state()  # Normalizes inputs

    def update_stats(self):
        '''Update our mean and std calculations'''
        last_mean = self.inp_mean.copy()
        self.inp_mean += (self.s - self.inp_mean) / max(self.n, 1)
        self.mean_diff += (self.s - last_mean) * (self.s - self.inp_mean)
        self.inp_var = np.clip((self.mean_diff / max(self.n, 1)), a_min = 1e-2, a_max = None)
        
    def normalize_state(self):
        '''Normalize state that gets fed into our policy.'''
        self.s = (self.s - self.inp_mean) / np.sqrt(self.inp_var)

In [8]:
norm_pol = norm_Policy()  

## Define Fitness Function

We'll search for the optimal parameters of the weight matrices. For that we need a couple of ingredients:
1. A fitness function that scores a particular weight matrix.
1. An initial population of weight matrices.
1. A selection mechanism.
1. Cross-over between weight matrices (a method to combine them).
1. Mutation.

In [9]:
def getFitness(W, pol = norm_pol):
    return pol.playPol(W)

## Initial Population

In [10]:
init_pop = [np.random.rand(NA, NS) for _ in range(INIT_POP)]

## Selection Mechanism

In [11]:
def selection(pop, keep_best = KEEP_BEST, keep_nonbest = KEEP_NONBEST):
    
    fit = np.array([getFitness(w) for w in pop])
    avg, std, maxfit = (np.mean(fit), np.std(fit), np.max(fit))
    
    sort_idxs = np.argsort(-fit)
    best_idxs = list(sort_idxs[:keep_best])
    nonbest_idxs = list(np.random.choice(len(pop), keep_nonbest, False))
    
    select_idxs = best_idxs+nonbest_idxs
    
    if len(select_idxs) == 1:
        return pop[select_idxs[0]], avg, std, maxfit
    
    keep = list(itemgetter(*(select_idxs))(pop))
        
    return keep, avg, std, maxfit    

## Cross-Over

In [12]:
def cross_over(A, B):
    
    assert A.shape == B.shape
    
    mask = np.random.randint(0, 2, size = A.shape)
    
    return mask*A + (1-mask)*B
    

## Mutation

In [13]:
def mutate(W, mut_size = MUTATION_SIZE):
    delta = np.random.randn(*W.shape)*2-1
    delta = delta * mut_size
    return W + delta

## 1 Step of Evolution

In [14]:
def evolve_1_step(pop, num_children = NUM_CHILDREN):
    
    keep, avg, std, maxfit =  selection(pop)
    
    children = [cross_over(*itemgetter(*np.random.choice(len(keep), 2, False))(keep)) 
                for _ in range(num_children)]
    
    children = [mutate(child) for child in children]
    
    return keep+children, avg, std, maxfit    

## Evolution over multiple generations

In [15]:
def evolve(pop, gen):
    for i in range(gen):
        pop, avg, std, maxfit = evolve_1_step(pop)
        if (i % 10 == 0) or (i == gen-1):
            print(f'Generation {i}, average fitnes: {avg}, std: {std}, max: {maxfit}')
    return pop

In [16]:
pop = evolve(init_pop, 200)

Generation 0, average fitnes: -15.339396146356773, std: 23.852071868820914, max: -4.621529529635325
Generation 10, average fitnes: -19.041435213950574, std: 33.44284080787119, max: 2.0912835829239538
Generation 20, average fitnes: -10.70105432369192, std: 11.670403520418253, max: 2.3312709762315507
Generation 30, average fitnes: -7.204469925928542, std: 24.34177086501751, max: 53.99564512356741
Generation 40, average fitnes: 29.257524880620974, std: 28.76081105636012, max: 87.34558406771237
Generation 50, average fitnes: 58.747103841192306, std: 42.43923720057645, max: 118.78397901339953
Generation 60, average fitnes: 68.74797084057181, std: 40.87895227923038, max: 134.99773015545696
Generation 70, average fitnes: 87.8218701216341, std: 38.90395320725124, max: 145.93027151549222
Generation 80, average fitnes: 84.10837273155826, std: 48.30605923592216, max: 146.97929040220083
Generation 90, average fitnes: 93.55097008263216, std: 45.10682078062708, max: 151.96563144056424
Generation 100

## Get best weights from population

In [17]:
def getBest(pop):
    pop, _, _, _ = selection(pop, keep_best = 10, keep_nonbest = 0)
    best, _, _, _ = selection(pop, keep_best = 1, keep_nonbest = 0)
    return best

In [18]:
norm_pol.W = getBest(pop)

In [19]:
norm_pol.playPol(save_frames=True, save_ext = 'bipedal_rec')

128.90901806581113

In [20]:
%%HTML
<video width="320" height="240" controls>
  <source src="bipedal_rec/openaigym.video.0.4333.video000000.mp4" type="video/mp4">
</video>