# Recommender systems, offline RL, and more with Ray RLlib
## A hands-on tutorial for building environments, configuring RL algorithms, and running them at scale

<table>
<tr>
    <td> <img src="images/youtube.png" style="width: 230px;"/> </td>
    <td> <img src="images/dota2.jpg" style="width: 213px;"/> </td>
    <td> <img src="images/forklifts.jpg" style="width: 169px;"/> </td>
    <td> <img src="images/spotify.jpg" style="width: 254px;"/> </td>
    <td> <img src="images/robots.jpg" style="width: 252px;"/> </td>
</tr>
</table>


### Overview
“Recommender systems, offline RL, and more with Ray RLlib” is a hands-on tutorial for industry researchers, domain-experts, and ML-engineers, showcasing how ..

1) .. you can use RLlib to build a recommender system for your industry application

2) .. offline RL poses a solution in case you don't have a simulator of your problem environment at hand

We will further explore how to deploy one or more trained models to production using Ray Serve and how to use RLlib's bandit algorithms to select a best model from some set of candidates for that purpose.

During the live-coding phases, we will build a recommender system simulating environment with RLlib, chose and configure an RLlib algorithm, and experiment and tune hyperparameters with Ray Tune.

RLlib offers high scalability, a large list of algos to choose from (offline, model-based, model-free, etc..), support for TensorFlow and PyTorch, and a unified API for a variety of applications. This tutorial includes a brief introduction to provide an overview of concepts (e.g. why RL) before proceeding to RLlib (recommender system) environments, neural network models, offline RL, student exercises, Q/A, and more. All code will be provided as .py files in a GitHub repo.

### Intended Audience
* Python programmers who are interested in using RL to solve their specific industry decision making problems and who want to get started with RLlib.

### Prerequisites
* Some Python programming experience.
* Some familiarity with machine learning.
* *Helpful, but not required:* Experience in reinforcement learning and Ray.
* *Helpful, but not required:* Experience with TensorFlow or PyTorch.

### Requirements/Dependencies

To get this very notebook up and running on your local machine, you can follow these steps here:

Install conda (https://www.anaconda.com/products/individual)

Then ...

#### Quick `conda` setup instructions (Linux):
```
$ conda create -n rllib_tutorial python=3.9
$ conda activate rllib_tutorial
$ pip install recsim # Google's RecSim environment
$ pip install "ray[rllib]"
$ pip install “ray[default]”  # updates ray dashboard
$ conda install tensorflow  # <- either one works!
$ pip install torch  # <- either one works!
$ pip install jupyterlab
```

#### Quick `conda` setup instructions (Mac):
```
$ conda create -n rllib_tutorial python=3.9
$ conda activate rllib_tutorial
$ conda install grpcio  # extra install only on apple M1 mac
$ pip install recsim # Google's RecSim environment
$ pip install cmake "ray[rllib]"
$ pip install “ray[default]”  # updates ray dashboard
$ conda install tensorflow  # <- either one works!
$ pip install torch  # <- either one works!
$ pip install jupyterlab
```

#### Quick `conda` setup instructions (Win10):
```
$ conda create -n rllib_tutorial python=3.9
$ conda activate rllib_tutorial
$ pip install recsim # Google's RecSim environment
$ pip install "ray[rllib]"
$ pip install “ray[default]”  # updates ray dashboard
$ conda install tensorflow  # <- either one works!
$ pip install torch  # <- either one works!
$ pip install jupyterlab
$ conda install pywin32
```

### Opening these tutorial files:
```
$ git clone https://github.com/sven1977/rllib_tutorials
$ cd rllib_tutorials/rl_conference_2022
$ jupyter-lab
```

### Key Takeaways(TODO
* What is reinforcement learning and why RLlib?
* Core concepts of RLlib: Environments, Trainers, Policies, and Models.
* How to configure, hyperparameter-tune, and parallelize RLlib.
* RLlib debugging best practices.

### Tutorial Outline(TODO
1. RL and RLlib in a nutshell.
1. Defining an RLlib-ready recommender system emulator with google RecSim.
1. **Exercise No.1**: Environment loop.

(15min break)

1. Picking an algorithm and training our first RLlib Trainer.
1. Configurations and hyperparameters - Easy tuning with Ray Tune.
1. The "infinite laptop": Quick intro into how to use RLlib with Anyscale's product.
1. **Exercise No.2**: Run your own Ray RLlib+Tune experiment)

(15min break)

1. Deeper dive into RLlib's parallelization architecture.
1. Specifying different compute resources and parallelization options through our config.
1. "Hacking in": Using callbacks to customize the RL loop and generate our own metrics.
1. **Exercise No.3**: Write your own custom callback.
1. "Hacking in (part II)" - Debugging with RLlib and PyCharm.
1. Checking on the "infinite laptop" - Did RLlib learn to solve the problem?

