## ***The Cross-Entropy Method***

The Cross-Entropy Method is model-free and policy-based. In RL, methods are:

- Model-free or model-based
- Value-based or policy-based
- On-policy or off-policy

The term "model-free" means that the method doesn't build a model of the environment or reward; it just directly connects observations to actions (or values that are related to actions). In contrast, model-based methods try to predict what the next observation and/or reward will be.

Policy-based methods directly approximate the policy of the agent, that is, what actions the agent should carry out at every step. The policy is usually represented by a probability distribution over the available actions. In contrast, the method could be value-based. In this case, instead of the probability of actions, the agent calculates the value of every possible action and chooses the action with the best value.

For now, define off-policy as the ability of the method to learn on historical data (obtained by a previous version of the agent, recorded by human demonstration, or just seen by the same agent several episodes ago).

In practice, the policy is usually represented as a probability distribution over actions, which makes it very similar to a classification problem, with the amount of classes being equal to the amount of actions we can carry out. 

### ***The Method in Action***

The core of the cross-entropy method is to throw away bad episodes and train on better ones. So, the steps of the method are as follows:
1. Play N number of episodes using our current model and environment.
2. Calculate the total reward for every episode and decide on a reward boundary. Usually, we use some percentile of all rewards, such as 50th or 70th.
3. Throw away all episodes with a reward below the boundary.
4. Train on the remaining "elite" episodes using observations as the input and issued actions as the desired output.
5. Repeat from step 1 until we become satisfied with the result.

In [5]:
import numpy as np
import gymnasium as gym
from dataclasses import dataclass
import typing as tt
from torch.utils.tensorboard.writer import SummaryWriter

import torch
import torch.nn as nn
import torch.optim as optim

In [6]:
HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70

In [7]:
class Net(nn.Module):
    def __init__(self, obs_size: int, hidden_size: int, n_actions: int):
        super(Net, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions)
        )

    def forward(self, x: torch.Tensor):
        return self.net(x)

In [8]:
@dataclass
class EpisodeStep:
    """Represents one single step that our agent makes in an episode."""
    observation: np.ndarray
    action: int

@dataclass
class Episode:
    """Single episode stored as total undiscounted reward and a collection of EpisodeStep."""
    reward: float
    steps: tt.List[EpisodeStep]

In [12]:
def iterate_batches(env: gym.Env, net: Net, batch_size: int) -> tt.Generator[tt.List[Episode], None, None]:
    batch = []
    episode_reward = 0.0
    episode_steps = []
    obs, _ = env.reset()
    sm = nn.Softmax(dim = 1) # Used to convert NN output to a probability distribution

    while True:
        obs_v = torch.tensor(obs, dtype = torch.float32)
        act_porbs_v = sm(net(obs_v.unsqueeze(0)))
        act_probs = act_porbs_v.data.numpy()[0]
        action = np.random.choice(len(act_probs), p = act_probs)
        next_obs, reward, is_done, is_trunc, _ = env.step(action)
        episode_reward += float(reward)
        step = EpisodeStep(observation=obs, action=action)
        episode_steps.append(step)
        if is_done or is_trunc:
            e = Episode(reward = episode_reward, steps = episode_steps)
            batch.append(e)
            episode_reward = 0.0
            episode_steps = []
            next_obs, _ = env.reset()
            if len(batch) == batch_size:
                yield batch
                batch = []
        obs = next_obs

In [15]:
def filter_batch(batch: tt.List[Episode], percentile: float) -> \
        tt.Tuple[torch.FloatTensor, torch.LongTensor, float, float]:
    rewards = list(map(lambda s: s.reward, batch))
    reward_bound = float(np.percentile(rewards, percentile))
    reward_mean = float(np.mean(rewards))

    train_obs: tt.List[np.ndarray] = []
    train_act: tt.List[int] = []
    for episode in batch:
        if episode.reward < reward_bound:
            continue
        train_obs.extend(map(lambda step: step.observation, episode.steps))
        train_act.extend(map(lambda step: step.action, episode.steps))

    train_obs_v = torch.FloatTensor(np.vstack(train_obs))
    train_act_v = torch.LongTensor(train_act)
    return train_obs_v, train_act_v, reward_bound, reward_mean

In [16]:
if __name__ == "__main__":
    env = gym.make("CartPole-v1")
    assert env.observation_space.shape is not None
    obs_size = env.observation_space.shape[0]
    assert isinstance(env.action_space, gym.spaces.Discrete)
    n_actions = int(env.action_space.n)

    net = Net(obs_size, HIDDEN_SIZE, n_actions)
    print(net)
    objective = nn.CrossEntropyLoss()
    optimizer = optim.Adam(params=net.parameters(), lr=0.01)
    writer = SummaryWriter(comment="-cartpole")

    for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):
        obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)
        optimizer.zero_grad()
        action_scores_v = net(obs_v)
        loss_v = objective(action_scores_v, acts_v)
        loss_v.backward()
        optimizer.step()
        print("%d: loss=%.3f, reward_mean=%.1f, rw_bound=%.1f" % (
            iter_no, loss_v.item(), reward_m, reward_b))
        writer.add_scalar("loss", loss_v.item(), iter_no)
        writer.add_scalar("reward_bound", reward_b, iter_no)
        writer.add_scalar("reward_mean", reward_m, iter_no)
        if reward_m > 475:
            print("Solved!")
            break
    writer.close()

Net(
  (net): Sequential(
    (0): Linear(in_features=4, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=2, bias=True)
  )
)
0: loss=0.701, reward_mean=19.0, rw_bound=21.0
1: loss=0.663, reward_mean=13.9, rw_bound=16.0
2: loss=0.606, reward_mean=15.7, rw_bound=17.5
3: loss=0.575, reward_mean=14.5, rw_bound=17.0
4: loss=0.558, reward_mean=16.1, rw_bound=17.5
5: loss=0.629, reward_mean=18.1, rw_bound=18.0
6: loss=0.584, reward_mean=20.4, rw_bound=20.5
7: loss=0.669, reward_mean=18.8, rw_bound=22.0
8: loss=0.644, reward_mean=20.9, rw_bound=24.5
9: loss=0.676, reward_mean=31.6, rw_bound=36.5
10: loss=0.648, reward_mean=36.9, rw_bound=43.0
11: loss=0.645, reward_mean=32.0, rw_bound=33.5
12: loss=0.627, reward_mean=37.8, rw_bound=40.0
13: loss=0.623, reward_mean=33.2, rw_bound=40.0
14: loss=0.615, reward_mean=42.9, rw_bound=42.5
15: loss=0.609, reward_mean=43.3, rw_bound=51.0
16: loss=0.591, reward_mean=45.8, rw_bound=53.0
17: loss=0.594, reward_mea

In [None]:
# REVIEW MATERIAL: 