evolution-strategies

Copyright (c) 2019 Patrick Deubel

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

evolution-strategies includes:

evolution-strategies-starter
Copyright (c) 2016 OpenAI (http://openai.com)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

# Using evolution strategies to train Roboschool Environments

## Introduction

This Jupyter Notebook is based on the [paper](https://arxiv.org/abs/1703.03864), [blog article](https://openai.com/blog/evolution-strategies/) and [implementation](https://github.com/openai/evolution-strategies-starter) of OpenAI on the topic of using an evolution strategy algorithm for a typical reinforcement learning task. 

My implementation summarizes their implementation, by simplifying, refactoring and organizing the code into this Jupyter notebook which can be used to test the algorithm. One can tweak the hyperparameters, change the environment which shall be trained or even expand the implementation to support for example Atari environments.

I recommend reading the paper or at least the article before trying out the notebook. Also depending on the environment the training can be very computationally intense (for example training the Humanoid), so if you want to try out the harder ones I recommend using a highly parallelizable machine, i.e. a machine with a high number of cores/threads which can use multiple processes simultaneously.

## Algorithm overview

This section gives a brief overview over the algorithm. First of all we need to define what this implementation is going to do. The Roboschool is a group of environments in the [OpenAi Gym](https://gym.openai.com/), a program to test the behavior of machine learning algorithms on _real world_ problems. In our case, we want to train different robotic environments using an evolutionary algorithm which belongs to the class of natural evolution strategies. We therefore define a neural net with a configurable number of hidden layers, where the input dimension equals the observation space of the environment and the dimension of the output layer equals the dimension of the action space of the environment. This neural net is also called policy or in this implementation also referred to as a model. Therefore we train our policy to output the best possible action sequence given an observation sequence. Now, how do we train this policy? Training an evolutionary strategy consists of a cycle which is repeated over and over. First, an initial weight vector is randomly generated. In our context this weight vector is equal to the weights of our policy. Then we perturb the vector with gaussian noise. The number of perturbations is called the population size. What we now have is a population of slightly different weight vectors compared with the weight vector we started. Each one of these vectors will then be evaluated by first updating our policy with the weights and then run the environment using the policy. When this is done for the whole population, we calculate a gradient ascent step in the direction of steepest ascent. In our case, where we are dealing with natural evolution strategies, we calculate the step with the natural gradient. This is done by approximating this gradient using Monte Carlo estimates.

So lets say we have our initial weight vector $\theta$, a population size $n$, random perturbations $\epsilon_i$, $0 \leq i \leq n$, learning rate $\alpha$, noise standard deviation $\sigma$ and a fitness Function $F(\cdot)$. We then calculate the resulting weight vector like this:
$\theta_{t+1} = \theta_{t} + \alpha \frac{1}{n \sigma} \sum \limits_{i=0}^n F(\theta_{t} + \epsilon_i)$


This gives us the weight vector for the next cycle which we will then, again, perturb a number of times (depending on the population size). A cycle in the context of evolutionary strategies is called a generation.

One might ask himself now what this fitness function is in the context of robotic simulations. When initializing such an environment one can call the `step` function on the environment with an array in the shape of the action space (in our case this would be the output of the policy). The environment then evaluates the provided action based on the current observation and other parameters in the environment and outputs a reward. This is done for either a fixed number of timesteps (some environments have a maximum of timesteps defined) or stops, when the action resulted in a state where the environment is `done`, for example when the `Humanoid` environment falls over and touches the surface. The rewards get summed up for every timestep which forms the reward for one action.

## Setup

Before starting any computation we need to configure the training and define some methods and objects we will use later on.


### Imports

Note that TensorFlow does not get imported here. We will only import it inside of a function which runs in another process. This is due to the fact that when importing TensorFlow a session is created in the background which will interefere with our models which we run in subprocesses. When importing the package only inside a function and then running these functions inside of subprocesses, every process has its own TensorFlow session and they therefore don't interfere with each other.

In [1]:
import csv
import errno
import json
import os
import time

from collections import namedtuple, OrderedDict

import ctypes
import multiprocessing
import numpy as np

import gym, roboschool

import pybullet, pybullet_envs

### Directory for storing the training

For evaluating the trained data we need to define a directory where we want to store the trained policies, as well as the log file to record the results of every generation.

Depending on your disk space you may not want to save every model, but for an indepth evaluation this is necessary. During training there will be so called _evaluation runs_ which will not add noise but test the currently trained policy to give insight on training. But since it relies on probability the number of evaluation runs will not be equal through generations. An additional Jupyter Notebook with the prefix **-visualization** can be used after training to load all saved weight files and evaluate them a given number of times.

If you want to change the location change the variable `main_directory` to a directory where the user which runs this notebook has write permissions. If it does not exist the program will create it. The default location is a `training_runs` folder which is created in the working directory.

When starting the master a subfolder is created inside `main_directory` where the the current progress of a training is stored.

In [2]:
def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise

main_directory = "/home/jovyan/work/evolution-strategies/training_runs/".format(os.getpid())
try:
    mkdir_p(main_directory)
except PermissionError:
    print("The user running this notebook has no permission to create this folder. Please provide a path to a folder"
         + " with write permissions.")

### Configuration, Task and Result Classes

Using `namedtuple` from the `collections` package from python allows us to quickly create small classes with different attributes, which is ideal for quickly accessing different attributes during training as well as saving the configurations to a file.

Each attribute will be explained in a bit, where objects of these classes are created and their parameters are set.

In [3]:
Config = namedtuple('Config', [
    'env_id',
    'population_size',
    'timesteps_per_gen',
    'num_workers',
    'learning_rate',
    'noise_stdev',
    'snapshot_freq',
    'return_proc_mode',
    'calc_obstat_prob',
    'l2coeff',
    'eval_prob'
])

Optimizations = namedtuple('Optimizations', [
    'mirrored_sampling',
    'fitness_shaping',
    'weight_decay',
    'discretize_actions',
    'gradient_optimizer',
    'observation_normalization',
    'divide_by_stdev'
])

ModelStructure = namedtuple('ModelStructure', [
    'ac_noise_std',
    'ac_bins',
    'hidden_dims',
    'nonlin_type',
    'optimizer',
    'optimizer_args'
])

Task = namedtuple('Task', [
    'theta', 'ob_mean', 'ob_std', 'task_id'])

Result = namedtuple('Result', [
    'noise_inds','returns', 'signreturns', 'lengths',
    'eval_return', 'eval_length',
    'ob_sum', 'ob_sumsq', 'ob_count',
    'task_id',
    'times_predict'
])

### Configuration

#### Optimizations object

First we start with the optimizations for the training, since other parameters are only used when the respective optimization is activated.

All values can only be either `True` or `False`. The term _activated_ means the value is set to `True` in this context.

When `mirrored_sampling` is activated, sampled noise gets used twice: One time it will get added to the parameter vector and the result gets evaluated and the other time it gets subtracted from the parameter vector and the result will be evaluated.

`fitness_shaping` processes the rewards by applying a rank transformation.

`weight_decay` slighty changes the parameter vector.

`discretize_actions` can be used to bin the actions. This means that you can provide a number of uniformely shaped bins in which the output of the model will be put. For some environments this can encourage exploration behavior.

`gradient_optimizer` will use a gradient optimizer for the computed gradient, for example the `Adam` optimizer.

`observation_normalization` When turned on, before an observation gets feeded into the neural network it will be subtracted by the observation mean and divided with the observation standard deviation. The observation mean and standard deviation get constantly updated on training based on the configured probability.

In [4]:
optimizations = Optimizations(
    mirrored_sampling=True,
    fitness_shaping=True,
    weight_decay=True,
    discretize_actions=True,
    gradient_optimizer=True,
    observation_normalization=True,
    divide_by_stdev=False
)

#### Config object

The config object will serve us as a general configuration for the training.

First of all it defines the `env_id` which has to be a valid ID for a `Roboschool` environment, for example `RoboschoolAnt-v1`. A complete list can be found [here](https://github.com/openai/roboschool/blob/master/roboschool/__init__.py).

`population_size` defines the number of perturbations per generation.

`timesteps_per_gen` defines the minimum number of timesteps which shall be processed during a generation.

`num_workers` defines the amount of parallel processes to be created during calculation and must be larger than 0. By default this value is the output of `os.cpu_count()` which allows the program to use the maximum amount of computational power in terms of the provided hardware.

`learning_rate` defines how much the estimated gradient influences the next generation and must be larger than 0 for the training to work. If the gradient optimizer is activated this value is not used. Instead, `step_size` in the model structure is the corresponding value.

`noise_stdev` is the standard devation for the noise and should be larger than 0. It cannot be equal to 0 since if it would be at some point there could then be a division by zero. Other than that it would not benefit training.

`snapshot_freq` describes the frequency in which generations shall be saved to a `.h5` file. For example a snapshot frequency of 5 would save every fifth generation to a file. Setting it to 0 disables snapshotting.

`return_proc_mode` translates to return processing mode and describes how the calculated rewards for a generation shall be processed. By default this is `centered_rank` which will calculate the ranks of the rewards. This option is only used when also activating the `fitness_shaping` optimization. One can choose between the three strings `RETURN_PROC_MODE_CR`, `RETURN_PROC_MODE_SIGN` and `RETURN_PROC_MODE_CR_SIGN`.

`calc_obstat_prob` is the probability of saving the observations during a rollout (evaluating the fitness of a policy) and updating the observation mean and standard deviation. These values are used to normalize the input of a model which helps a neural network to generalize faster. The parameter is only used in combination with the `observation_normalization` optimization. Must be greater than 0 when using the optimization since otherwise it would waste performance while not normalizing.

`l2coeff` is the coefficient which is used for weight decay to deform the parameter vector.

`eval_prob` is the probability of inserting an evaluation run. This is useful for training to quickly monitor the reward mean, reward standard deviation and length mean of the current generation. The value must be greater or equal 0 (equal 0 turns of the evaluation runs).

Below the configuration we create an environment with the configured ID and check if it is valid. If it is, `ob_space` and `ac_space` are created. They are used inside `create_model`, which we will look at in a bit, because they equal the
input and output dimension of the model. So if you decide to change the configuration, they need to be changed as well. Therefore we define them here. Generally speaking as long as one does not rerun a cell, old values are used.

In [5]:
RETURN_PROC_MODE_CR = 'centered_rank'
RETURN_PROC_MODE_SIGN = 'sign'
RETURN_PROC_MODE_CR_SIGN = 'centered_sign_rank'

config = Config(
    env_id="RoboschoolAnt-v1",
    population_size=8,
    timesteps_per_gen=10000,
    num_workers=os.cpu_count(),
    learning_rate=0.001,
    noise_stdev=0.02,
    snapshot_freq=1,
    return_proc_mode=RETURN_PROC_MODE_CR,
    calc_obstat_prob=0.01,
    l2coeff=0.005,
    eval_prob=0.003
)

try:
    env = gym.make(config.env_id)
except:
    print("Please provide a valid environment ID for the OpenAI Gym. {} is not valid.".format(config.env_id))

# These are used inside create_model for the input and output dimensions
ob_space = env.observation_space
ac_space = env.action_space

assert config.population_size > 0
assert config.num_workers > 0
assert config.learning_rate > 0
assert config.noise_stdev != 0
assert config.eval_prob >= 0

if (config.return_proc_mode != RETURN_PROC_MODE_CR
    and config.return_proc_mode != RETURN_PROC_MODE_SIGN
    and config.return_proc_mode != RETURN_PROC_MODE_CR_SIGN):
    
    raise NotImplementedError
    
if optimizations.observation_normalization:
    assert config.calc_obstat_prob > 0

if optimizations.gradient_optimizer:
    assert config.l2coeff > 0

#### ModelStructure object

The ModelStructure object defines the overall structure of the neural network.

`ac_noise_std` is the standard deviation for the noise which is added during training. Adding noise shall generalize the training. It must be greater than 0, or equal to 0 for no noise.

When using the `discretize_actions` optimizations, `ac_bins` defines into how much uniformely spaced bins the actions shall be put. For example if the possible action values range from -1 to 1 and one defines 5 action bins the model will output values from $\{-1, -0.5, 0, 0.5, 1\}$. If you use the optimization the number of bins must be greater than 0.

`hidden_dims` define the number of hidden layers and their dimensions. It must be a list of positive Integers.

`nonlin_type` defines the activation function for the hidden layers. Can be `tanh`, `relu`, `lrelu` or `elu` for the hyperbolic tangent, rectified linear, leaky ReLU and the exponential linear. If something else is defined `tanh` will be picked.

`optimizer` is only used when the `gradient_optimizer` optimization is turned on. It can be `adam` for the Adam optimizer or `sgd` for the SGD Optimizer. Defining anything other will result in an error.

`optimizer_args` is only used when the `gradient_optimizer` optimization is turned on. This will be feeded into the constructor of the optimizer. For the Adam and SGD optimizer one must specify the `stepsize` but can also specify other optimizer specific attributes. Please check the constructor signature for the names. If you specify something else than stepsize be careful, this does not get checked for errors.

In [6]:
OPTIMIZER_ADAM = 'adam'
OPTIMIZER_SGD = 'sgd'

model_structure = ModelStructure(
    ac_noise_std=0.01,
    ac_bins=5,
    hidden_dims=[256, 256],
    nonlin_type='tanh',
    optimizer=OPTIMIZER_ADAM,
    optimizer_args={
        'stepsize': config.learning_rate
    }
)

assert model_structure.ac_noise_std >= 0
assert isinstance(model_structure.hidden_dims, list)
assert all(hd >= 0 for hd in model_structure.hidden_dims)

if optimizations.gradient_optimizer:
    if model_structure.optimizer != OPTIMIZER_ADAM and model_structure.optimizer != OPTIMIZER_SGD:
        raise NotImplementedError
    
    try:
        stepsize = model_structure.optimizer_args['stepsize']
        assert stepsize > 0
    except KeyError:
        print("Please provide the stepsize parameter.")

if optimizations.discretize_actions:
    assert model_structure.ac_bins > 0

#### Task class

During training the master will enqueue a new Task object per generation. The workers will then take the latest task, compute it and push a result object on a queue. In the following table the attributes of a task object are explained in depth.

| Attribute | Explanation |
| :---------|:------------|
| `theta`   | The one-dimensional parameter vector of this task, i.e. the current generation|
| `ob_mean` | When observation normalization is used this is the current mean of the observed observation |
| `ob_std`  | When observation normalization is used this is the current standard deviation of the observed observation|
| `task_id` | The ID of this Task, which equals the generation number |



#### Result class

An object of the Result class will define a computed task by the workers. This can either be an evaluation task, where no noise gets added and the policy will simply be evaluated on the environment or a regular task. This means that the noise gets sampled, added (and subtracted when mirrored sampling is used) and evaluated. The following table gives a more detailed information on each attribute.

| Attribute             | Explanation   |
| :---------------------|:--------------|
| `noise_inds`        | A numpy array with the indices of the used noise|
| `returns`        | A numpy array with the rewards. When mirrored sampling is used this list is two dimensional| |`signreturns` | The sum of the signs of the rewards. When mirrored sampling is used this list is two dimensional| |`lengths` | A numpy array with the sum of the timesteps. When mirrored sampling is used this list is two dimensional|
| `eval_return` | np.nan if this was not an evaluation task, otherwise a numpy array with the reward of the evaluation|
| `eval_length`|np.nan if this was not an evaluation task, otherwise a numpy array with the timesteps of the evaluation| |`ob_sum` | If observation normalization is used this contains the sum of the tracked observations |
| `ob_sumsq` | If observation normalization is used this contains the squared sum of the tracked observations |
| `ob_count` | If observation normalization is used this contains the amount of tracked observations|
| `task_id` | The ID of the task that has been calculated|
| `times_predict` | A list of time measurements, where each value is the time how long a prediciton with the Keras model needs| 

### Saving the configuration

When this method is called the specified configuration gets saved to `save_directory`, so when training is done one can reproduce the training with the exact parameters.

In [7]:
def save_configuration(save_directory):
    with open(os.path.join(save_directory, 'config.json'), 'w', encoding='utf-8') as f:
        chained_dict = OrderedDict([
            ('config', config._asdict()),
            ('model_structure', model_structure._asdict()),
            ('optimizations', optimizations._asdict())])

        json.dump(chained_dict, f, ensure_ascii=False, indent=4)

## Function, Variable and Class definitions



### Using Keras for the models

The original implementation from OpenAI used chained TensorFlow operations to construct the model. This notebook however, uses Keras (which is integrated into TensorFlow) as a high-level API to construct `Model` objects. These objects are, in our case, chained layers which will represent our neural network.

Also, note that all imports of TensorFlow need to be inside function definitions and these functions can only be called inside subprocesses. This is needed since importing TensorFlow automatically starts a background session which would be inherited to then started subprocesses. Therefore each worker would have the same TensorFlow session which would interfere with their respective models. Another noteworthy finding is that when creating Keras models in a loop there will be a memory leak when one does not clear the session. This will be adressed in the `run_worker` method.

In the following cell the method `create_model` creates and returns a Keras model as defined with the configurations. It needs to have the `ob_space` and `ac_space` variable set to the observation space and action space of the used environment in training, because they define the input and output dimension.

After adding an input layer the method will add the custom `ObservationNormalizationLayer` if the `observation_normalization` optimization is turned on. This layer uses the method parameters `ob_mean` and `ob_std` to normalize the input $o$ with this equation $\frac{o - \text{ob_mean}}{\text{ob_std}}$ and clip the values to $[-5, 5]$.

Next, depending on the dimension and number of hidden layers defined in the configuration these hidden layers get added as Dense layers.

As a last step the output layer gets added. If the `discretize_actions` optimization is turned the number of chosen bins will be used to add an extra Dense layer. This Dense layer has an input shape of $\text{num_bins} \cdot \text{adim}$ to enable the custom `DiscretizeActionsLayer` to reshape the input to $[-1, \text{adim}, \text{num_bins}]$. From there it pics the argument with the maximum value from the third dimension. This essentially means that it will pick `adim` maximum values from the bins. As a last step the picked values get transformed into the $[\text{alow}, \text{ahigh}]$ interval, which for the Roboschool environments is typically $[-1, 1]$.

If `discretize_actions` optimization is not used a simple Dense Layer with the action dimension serves as output layer.

All weights use the custom defined `Normc_initializer` to initialize their weights. This is copied from the OpenAI implementation, since initializing them differently can lead to a large minus reward when starting the training. With this custom layer one can specify the standard deviation for the random variables. For all layers `std=1.0` is used, except for the output layer and the layer before the `DiscretizeActionsLayer`, where `std=0.01` is set.
A low standard deviation in the output layer leads to a more stable result.

When `initial_weights` is provided these weights get set as the weights of the model. They need to be in the correct shape for the model.

In [8]:
def create_model(initial_weights=None, model_name="model", ob_mean=None, ob_std=None, model_file_path=None):
       
    import tensorflow as tf
    
    class Normc_initializer(tf.keras.initializers.Initializer):
        def __init__(self, std=1.0):
            self.std=std

        def __call__(self, shape, dtype=None, partition_info=None):
            out = np.random.randn(*shape).astype(np.float32)
            out *= self.std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
            return tf.constant(out)
    
    class ObservationNormalizationLayer(tf.keras.layers.Layer):
        def __init__(self, ob_mean, ob_std, **kwargs):
            self.ob_mean = ob_mean
            self.ob_std = ob_std
            super(ObservationNormalizationLayer, self).__init__(**kwargs)

        def call(self, x):
            return tf.clip_by_value((x - self.ob_mean) / self.ob_std, -5.0, 5.0)
        
        # get_config and from_config need to implemented to be able to serialize the model
        def get_config(self):
            base_config = super(ObservationNormalizationLayer, self).get_config()
            base_config['ob_mean'] = self.ob_mean
            base_config['ob_std'] = self.ob_std
            return base_config
        
        @classmethod
        def from_config(cls, config):
            return cls(**config)
        
    class DiscretizeActionsUniformLayer(tf.keras.layers.Layer):
        def __init__(self, num_ac_bins, adim, ahigh, alow, **kwargs):
            self.num_ac_bins = num_ac_bins
            self.adim = adim
            # ahigh, alow are NumPy arrays when extracting from the environment, but when the model is loaded from a h5
            # File they get initialised as a normal list, where operations like subtraction does not work, thereforce
            # cast them explicitly
            self.ahigh = np.array(ahigh)
            self.alow = np.array(alow)
            super(DiscretizeActionsUniformLayer, self).__init__(**kwargs)

        def call(self, x):            
            # Reshape to [n x i x j] where n is dynamically chosen, i equals action dimension and j equals the number
            # of bins
            scores_nab = tf.reshape(x, [-1, self.adim, self.num_ac_bins])
            # This picks the bin with the greatest value
            a = tf.argmax(scores_nab, 2)
            
            # Then transform the interval from [0, num_ac_bins - 1] to [-1, 1] which equals alow and ahigh
            ac_range_1a = (self.ahigh - self.alow)[None, :]
            return 1. / (self.num_ac_bins - 1.) * tf.keras.backend.cast(a, 'float32') * ac_range_1a + self.alow[None, :]        
        
        # get_config and from_config need to implemented to be able to serialize the model
        def get_config(self):
            base_config = super(DiscretizeActionsUniformLayer, self).get_config()
            base_config['num_ac_bins'] = self.num_ac_bins
            base_config['adim'] = self.adim
            base_config['ahigh'] = self.ahigh
            base_config['alow'] = self.alow
            return base_config
        
        @classmethod
        def from_config(cls, config):
            return cls(**config)
    
    if model_file_path is not None:
        custom_objects = {
            "Normc_initializer" : Normc_initializer, 
            "ObservationNormalizationLayer" : ObservationNormalizationLayer,
            "DiscretizeActionsUniformLayer" : DiscretizeActionsUniformLayer
        }
    
        try:
            model = tf.keras.models.load_model(model_file_path, custom_objects=custom_objects)
        except IOError as e:
            print("Could not load the model provided in {}".format(model_file_path))
        else:
            return model
    
    ac_space = env.action_space
    ob_space = env.observation_space
    
    nonlin = tf.nn.tanh
    
    if model_structure.nonlin_type == 'relu':
        nonlin = tf.nn.relu
    elif model_structure.nonlin_type == 'lrelu':
        nonlin = tf.nn.leaky_relu
    elif model_structure.nonlin_type == 'elu':
        nonlin = tf.nn.leaky_relu

    # Policy network
    input_layer = x = tf.keras.Input(ob_space.shape, dtype=tf.float32)
    
    if ob_mean is not None and ob_std is not None and optimizations.observation_normalization:
        if ob_std.all() != 0:
            x = ObservationNormalizationLayer(ob_mean, ob_std)(x)
                
    for hd in model_structure.hidden_dims:
        x = tf.keras.layers.Dense(
            hd, activation=nonlin,
            kernel_initializer=tf.random_normal_initializer(),#Normc_initializer(std=1.0),
            bias_initializer=tf.initializers.zeros())(x)

    # Action dimension and the lowest and highest possible values for an action
    adim, ahigh, alow = ac_space.shape[0], ac_space.high, ac_space.low        
    
    if optimizations.discretize_actions:
        num_ac_bins = int(model_structure.ac_bins)
        x = tf.keras.layers.Dense(
                        adim * num_ac_bins,
                        kernel_initializer=tf.random_normal_initializer(),#Normc_initializer(std=0.01),
                        bias_initializer=tf.initializers.zeros())(x)
        a = DiscretizeActionsUniformLayer(num_ac_bins, adim, ahigh, alow)(x)
    else:
        a = tf.keras.layers.Dense(
            adim,
            kernel_initializer=tf.random_normal_initializer(),#Normc_initializer(std=0.01),
            bias_initializer=tf.initializers.zeros())(x)
    
    model = tf.keras.Model(inputs=input_layer, outputs=a, name=model_name)
    
    if initial_weights is not None:
        set_from_flat(model, initial_weights)
        
    return model

### Defining the act function

The `act` function gets the current observation of an environment as parameter as well as the model which shalle be used to predict the action based on this observation. Therefore `act` gets called on every timestep in the `rollout` method which will be discussed in a bit. In addition if `random_stream` is provided, which is done by default in training, noise gets added to the predicted training to make the model more robust and generalize better.

In [9]:
def act(ob, model, random_stream=None):
    time_predict_s = time.time()
    action = model.predict_on_batch(ob)
    time_predict_e = time.time() - time_predict_s

    if random_stream is not None and model_structure.ac_noise_std != 0:
        action += random_stream.randn(*action.shape) * model_structure.ac_noise_std
    return action, time_predict_e

### The RunningStat class

This class is used for tracking the observations during training. It is only used in combination with the `observation_normalization` optimization. When used, the master holds an object of this class and updates the `sum`, `sumsq` and `count` attributes with the values computed by the masters. Then when the master puts out a new task it provides the current `ob_mean` and `ob_std` with the two methods `mean` and `std` which will be provided to the workers.

The workers simply track the observations by appending the observation for each step in the environment to a list, calculate the sums, add them to their own RunningStat object and send them to the master inside a `Result` object.

In [10]:
class RunningStat(object):
    def __init__(self, shape, eps):
        self.sum = np.zeros(shape, dtype=np.float32)
        self.sumsq = np.full(shape, eps, dtype=np.float32)
        self.count = eps
        
    def save(self, save_file):
        # leading underscore in sum to distinct with 'sum' keyword
        np.savez(save_file, _sum=self.sum, sumsq=self.sumsq, count=self.count)

    def load(self, save_file):
        self.data = None
        try:
            self.data = np.load(save_file, allow_pickle=True)
        except IOError:
            print("{} cannot be found or is not a file. Initializing observation normalization with default values.".format(save_file))
        except KeyError:
            print("The file {} is corrupted or not created by this program. Initializing observation normalization with default values.".format(save_file))
        else:
            try:
                _sum = self.data["_sum"]
                sumsq = self.data["sumsq"]
                count = int(self.data["count"])
            except KeyError:
                print("{} does not provide enough data to initialize the observation normalization. Using default values".format(save_file))
            except TypeError:
                print("The data from {} does not match or is corrupted. Using default values".format(save_file))
            else:
                self.sum = _sum
                self.sumsq = sumsq
                self.count = count
        
    def increment(self, s, ssq, c):
        self.sum += s
        self.sumsq += ssq
        self.count += c

    @property
    def mean(self):
        return self.sum / self.count

    @property
    def std(self):
        return np.sqrt(np.maximum(self.sumsq / self.count - np.square(self.mean), 1e-2))

### Specifiying the number of paramters

For the Optimizer classes we need the total amount of parameters in our models. For this we define the `get_initial_weights` methods, which will create us a normal model, prints its layout and returns the random weights.
Remember this needs to be done in a subprocess to avoid creating a TensorFlow session in the main process. So we call a `multiprocessing` `Pool` which allows us to spawn a subprocess and return the weights. In addition, the model structure is printed out.

We then calculate the number of parameters from our weight vector $\theta$. The weight vector is also important for later, because it will be the starting weight vector for our training.

In [11]:
def get_initial_weights(ob_mean=None, ob_std=None, model_file_path=None):
    
    model = create_model(ob_mean=ob_mean, ob_std=ob_std, model_file_path=model_file_path)
    
    # Print out the model
    model.summary()
    
    return model.get_weights()

def initialize_parameter_vector(model_file_path=None):
    with multiprocessing.Pool(1) as pool:
        if optimizations.observation_normalization:
            ob_stat = RunningStat(
                env.observation_space.shape,
                eps=1e-2  # eps to prevent dividing by zero at the beginning when computing mean/stdev
                )
            theta = pool.apply(func=get_initial_weights, args=(ob_stat.mean, ob_stat.std, model_file_path))
        else:
            theta = pool.apply(func=get_initial_weights, args=(None, None, model_file_path))

    return theta, sum(np.prod(v.shape) for v in theta)

### Optimization: Using a neural network optimizer

These optimizer are copied from the implementation from OpenAI. They are also implemented in Keras but need a loss function to work which we do not have when using neuroevolution.

One must provide the `stepsize` attribute for both optimizers and can customize the other ones. It is recommended to leave them as is.

Of course these classes do only get used when the `gradient_optimizer` optimization is active.

In [12]:
class Optimizer(object):
    def __init__(self, num_params):
        self.dim = num_params
        self.t = 0
        self.data = None
        
    def update(self, theta, globalg):
        self.t += 1
        step = self._compute_step(globalg)
        ratio = np.linalg.norm(step) / np.linalg.norm(theta)
        theta_new = theta + step
        return theta_new, ratio

    def save(self, save_file, **kwargs):
        np.savez(save_file, **kwargs, t=self.t, dim=self.dim)
        
    def load(self, save_file):
        self.data = None
        try:
            self.data = np.load(save_file, allow_pickle=True)
        except IOError:
            print("{} cannot be found or is not a file. Initializing optimizer with default values.".format(save_file))
        except KeyError:
            print("The file {} is corrupted or not created by this program. Initializing the optimizer with default values.".format(save_file))
        else:
            try:
                t = int(self.data["t"])
                dim = int(self.data["dim"])
            except KeyError:
                print("{} does not provide enough data to initialize the optimizer. Using default values".format(save_file))
            except TypeError:
                print("The data from {} does not match or is corrupted. Using default values".format(save_file))
            else:
                self.t = t
                self.dim = dim

    def _compute_step(self, globalg):
        raise NotImplementedError

class SGD(Optimizer):
    def __init__(self, num_params, stepsize, momentum=0.9):
        Optimizer.__init__(self, num_params)
        self.v = np.zeros(self.dim, dtype=np.float32)
        self.stepsize, self.momentum = stepsize, momentum

    def save(self, save_file, **kwargs):
        super().save(save_file, stepsize=self.stepsize, momentum=self.momentum, v=self.v)
        
    def load(self, save_file):
        super.load(save_file)
        if self.data is not None:
            try:
                stepsize = float(self.data["stepsize"])
                momentum = float(self.data["momentum"])
                v = self.data["v"]
                assert isinstance(v, np.ndarray)
                assert v.size == self.dim
            except KeyError:
                print("{} does not provide enough data to initialize the optimizer. Using default values".format(save_file))
            except TypeError or AssertionError:
                # Either assertion error or cast failed. Same error message for both
                print("The data from {} does not match or is corrupted. Using default values".format(save_file))
            else:
                self.stepsize = stepsize
                self.momentum = momentum
                self.v = v
    
    def _compute_step(self, globalg):
        self.v = self.momentum * self.v + (1. - self.momentum) * globalg
        step = -self.stepsize * self.v
        return step
        
class Adam(Optimizer):
    def __init__(self, num_params, stepsize, beta1=0.9, beta2=0.999, epsilon=1e-08):
        Optimizer.__init__(self, num_params)
        self.stepsize = stepsize
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.m = np.zeros(self.dim, dtype=np.float32)
        self.v = np.zeros(self.dim, dtype=np.float32)

    def save(self, save_file, **kwargs):
        super().save(save_file, stepsize=self.stepsize, beta1=self.beta1, beta2=self.beta2, epsilon=self.epsilon, m=self.m, v=self.v)
        
    def load(self, save_file):
        super().load(save_file)
        if self.data is not None:
            try:
                stepsize = float(self.data["stepsize"])
                beta1 = float(self.data["beta1"])
                beta2 = float(self.data["beta2"])
                epsilon = float(self.data["epsilon"])
                m = self.data["m"]
                v = self.data["v"]
                assert self.m.size == self.dim and self.v.size == self.dim
                assert isinstance(m, np.ndarray)
                assert isinstance(v, np.ndarray)
            except KeyError:
                print("{} does not provide enough data to initialize the optimizer. Using default values".format(save_file))
            except TypeError or AssertionError:
                # Either assertion error or cast failed. Same error message for both
                print("The data from {} does not match or is corrupted. Using default values".format(save_file))
            else:
                self.stepsize = stepsize
                self.beta1 = beta1
                self.beta2 = beta2
                self.epsilon = epsilon
                self.m = m
                self.v = v

    def _compute_step(self, globalg):
        a = self.stepsize * np.sqrt(1 - self.beta2 ** self.t) / (1 - self.beta1 ** self.t)
        self.m = self.beta1 * self.m + (1 - self.beta1) * globalg
        self.v = self.beta2 * self.v + (1 - self.beta2) * (globalg * globalg)
        step = -a * self.m / (np.sqrt(self.v) + self.epsilon)
        return step

### Shared Noise

To perturb the parameter vector we need noise to sample from. This class provides this noise to the master and the workers. It creates a multiprocessing array, which is stored in shared memory so every spawned process can access it. The array is then filled with samples from the standard normal distribution with the specified `seed`. As a default value `count` is set to $250 \cdot 10^6$ which will sample 2GB of random numbers.

In [13]:
class SharedNoiseTable(object):
    def __init__(self, seed=123):
        self.seed = seed
        count = 250000000
        print('Sampling {} random numbers with seed {}'.format(count, self.seed))

        # Instantiate an array of C float datatype with size count
        self._shared_mem = multiprocessing.Array(ctypes.c_float, count)

        # Convert to numpy array
        self.noise = np.ctypeslib.as_array(self._shared_mem.get_obj())
        assert self.noise.dtype == np.float32
        self.noise[:] = np.random.RandomState(seed).randn(count)
        print('Sampled {} bytes'.format(self.noise.size * 4))

    def get(self, i, dim):
        return self.noise[i:i + dim]

    def sample_index(self, stream, dim):
        return stream.randint(0, len(self.noise) - dim + 1)

### Reshape the parameter vector

Depending on our model structure the weight vector has a specific shape. To easily add or subtract noise, we define the `get_flat` and `set_from_flat` methods which will allow us to reshape the vector.

`get_flat` as the name suggests flattens the given vector.

`set_from_flat` serves two purposes. First it reshapes the one-dimensional array `theta` to the shape the model needs and then it sets the reshaped vector as the weight vector for the given `model`.

In [14]:
def get_flat(theta):
     return np.concatenate([np.reshape(v, [-1]) for v in theta], 0)

def set_from_flat(model, theta):
    old_theta = model.get_weights()
    shapes = [v.shape for v in old_theta]
    total_size = theta.size
        
    start = 0
    reshapes = []
    
    for (shape, v) in zip(shapes, theta):
        size = int(np.prod(shape))
        reshapes.append(np.reshape(theta[start:start+size], shape))
        start += size
    
    assert start == total_size
    model.set_weights(reshapes)

### The rollout function

This function will connect our model with our environment. Each environment in the OpenAI Gym has some functions with which it can be controlled. `env.reset()` resets the environment to the starting position and returns the initial observation. It must be called before doing anything else. `env.step(action)` does one timestep in the environment with the provided action vector and returns four values. First the observation, which represents the state of the environment after our action. Second, the reward for our action. Third, the boolean value `done` which is `True` when the environment reached a state where the environment is finished, for example after the maximum number of timesteps. The fourth value is `info` which gives additional information, but is not used here since for the two Roboschool environments `RoboschoolHumanoid-v1` and `RoboschoolAnt-v1`, `info` is just an empty set. Other than that the additional information would require a problem specific handling which we want to avoid when using black-box optimization.
`env.render()` renders the environment and opens a window where one can see how the environment is performing instead of looking on only numbers. In training this is not done. When you want to view your progress you can use the additional Jupyter Notebook where you can load the latest model and visualize it.

The way `rollout` works is that after resetting the environment and getting the first observation, it creates a loop for the maximum number of timesteps in the environment, mostly this is $1000$ timesteps. In this loop the act function is called with the observation `ob`, the model which is currently used and the perhabs provided `random_stream` which adds action noise to help generalize the model. Based on the model the action is predicted and returned which can then be used for a step in the environment. Lastly, the reward of this step gets saved to a list and the timestep counter `t` gets incremented. If the `save_obs` parameter is `True` every observation gets added to a list and returned later on to calculate the mean and standard deviation of the observation space.

In [15]:
def rollout(env, 
            model, 
            *, 
            render=False, 
            timestep_limit=None, 
            save_obs=False, 
            random_stream=None):
    """
    If random_stream is provided, the rollout will take noisy actions with noise drawn from that stream.
    Otherwise, no action noise will be added.
    """
    
    env_timestep_limit = env.spec.tags.get('wrapper_config.TimeLimit.max_episode_steps')
    timestep_limit = env_timestep_limit if timestep_limit is None else min(timestep_limit, env_timestep_limit)
    rews = []
    times_predict = []
    t = 0
    if save_obs:
        obs = []
    ob = env.reset()
    for _ in range(timestep_limit):
        ac, time_predict = act(ob[None], model, random_stream=random_stream)
        ac = ac[0]
        times_predict.append(time_predict)
        if save_obs:
            obs.append(ob)
        try:
            ob, rew, done, _ = env.step(ac)
        except AssertionError:
            # Is thrown when for example ac is a list which has at least one entry with NaN
            raise 
        rews.append(rew)
        t += 1
        if render:
            env.render()
        if done:
            break
    rews = np.array(rews, dtype=np.float32)
    if save_obs:
        return rews, t, np.array(obs), times_predict
    return rews, t, times_predict

### Collect the observations, if needed

Now, we will define one last method before we go into how the workers operate. `rollout_and_update_ob_stat` is a helper function to do a rollout and collect the observation statistics. If the `observation_normalization` optimization is active, a random number is sampled. If this number is lower than `config.calc_obstat_prob` the rollout is started with the parameter `save_obs` set to `True`. This simply saves every observation after each step in the rollout and returns them as a NumPy array. The `task_ob_stat` then gets incremented with the values of this array. `task_ob_stat` is a RunningStat object created by the worker to keep track of the observation statistics. If the random number is greater or equal to `config.calc_obstat_prob` therew will be a standard rollout where the observations do not get collected.


In [16]:
def rollout_and_update_ob_stat(env, model, rs, task_ob_stat):
    if optimizations.observation_normalization and config.calc_obstat_prob != 0 and rs.rand() < config.calc_obstat_prob:
        try:
            rollout_rews, rollout_len, obs, times_predict = rollout(
                env, model, save_obs=True, random_stream=rs)
        except AssertionError:
            raise
        task_ob_stat.increment(obs.sum(axis=0), np.square(obs).sum(axis=0), len(obs))
    else:
        try:
            rollout_rews, rollout_len, times_predict = rollout(env, model, random_stream=rs)
        except AssertionError:
            raise
    return rollout_rews, rollout_len, times_predict

## Using a master-worker architecture

### The workers

In the next cell we will define the `run_worker` method. As the name suggests this method will be executed by the worker processes. The way this works is that in the master a number of subprocesses is started using the `multiprocessing` package. Each subprocess will run the `run_worker` function with the given parameters `task_list`, `result_queue` and `num_params`. The workers get the latest task from the task list, compute it and push their result on the queue, where the master pulls the results from the queue as long as the configured population size did not exceed yet.

In a more detailed manner, the worker function starts by importing `TensorFlow`, as well as `Keras`. Then the worker runs an infinite loop, where at the beginning the worker gets the last entry in the `task_list`. If a task with the same `task_id` is cached, the cached version will be used. If not the cache gets updated with the newest task and a new model is created. This is needed, since every new task has potentially a new observation mean and standard deviation and they need to be given when creating a model. An important step here is to know that Keras uses a TensorFlow session in the background for their computation. When one creates models in a loop this session gets filled up with new data which reduces the performance of a prediction, as well as it causes a memory leak. Therefore we need to call `clear_session` on the Keras backend which will destroy the current session and starts a new one. To improve performance we then need to set this new session to one with a defined configuration where we set `inter_op_parallelism_threads` and `intra_op_parallelism_threads` both to 1. These allow to use a Thread pool to compute TensorFlow operations, but since we already implemented parallel processing with our master-worker architectur adding another layer of multithreading just creates more overhead and hinders performance.
Now the worker has the task and the model and can start computing. First it samples a random number. If this number is lower than `config.eval_prob` it will do an evaluation run. This simply takes the current parameter vector, which is `theta` in the task, and performs a rollout without adding noise. This gives insight into the different generations when starting the training. The result object will then consist of only `eval_return` which is the sum of the returned rewards, and `eval_length` which is the number of timesteps for this evaluation episode. Also `task_id` is added to the result for the master to check for outdated tasks (This will be explained when we get to the master).
If the random number is greater or equal to `config.eval_prob`, the task will be calculated with noise. First, `task_ob_stat`, a RunningStat object is created to track the observations. Then we sample a noise vector from our SharedNoiseTable object `noise`, with dimension `num_params`. Then the weights of the model get updated with `set_from_flat` where define the new weigths as theta plus the noise vector. Then we call `rollout_and_update_ob_stat` which will do a rollout and potentially save the observations as previously discussed. If we also use `mirrored_sampling`, we set the weights to theta minus the noise vector and, again, do a rollout. We then send back the computation in a Result object which is pushed on the `result_queue`.

In [17]:
def run_worker(task_list, result_queue, stop_work, noise, num_params):
    from tensorflow.keras import backend as K
    import tensorflow as tf
    
    # Threading pools set to 1 since we are parallelizing using multiprocessing
    tf.config.threading.set_inter_op_parallelism_threads(1)
    tf.config.threading.set_intra_op_parallelism_threads(1)
    
    print("PID {}: Started worker".format(os.getpid()))
    
    assert isinstance(noise, SharedNoiseTable)

    # Setup
    # Create a new gym environment object because each worker needs its own one
    env = gym.make(config.env_id)
        
    # Random stream used for adding noise to the actions as well as deciding if the observation statistics shall be
    # updated
    rs = np.random.RandomState()
    
    wait_time = 1
    
    cached_task = None
    cached_task_id = -1
    model = None
    
    while not bool(stop_work.value):
        # Get the latest Task from the Manger list
        try:
            task = task_list[-1]
        except IndexError:
            if wait_time > 100:
                print("The task list does not get tasks, something went wrong in the Master. Aborting.")
                break
            print("Task list is empty, waiting {} seconds before trying again".format(wait_time))
            wait_time *= 2
            time.sleep(wait_time)
            continue
    
        assert isinstance(task, Task)
        task_id = task.task_id
        assert isinstance(task_id, int)
        
        if task_id != cached_task_id:
            cached_task = task
            cached_task_id = task_id
        
            K.clear_session()
            model = create_model(initial_weights=cached_task.theta, 
                             model_name=str(os.getpid()),
                             ob_mean=cached_task.ob_mean,
                             ob_std=cached_task.ob_std)
        
        if rs.rand() < config.eval_prob:
            # Evaluation sample
            set_from_flat(model, cached_task.theta)
            try:
                eval_rews, eval_length, times_predict = rollout(env, model)
            except AssertionError:
                result_queue.put(None)
                return
            
            result_queue.put(Result(
                noise_inds=None,
                returns=None,
                signreturns=None,
                lengths=None,
                eval_return=eval_rews.sum(),
                eval_length=eval_length,
                ob_sum=None,
                ob_sumsq=None,
                ob_count=None,
                task_id=cached_task_id,
                times_predict=times_predict
            ))
            
        else:
            task_ob_stat = RunningStat(env.observation_space.shape, eps=0.)  # eps=0 because we're incrementing only
            
            noise_inds, returns, signreturns, lengths = [], [], [], []
            times_predict = []
            
            while not noise_inds:

                # Noise sample
                noise_idx = noise.sample_index(rs, num_params)
                
                epsilon = config.noise_stdev * noise.get(noise_idx, num_params)
                
                # Evaluate the sampled noise
                set_from_flat(model, cached_task.theta + epsilon)
                
                try:
                    rews_pos, len_pos, times_predict_pos = rollout_and_update_ob_stat(env,
                                                                                      model,
                                                                                      rs=rs,
                                                                                      task_ob_stat=task_ob_stat)
                except AssertionError:
                    result_queue.put(None)
                    return
                
                # Gather results
                noise_inds.append(noise_idx)
                returns.append([rews_pos.sum()])
                signreturns.append([np.sign(rews_pos).sum()])
                lengths.append([len_pos])
                
                times_predict += times_predict_pos

                # Mirrored sampling also evaluates the noise by subtracting it
                if optimizations.mirrored_sampling:
                    set_from_flat(model, cached_task.theta - epsilon)
                    
                    try:
                        rews_neg, len_neg, times_predict_neg = rollout_and_update_ob_stat(env,
                                                                                          model, 
                                                                                          rs=rs, 
                                                                                          task_ob_stat=task_ob_stat)  
                    except AssertionError:
                        result_queue.put(None)
                        return

                    returns[-1].append(rews_neg.sum())
                    signreturns[-1].append(np.sign(rews_neg).sum())
                    lengths[-1].append(len_neg)
                    
                    times_predict += times_predict_neg

            
            result_queue.put(Result(
                noise_inds=np.array(noise_inds),
                returns=np.array(returns, dtype=np.float32),
                signreturns=np.array(signreturns, dtype=np.float32),
                lengths=np.array(lengths, dtype=np.int32),
                eval_return=None,
                eval_length=None,
                ob_sum=None if task_ob_stat.count == 0 else task_ob_stat.sum,
                ob_sumsq=None if task_ob_stat.count == 0 else task_ob_stat.sumsq,
                ob_count=task_ob_stat.count,
                task_id=cached_task_id,
                times_predict=times_predict
            ))

### Supporting functions for the master

#### Batched dot product

A computationally intense part of the algorithm is the calculation of this sum: $\sum \limits_{i=0}^n F_i \epsilon_i$. It is the rewards, weighted by the used noise vector. For a large population, meaning a large $n$, this can get slow, but we use the following technique from the OpenAI implementation. The `weights` and `vecs` paramters get divided into smaller `groups` and then the dot product is calculated on these groups. Each grouped dot product is then simply summed which results in the result.

In [18]:
def itergroups(items, group_size):
    assert group_size >= 1
    group = []
    for x in items:
        group.append(x)
        if len(group) == group_size:
            yield tuple(group)
            del group[:]
    if group:
        yield tuple(group)
        
def batched_weighted_sum(weights, vecs, batch_size):
    total = 0.
    num_items_summed = 0
    for batch_weights, batch_vecs in zip(itergroups(weights, batch_size), itergroups(vecs, batch_size)):
        assert len(batch_weights) == len(batch_vecs) <= batch_size
        total += np.dot(np.asarray(batch_weights, dtype=np.float32), np.asarray(batch_vecs, dtype=np.float32))
        num_items_summed += len(batch_weights)
    return total, num_items_summed

#### Optimization: Fitness shaping with a rank transformation

One possible optimization which has not yet been discussed is rank transformation. What this does is, it transforms the range of rewards the master gets from the workers for a generation into ranks. So instead of having float numbers as rewards we get integer values in the $[0, len(x)]$ interval as our rewards. `compute_centered_ranks` therefore reshapes the paramter `x`, which in our case will be a one-dimensional reward array when no mirrored sampling is used and two-dimensional otherwise, and then invokes `compute_ranks` on it. This method sorts the input by the argument and sets the interval as the values. Back in the original method the returned array gets reshaped as it was and returned.

In [19]:
def compute_ranks(x):
    """
    Returns ranks in [0, len(x))
    Note: This is different from scipy.stats.rankdata, which returns ranks in [1, len(x)].
    """
    assert x.ndim == 1
    ranks = np.empty(len(x), dtype=int)
    ranks[x.argsort()] = np.arange(len(x))
    return ranks


def compute_centered_ranks(x):
    y = compute_ranks(x.ravel()).reshape(x.shape).astype(np.float32)
    y /= (x.size - 1)
    y -= .5
    return y

When the function is called with a valid `save_path` the model gets saved to this path as a `.h5` file, including the current weights as well as the structure of the model. Normally, this allows to easily load the model based on the file without providing additional information. Here on the other hand, one must provide the custom classes `Normc_initializer`, `ObservationNormalizationLayer` and `DiscretizeActionsUniformLayer` when loading a saved model, since Keras does not have the implementation of these classes when the model gets loaded. An example and also visualizing the trained data can be found in the other Jupyter Notebook, called `evolution-strategies-visualize.ipynb`.

In [20]:
def save_model(save_directory, stop_work, save_tasks_queue):
    import tensorflow as tf
    from tensorflow.keras import backend as K
    print("PID {}: Started saving process".format(os.getpid()))
    while not bool(stop_work.value):
        save_task = save_tasks_queue.get()
        assert isinstance(save_task, Task)
        # We are creating models in a loop therefore we need to clear the session to avoid build up
        K.clear_session()
        model = create_model(initial_weights=save_task.theta, 
                            model_name=config.env_id + "_Generation_" + str(save_task.task_id),
                            ob_mean=save_task.ob_mean,
                            ob_std=save_task.ob_std)
        
        model.save(os.path.join(save_directory, "snapshot_{:05d}.h5".format(save_task.task_id)))
        save_tasks_queue.task_done()

In [21]:
def resume_training(init_from):
    if init_from is not None:
        assert isinstance(init_from, dict)
        try:
            save_dir = init_from["save_dir"]
            config_file = init_from["config"]
            log_file = init_from["log"]
            model_file = init_from["model_file"]
            optimizer_state_file = init_from["optimizer_state"]
            observation_norm_state_file = init_from["observation_norm_state"]
            assert os.path.isdir(save_dir)
        except KeyError:
            print("To resume training, init_from must contain 'config', 'model' and 'optimizer' keys.")
            return None, None, None
        except AssertionError as e:
            print(e)
            print("Starting training from scratch.")
            return None, None, None
        else:
            if save_dir is None or config_file is None or model_file is None:
                print("Please provide the config and model file you want to initialize the training from, as well as the directory they are stored in.")
                return None, None, None
            # Config
            with open(os.path.join(save_dir, config_file), encoding='utf-8') as f:
                try:
                    config_json = json.load(f)
                except json.JSONDecodeError as e:
                    print("The config file {} is empty or cannot be parsed. Starting training from scratch.".format(f))
                else:
                    # Does not catch false or missing values here!
                    global optimizations, config, model_structure, env
                    optimizations = Optimizations._make(config_json["optimizations"].values())
                    config = Config._make(config_json["config"].values())
                    model_structure = ModelStructure._make(config_json["model_structure"].values())
                    env = gym.make(config.env_id)                   

            model_file_path = os.path.join(save_dir, model_file)
            optimizer_state_file_path = None
            observation_norm_state_file_path = None
            
            if optimizations.gradient_optimizer and optimizer_state_file is not None:
                optimizer_state_file_path = os.path.join(save_dir, optimizer_state_file)
                
            if optimizations.observation_normalization and observation_norm_state_file_path is not None:
                observation_norm_state_file_path = os.path.join(save_dir, observation_norm_state_file)
                
            return model_file_path, optimizer_state_file_path, observation_norm_state_file_path

### The Master

In the following cell the master and thus the training gets started. After the initial setup an infinite loop is started where in every iteration a new task, representing a new generation, gets pushed onto the task queue. The workers which have been started before the loop, pick up the new task and compute their results, which then, in turn, get enqueued onto the result queue. From their the master picks up results as long as the population size has not been exceeded. Then depending on the optimizations the results get processed and the parameter vector for the new generation is calculated. After logging the weights of the current model may get saved and then a new generation is started. This continues until one stops the kernel, or if you want you can change the infinite loop to a predefined length.

#### Setup

Preferably this notebook would be run in order and start the master cell one time to start the training. But since one may want to start the training again with different optimizations we need to create a subdirectory every time we start the training to avoid duplicating log and weight files.

Note that if you alter the configuration files you of course need to restart that cell to _activate_ these settings. But the best way to restart a training is to restart the kernel and clear the output which resets this notebook. Remember that you sample a rather large noise object (approximately 1GB) of memory which resides does not get freed immediately after aborting the computation.

After creating the noise object which is used by the master and all workers, we create the `task_list` as a list of a `Manager` object. The manager automatically shares the data between processes and handles access to it. For the `result_queue` we use a `multiprocessing.Queue` object. Like the list from the manger this queue object allows for access between processes but when one accesses an item of this queue with `get()` the item also gets removed from the queue. Each worker is a `multiprocessing.Process` which get the queue and list as parameter. Then they get started and compute in their infinite loop as previously discussed.

If we use `observation_normalization` the `ob_stat` object is used to track the mean and standard deviation throughout the training. As a last setup step we define the headrow of our logging file which we save as a comma separated value file.

#### Training

The training starts by appending a new task to the task list. `theta` is the current parameter vector, which in the first generation is just the randomly initiated vector when creating a model. It is reshaped to be one-dimensional with the size `num_params` to easily add or subtract noise. After putting out the task we need to collect the results by the workers. In the while loop we iterate until `num_episodes_popped` exceed our configured population size. Inside the loop we first pop a result from the queue with `result_queue.get()`. This function blocks until an item is returned. Then we need to differentiate between two cases. First, the returned result is an evaluation result and second, the result is a job where noise was added. For the evaluation result we increase the `episodes_so_far` counter which counts exactly what its name says and add the timesteps of the evaluation to `timesteps_so_far`. Then we compare the `task_id` of the result and the task_id which is currently gathered by the master. If they match we have a valid evaluation for this task and we collect the reward of this evaluation and its timestep length to `eval_returns` and `eval_lengths`. For the second case we also increase `episodes_so_far` and `timesteps_so_far` depending on the number of episodes in the result object. Then again we compare the task id's. If they match, we append the result to the `curr_task_results` list and increase our `num_episodes_popped` counter, which remember is our exit condition for the while loop. If we use observation normalization we also increase the values in the `ob_stat` object. On the other hand if the task id's do not match we increase the `num_results_skipped` counter by one. Later on we can then calculate the fraction of skipped results.

After the while loop whe have the results for this generation in `curr_task_results`. We then concatenate the noise indices, rewards and lengths of the results in `noise_inds`, `returns` and `lengths`. If we do fitness shaping we process the rewards with our method of computing the ranks. Then we calculate the dot product and divide by the number of episodes. If we use a gradient-based optimizer, like for example Adam, we input our gradient into it and get the updated `theta` back. Without an optimizer we devide by the standard deviation and multiply the learning rate, as described in the paper, to get our new parameter vector.

Now we log the results of this generation to our csv file and if we snapshot the policy in this generation we create a model with our new theta as initial weights and store them to disk. This is done in a subprocess to avoid an inflicting TensorFlow session.

In [22]:
def run_master(max_timesteps=np.inf, seed=123, init_from=None):
    save_directory = os.path.join(main_directory, time.strftime('%d_%m_%Y-%Hh_%Mm_%Ss', time.localtime(time.time())))
    mkdir_p(save_directory)
    save_configuration(save_directory)

    rs = np.random.RandomState()

    noise = SharedNoiseTable(seed)
    
    model_file_path = None
    optimizer_state_file_path = None
    observation_norm_state_file_path = None
    
    if init_from is not None:
        assert isinstance(init_from, dict)
        print("Initializing training from previous state.")
        model_file_path, optimizer_state_file_path, observation_norm_state_file_path = resume_training(init_from)
    
    # Only used with observation_normalization optimization
    ob_stat = RunningStat(
        env.observation_space.shape,
        eps=1e-2  # eps to prevent dividing by zero at the beginning when computing mean/stdev
        )
        
    if optimizations.observation_normalization and observation_norm_state_file_path is not None:
        ob_stat.load(observation_norm_state_file_path)
    
    theta, num_params = initialize_parameter_vector(model_file_path)
    theta = get_flat(theta)

    if optimizations.gradient_optimizer:
        if model_structure.optimizer == OPTIMIZER_ADAM:
            optimizer = Adam(int(num_params), **model_structure.optimizer_args)
        elif model_structure.optimizer == OPTIMIZER_SGD:
            optimizer = SGD(int(num_params), **model_structure.optimizer_args)
        else:
            raise NotImplementedError
            
        if optimizer_state_file_path is not None:
            # A previous state of the optimizer was given, restore it
            optimizer.load(optimizer_state_file_path)
            
    '''    print("Optimizer stepsize {}".format(optimizer.stepsize))
    print("Optimizer t {}".format(optimizer.t))
    print("Optimizer v {}".format(optimizer.v))
    print("Optimizer m {}".format(optimizer.m))'''
    
    manager = multiprocessing.Manager()
    task_list = manager.list()
    result_queue = multiprocessing.Queue()
    save_tasks_queue = multiprocessing.JoinableQueue()
    stop_work = multiprocessing.Value('i', 0, lock=False)

    # Start workers
    workers = []
    for _ in range(config.num_workers):
        worker = multiprocessing.Process(target=run_worker, args=(task_list, result_queue, stop_work, noise, num_params))
        workers.append(worker)
        worker.start()
        
    save_process = multiprocessing.Process(target=save_model, args=(save_directory, stop_work, save_tasks_queue))
    save_process.start()

    episodes_so_far = 0
    timesteps_so_far = 0
    generations = 0
    tstart = time.time()



    generation_log = OrderedDict()
    generation_log_file = os.path.join(save_directory, 'log.csv')
    fieldnames = [
        'Generation',
        'GenRewMean', 'GenRewStd', 'GenLenMean', 
        'EvalGenRewardMean', 'EvalGenRewardStd', 'EvalGenLengthMean', 'EvalGenCount',
        'EpisodesThisGen', 'EpisodesSoFar', 'TimestepsThisGen', 'TimestepsSoFar',
        'UniqueWorkers', 'ResultsSkippedFrac', 'ObCount',
        'TimeElapsedThisGen', 'TimeElapsed',
        'TimePredictMin', 'TimePredictMax', 'TimePredictMean', 'TimePredictCount']

    with open(generation_log_file, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

    while timesteps_so_far < max_timesteps:
        step_tstart = time.time()

        task_list.append(Task(
            theta=theta,
            ob_mean=ob_stat.mean if optimizations.observation_normalization else None,
            ob_std=ob_stat.std if optimizations.observation_normalization else None,
            task_id=generations
        ))

        print("---------------- Generation: {}----------------".format(generations))

        assert theta.dtype == np.float32

        curr_task_results, eval_returns, eval_lengths = [], [], []
        num_results_skipped, num_episodes_popped, num_timesteps_popped, ob_count_this_gen = 0, 0, 0, 0

        times_predict = []
        
        stop_training = False

        print("PID {}: Waiting for results".format(os.getpid()))

        while num_episodes_popped < config.population_size or num_timesteps_popped < config.timesteps_per_gen:
            result = result_queue.get()
            
            if result is None:
                print("Stopping training. The model produced non finite numbers inside the action vector. Try a"
                      + " different configuration.")
                stop_training = True
                break
            
            assert isinstance(result, Result)
            task_id = result.task_id
            assert isinstance(task_id, int)

            assert (result.eval_return is None) == (result.eval_length is None)

            if result.eval_length is not None:
                # The result was an evaluation job therefore do not collect the result only the evaluation
                if task_id == generations:
                    eval_returns.append(result.eval_return)
                    eval_lengths.append(result.eval_length)
                    times_predict += result.times_predict
            else:
                assert result.noise_inds.ndim == 1 and result.returns.dtype == np.float32

                if optimizations.mirrored_sampling:
                    assert result.returns.shape == result.lengths.shape == (len(result.noise_inds), 2)
                else:
                    assert result.returns.shape == result.lengths.shape == (len(result.noise_inds), 1)

                if task_id == generations:
                    curr_task_results.append(result)
                                    
                    # Update counts
                    result_num_eps = result.lengths.size
                    result_num_timesteps = result.lengths.sum()
                    episodes_so_far += result_num_eps
                    timesteps_so_far += result_num_timesteps
                    
                    num_episodes_popped += result_num_eps
                    num_timesteps_popped += result_num_timesteps

                    # Update observation stats if the optimization is used
                    if optimizations.observation_normalization and result.ob_count > 0:
                        ob_stat.increment(result.ob_sum, result.ob_sumsq, result.ob_count)
                        ob_count_this_gen += result.ob_count
                        
                    times_predict += result.times_predict
                else:
                    num_results_skipped += 1

        if stop_training:
            break
            
        print("Gathered results")

        # Compute skip fraction
        frac_results_skipped = num_results_skipped / (num_results_skipped + len(curr_task_results))
        if num_results_skipped > 0:
            print("Skipped {} out of date results ({:.2f}%)".format(
                num_results_skipped, 100. * frac_results_skipped))

        # Assemble results
        noise_inds = np.concatenate([r.noise_inds for r in curr_task_results])
        returns = np.concatenate([r.returns for r in curr_task_results])
        lengths = np.concatenate([r.lengths for r in curr_task_results])
        assert noise_inds.shape[0] == returns.shape[0] == lengths.shape[0]

        # If fitness shaping is turned on rank the results
        if optimizations.fitness_shaping:
            if config.return_proc_mode == RETURN_PROC_MODE_CR:
                proc_returns = compute_centered_ranks(returns)
            # sign and centered_sign_rank are obviously only useful in combination with mirrored sampling
            elif config.return_proc_mode == RETURN_PROC_MODE_SIGN:
                proc_returns = np.concatenate([r.signreturns for r in curr_task_results])
            elif config.return_proc_mode == RETURN_PROC_MODE_CR_SIGN:
                proc_returns = compute_centered_ranks(np.concatenate([r.signreturns for r in curr_task_results]))
            else:
                # Throw error to indicate the false input instead of silently pass on.
                # This should have been already catched in the configuration section, so this here is a misconfiguration.
                raise NotImplementedError
        else:
            proc_returns = returns

        # Mirrored sampling returns a 2D numpy array therefore we need to preprocess it accordingly
        if optimizations.mirrored_sampling:
            # Calculates the difference between the rewards sampled with the positive and negative noise
            proc_returns = proc_returns[:, 0] - proc_returns[:, 1]
        else:
            proc_returns = proc_returns.ravel()

        # Calculate the approximated gradient with a batch variant which saves time on large vectors
        g, count = batched_weighted_sum(
            proc_returns,
            (noise.get(idx, num_params) for idx in noise_inds),
            batch_size=500
        )

        assert g.shape == (num_params,) and g.dtype == np.float32 and count == len(noise_inds)
        
        # Update with the approximated gradient
        g /= returns.size
        
        if optimizations.divide_by_stdev:
            g /= config.noise_stdev
            
        if optimizations.gradient_optimizer:
            step = -g
            if optimizations.weight_decay:
                step += config.l2coeff * theta
            theta, _ = optimizer.update(theta, step)
        else:
            step = g * config.learning_rate
            if optimizations.weight_decay:
                step *= config.l2coeff
            theta += step
        
        step_tend = time.time()

        # Log the generation and print to stdout
        generation_log['Generation'] = generations

        generation_log['GenRewMean'] = returns.mean()
        generation_log['GenRewStd'] = returns.std()
        generation_log['GenLenMean'] = lengths.mean()

        generation_log['EvalGenRewardMean'] = np.nan if not eval_returns else np.mean(eval_returns)
        generation_log['EvalGenRewardStd'] = np.nan if not eval_returns else np.std(eval_returns)
        generation_log['EvalGenLengthMean'] = np.nan if not eval_lengths else np.mean(eval_lengths)
        generation_log['EvalGenCount'] = len(eval_returns)

        generation_log['EpisodesThisGen'] = lengths.size
        generation_log['EpisodesSoFar'] = episodes_so_far
        generation_log['TimestepsThisGen'] = lengths.sum()
        generation_log['TimestepsSoFar'] = timesteps_so_far

        generation_log['UniqueWorkers'] = config.num_workers
        generation_log['ResultsSkippedFrac'] = frac_results_skipped
        generation_log['ObCount'] = ob_count_this_gen

        generation_log['TimeElapsedThisGen'] = step_tend - step_tstart
        generation_log['TimeElapsed'] = step_tend - tstart

        generation_log['TimePredictMin'] = np.amin(times_predict)
        generation_log['TimePredictMax'] = np.amax(times_predict)
        generation_log['TimePredictMean'] = np.mean(times_predict)
        generation_log['TimePredictCount'] = len(times_predict)

        for key, value in generation_log.items():
            print(f'{key:25} {value}')

        # Append the log the csv file
        with open(generation_log_file, 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writerow(generation_log)

        # Note that the model is created with a custom layer and custom initializer, and therefore needs these two
        # custom classes if one wants to load a saved model
        if config.snapshot_freq != 0 and generations % config.snapshot_freq == 0:
            
            # Save model
            save_tasks_queue.put(Task(
                theta=theta,
                ob_mean=ob_stat.mean if optimizations.observation_normalization else None,
                ob_std=ob_stat.std if optimizations.observation_normalization else None,
                task_id=generations
                ))
            
            # Save optimizer
            if optimizations.gradient_optimizer:
                optimizer.save(os.path.join(save_directory, "optimizer_{:05d}.npz".format(generations)))
                
            # Save observation normalization
            if optimizations.observation_normalization:
                ob_stat.save(os.path.join(save_directory, "ob_normalization_{:05d}.npz".format(generations)))

            print("Saved training state in generation {} to {}".format(generations, save_directory))

        generations += 1

    # Quit the multiprocessing data structures and processes
    stop_work.value = 1

    result_queue.close()
    
    for w in workers:
        # Workers are blocking on empty queues and cannot be joined. When attempted they will try to join forever
        # Therefore we terminate the process. This is not crucial since we already saved everything for the last
        # generation.
        w.terminate()
        
    # Save tasks queue is a joinable queue, therefore we can gracefully join the queue
    save_tasks_queue.join()
    save_tasks_queue.close()

    # Like the worker processes a join would result in an indefinite block, since save_tasks_queue is closed and all
    # save jobs have been processed we can terminate the process
    save_process.terminate()

In [23]:
init_from = {
    "save_dir": "/home/jovyan/work/evolution-strategies/training_runs/",
    "config": "config.json",
    "log": None,
    "model_file": "snapshot_00003.h5",
    "optimizer_state": "optimizer_00003.h5.npz",
    "observation_norm_state": "ob_normalization00003.h5.npz",                
}

In [24]:
run_master(seed=123, max_timesteps=40e3, init_from=None)

Sampling 250000000 random numbers with seed 123
Sampled 1000000000 bytes
Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28)]              0         
_________________________________________________________________
observation_normalization_la (None, 28)                0         
_________________________________________________________________
dense (Dense)                (None, 256)               7424      
_________________________________________________________________
dense_1 (Dense)              (None, 256)               65792     
_________________________________________________________________
dense_2 (Dense)              (None, 40)                10280     
_________________________________________________________________
discretize_actions_uniform_l (None, 8)                 0         
Total params: 83,496
Trainable params: 83,496
Non-trai