In [1]:
import numpy as np
import gym
env = gym.make('Marvin-v0')

In [2]:
import psutil
import ray

In [3]:
ray.init(memory=4 * 1024 * 1024 * 1024, object_store_memory=4 * 1024 * 1024 * 1024)

2019-10-05 13:44:09,888	INFO resource_spec.py:205 -- Starting Ray with 3.96 GiB memory available for workers and up to 4.0 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '10.111.6.11',
 'redis_address': '10.111.6.11:53133',
 'object_store_address': '/tmp/ray/session_2019-10-05_13-44-09_840503_39519/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2019-10-05_13-44-09_840503_39519/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2019-10-05_13-44-09_840503_39519'}

[2m[33m(pid=raylet)[0m E1005 13:44:10.205116 2806375232 io.cc:168] Connection to IPC socket failed for pathname /tmp/ray/session_2019-10-05_13-44-09_840503_39519/sockets/plasma_store, retrying 300 more times


In [4]:
class NN:
    def __init__(self, layer_sizes, seed=None):
        if seed is not None:
            np.random.seed(seed)
        self.weights = [np.zeros((m, n)) * 1e-3 for m, n in zip(layer_sizes[1:], layer_sizes)]
    
    def predict(self, X):
        out = X
        for W in self.weights:
            Z = out @ W.T
            out = np.tanh(Z)
        if out.shape[0] == 1 and len(out.shape) == 1:
            return out.item()
        return out

    def set_weights(self, weights, copy=False):
        if copy:
            self.weights = [np.copy(l) for l in weights]
        else:
            self.weights = weights
        
    def get_weights(self, copy=False):
        if copy:
            return [np.copy(l) for l in self.weights]
        return self.weights
    
    def sample_like(self, sigma=1.0):
        return [np.random.randn(*l.shape) * sigma for l in self.weights]

In [5]:
def sample_like(weights, sigma=1, rs=None):
    """
    Create a sample of the same shapes as the input
    @param weights: list of np.arrays
    """
    
    if rs is None:
        rs = np.random
    
    return [rs.randn(*l.shape) * sigma for l in weights]


def combine_weights(params, delta_params, sigma):
    return [W + dW * sigma for W, dW in zip(params, delta_params)]

In [6]:
def evaluate_model(model, env):
    
    observation = env.reset()
    done = False
    i = 0
    r_sum = 0
    while not done and i < 1500:
        action = model.predict(observation)
        observation, reward, done, _ = env.step(action)
        i += 1
        r_sum += reward
    return r_sum

def update_params(params, population, rewards, lr=0.05, sigma=0.1):
    """
    Inplace update of parameters
    """
    n = len(population)
    for i in range(len(params)):
        W = params[i]
        
        dW_accum = np.zeros_like(W)
        for candidate, reward in zip(population, rewards):
            dW = candidate[i]
            dW_accum += reward * dW
        W_new = W + lr / (n * sigma) * dW_accum
        params[i] = W_new
    return params

In [None]:
@ray.remote(num_gpus=0.01, num_cpus=0)
class ESWorker:
    def __init__(self, layer_sizes, init_seed, env_name, seed, sigma=.1):
        self.model = NN(layer_sizes, init_seed)
        self.env = gym.make(env_name)
        self.rs = np.random.RandomState(seed=seed)
        self.sigma = sigma
    
    def evaluate(self):
        
        candidate = sample_like(self.model.weights, rs=self.rs)
        self.model.set_weights(combine_weights(self.model.get_weights(), candidate, self.sigma))
        reward = evaluate_model(self.model, self.env)
        return reward
    
    def update(self, weights):
        self.model.set_weights(weights, copy=True)

In [None]:

class ESRaySolver:
    def __init__(self, model, environment, population_size=30, max_episode_len=1500,
                 lr=0.05, lr_decay=0.999, sigma=0.1, verbose=False):
        self.model = model
        self.env = environment
        self.population_size = population_size
        self.max_episode_len = max_episode_len
        self.lr = lr
        self.lr_decay = lr_decay
        self.sigma = sigma
        self.verbose = verbose
        self.w_seeds = [seed for seed in range(population_size)]
        self.w_rss = [np.random.RandomState(seed=seed) for seed in self.w_seeds]
        self.workers = [ESWorker.remote([24, 24, 4], 0, 'Marvin-v0', seed) for seed in self.w_seeds]  # actor handles
    
    def solve(self, weights=None, fitness_fn=None, n_generations=100, seed=None):
        """
        If weights is none, simple MLP is assumed, otherwise this should be the list of weights matrices from some model
        """
        if weights is None:
            weights = self.model.get_weights(copy=True)
        if fitness_fn is None:
            fitness_fn = evaluate_model

        if seed is not None:
            np.random.seed(seed)

        lr = self.lr


        
        for generation in range(n_generations):
    
            rewards = [ray.get(w.evaluate.remote()) for w in self.workers]
            population = [sample_like(weights, rs=rs) for rs in self.w_rss]

            rewards = np.array(rewards)
            r_mean, r_std = rewards.mean(), rewards.std()
            rewards = (rewards - r_mean) / r_std
            
            update_params(weights, population, rewards, lr=lr, sigma=self.sigma)
            [ray.get(w.update.remote(weights)) for w in self.workers]
        
            lr = lr * self.lr_decay
            if self.verbose and (generation % int(self.verbose) == 0):
                print(f'[{generation}]: E[R]={r_mean:.4f}, std(R)={r_std:.4f} | lr={lr:.4f}')
        return weights


In [None]:
nn = NN([24, 24, 4], 0)
es = ESRaySolver(nn, env, population_size=50, lr=0.03, verbose=5)
weights = es.solve(n_generations=300)