### Other Recommended Readings
* [Reinforcement Learning with RLlib in the Unity Game Engine](https://medium.com/distributed-computing-with-ray/reinforcement-learning-with-rllib-in-the-unity-game-engine-1a98080a7c0d)

<img src="images/unity3d_blog_post.png" width=400>

* [Attention Nets and More with RLlib's Trajectory View API](https://medium.com/distributed-computing-with-ray/attention-nets-and-more-with-rllibs-trajectory-view-api-d326339a6e65)
* [Intro to RLlib: Example Environments](https://medium.com/distributed-computing-with-ray/intro-to-rllib-example-environments-3a113f532c70)

## The RL cycle

<img src="images/rl-cycle.png" width=800>

### Coding/defining our "problem" via an RL environment.

We will use the following (adversarial) multi-agent environment
throughout this tutorial to demonstrate a large fraction of RLlib's
APIs, features, and customization options.

<img src="images/environment.png" width=800>

### A word or two on Spaces:

Spaces are used in ML to describe what possible/valid values inputs and outputs of a neural network can have.

RL environments also use them to describe what their valid observations and actions are.

Spaces are usually defined by their shape (e.g. 84x84x3 RGB images) and datatype (e.g. uint8 for RGB values between 0 and 255).
However, spaces could also be composed of other spaces (see Tuple or Dict spaces) or could be simply discrete with n fixed possible values
(represented by integers). For example, in our game, where each agent can only go up/down/left/right, the action space would be `Discrete(4)`
(no datatype, no shape needs to be defined here). Our observation space will be `MultiDiscrete([n, m])`, where n is the position of the agent observing and m is the position of the opposing agent, so if agent1 starts in the upper left corner and agent2 starts in the bottom right corner, agent1's observation would be: `[0, 63]` (in an 8 x 8 grid) and agent2's observation would be `[63, 0]`.

<img src="images/spaces.png" width=800>

In [1]:
TODO

NameError: name 'TODO' is not defined

## Exercise No 1

<hr />

<img src="images/exercise1.png" width=400>

In the cell above, we performed a `reset()` and a single `step()` call. To walk through an entire episode, one would normally call `step()` repeatedly (with different actions) until the returned `done` dict has the "agent1" or "agent2" (or "__all__") key set to True. Your task is to write an "environment loop" that runs for exactly one episode using our `MultiAgentArena` class.

Follow these instructions here to get this done.

1. `reset` the already created (variable `env`) environment to get the first (initial) observation.
1. Enter an infinite while loop.
1. Compute the actions for "agent1" and "agent2" calling `DummyTrainer.compute_action([obs])` twice (once for each agent).
1. Put the results of the action computations into an action dict (`{"agent1": ..., "agent2": ...}`).
1. Pass this action dict into the env's `step()` method, just like it's done in the above cell (where we do a single `step()`).
1. Check the returned `dones` dict for True (yes, episode is terminated) and if True, break out of the loop.

**Good luck! :)**


In [None]:
class DummyTrainer:
    """Dummy Trainer class used in Exercise #1.

    Use its `compute_action` method to get a new action for one of the agents,
    given the agent's observation (a single discrete value encoding the field
    the agent is currently in).
    """

    def compute_action(self, single_agent_obs=None):
        # Returns a random action for a single agent.
        return np.random.randint(4)  # Discrete(4) -> return rand int between 0 and 3 (incl. 3).

dummy_trainer = DummyTrainer()
# Check, whether it's working.
for _ in range(3):
    # Get action for agent1 (providing agent1's and agent2's positions).
    print("action_agent1={}".format(dummy_trainer.compute_action(np.array([0, 99]))))

    # Get action for agent2 (providing agent2's and agent1's positions).
    print("action_agent2={}".format(dummy_trainer.compute_action(np.array([99, 0]))))

    print()

Write your solution code into this cell here:

In [None]:
# !LIVE CODING!

# Leave the following as-is. It'll help us with rendering the env in this very cell's output.
import time
from ipywidgets import Output
from IPython import display
import time

out = Output()
display.display(out)

with out:

    # Solution to Exercise #1:

    # Start coding here inside this `with`-block:
    # 1) Reset the env.
    
    # 2) Enter an infinite while loop (to step through the episode).

        # 3) Calculate both agents' actions individually, using dummy_trainer.compute_action([individual agent's obs])

        # 4) Compile the actions dict from both individual agents' actions.

        # 5) Send the actions dict to the env's `step()` method to receive: obs, rewards, dones, info dicts

        # 6) We'll do this together: Render the env.
        # Don't write any code here (skip directly to 7).
        out.clear_output(wait=True)
        time.sleep(0.08)
        env.render()

        # 7) Check, whether the episde is done, if yes, break out of the while loop.

# 8) Run it! :)

------------------
## 15 min break :)
------------------

### And now for something completely different:
#### Plugging in RLlib!

In [None]:
import numpy as np
import pprint
import ray

# Start a new instance of Ray (when running this tutorial locally) or
# connect to an already running one (when running this tutorial through Anyscale).

ray.init()  # Hear the engine humming? ;)

# In case you encounter the following error during our tutorial: `RuntimeError: Maybe you called ray.init twice by accident?`
# Try: `ray.shutdown() + ray.init()` or `ray.init(ignore_reinit_error=True)`

### Picking an RLlib algorithm - We'll use PPO throughout this tutorial (one-size-fits-all-kind-of-algo)

<img src="images/rllib_algos.png" width=800>

https://docs.ray.io/en/master/rllib-algorithms.html#available-algorithms-overview

In [None]:
# Import a Trainable (one of RLlib's built-in algorithms):
# We use the PPO algorithm here b/c its very flexible wrt its supported
# action spaces and model types and b/c it learns well almost any problem.
from ray.rllib.agents.ppo import PPOTrainer

# Specify a very simple config, defining our environment and some environment
# options (see environment.py).
config = {
    "env": MultiAgentArena,  # "my_env" <- if we previously have registered the env with `tune.register_env("[name]", lambda config: [returns env object])`.
    "env_config": {
        "config": {
            "width": 10,
            "height": 10,
            "ts": 100,
        },
    },

    # !PyTorch users!
    #"framework": "torch",  # If users have chosen to install torch instead of tf.

    "create_env_on_driver": True,
}
# Instantiate the Trainer object using above config.
rllib_trainer = PPOTrainer(config=config)
rllib_trainer

### Ready to train with RLlib's PPO algorithm

That's it, we are ready to train.
Calling `Trainer.train()` will execute a single "training iteration".

One iteration for most algos involves:

1) sampling from the environment(s)
2) using the sampled data (observations, actions taken, rewards) to update the policy model (neural network), such that it would pick better actions in the future, leading to higher rewards.

