# Cross-Entropy Method

### What is CEM?
From a biological perspective, it is an evolutionary algorithm: some individuals are sample from a population and the top performers of that sample govern the characteristics of the future generations.

From a math viewpoint, it is a Derivative-Free Optimization (DFO) technique: it can find the optima without calculating derivatives (no backpropagation).

### Mental Image 

suppose you are given a blackbox which take some numbers as input and outputs some other numbers. You can only choose the input values and observe the output values. How do you choose the input values such that the outputs are the values you want?

One simple way is generate a bunch of inputs from a probability distribution (says Gausian with mean $\mu$ and standard deviation $\sigma$) and see the produced outputs, choose the inputs that have led to the best outputs to tune the characteristics of that probability distribution. Says, $\mu \leftarrow mean(\mu) $ and $\sigma \leftarrow mean(\sigma)$.

## 1. Import packages

In [10]:
import gym
import math
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

## 2. Instantiate Env and Agent

In [11]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

env = gym.make('MountainCarContinuous-v0')
env.seed(101)
np.random.seed(101)

print('observation space', env.observation_space)
print('action space', env.action_space)
print('  -low', env.action_space.low)
print('  -high', env.action_space.high)

device

observation space Box(2,)
action space Box(1,)
  -low [-1.]
  -high [1.]


device(type='cpu')

In [12]:
#define agent with nn brain
class Agent(nn.Module):
    def __init__(self, env, h_size=16):
        super(Agent, self).__init__()
        self.env = env
        
        #state, hidden layer, action sizes
        self.s_size = env.observation_space.shape[0]
        self.h_size = h_size
        self.a_size = env.action_space.shape[0]
        
        #define layers
        self.fc1 = nn.Linear(self.s_size, self.h_size)
        self.fc2 = nn.Linear(self.h_size, self.a_size)
        
    def set_weights(self, weights):
        s_size = self.s_size
        h_size = self.h_size
        a_size = self.a_size
        
        #separate weights for each layer
        fc1_end = s_size*h_size + h_size
        fc1_W = torch.from_numpy(weights[:s_size*h_size].reshape(s_size, h_size))
        fc1_b = torch.from_numpy(weights[s_size*h_size:fc1_end])
        fc2_W = torch.from_numpy(weights[fc1_end:fc1_end+h_size*a_size].reshape(h_size,a_size))
        fc2_b = torch.from_numpy(weights[fc1_end+h_size*a_size:])
        
        #set weights for each layer
        self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
        self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
        self.fc2.weight.data.copy_(fc2_W.view_as(self.fc2.weight.data))
        self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))
        
    def get_weights_dim(self):
        return (self.s_size+1)*self.h_size + (self.h_size+1)*self.a_size
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        return x.cpu().data
    
    def evaluate(self, weights, gamma=1.0, max_t=5000):
        self.set_weights(weights)
        episode_return = 0.0
        state = self.env.reset()
        for t in range(max_t):
            state = torch.from_numpy(state).float().to(device)
            action = self.forward(state)
            state, reward, done, _ = self.env.step(action)
            episode_return += reward * math.pow(gamma, t)
            if done:
                break
        return episode_return
    
agent = Agent(env).to(device)

In [18]:
state = agent.env.reset()
print(state)
state = torch.from_numpy(state).float().to(device)
agent.forward(state)

[-0.46071939  0.        ]


tensor([0.1332])

## 3. Train the agent with the Cross-Entropy Method

