The MIT License

Copyright (c) 2016 OpenAI (http://openai.com)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

# Using evolution strategies to train Roboschool Environments

## Introduction

This Jupyter Notebook is based on the [paper](https://arxiv.org/abs/1703.03864), [blog article](https://openai.com/blog/evolution-strategies/) and [implementation](https://github.com/openai/evolution-strategies-starter) of OpenAI on the topic of using an evolution strategy algorithm for a typical reinforcement learning task. 

My implementation summarizes their implementation, by simplifying, refactoring and organizing the code into this Jupyter notebook which can be used to test the algorithm. One can tweak the hyperparameters, change the environment which shall be trained or even expand the implementation to support for example Atari environments.

I recommend reading the paper or at least the article before trying out the notebook. Also depending on the environment the training can be very computationally intense (for example training the Humanoid), so if you want to try out the harder ones I recommend using a highly parallelizable machine, i.e. a machine with a high number of cores/threads.

## Algorithm overview

This section gives a brief overview over the algorithm. First of all we need to define what this implementation is going to do. The Roboschool is a group of environments in the [OpenAi Gym](https://gym.openai.com/), a program to test the behavior of machine learning algorithms on _real world_ problems. In our case, we want to train different robotic environments using an evolutionary algorithm which belongs to the class of natural evolution strategies. We therefore define a neural net with a configurable number of hidden layers, where the input dimension equals the observation space of the environment and the dimension of the output layer equals the dimension of the action space of the environment. This neural net is also called policy or in this implementation also referred to as a model. Therefore we train our policy to output the best possible action sequence given an observation sequence. Now, how do we train this policy? Training an evolutionary strategy consists of a cycle which is repeated over and over. First, an initial weight vector is randomly generated. In our context this weight vector equals to the weights of our policy. Then we perturb the vector with gaussian noise. The amount of perturbations is called the population size. What we now have is a population of slightly different weight vectors compared with the weight vector we started. Each one of these vectors will then be evaluated by first updating our policy with the weights and then run the environment using the policy. When this is done for the whole population we calculate a gradient ascent step in the direction of steepest ascent. In our case, where we are dealing with natural evolution strategies, we calculate the step with the natural gradient. This is done by approximating this gradient using Monte Carlo estimates.

So lets say we have our initial weight vector $\theta$, a population size $n$, random perturbations $\epsilon_i$, $0 \leq i \leq n$, learning rate $\alpha$, noise standard deviation $\sigma$ and a fitness Function $F(\cdot)$. We then calculate the resulting weight vector like this:
$\theta_{t+1} = \theta_{t} + \alpha \frac{1}{n \sigma} \sum \limits_{i=0}^n F(\theta_{t} + \epsilon_i)$


This gives us the weight vector for the next cycle which we will then, again, perturb a number of times (depending on the population size). A cycle in the context of evolutionary strategies is called a generation.

One might ask himself now what this fitness function is in the context of robotic simulations. When initializing such an environment one can call the `step` function on the environment with an array in the shape of the action space (in our case this would be the output of the policy). The environment then evaluates the provided action based on the current observation and other parameters in the environment and outputs a reward. This is done for either a fixed number of timesteps (some environments have a maximum of timesteps defined) or stops, when the action resulted in a state where the environment is `done`, for example when the `Ant` environment touches the surface with its torso. The rewards get summed up for every timestep which forms the reward for one action.

## Setup

Before starting any computation we need to configure the program and define some methods and objects we will use later on.


### Imports

Note that TensorFlow does not get imported here. We will only import it inside of a function which runs in another process. This is due to the fact that when importing TensorFlow a session is created in the background which will interefere with our models which we run in subprocesses. When importing the package only inside a function and then running these functions inside of subprocesses, every process has its own TensorFlow session and they therefore don't interfere with each other.

In [None]:
import errno
import logging
import os
import time

from collections import namedtuple

import ctypes
import multiprocessing
import numpy as np

import gym, roboschool

### Logging and save directory

For evaluating the trained data we need to define a directory where we want to store the trained policies, as well as the log file to record the results of every generation (Depending on your disk space you may not want to save every model).

If you want to change the location change the variable `save_directory` to a directory where the user which runs this notebook has write permissions. If it does not exist the program will create it. The default location is the /tmp/es_xx directory where xx is the process id of this notebook.

In [None]:
def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise

save_directory = "/tmp/" + "/es_{}/".format(os.getpid())
mkdir_p(save_directory)

logger = logging.getLogger('main_logger')
logger.setLevel(logging.INFO)

fh = logging.FileHandler(save_directory + 'log.txt')
fh.setLevel(logging.INFO)
fh.setFormatter(logging.Formatter('%(asctime)s %(message)s', ''))

logger.addHandler(fh)

### Configuration & Result Classes

Using a `namedtuple` allows use to quickly create a small class with different attributes, which is ideal for quickly accessing different attributes during training.

#### Config Class

The Config class defines our general configuration of the program. The following table explains what each attribute does:


| Attribute             | Explanation   |
| :---------------------|:--------------|
| `env_id`              | A string representing an environment of the Roboschool, e.g. `"RoboschoolAnt-v1"`|
| `population_size`     | The size of the population of one generation    |
| `num_workers`         | The number of workers doing computation in parallel, for maximum performance the default is `os.cpu_cores()`|
| `learning_rate`       | A floating point number which dictates how _strong_ the calculated gradient influences the next generation
| `noise_stdev`         | The standard deviation used for the noise
| `snapshot_freq`       | An integer which tells in which frequency the weights of the model shall be saved, e.g. a snapshot frequency of 10 would save the weights of the model every 10 generations
| `return_proc_mode`    | The processing mode of the gathered returns. Only used when fitness shaping is active. 
| `timesteps_per_gen` | Alternative exit condition for a generation, i.e. stop after x timesteps, currently not used
| `calc_obstat_prob`    | The probability of calculating the new mean and standard deviation of the observation space
| `eval_prob`           | The probability of inserting an evaluation result, i.e. calculating the fitness without adding noise. This is useful to have insight on the training progress.

In [None]:
Config = namedtuple('Config', [
    'env_id',
    'population_size',
    'num_workers',
    'learning_rate',
    'noise_stdev',
    'snapshot_freq',
    'return_proc_mode',
    'calc_obstat_prob',
    'l2coeff',
    'eval_prob'
])

In [None]:
Optimizations = namedtuple('Optimizations', [
    'mirrored_sampling',
    'fitness_shaping',
    'weight_decay', # TODO
    'discretize_actions', # TODO custom bins
    'neural_network_optimizer',
    'observation_normalization'
])

#### Model Structure class

| Attribute         | Explanation   |
| :---------------- |:--------------|
| `ac_noise_std`    | Standard deviation of noise that is added to the input (action) to make trained model more robust|
| `ac_bins`         | When discretizing actions use this to specify how the bins are structured.       |
| `hidden_dims`     | A list with the hidden dimensions. Each entry is one layer with the number representing the number of neurons in that layer              |
| `nonlin_type`     |  The activation function used for every layer (except ObservationNormalizationLayer and Output, more on that later)             |

In [None]:
ModelStructure = namedtuple('ModelStructure', [
    'ac_noise_std',
    'ac_bins',
    'hidden_dims',
    'nonlin_type',
    'nn_optimizer',
    'nn_optimizer_args'
])

#### Result class

| Attribute             | Explanation   |
| :---------------------|:--------------|
| `noise_inds`        | A numpy array with the indices of the used noise.|
| `returns`        | A numpy array with the rewards. When mirrored sampling is used this list is two dimensional.| |`signreturns` | The sum of the signs of the rewards. When mirrored sampling is used this list is two dimensional. | |`lengths` | A numpy array with the sum of the timesteps. When mirrored sampling is used this list is two dimensional. |
| `eval_returns` | np.nan if no evaluations have been done, otherwise a numpy array with evaluation rewards. |
| `eval_lengths` |np.nan if no evaluations have been done, otherwise a numpy array with evaluation timesteps. |
| `ob_sum` | If observation normalization is used this contains the sum of the tracked observations. |
| `ob_sumsq` | If observation normalization is used this contains the squared sum of the tracked observations. |
| `ob_count` | If observation normalization is used this contains number of tracked observations. |



In [None]:
Result = namedtuple('Result', [
    'noise_inds','returns', 'signreturns', 'lengths',
    'eval_returns', 'eval_lengths',
    'ob_sum', 'ob_sumsq', 'ob_count'
])

## Config

This is the base configuration of the program. 

In [None]:
config = Config(
    env_id="RoboschoolAnt-v1",
    population_size=20,
    num_workers=os.cpu_count(),
    learning_rate=0.001,
    noise_stdev=0.02,
    snapshot_freq=1,
    return_proc_mode="centered_rank",
    calc_obstat_prob=0.01,
    l2coeff=0.005,
    eval_prob=0.003
)

assert config.eval_prob > 0

## Environment

In [None]:
env = gym.make(config.env_id)

ob_space= env.observation_space
ac_space = env.action_space

## Keras as Model

Original implementation used hand written dense layers and tensorflow operations. I use a Keras model and their
functional API to create the net. In testing the two version differ in 0.x float scope. Something to worry about?

In [None]:
model_structure = ModelStructure(
    ac_noise_std=0.01,
    ac_bins='uniform:10',
    hidden_dims=[256, 256],
    nonlin_type='tanh',
    nn_optimizer='adam',
    nn_optimizer_args={
        'stepsize': 0.01
    }
)

## Optimizations

In [None]:
optimizations = Optimizations(
    mirrored_sampling=True,
    fitness_shaping=True,
    weight_decay=True,
    discretize_actions=False,
    neural_network_optimizer=True,
    observation_normalization=True
)

if optimizations.observation_normalization:
    assert config.calc_obstat_prob > 0
    
# TODO config and optimizations asserts here

## Keras clearin backend to support multiprocessing

$\text{out } = \frac{\sigma}{\sqrt{\sum \limits_{i \in \text{out}} i^2}}$

In [None]:
def create_model(initial_weights=None, model_name="model", save_path=None, ob_mean=None, ob_std=None):
    import tensorflow as tf
    
    class Normc_initializer(tf.keras.initializers.Initializer):
        """
        Create a TensorFlow constant with random numbers normed in the given shape.
        :param std:
        :return:
        """
        def __init__(self, std=1.0):
            self.std=std

        def __call__(self, shape, dtype=None, partition_info=None):#pylint: disable=W0613
            out = np.random.randn(*shape).astype(np.float32)
            out *= self.std / np.sqrt(np.square(out).sum(axis=0, keepdims=True))
            return tf.constant(out)
    
    class ObservationNormalizationLayer(tf.keras.layers.Layer):
        def __init__(self, ob_mean, ob_std, **kwargs):
            self.ob_mean = ob_mean
            self.ob_std = ob_std
            super(ObservationNormalizationLayer, self).__init__(**kwargs)

        def call(self, input):
            return tf.clip_by_value((input - self.ob_mean) / self.ob_std, -5.0, 5.0)
        
        # get_config and from_config need to implemented to be able to serialize the model
        def get_config(self):
            base_config = super(ObservationNormalizationLayer, self).get_config()
            base_config['ob_mean'] = self.ob_mean
            base_config['ob_std'] = self.ob_std
            return base_config
        
        @classmethod
        def from_config(cls, config):
            return cls(**config)
    
    nonlin = tf.nn.tanh
    
    if model_structure.nonlin_type == 'relu':
        nonlin = tf.nn.relu
    elif model_structure.nonlin_type == 'lrelu':
        nonlin = tf.nn.leaky_relu
    elif model_structure.nonlin_type == 'elu':
        nonlin = tf.nn.leaky_relu

    # Policy network
    input_layer = x = tf.keras.Input(ob_space.shape, dtype=tf.float32)

    if ob_mean is not None and ob_std is not None and optimizations.observation_normalization:
        if ob_std.all() != 0:
            x = ObservationNormalizationLayer(ob_mean, ob_std)(x)
                
    for hd in model_structure.hidden_dims:
        x = tf.keras.layers.Dense(
            hd, activation=nonlin,
            kernel_initializer=Normc_initializer(std=1.0),
            bias_initializer=tf.initializers.zeros())(x)

    # Map to action
    adim, ahigh, alow = ac_space.shape[0], ac_space.high, ac_space.low        
    
    a = tf.keras.layers.Dense(
        adim,
        kernel_initializer=Normc_initializer(std=1.0),
        bias_initializer=tf.initializers.zeros())(x)
    
    if optimizations.discretize_actions:
        # discretize, if else for uniform/custom
        ac_bin_mode, ac_bin_arg = model_structure.ac_bins.split(':')
        if ac_bin_mode == 'uniform':
            num_ac_bins = int(ac_bin_arg)
            assert num_ac_bins > 1
            ac_range_1a = (ahigh - alow)[None, :]
            # Transforms the interval from the normal Dense output layer to the interval [0, num_ac_bins-1]
            a = tf.keras.layers.Lambda(lambda a: 1. / (num_ac_bins - 1.) * a * ac_range_1a + alow[None, :])(a)
        elif ac_bin_mode == 'custom':
            #Custom, TODO
            raise NotImplementedError
        else:
            raise NotImplementedError
    
    model = tf.keras.Model(inputs=input_layer, outputs=a, name=model_name)
    
    if initial_weights is not None:
        set_from_flat(model, initial_weights)
        
    if save_path:
        model.save(save_path)
        
    return model

In [None]:
def act(ob, model, random_stream=None):   
    action = model.predict(ob)
    
    # TODO why randomstream? Better generalization?
    if random_stream is not None and model_structure.ac_noise_std != 0:
        action += random_stream.randn(*action.shape) * model_structure.ac_noise_std
    return action

def get_initial_weights():
    model = create_model()
    
    # Print out the model
    model.summary()
    
    return model.get_weights()

with multiprocessing.Pool(1) as pool:
    theta = pool.apply(func=get_initial_weights)

num_params = sum(np.prod(v.shape) for v in theta)

## Optimization: Using Adam Optimizer

Defining it manually since with Keras you have to define a loss function and use training set, etc. Manually seems
easier for now.
Completely copied from OpenAI, except for not using the policy variable but providing theta directly

In [None]:
class Optimizer(object):
    def __init__(self):
        self.dim = num_params
        self.t = 0

    def update(self, theta, globalg):
        self.t += 1
        step = self._compute_step(globalg)
        ratio = np.linalg.norm(step) / np.linalg.norm(theta)
        theta_new = theta + step
        return theta_new, ratio

    def _compute_step(self, globalg):
        raise NotImplementedError

class SGD(Optimizer):
    def __init__(self, stepsize, momentum=0.9):
        Optimizer.__init__(self)
        self.v = np.zeros(self.dim, dtype=np.float32)
        self.stepsize, self.momentum = stepsize, momentum

    def _compute_step(self, globalg):
        self.v = self.momentum * self.v + (1. - self.momentum) * globalg
        step = -self.stepsize * self.v
        return step
        
class Adam(Optimizer):
    def __init__(self, stepsize, beta1=0.9, beta2=0.999, epsilon=1e-08):
        Optimizer.__init__(self)
        self.stepsize = stepsize
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.m = np.zeros(self.dim, dtype=np.float32)
        self.v = np.zeros(self.dim, dtype=np.float32)

    def _compute_step(self, globalg):
        a = self.stepsize * np.sqrt(1 - self.beta2 ** self.t) / (1 - self.beta1 ** self.t)
        self.m = self.beta1 * self.m + (1 - self.beta1) * globalg
        self.v = self.beta2 * self.v + (1 - self.beta2) * (globalg * globalg)
        step = -a * self.m / (np.sqrt(self.v) + self.epsilon)
        return step

In [None]:
if optimizations.neural_network_optimizer:
    if model_structure.nn_optimizer == 'adam':
        optimizer = Adam(**model_structure.nn_optimizer_args)
    elif model_structure.nn_optimizer == 'sgd':
        optimizer = SGD(**model_structure.nn_optimizer_args)
    else:
        raise NotImplementedError

## Shared Noise

In [None]:
class SharedNoiseTable(object):
    def __init__(self):
        seed = 123
        count = 250000000  # 1 gigabyte of 32-bit numbers. Will actually sample 2 gigabytes below.
        logger.info('Sampling {} random numbers with seed {}'.format(count, seed))

        # Instantiate an array of C float datatype with size count
        self._shared_mem = multiprocessing.Array(ctypes.c_float, count)

        # Convert to numpy array
        self.noise = np.ctypeslib.as_array(self._shared_mem.get_obj())
        assert self.noise.dtype == np.float32
        self.noise[:] = np.random.RandomState(seed).randn(count)  # 64-bit to 32-bit conversion here
        logger.info('Sampled {} bytes'.format(self.noise.size * 4))

    def get(self, i, dim):
        return self.noise[i:i + dim]

    def sample_index(self, stream, dim):
        return stream.randint(0, len(self.noise) - dim + 1)

noise = SharedNoiseTable()

## Get flat

In [None]:
def get_flat(theta):
     return np.concatenate([np.reshape(v, [-1]) for v in theta], 0)

def set_from_flat(model, theta):
    old_theta = model.get_weights()
    shapes = [v.shape for v in old_theta]
    total_size = theta.size
    
    start = 0
    reshapes = []
    
    for (shape, v) in zip(shapes, theta):
        size = int(np.prod(shape))
        reshapes.append(np.reshape(theta[start:start+size], shape))
        start += size
    
    assert start == total_size
    model.set_weights(reshapes)
    

## Set from flat

## Rollout TODO

In [None]:
def rollout(env, 
            model, 
            *, 
            render=False, 
            timestep_limit=None, 
            save_obs=False, 
            random_stream=None):
    """
    If random_stream is provided, the rollout will take noisy actions with noise drawn from that stream.
    Otherwise, no action noise will be added.
    """
    
    env_timestep_limit = env.spec.tags.get('wrapper_config.TimeLimit.max_episode_steps')
    timestep_limit = env_timestep_limit if timestep_limit is None else min(timestep_limit, env_timestep_limit)
    rews = []
    t = 0
    if save_obs:
        obs = []
    ob = env.reset()
    for _ in range(timestep_limit):
        ac = act(ob[None], model, random_stream=random_stream)[0]
        if save_obs:
            obs.append(ob)
        ob, rew, done, _ = env.step(ac)
        rews.append(rew)
        t += 1
        if render:
            env.render()
        if done:
            break
    rews = np.array(rews, dtype=np.float32)
    if save_obs:
        return rews, t, np.array(obs)
    return rews, t


In [None]:
class RunningStat(object):
    def __init__(self, shape, eps):
        self.sum = np.zeros(shape, dtype=np.float32)
        self.sumsq = np.full(shape, eps, dtype=np.float32)
        self.count = eps

    def increment(self, s, ssq, c):
        self.sum += s
        self.sumsq += ssq
        self.count += c

    @property
    def mean(self):
        return self.sum / self.count

    @property
    def std(self):
        return np.sqrt(np.maximum(self.sumsq / self.count - np.square(self.mean), 1e-2))

# Worker method


In [None]:
def rollout_and_update_ob_stat(env, model, rs, task_ob_stat):
    if optimizations.observation_normalization and config.calc_obstat_prob != 0 and rs.rand() < config.calc_obstat_prob:
        rollout_rews, rollout_len, obs = rollout(
            env, model, save_obs=True, random_stream=rs)
        task_ob_stat.increment(obs.sum(axis=0), np.square(obs).sum(axis=0), len(obs))
    else:
        rollout_rews, rollout_len = rollout(env, model, random_stream=rs)
    return rollout_rews, rollout_len

In [None]:
def run_worker(num_jobs, theta, ob_mean=None, ob_std=None): #min_task_runtime=.2):

    print("PID " + str(os.getpid()) + ": " + "Started worker with " + str(num_jobs) + "Jobs")

    assert isinstance(noise, SharedNoiseTable)

    # Setup
    # Create a new gym environment object because each worker needs its own one
    env = gym.make(config.env_id)
    
    # Initialize the model with the supplied weights 'theta' to calcualate based on the current generation
    model = create_model(initial_weights=theta, model_name=str(os.getpid()), ob_mean=ob_mean, ob_std=ob_std)
    
    # Random stream used for adding noise to the actions as well as deciding if the observation statistics shall be
    # updated
    rs = np.random.RandomState()
    
    task_ob_stat = RunningStat(env.observation_space.shape, eps=0.)  # eps=0 because we're incrementing only
    noise_inds, returns, signreturns, lengths, eval_returns, eval_lengths = [], [], [], [], [], []

    assert optimizations.observation_normalization == (config.calc_obstat_prob != 0)
    

    i = 0
    while i < num_jobs:

        if rs.rand() < config.eval_prob:
            # Evaluation sample
            set_from_flat(model, theta)
            eval_rews, eval_length = rollout(env, model)
            eval_returns.append(eval_rews.sum())
            eval_lengths.append(eval_length)
        
        # Rollouts with noise
        
        # Noise sample
        noise_idx = noise.sample_index(rs, num_params)
        epsilon = config.noise_stdev * noise.get(noise_idx, num_params)

        # Evaluate the sampled noise
        set_from_flat(model, theta + epsilon)
        rews_pos, len_pos = rollout_and_update_ob_stat(env, model, rs=rs, task_ob_stat=task_ob_stat)
        
        # Mirrored sampling also evaluates the noise by subtracting it
        if optimizations.mirrored_sampling:
            set_from_flat(model, theta - epsilon)
            rews_neg, len_neg = rollout_and_update_ob_stat(env, model, rs=rs, task_ob_stat=task_ob_stat)
        
        # Gather results
        noise_inds.append(noise_idx)
        returns.append([rews_pos.sum()])
        signreturns.append([np.sign(rews_pos).sum()])
        lengths.append([len_pos])
        
        if optimizations.mirrored_sampling:
            returns[-1].append(rews_neg.sum())
            signreturns[-1].append(np.sign(rews_neg).sum())
            lengths[-1].append(len_neg)
            
        i += 1
        
    result = Result(
        noise_inds=np.array(noise_inds),
        returns=np.array(returns, dtype=np.float32),
        signreturns=np.array(signreturns, dtype=np.float32),
        lengths=np.array(lengths, dtype=np.int32),
        eval_returns=np.array(eval_returns, dtype=np.float32),
        eval_lengths=np.array(eval_lengths, dtype=np.int32),
        ob_sum=None if task_ob_stat.count == 0 else task_ob_stat.sum,
        ob_sumsq=None if task_ob_stat.count == 0 else task_ob_stat.sumsq,
        ob_count=task_ob_stat.count
    )
    
    print("PID " + str(os.getpid()) + ": " + "Returned result")
    
    return result

In [None]:
def itergroups(items, group_size):
    assert group_size >= 1
    group = []
    for x in items:
        group.append(x)
        if len(group) == group_size:
            yield tuple(group)
            del group[:]
    if group:
        yield tuple(group)
        
def batched_weighted_sum(weights, vecs, batch_size):
    total = 0.
    num_items_summed = 0
    for batch_weights, batch_vecs in zip(itergroups(weights, batch_size), itergroups(vecs, batch_size)):
        assert len(batch_weights) == len(batch_vecs) <= batch_size
        total += np.dot(np.asarray(batch_weights, dtype=np.float32), np.asarray(batch_vecs, dtype=np.float32))
        num_items_summed += len(batch_weights)
    return total, num_items_summed

## Optimization: Fitness shaping with a rank transformation

In [None]:
def compute_ranks(x):
    """
    Returns ranks in [0, len(x))
    Note: This is different from scipy.stats.rankdata, which returns ranks in [1, len(x)].
    """
    assert x.ndim == 1
    ranks = np.empty(len(x), dtype=int)
    ranks[x.argsort()] = np.arange(len(x))
    return ranks


def compute_centered_ranks(x):
    y = compute_ranks(x.ravel()).reshape(x.shape).astype(np.float32)
    return y

# Master

In [None]:
rs = np.random.RandomState()

if optimizations.observation_normalization:
    ob_stat = RunningStat(
        env.observation_space.shape,
        eps=1e-2  # eps to prevent dividing by zero at the beginning when computing mean/stdev
    )

tslimit, incr_tslimit_threshold, tslimit_incr_ratio = None, None, None
adaptive_tslimit = False


episodes_so_far = 0
timesteps_so_far = 0
tstart = time.time()

assert config.num_workers > 0

num_jobs_per_worker = [int(config.population_size / config.num_workers)] * config.num_workers

mod = config.population_size % config.num_workers
i = 0
while mod > 0:
    num_jobs_per_worker[i] += 1
    mod -= 1
    i += 1
    
assert len(num_jobs_per_worker) == config.num_workers

generations = 0

theta = get_flat(theta)

while True:
    print("----------------------GENERATION: " + str(generations) + "------------------------------------")
    logger.info("Generation {}".format(generations))
    
    step_tstart = time.time() 
            
    assert theta.dtype == np.float32
    
    # Start workers
    
    workers = []
    results = []
    
    with multiprocessing.Pool(processes=config.num_workers) as pool:
    
        print("PID " + str(os.getpid()) + ": " + "Waiting for results")
        
        # Spawns the worker without blocking
        for i in num_jobs_per_worker:
            results.append(pool.apply_async(func=run_worker, args=(i, theta, ob_stat.mean, ob_stat.std)))

        # Blocks until all results are gathered
        for i in range(len(results)):
            results[i] = results[i].get()

    # Pop off results for the current task
    curr_task_results, eval_returns, eval_lengths, worker_ids = [], [], [], []
    num_results_skipped, num_episodes_popped, num_timesteps_popped, ob_count_this_gen, eval_count = 0, 0, 0, 0, 0
   
    #while num_episodes_popped < config.episodes_per_batch:
    for result in results:
        assert isinstance(result, Result)

        assert (result.eval_returns is None) == (result.eval_lengths is None)
        # worker_ids.append(result.worker_id)
         
        if result.eval_lengths is not None:
        # This was an eval job
            #episodes_so_far += 1
            #timesteps_so_far += result.eval_length
            # Store the result only for current tasks
            eval_returns.extend(result.eval_returns)
            eval_lengths.extend(result.eval_lengths)
            eval_count += 1
        
        assert result.noise_inds.ndim == 1 and result.returns.dtype == np.float32
        
        if optimizations.mirrored_sampling:
            assert result.returns.shape == result.lengths.shape == (len(result.noise_inds), 2)
        else:
            assert result.returns.shape == result.lengths.shape == (len(result.noise_inds), 1)
        
        # Update counts
        result_num_eps = result.lengths.size
        result_num_timesteps = result.lengths.sum()
        episodes_so_far += result_num_eps
        timesteps_so_far += result_num_timesteps
        # Store results only for current tasks
        curr_task_results.append(result)
        num_episodes_popped += result_num_eps
        num_timesteps_popped += result_num_timesteps
        # Update ob stats
        if result.ob_count > 0:
            ob_stat.increment(result.ob_sum, result.ob_sumsq, result.ob_count)
            ob_count_this_gen += result.ob_count
    
    print("Gathered results")

    # Process evaluation
    eval_gen_reward_mean = np.nan if eval_count == 0 else np.mean(np.array(eval_returns, dtype=np.float32))
    eval_gen_reward_std = np.nan if eval_count == 0 else np.std(np.array(eval_returns, dtype=np.float32))
    eval_gen_length_mean= np.nan if eval_count == 0 else np.mean(np.array(eval_lengths, dtype=np.int32))   
    
    # Assemble results
    noise_inds = np.concatenate([r.noise_inds for r in curr_task_results])
    returns = np.concatenate([r.returns for r in curr_task_results])
    lengths = np.concatenate([r.lengths for r in curr_task_results])
    assert noise_inds.shape[0] == returns.shape[0] == lengths.shape[0]
    
    # If fitness shaping is turned on rank the results
    if optimizations.fitness_shaping:
        if config.return_proc_mode == 'centered_rank':
            proc_returns = compute_centered_ranks(returns)
        # sign and centered_sign_rank are obviously only useful in combination with mirrored sampling
        elif config.return_proc_mode == 'sign':
            proc_returns = np.concatenate([r.signreturns for r in curr_task_results])
        elif config.return_proc_mode == 'centered_sign_rank':
            proc_returns = compute_centered_ranks(np.concatenate([r.signreturns for r in curr_task_results]))
        else:
            # Throw error to indicate the false input instead of silently pass on
            raise NotImplementedError
    else:
        proc_returns = returns
    
    # Mirrored sampling returns a 2D numpy array therefore we need to preprocess it accordingly
    if optimizations.mirrored_sampling:
        # Calculates the difference between the rewards sampled with the positive and negative noise
        proc_returns = proc_returns[:, 0] - proc_returns[:, 1]
    else:
        proc_returns = proc_returns.ravel()
    
    # Calculate the approximated gradient with a batch variant which saves time TODO saving time true?
    g, count = batched_weighted_sum(
        proc_returns,
        (noise.get(idx, num_params) for idx in noise_inds),
        batch_size=500
    )
    
    assert g.shape == (num_params,) and g.dtype == np.float32 and count == len(noise_inds)
    
    # Update with the approximated gradient
    g /= returns.size
    
    if optimizations.neural_network_optimizer:
        theta, _ = optimizer.update(theta, -g + config.l2coeff * theta)
    else:
        theta += ((config.learning_rate / config.noise_stdev) * g)
    
    # Update number of steps to take
    # if adaptive_tslimit and (lengths_n2 == tslimit).mean() >= incr_tslimit_threshold:
    #     old_tslimit = tslimit
    #     tslimit = int(tslimit_incr_ratio * tslimit)
    #     logger.info('Increased timestep limit from {} to {}'.format(old_tslimit, tslimit))
    
    step_tend = time.time()
    
    # Print to stdout for evaluate training 
    print("EvalGenRewardMean", eval_gen_reward_mean)
    print("EvalGenRewardStd", eval_gen_reward_std)
    print("EvalGenLengthMean", eval_gen_length_mean)
    print("EvalGenCount", eval_count)
    
    
    logger.info("GenRewMean {}".format(returns.mean()))
    logger.info("GenRewStd {}".format(returns.std()))
    logger.info("GenLenMean {}".format(lengths.mean()))

    logger.info("EvalGenRewardMean {}".format(eval_gen_reward_mean))
    logger.info("EvalGenRewardStd {}".format(eval_gen_reward_std))
    logger.info("EvalGenLengthMean {}".format(eval_gen_length_mean))
    logger.info("EvalGenCount {}".format(eval_count))
    
    logger.info("EpisodesThisGen {}".format(lengths.size))
    logger.info("EpisodesSoFar {}".format(episodes_so_far))
    logger.info("TimestepsThisGen {}".format(lengths.sum()))
    logger.info("TimestepsSoFar {}".format(timesteps_so_far))
     
    
    logger.info("UniqueWorkers {}".format(config.num_workers))
    logger.info("ObCount {}".format(ob_count_this_gen))
    
    logger.info("TimeElapsedThisGen {}".format(step_tend - step_tstart))
    logger.info("TimeElapsed {}".format(step_tend - tstart))

    # Note that the model is created with a custom layer and custom initializer, and therefore needs these two
    # custom classes if one wants to load a saved model
    if config.snapshot_freq != 0 and generations % config.snapshot_freq == 0:
        from multiprocessing import Process
        
        p = Process(target=create_model, args=(
                                            theta, 
                                            config.env_id + "_Generation_" + str(generations), 
                                            save_directory + 'snapshot_{:05d}'.format(generations) + ".h5",
                                            ob_stat.mean,
                                            ob_stat.std))
        p.start()
        p.join()
        
        print("Saved model in generation {}".format(generations))
        logger.info("Saved model to {}".format(save_directory))
            
    generations += 1
    
    logger.info("-----------------------------------------------------------------")