Let's try it out:

In [None]:
results = rllib_trainer.train()

# Delete the config from the results for clarity.
# Only the stats will remain, then.
del results["config"]
# Pretty print the stats.
pprint.pprint(results)

### Going from single policy (RLlib's default) to multi-policy:

So far, our experiment has been ill-configured, because both
agents, which should behave differently due to their different
tasks and reward functions, learn the same policy: the "default_policy",
which RLlib always provides if you don't configure anything else.
Remember that RLlib does not know at Trainer setup time, how many and which agents
the environment will "produce". Agent control (adding agents, removing them, terminating
episodes for agents) is entirely in the Env's hands.
Let's fix our single policy problem and introduce the "multiagent" API.

<img src="images/from_single_agent_to_multi_agent.png" width=800>

In order to turn on RLlib's multi-agent functionality, we need two things:

1. A policy mapping function, mapping agent IDs (e.g. a string like "agent1", produced by the environment in the returned observation/rewards/dones-dicts) to a policy ID (another string, e.g. "policy1", which is under our control).
1. A policies definition dict, mapping policy IDs (e.g. "policy1") to 4-tuples consisting of 1) policy class (None for using the default class), 2) observation space, 3) action space, and 4) config overrides (empty dict for no overrides and using the Trainer's main config dict).

Let's take a closer look:

In [None]:
# Define the policies definition dict:
# Each policy in there is defined by its ID (key) mapping to a 4-tuple (value):
# - Policy class (None for using the "default" class, e.g. PPOTFPolicy for PPO+tf or PPOTorchPolicy for PPO+torch).
# - obs-space (we get this directly from our already created env object).
# - act-space (we get this directly from our already created env object).
# - config-overrides dict (leave empty for using the Trainer's config as-is)
policies = {
    "policy1": (None, env.observation_space, env.action_space, {}),
    "policy2": (None, env.observation_space, env.action_space, {"lr": 0.0002}),
}
# Note that now we won't have a "default_policy" anymore, just "policy1" and "policy2".

# Define an agent->policy mapping function.
# Which agents (defined by the environment) use which policies (defined by us)?
# The mapping here is M (agents) -> N (policies), where M >= N.
def policy_mapping_fn(agent_id: str):
    # Make sure agent ID is valid.
    assert agent_id in ["agent1", "agent2"], f"ERROR: invalid agent ID {agent_id}!"
    # Map agent1 to policy1, and agent2 to policy2.
    return "policy1" if agent_id == "agent1" else "policy2"

# We could - if we wanted - specify, which policies should be learnt (by default, RLlib learns all).
# Non-learnt policies will be frozen and not updated:
# policies_to_train = ["policy1", "policy2"]

# Adding the above to our config.
config.update({
    "multiagent": {
        "policies": policies,
        "policy_mapping_fn": policy_mapping_fn,
        # We'll leave this empty: Means, we train both policy1 and policy2.
        # "policies_to_train": policies_to_train,
    },
})

pprint.pprint(config)
print()
print(f"agent1 is now mapped to {policy_mapping_fn('agent1')}")
print(f"agent2 is now mapped to {policy_mapping_fn('agent2')}")

In [None]:
# Recreate our Trainer (we cannot just change the config on-the-fly).
rllib_trainer.stop()

# Using our updated (now multiagent!) config dict.
rllib_trainer = PPOTrainer(config=config)
rllib_trainer

Now that we are setup correctly with two policies as per our "multiagent" config, let's call `train()` on the new Trainer several times (what about 10 times?).

In [None]:
# Run `train()` n times. Repeatedly call `train()` now to see rewards increase.
# Move on once you see (agent1 + agent2) episode rewards of 10.0 or more.
for _ in range(10):
    results = rllib_trainer.train()
    print(f"Iteration={rllib_trainer.iteration}: R(\"return\")={results['episode_reward_mean']}")

In [None]:
# Do another loop, but this time, we will print out each policies' individual rewards.
for _ in range(10):
    results = rllib_trainer.train()
    r1 = results['policy_reward_mean']['policy1']
    r2 = results['policy_reward_mean']['policy2']
    r = r1 + r2
    print(f"Iteration={rllib_trainer.iteration}: R(\"return\")={r} R1={r1} R2={r2}")

#### !OPTIONAL HACK! (<-- we will not do these during the tutorial, but feel free to try these cells by yourself)

Use the above solution of Exercise #1 and replace our `dummy_trainer` in that solution
with the now trained `rllib_trainer`. You should see a better performance of the two agents.

However, keep in mind that we are mostly training agent1 as we only trian a single policy and agent1
is the "easier" one to collect high rewards with.

#### !OPTIONAL HACK!

Feel free to play around with the following code in order to learn how RLlib - under the hood - calculates actions from the environment's observations using Policies and their model(s) inside our Trainer object):

In [None]:
# Let's actually "look inside" our Trainer to see what's in there.
from ray.rllib.utils.numpy import softmax

# To get to one of the policies inside the Trainer, use `Trainer.get_policy([policy ID])`:
policy = rllib_trainer.get_policy("policy1")
print(f"Our (only!) Policy right now is: {policy}")

# To get to the model inside any policy, do:
model = policy.model
#print(f"Our Policy's model is: {model}")

# Print out the policy's action and observation spaces.
print(f"Our Policy's observation space is: {policy.observation_space}")
print(f"Our Policy's action space is: {policy.action_space}")

# Produce a random obervation (B=1; batch of size 1).
obs = np.array([policy.observation_space.sample()])
# Alternatively for PyTorch:
#import torch
#obs = torch.from_numpy(obs)

# Get the action logits (as tf tensor).
# If you are using torch, you would get a torch tensor here.
logits, _ = model({"obs": obs})
logits

# Numpyize the tensor by running `logits` through the Policy's own tf.Session.
logits_np = policy.get_session().run(logits)
# For torch, you can simply do: `logits_np = logits.detach().cpu().numpy()`.

# Convert logits into action probabilities and remove the B=1.
action_probs = np.squeeze(softmax(logits_np))

# Sample an action, using the probabilities.
action = np.random.choice([0, 1, 2, 3], p=action_probs)

# Print out the action.
print(f"sampled action={action}")

### Saving and restoring a trained Trainer.
Currently, `rllib_trainer` is in an already trained state.
It holds optimized weights in its Policy's model that allow it to act
already somewhat smart in our environment when given an observation.

However, if we closed this notebook right now, all the effort would have been for nothing.
Let's therefore save the state of our trainer to disk for later!

In [None]:
# We use the `Trainer.save()` method to create a checkpoint.
checkpoint_file = rllib_trainer.save()
print(f"Trainer (at iteration {rllib_trainer.iteration} was saved in '{checkpoint_file}'!")

# Here is what a checkpoint directory contains:
print("The checkpoint directory contains the following files:")
import os
os.listdir(os.path.dirname(checkpoint_file))

### Restoring and evaluating a Trainer
In the following cell, we'll learn how to restore a saved Trainer from a checkpoint file.

We'll also evaluate a completely new Trainer (should act more or less randomly) vs an already trained one (the one we just restored from the created checkpoint file).

In [None]:
# Pretend, we wanted to pick up training from a previous run:
new_trainer = PPOTrainer(config=config)
# Evaluate the new trainer (this should yield random results).
results = new_trainer.evaluate()
print(f"Evaluating new trainer: R={results['evaluation']['episode_reward_mean']}")

# Restoring the trained state into the `new_trainer` object.
print(f"Before restoring: Trainer is at iteration={new_trainer.iteration}")
new_trainer.restore(checkpoint_file)
print(f"After restoring: Trainer is at iteration={new_trainer.iteration}")

# Evaluate again (this should yield results we saw after having trained our saved agent).
results = new_trainer.evaluate()
print(f"Evaluating restored trainer: R={results['evaluation']['episode_reward_mean']}")

In order to release all resources from a Trainer, you can use a Trainer's `stop()` method.
You should definitley run this cell as it frees resources that we'll need later in this tutorial, when we'll do parallel hyperparameter sweeps.

In [None]:
rllib_trainer.stop()
new_trainer.stop()

### Moving stuff to the professional level: RLlib in connection w/ Ray Tune

Running any experiments through Ray Tune is the recommended way of doing things with RLlib. If you look at our
<a href="https://github.com/ray-project/ray/tree/master/rllib/examples">examples scripts folder</a>, you will see that almost all of the scripts use Ray Tune to run the particular RLlib workload demonstrated in each script.

<img src="images/rllib_and_tune.png" width=400>

When setting up hyperparameter sweeps for Tune, we'll do this in our already familiar config dict.

So let's take a quick look at our PPO algo's default config to understand, which hyperparameters we may want to play around with:

In [None]:
# Configuration dicts and Ray Tune.
# Where are the default configuration dicts stored?

# PPO algorithm:
from ray.rllib.agents.ppo import DEFAULT_CONFIG as PPO_DEFAULT_CONFIG
print(f"PPO's default config is:")
pprint.pprint(PPO_DEFAULT_CONFIG)

# DQN algorithm:
#from ray.rllib.agents.dqn import DEFAULT_CONFIG as DQN_DEFAULT_CONFIG
#print(f"DQN's default config is:")
#pprint.pprint(DQN_DEFAULT_CONFIG)

# Common (all algorithms).
#from ray.rllib.agents.trainer import COMMON_CONFIG
#print(f"RLlib Trainer's default config is:")
#pprint.pprint(COMMON_CONFIG)

### Let's do a very simple grid-search over two learning rates with tune.run().

In particular, we will try the learning rates 0.00005 and 0.5 using `tune.grid_search([...])`
inside our config dict:

In [None]:
# Plugging in Ray Tune.
# Note that this is the recommended way to run any experiments with RLlib.
# Reasons:
# - Tune allows you to do hyperparameter tuning in a user-friendly way
#   and at large scale!
# - Tune automatically allocates needed resources for the different
#   hyperparam trials and experiment runs on a cluster.

from ray import tune

# Running stuff with tune, we can re-use the exact
# same config that we used when working with RLlib directly!
tune_config = config.copy()

# Let's add our first hyperparameter search via our config.
# How about we try two different learning rates? Let's say 0.00005 and 0.5 (ouch!).
tune_config["lr"] = tune.grid_search([0.0001, 0.5])  # <- 0.5? again: ouch!
tune_config["train_batch_size"] = tune.grid_search([3000, 4000])

# Now that we will run things "automatically" through tune, we have to
# define one or more stopping criteria.
# Tune will stop the run, once any single one of the criteria is matched (not all of them!).
stop = {
    # Note that the keys used here can be anything present in the above `rllib_trainer.train()` output dict.
    "training_iteration": 5,
    "episode_reward_mean": 20.0,
}

# "PPO" is a registered name that points to RLlib's PPOTrainer.
# See `ray/rllib/agents/registry.py`

# Run a simple experiment until one of the stopping criteria is met.
tune.run(
    "PPO",
    config=tune_config,
    stop=stop,

    # Note that no trainers will be returned from this call here.
    # Tune will create n Trainers internally, run them in parallel and destroy them at the end.
    # However, you can ...
    checkpoint_at_end=True,  # ... create a checkpoint when done.
    checkpoint_freq=10,  # ... create a checkpoint every 10 training iterations.
)

### Why did we use 6 CPUs in the tune run above (3 CPUs per trial)?

PPO - by default - uses 2 "rollout" workers (`num_workers=2`). These are Ray Actors that have their own environment copy(ies) and step through those in parallel. On top of these two "rollout" workers, every Trainer in RLlib always also has a "local" worker, which - in case of PPO - handles the learning updates. This gives us 3 workers (2 rollout + 1 local learner), which require 3 CPUs.

## Exercise No 2

<hr />

Using the `tune_config` that we have built so far, let's run another `tune.run()`, but apply the following changes to our setup this time:
- Setup only 1 learning rate under the "lr" config key. Chose the (seemingly) best value from the run in the previous cell (the one that yielded the highest avg. reward).
- Setup only 1 train batch size under the "train_batch_size" config key. Chose the (seemingly) best value from the run in the previous cell (the one that yielded the highest avg. reward).
- Set `num_workers` to 5, which will allow us to run more environment "rollouts" in parallel and to collect training batches more quickly.
- Set the `num_envs_per_worker` config parameter to 5. This will clone our env on each rollout worker, and thus parallelize action computing forward passes through our neural networks.

Other than that, use the exact same args as in our `tune.run()` call in the previous cell.

**Good luck! :)**


