In [1]:
import os
import numpy as np

In [2]:
import gym
from gym import wrappers

ModuleNotFoundError: No module named 'gym'

In [None]:
import pybullet_envs

In [None]:
# Setting the Hyper Parameters

class Hp():
    
    def __init__(self):

        self.nb_steps = 1000 # no of training loops we are going to have in the end or no of times
        # we are going to update our model.
        self.episode_length = 1000 # maximum length of an episode, i.e maximum time AI will walk on
        # the field.
        self.learning_rate = 0.02 # learning rate to control how fast AI is learning
        self.nb_directions = 16 # no of pertubations to be applied on each of these weights. More
        # directions to explore, more the reward, but will take more training time.
        self.nb_best_directions = 16 # choosing the best no of directions
        assert self.nb_best_directions <= self.nb_directions # We ensure that no of best directions
        # is less than total no of directions
        self.noise = 0.03 # sigma of the Gaussian Distribution which will be used to sample the 
        # pertubations
        self.seed = 1 # seed to fix the current configuration of environment
        self.env_name = 'HalfCheetahBulletEnv-v0' # setting the environment name
        

Here states are some vectors describing perfecly at each time what is happening at time t. So each
So each state is represented by a vector, which will contain the coordinates of different points of
virtual robots, and each vector describes perfectly what's happening in the environment, so that we can draw the picture by only looking at the environment. And that will be the input for the perceprton.
<br>
We need to normalise the states to increase the sensitivity of the neural network.


In [None]:
# Normalizing the states

class Normalizer():
    
    def __init__(self, nb_inputs):
        # inputs to the perceptron
        self.n = np.zeros(nb_inputs) # counter which tells how many states we have encountered
        self.mean = np.zeros(nb_inputs) # mean of values of each input vector
        self.mean_diff = np.zeros(nb_inputs) 
        self.var = np.zeros(nb_inputs)
    
    # Update mean and variance on seeing a new state
    
    def observe(self, x):
        self.n += 1. # since  we update on seeing a new state, n increases everytime by 1
        # Mean Computation. It will be not be simple mean calculation. It will be an online mean. 
        # Each time you have a previous mean, and when a new value comes, we need to update the 
        # mean.
        last_mean = self.mean.copy()
        self.mean += (x - self.mean) / self.n
        self.mean_diff += (x - last_mean) * (x - self.mean)
        self.var = (self.mean_diff / self.n).clip(min = 1e-2) # To make sure variance is never 0
    
    def normalize(self, inputs):
        obs_mean = self.mean
        obs_std = np.sqrt(self.var)
        return (inputs - obs_mean) / obs_std


Our AI is a policy, which is a function that taking inputs, the states of environments, and returning some actions to play, in order to work. This algo basically focusses on exploration of space of pulses. We will be exploring a lot of policies and converging to a one, that will return the best actions to work. 
<br>
We will build the policy and give it tools to update, during the training by applying some pertubations and updating weights of the policy, in the direction of rewards. 

In [None]:
# Building the AI

class Policy():
    
    def __init__(self, input_size, output_size):
        self.theta = np.zeros((output_size, input_size))
    
    def evaluate(self, input, delta = None, direction = None):
        if direction is None:
            return self.theta.dot(input)
        elif direction == "positive":
            return (self.theta + hp.noise*delta).dot(input)
        else:
            return (self.theta - hp.noise*delta).dot(input)
    
    def sample_deltas(self):
        # We need to return pertubations, because there are 16 directions.
        # These are matrices following normal distribution having mean zero and variance 1
        return [np.random.randn(*self.theta.shape) for _ in range(hp.nb_directions)]
    
    def update(self, rollouts, sigma_r):
        step = np.zeros(self.theta.shape)
        for r_pos, r_neg, d in rollouts:
            step += (r_pos - r_neg) * d
        self.theta += hp.learning_rate / (hp.nb_best_directions * sigma_r) * step


In [None]:
# Exploring the policy on one specific direction and over one episode

def explore(env, normalizer, policy, direction = None, delta = None):
    state = env.reset() # Rsetting the environment
    done = False # To mark start and end of an episode.
    num_plays = 0.
    sum_rewards = 0
    while not done and num_plays < hp.episode_length:
        # Normalization
        normalizer.observe(state)
        state = normalizer.normalize(state)
        # Feeding data to the perceptron
        action = policy.evaluate(state, delta, direction)
        state, reward, done, _ = env.step(action)
        # Not to be biased by super high reward or a super low reward.
        reward = max(min(reward, 1), -1) # Classic trick in Reinforcemet Learning.
        # We enforce super high and super low rewards to be +1 and -1 respectively.
        sum_rewards += reward
        num_plays += 1
    return sum_rewards

In [None]:
def train(env, policy, normalizer, hp):
    
    for step in range(hp.nb_steps):
        
        # Initializing the perturbations deltas and the positive/negative rewards
        deltas = policy.sample_deltas()
        positive_rewards = [0] * hp.nb_directions
        negative_rewards = [0] * hp.nb_directions
        
        # Getting the positive rewards in the positive directions
        for k in range(hp.nb_directions):
            positive_rewards[k] = explore(env, normalizer, policy, direction = "positive", delta = deltas[k])
            # delta[k] represents the pertubation applied in the kth direction
        # Getting the negative rewards in the negative/opposite directions
        for k in range(hp.nb_directions):
            negative_rewards[k] = explore(env, normalizer, policy, direction = "negative", delta = deltas[k])
        
        # Gathering all the positive/negative rewards to compute the standard deviation of these rewards
        all_rewards = np.array(positive_rewards + negative_rewards)
        sigma_r = all_rewards.std()
        
        # Sorting the rollouts by the max(r_pos, r_neg) and selecting the best directions
        
        # For all 16 directions, we take maximum of positive and negative rewards.
        scores = {k:max(r_pos, r_neg) for k,(r_pos,r_neg) in enumerate(zip(positive_rewards, negative_rewards))}
        order = sorted(scores.keys(), key = lambda x:scores[x], reverse = True)[:hp.nb_best_directions]
        rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]
        
        # Updating our policy
        policy.update(rollouts, sigma_r)
        
        # Printing the final reward of the policy after the update
        reward_evaluation = explore(env, normalizer, policy)
        print('Step:', step, 'Reward:', reward_evaluation)

In [None]:
# Running the main code

def mkdir(base, name):
    path = os.path.join(base, name)
    if not os.path.exists(path):
        os.makedirs(path)
    return path
work_dir = mkdir('exp', 'brs')
monitor_dir = mkdir(work_dir, 'monitor')

In [None]:
hp = Hp()
np.random.seed(hp.seed)
env = gym.make(hp.env_name)
env = wrappers.Monitor(env, monitor_dir, force = True)
nb_inputs = env.observation_space.shape[0]
nb_outputs = env.action_space.shape[0]
policy = Policy(nb_inputs, nb_outputs)
normalizer = Normalizer(nb_inputs)
train(env, policy, normalizer, hp)