### How does CEM work to solve RL problem?
Consider a policy network. We want to find the best weights which predicts the probability of action based on the agent state. [Here](https://towardsdatascience.com/cross-entropy-method-for-reinforcement-learning-2b6de2a4f3a0) is how a CEM approach to find these weights:

**STEP 1**: Sample a bunch of initial weights from your choice of probability distribution, says Gaussian with $\mu = 0$ and $\sigma = 1$.
```python
mean = 0.0
std = 1.0
n_weights = 50

weights_pop = [mean + std*np.random.randn(weights_dim) for i in range(n_weights)]
```

**STEP 2**: Let the agent pick actions from the policy network based on these weights, run the agent through an episode and collect the rewards generated by the environment. For instance, **w1** generate accumulative reward **r1**, **w2** generate accumulative reward **r2** and so on.

```python
rewards = [agent.evaluate(weights) for weights in weights_pop]
```

**STEP 3**: Find those weights generate the top n rewards.

```python
n_elite = 10
elite_idxs = np.array(rewards).argsort()[-n_elite:]
elite_weights = [weights_pop[idx] for idx in elite_idxs]
```

**STEP 4**: Update the characteristics of the probability distribution by the top performers (i.e. elite_weights) and use the updated distribution to generate new sample weights.

```python
mean = np.array(elite_weights).mean()
std = np.array(elite_weights).std()

weights_pop = [mean + std*np.random.randn(weights_dim) for i in range(n_weights)]
```

**STEP 5** Repeat step2-4 until you get the rewards you are satisfied with.


Cross Entropy Algorithm:
- initiate the number of samples (i.e. pop_size = 50) and the number of top performers among the samples (n_elite = 10).

- initiate the best **weight** as random weight for the agent's neural network, which corresponds to select a random **location** for the agent on the objective function surface.

- iterate n steps (toward the objective mountain top):

> - create a population of points nearby the location of the agent, i.e. to create each point by adding Gausian noise (with fixed standard deviation sigma) to the weight of the agent to create a new weight

> - calculate the reward of each point in that population; select the weights of the top n_elite performers and calculate the mean weight; update the best **weight** as the mean weight

> - use the best weight to calculate the reward and keep track of the most recent (says 100) rewards.

> - if the average value of the most recent rewards greater than a threshold value, says 90, save the learned weight and break of the iteration



In [20]:
def cem(n_iterations=500, max_t=1000, gamma=1.0, print_every=10, pop_size=50, elite_frac=0.2, sigma=0.5):
    """PyTorch implementation of the cross-entropy method.
        
    Params
    ======
        n_iterations (int): maximum number of training iterations
        max_t (int): maximum number of timesteps per episode
        gamma (float): discount rate
        print_every (int): how often to print average score (over last 100 episodes)
        pop_size (int): size of population at each iteration
        elite_frac (float): percentage of top performers to use in update
        sigma (float): standard deviation of additive noise
    """
    n_elite=int(pop_size*elite_frac)

    scores_deque = deque(maxlen=100)
    scores = []
    best_weight = sigma*np.random.randn(agent.get_weights_dim())

    for i_iteration in range(1, n_iterations+1):
        weights_pop = [best_weight + (sigma*np.random.randn(agent.get_weights_dim())) for i in range(pop_size)]
        rewards = np.array([agent.evaluate(weights, gamma, max_t) for weights in weights_pop])

        elite_idxs = rewards.argsort()[-n_elite:]
        elite_weights = [weights_pop[i] for i in elite_idxs]
        best_weight = np.array(elite_weights).mean(axis=0)

        reward = agent.evaluate(best_weight, gamma=1.0)
        scores_deque.append(reward)
        scores.append(reward)
        
        torch.save(agent.state_dict(), 'checkpoint.pth')
        
        if i_iteration % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_iteration, np.mean(scores_deque)))

        if np.mean(scores_deque)>=90.0:
            print('\nEnvironment solved in {:d} iterations!\tAverage Score: {:.2f}'.format(i_iteration-100, np.mean(scores_deque)))
            break
    return scores

scores = cem()

Episode 10	Average Score: -9.42
Episode 20	Average Score: -7.24
Episode 30	Average Score: -5.38
Episode 40	Average Score: -5.13
Episode 50	Average Score: -5.17


TypeError: can't convert np.ndarray of type numpy.object_. The only supported types are: float64, float32, float16, int64, int32, int16, int8, uint8, and bool.

In [None]:
# The 2nd implementation:
def cem(n_iterations=500, max_t=1000, gamma=1.0, print_every=10, pop_size=50, elite_frac=0.2, sigma=0.5):
    """ Pytorch implementation of the cross-entropy method
    
    INPUT:
    - n_iterations (int): max number of training rounds
    - max_t (int): max timesteps of episode
    - print_every (int): how often print average score
    - pop_size (int): size of population at each iteration
    - elite_frac (float): percentage of top performers to use in update
    - sigma (float): standard deviation of additive noise
    
    OUTPUT:
    
    """
    n_elite = int(pop_size*elite_frac)
    scores_deque = deque(maxlen=100)
    scores = []
    mean = 0.0
    std = 1.0
    weights_dim = agent.get_weights_dim()
    
    for i in range(1, n_iterations+1):
        weights_pop = [mean + std*np.random.randn(weights_dim) for i in range(pop_size)]
        rewards = np.array([agent.evaluate(weights, gamma, max_t) for weights in weights_pop])
        
        elite_idxs = rewards.argsort()[-n_elite:]
        elite_weights = [weights_pop[i] for i in elite_idxs]
        mean = np.array(elite_weights).mean()
        std = np.array(elite_weights).std()
        best_weights = np.array(elite_weights).mean(axis=0)
        
        best_reward = agent.evaluate(best_weights, gamma=1.0)
        scores_deque.append(best_reward)
        scores.append(best_reward)
        
        torch.save(agent.state_dict(), 'checkpoint.pth')
        
        if i % print_every == 0:
            print(f'Episode {i} \tAverage Score {round(np.mean(scores_deque),2)}')
            
        if np.mean(scores_deque)>=90.0:
            print(f'\nEnvironment solved in {i-100} iternations!\tAverage Score {round(np.mean(scores_deque),2)}')
            break
    return scores

scores = cem()

In [None]:
# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores)+1), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()

## 4. Watch smart agent to perform

We load weights from file to watch a smart agent to perform.

In [None]:
agent.load_state_dict(torch.load('checkpoint.pth', map_location='cpu'))

state = env.reset()
while True:
    state = torch.from_numpy(state).float().to(device)
    with torch.no_grad():
        action = agent(state)
    env.render()
    next_state, reward, done, _ = env.step(action)
    state = next_state
    if done:
        break
env.close()