In [None]:
# !LIVE CODING!

# Solution to Exercise #2

# Run for longer this time (100 iterations) and try to reach 40.0 reward (sum of both agents).
stop = {
    "training_iteration": 180,  # we have the 15min break now to run this many iterations
    "episode_reward_mean": 60.0,  # sum of both agents' rewards. Probably won't reach it, but we should try nevertheless :)
}

# tune_config.update({
# ???
# })

# analysis = tune.run(...)

------------------
## 15 min break :)
------------------


(while the above experiment is running (and hopefully learning))


## How do we extract any checkpoint from a trial of a tune.run?

In [None]:
# The previous tune.run (the one we did before the exercise) returned an Analysis object, from which we can access any checkpoint
# (given we set checkpoint_freq or checkpoint_at_end to reasonable values) like so:
print(analysis)
# Get all trials (we only have one).
trials = analysis.trials
# Assuming, the first trial was the best, we'd like to extract this trial's best checkpoint "":
best_checkpoint = analysis.get_best_checkpoint(trial=trials[0], metric="episode_reward_mean", mode="max")
print(f"Found best checkpoint for trial #2: {best_checkpoint}")

# Undo the grid-search config, which RLlib doesn't understand.
rllib_config = tune_config.copy()
rllib_config["lr"] = 0.00005
rllib_config["train_batch_size"] = 4000

# Restore a RLlib Trainer from the checkpoint.
new_trainer = PPOTrainer(config=rllib_config)
new_trainer.restore(best_checkpoint)
new_trainer

In [None]:
out = Output()
display.display(out)

with out:
    obs = env.reset()
    while True:
        a1 = new_trainer.compute_action(obs["agent1"], policy_id="policy1")
        a2 = new_trainer.compute_action(obs["agent2"], policy_id="policy2")
        actions = {"agent1": a1, "agent2": a2}
        obs, rewards, dones, _ = env.step(actions)

        out.clear_output(wait=True)
        env.render()
        time.sleep(0.07)

        if dones["agent1"] is True:
            break


## Let's talk about customization options

### Deep Dive: How do we customize RLlib's RL loop?

RLlib offers a callbacks API that allows you to add custom behavior to
all major events during the environment sampling- and learning process.

**Our problem:** So far, we can only see standard stats, such as rewards, episode lengths, etc..
This does not give us enough insights sometimes into important questions, such as: How many times
have both agents collided? or How many times has agent1 discovered a new field?

In the following cell, we will create custom callback "hooks" that will allow us to
add these stats to the returned metrics dict, and which will therefore be displayed in tensorboard!

For that we will override RLlib's DefaultCallbacks class and implement the
`on_episode_start`, `on_episode_step`, and `on_episode_end` methods therein:


In [None]:
# Override the DefaultCallbacks with your own and implement any methods (hooks)
# that you need.
from ray.rllib.agents.callbacks import DefaultCallbacks
from ray.rllib.evaluation.episode import MultiAgentEpisode


class MyCallbacks(DefaultCallbacks):
    def on_episode_start(self,
                         *,
                         worker,
                         base_env,
                         policies,
                         episode: MultiAgentEpisode,
                         env_index,
                         **kwargs):
        # We will use the `MultiAgentEpisode` object being passed into
        # all episode-related callbacks. It comes with a user_data property (dict),
        # which we can write arbitrary data into.

        # At the end of an episode, we'll transfer that data into the `hist_data`, and `custom_metrics`
        # properties to make sure our custom data is displayed in TensorBoard.

        # The episode is starting:
        # Set per-episode object to capture, which states (observations)
        # have been visited by agent1.
        episode.user_data["new_fields_discovered"] = 0
        # Set per-episode agent2-blocks counter (how many times has agent2 blocked agent1?).
        episode.user_data["num_collisions"] = 0

    def on_episode_step(self,
                        *,
                        worker,
                        base_env,
                        episode: MultiAgentEpisode,
                        env_index,
                        **kwargs):
        # Get both rewards.
        ag1_r = episode.prev_reward_for("agent1")
        ag2_r = episode.prev_reward_for("agent2")

        # Agent1 discovered a new field.
        if ag1_r == 1.0:
            episode.user_data["new_fields_discovered"] += 1
        # Collision.
        elif ag2_r == 1.0:
            episode.user_data["num_collisions"] += 1

    def on_episode_end(self,
                       *,
                       worker,
                       base_env,
                       policies,
                       episode: MultiAgentEpisode,
                       env_index,
                       **kwargs):
        # Episode is done:
        # Write scalar values (sum over rewards) to `custom_metrics` and
        # time-series data (rewards per time step) to `hist_data`.
        # Both will be visible then in TensorBoard.
        episode.custom_metrics["new_fields_discovered"] = episode.user_data["new_fields_discovered"]
        episode.custom_metrics["num_collisions"] = episode.user_data["num_collisions"]


In [None]:
# Setting up our config to point to our new custom callbacks class:
config = {
    "env": MultiAgentArena,
    "callbacks": MyCallbacks,  # by default, this would point to `rllib.agents.callbacks.DefaultCallbacks`, which does nothing.
    "num_workers": 5,  # we know now: this speeds up things!
}

tune.run(
    "PPO",
    config=config,
    stop={"training_iteration": 20},
    checkpoint_at_end=True,
    # If you'd like to restore the tune run from an existing checkpoint file, you can do the following:
    #restore="/Users/sven/ray_results/PPO/PPO_MultiAgentArena_fd451_00000_0_2021-05-25_15-13-26/checkpoint_000010/checkpoint-10",
)

### Let's check tensorboard for the new custom metrics!

1. Head over to the Anyscale project view and click on the "TensorBoard" butten:

<img src="images/tensorboard_button.png" width=1000>

Alternatively - if you ran this locally on your own machine:

1. Head over to ~/ray_results/PPO/PPO_MultiAgentArena_[some key]_00000_0_[date]_[time]/
1. In that directory, you should see a `event.out....` file.
1. Run `tensorboard --logdir .` and head to https://localhost:6006

<img src="images/tensorboard.png" width=800>


### Deep Dive: Writing custom Models in tf or torch.

In [None]:
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.utils.framework import try_import_tf, try_import_torch

tf1, tf, tf_version = try_import_tf()
torch, nn = try_import_torch()


# Custom Neural Network Models.
class MyKerasModel(TFModelV2):
    """Custom model for policy gradient algorithms."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        """Build a simple [16, 16]-MLP (+ value branch)."""
        super(MyKerasModel, self).__init__(obs_space, action_space,
                                           num_outputs, model_config, name)
        
        # Keras Input layer.
        self.inputs = tf.keras.layers.Input(
            shape=obs_space.shape, name="observations")

        # Hidden layer (shared by action logits outputs and value output).
        layer_1 = tf.keras.layers.Dense(
            16,
            name="layer1",
            activation=tf.nn.relu)(self.inputs)
        
        # Action logits output.
        logits = tf.keras.layers.Dense(
            num_outputs,
            name="out",
            activation=None)(layer_1)

        # "Value"-branch (single node output).
        # Used by several RLlib algorithms (e.g. PPO) to calculate an observation's value.
        value_out = tf.keras.layers.Dense(
            1,
            name="value",
            activation=None)(layer_1)

        # The actual Keras model:
        self.base_model = tf.keras.Model(self.inputs,
                                         [logits, value_out])

    def forward(self, input_dict, state, seq_lens):
        """Custom-define your forard pass logic here."""
        # Pass inputs through our 2 layers and calculate the "value"
        # of the observation and store it for when `value_function` is called.
        logits, self.cur_value = self.base_model(input_dict["obs"])
        return logits, state

    def value_function(self):
        """Implement the value branch forward pass logic here:
        
        We will just return the already calculated `self.cur_value`.
        """
        assert self.cur_value is not None, "Must call `forward()` first!"
        return tf.reshape(self.cur_value, [-1])


class MyTorchModel(TorchModelV2, nn.Module):
    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        """Build a simple [16, 16]-MLP (+ value branch)."""
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                              model_config, name)
        nn.Module.__init__(self)

        self.device = torch.device("cuda"
                                   if torch.cuda.is_available() else "cpu")

        # Hidden layer (shared by action logits outputs and value output).
        self.layer_1 = nn.Linear(obs_space.shape[0], 16).to(self.device)

        # Action logits output.
        self.layer_out = nn.Linear(16, num_outputs).to(self.device)

        # "Value"-branch (single node output).
        # Used by several RLlib algorithms (e.g. PPO) to calculate an observation's value.
        self.value_branch = nn.Linear(16, 1).to(self.device)
        self.cur_value = None

    def forward(self, input_dict, state, seq_lens):
        """Custom-define your forard pass logic here."""
        # Pass inputs through our 2 layers.
        layer_1_out = self.layer_1(input_dict["obs"])
        logits = self.layer_out(layer_1_out)

        # Calculate the "value" of the observation and store it for
        # when `value_function` is called.
        self.cur_value = self.value_branch(layer_1_out).squeeze(1)

        return logits, state

    def value_function(self):
        """Implement the value branch forward pass logic here:
        
        We will just return the already calculated `self.cur_value`.
        """
        assert self.cur_value is not None, "Must call `forward()` first!"
        return self.cur_value


In [None]:
# Do a quick test on the custom model classes.
test_model_tf = MyKerasModel(
    obs_space=gym.spaces.Box(-1.0, 1.0, (2, )),
    action_space=None,
    num_outputs=2,
    model_config={},
    name="MyModel",
)

print("TF-output={}".format(test_model_tf({"obs": np.array([[0.5, 0.5]])})))

# For PyTorch, you can do:
#test_model_torch = MyTorchModel(
#    obs_space=gym.spaces.Box(-1.0, 1.0, (2, )),
#    action_space=None,
#    num_outputs=2,
#    model_config={},
#    name="MyModel",
#)
#print("Torch-output={}".format(test_model_torch({"obs": torch.from_numpy(np.array([[0.5, 0.5]], dtype=np.float32))})))


In [None]:
# Set up our custom model and re-run the experiment.
config.update({
    "model": {
        "custom_model": MyKerasModel,  # for torch users: "custom_model": MyTorchModel
        "custom_model_config": {
            #"layers": [128, 128],
        },
    },
})

tune.run(
    "PPO",
    config=config,  # for torch users: config=dict(config, **{"framework": "torch"}),
    stop={
        "training_iteration": 5,
    },
)


### Deep Dive: A closer look at RLlib's components
#### (Depending on time left and amount of questions having been accumulated :)

We already took a quick look inside an RLlib Trainer object and extracted its Policy(ies) and the Policy's model (neural network). Here is a much more detailed overview of what's inside a Trainer object.

At the core is the so-called `WorkerSet` sitting under `Trainer.workers`. A WorkerSet is a group of `RolloutWorker` (`rllib.evaluation.rollout_worker.py`) objects that always consists of a "local worker" (`Trainer.workers.local_worker()`) and n "remote workers" (`Trainer.workers.remote_workers()`).



<img src="images/rllib_structure.png" width=1000>

### Scaling RLlib

Scaling RLlib works by parallelizing the "jobs" that the remote `RolloutWorkers` do. In a vanilla RL algorithm, like PPO, DQN, and many others, the `@ray.remote` labeled RolloutWorkers in the figure above are responsible for interacting with one or more environments and thereby collecting experiences. Observations are produced by the environment, actions are then computed by the Policy(ies) copy located on the remote worker and sent to the environment in order to produce yet another observation. This cycle is repeated endlessly and only sometimes interrupted to send experience batches ("train batches") of a certain size to the "local worker". There these batches are used to call `Policy.learn_on_batch()`, which performs a loss calculation, followed by a model weights update, and a subsequent weights broadcast back to all the remote workers.



## Time for Q&A

...

## Thank you for listening and participating!

### Here are a couple of links that you may find useful.

- The <a href="https://github.com/sven1977/rllib_tutorials.git">github repo of this tutorial</a>.
- <a href="https://docs.ray.io/en/master/rllib.html">RLlib's documentation main page</a>.
- <a href="http://discuss.ray.io">Our discourse forum</a> to ask questions on Ray and its libraries.
- Our <a href="https://forms.gle/9TSdDYUgxYs8SA9e8">Slack channel</a> for interacting with other Ray RLlib users.
- The <a href="https://github.com/ray-project/ray/blob/master/rllib/examples/">RLlib examples scripts folder</a> with tons of examples on how to do different stuff with RLlib.
- A <a href="https://medium.com/distributed-computing-with-ray/reinforcement-learning-with-rllib-in-the-unity-game-engine-1a98080a7c0d">blog post on training with RLlib inside a Unity3D environment</a>.
