# Ray RLlib - RecSys: Recommender System

© 2019-2022, Anyscale. All Rights Reserved

![Anyscale Academy](../../images/AnyscaleAcademyLogo.png)

This section explores one approach for using *reinforcement learning* with [Ray RLlib](https://rllib.io/) to build a [*recommender system*](https://en.wikipedia.org/wiki/Recommender_system).

For reference, the GitHub public repo for this code is available at <https://github.com/anyscale/academy/blob/main/ray-rllib/recsys> and full source code for this example recommender system is also in the `recsys.py` script. You can run that with default settings to exercise the code:

```shell
python recsys.py
```

To see the available command line options use:

```shell
python recsys.py --help
```

A full run takes about 5-10 minutes on a recent model MacBook Pro laptop.

## Load the Training Data

The approach given here for building a recommender system could be applied to just about any dataset where *users* are *rating* a set of *items*. It's bounded in terms of memory requirements as the number of users grows. It could be re-engineered to handle a very large set of items.

We'll be using the [Jester collaborative filtering dataset](https://goldberg.berkeley.edu/jester-data/) – which is known for having a high *density*, i.e., where many users have rated many of the items: https://goldberg.berkeley.edu/jester-data/.

Jester was an online joke recommender hosted a UC Berkeley which collected data from April 1999 through May 2003. See the discussion of "universal queries" in:

> "Eigentaste: A Constant Time Collaborative Filtering Algorithm"  
Ken Goldberg, Theresa Roeder, Dhruv Gupta, Chris Perkins  
*Information Retrieval*, 4(2), 133-151 (July 2001)  
<https://goldberg.berkeley.edu/pubs/eigentaste.pdf>

The data is split into three downloadable files, and the first file contains anonymous ratings from 24,983 users who have rated 36 or more jokes.

Ratings data is organized as a matrix with dimensions `24983 X 101`

  * one row per user
  * first column gives the number of jokes rated by that user
  * the next 100 columns give the ratings for jokes `01` through `100`
  * ratings are real values ranging from `-10.00` to `+10.00`
  * the value `"99"` corresponds to `None` = "not rated"
  
Here's a function to load the dataset:

Now for the Gym environment. This is one class and should be defined within one cell – albeit this is a long cell to scroll through…

In [9]:
#!pip install compiler_gym 'ray[default,rllib]' &>/dev/null || echo "Install failed!"

import compiler_gym
import ray

from ray.rllib.agents.ppo import PPOTrainer
from compiler_gym.wrappers import ConstrainedCommandline, TimeLimit
from ray import tune
from itertools import islice
from compiler_gym.wrappers import CycleOverBenchmarks
from compiler_gym.util.registration import register

from loop_tool_service.service_py.datasets import loop_tool_dataset
from loop_tool_service.service_py.rewards import flops_loop_nest_reward, flops_reward, runtime_reward
import sys
import os

os.environ.setdefault("LOOP_TOOL_ROOT", "/home/dejang/loop_tool_env")

'/home/dejang/loop_tool_env'

In [10]:
import loop_tool_service

In [11]:
import loop_tool_service

def register_env():
    register(
        id="loop_tool_env-v0",
        # entry_point=loop_tool_service.LoopToolCompilerEnv,
        entry_point="compiler_gym.service.client_service_compiler_env:ClientServiceCompilerEnv",
        kwargs={
            "service": loop_tool_service.paths.LOOP_TOOL_SERVICE_PY,
            "rewards": [ flops_loop_nest_reward.RewardTensor(),
            ],
            "datasets": [
                loop_tool_dataset.Dataset()
            ],
        },
    )

# register_env()

In [12]:
def make_env() -> compiler_gym.envs.CompilerEnv:
    """Make the reinforcement learning environment for this experiment."""
    
    env = loop_tool_service.make(
        "loop_tool_env-v0",
        observation_space="ir_tensor",
        reward_space="flops_loop_nest_tensor",
        # reward_space="runtime",
    )

    env = TimeLimit(env, max_episode_steps=10)
    return env

In [13]:
with make_env() as env:
    print("Action space:", env.action_space)
    print("Observation space:", env.observation_space)
    print("Reward space:", env.reward_space)

Action space: NamedDiscrete([up, down, swap_up, swap_down])
Observation space: Box([[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
  0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]], [[256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256.
  256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256.
  256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256.
  256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256. 256.
  256. 256. 256. 256.]], (1, 60), float32)
Reward space: flops_loop_nest_tensor


In [14]:
with make_env() as env:
    # The two datasets we will be using:
    lt_dataset = env.datasets["loop_tool_simple-v0"]
    # train_benchmarks = list(islice(lt_dataset.benchmarks(), 1))
    # test_benchmarks = list(islice(lt_dataset.benchmarks(), 2))
    
    bench = ["benchmark://loop_tool_simple-v0/simple"]
            #  "benchmark://loop_tool_simple-v0/mm128", 
            #  "benchmark://loop_tool_simple-v0/mm"] 

    train_benchmarks = bench
    test_benchmarks = bench

print("Number of benchmarks for training:", len(train_benchmarks))
print("Number of benchmarks for testing:", len(test_benchmarks))


Number of benchmarks for training: 1
Number of benchmarks for testing: 1


In [15]:
def make_training_env(*args) -> compiler_gym.envs.CompilerEnv:
    """Make a reinforcement learning environment that cycles over the
    set of training benchmarks in use.
    """
    del args  # Unused env_config argument passed by ray
    return CycleOverBenchmarks(make_env(), train_benchmarks)


In [16]:
with make_training_env() as env:
    env.reset()
    print(env.benchmark)
    env.reset()
    print(env.benchmark)

E0626 13:08:05.574080 140109369882176 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130804-787558-2c1d

E0626 13:08:05.682326 140109369882176 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130804-787558-2c1d



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

benchmark://loop_tool_simple-v0/simple
for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

benchmark://loop_tool_simple-v0/simple


In [17]:
# if ray.is_initialized():
#     ray.shutdown()
# ray.init(include_dashboard=False, ignore_reinit_error=True)

tune.register_env("compiler_gym", make_training_env)

The `JokeRec.step()` method contains the heart of the simulation logic here. It determines the item to be recommend based on the input `action`, then determines the `reward` and the agent's updated vector distance to the cluster centers as the `observation space`.

Overall, this approach is relatively well-behaved and bounded for its memory use as the number of items grows. We're keeping most of the data in memory, but could readily use a distributed key/value store for selecting items, maintaining user history, etc.

## Configuration

Now we'll use the results of K-means clustering on the data sample to prepare a configuration for our custom environment.

We'll use [*proximal policy optimization*](https://docs.ray.io/en/latest/rllib-algorithms.html?highlight=ppo#proximal-policy-optimization-ppo) (PPO) for training a policy in RLlib, taking the default PPO configuration as the foundation here.

Now let's create an instance of our custom Gym environment and call the `reset()` method, which will initialize an episode (simulating one user's ratings), run a "warm start", then return the initial observation:

In [18]:
env = make_training_env()
env.reset()

for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  



E0626 13:08:33.464604 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



array([[  1.,   0.,   0.,   0., 128.,   0.,   0.,   0.,   0.,   2., 128.,
          0.,   0.,   0.,   0.,   1., 128.,   0.,   0.,   0.,   0.,   0.,
          0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
          0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
          0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
          0.,   0.,   0.,   0.,   0.]], dtype=float32)

We can randomly select an action, i.e., the label of an item cluster to recommend:

In [19]:
action = env.action_space.sample()
print("action:", action)

action: 3


Let's use that action to take one step:

In [20]:
state, reward, done, info = env.step(action)
print("obs:", state)
print("reward:", reward)

Action = swap_down
for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  1
obs: [[  0.   0.   0.   2. 128.   0.   1.   0.   0.   0. 128.   0.   0.   0.
    0.   1. 128.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
    0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
    0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
    0.   0.   0.   0.]]
reward: -0.03406330943607757


We can call the `render()` method to describe more about the agent's state within its environment:

In [21]:
env.render()

[[  0.   0.   0.   2. 128.   0.   1.   0.   0.   0. 128.   0.   0.   0.
    0.   1. 128.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
    0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
    0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
    0.   0.   0.   0.]]


Notice how there are already some other "used" items, due to the warm start.

Next, we'll define a function that runs through one entire episode:

In [22]:
def run_one_episode (env, naive=False, verbose=False):
    """
    step through one episode, using either a naive strategy or random actions
    """
    env.reset()
    sum_reward = 0

    action = None
    avoid_actions = set([])
    depleted = 0

    for i in range(5):
        if not naive or not action:
            action = env.action_space.sample()

        state, reward, done, info = env.step(action)

        sum_reward += reward

     
    return sum_reward

Next, another function which uses `run_one_episode()` to measure the baseline performance of a "naïve" strategy, i.e., without use of reinforcement learning to train a policy:

In [23]:
from tqdm import tqdm

def measure_baseline (env, n_iter=1, naive=False, verbose=False):
    history = []

    for episode in tqdm(range(n_iter), ascii=True, desc="measure baseline"):
        sum_reward = run_one_episode(env, naive=naive, verbose=verbose)
        history.append(sum_reward)

    baseline = sum(history) / len(history)
    return baseline

Now let's use this to measure how well our recommender system runs without leveraging RLlib:

In [24]:
baseline = measure_baseline(env, n_iter=10, naive=True)
print("BASELINE CUMULATIVE REWARD", round(baseline, 3))

measure baseline:   0%|          | 0/10 [00:00<?, ?it/s]E0626 13:14:19.277870 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  



measure baseline:  10%|#         | 1/10 [00:03<00:27,  3.07s/it]E0626 13:14:22.349595 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  



measure baseline:  20%|##        | 2/10 [00:06<00:24,  3.08s/it]E0626 13:14:25.434799 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

Action = swap_down
for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  1
Action = swap_down
for n_5625 in 128 : L0  
 for m_5586 in 128 : L1  <<<<<< cursor (line 1 )
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  2


measure baseline:  30%|###       | 3/10 [00:09<00:23,  3.29s/it]E0626 13:14:28.979576 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

Action = down
for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  1
Action = down
for m_5586 in 128 : L0  
 for n_5625 in 128 : L1  <<<<<< cursor (line 1 )
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  2


measure baseline:  40%|####      | 4/10 [00:13<00:19,  3.31s/it]E0626 13:14:32.310877 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  



measure baseline:  50%|#####     | 5/10 [00:16<00:16,  3.24s/it]E0626 13:14:35.434668 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  



measure baseline:  60%|######    | 6/10 [00:19<00:12,  3.19s/it]E0626 13:14:38.536162 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  



measure baseline:  70%|#######   | 7/10 [00:22<00:09,  3.18s/it]E0626 13:14:41.694870 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

Action = swap_down
for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  1
Action = swap_down
for n_5625 in 128 : L0  
 for m_5586 in 128 : L1  <<<<<< cursor (line 1 )
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  2


measure baseline:  80%|########  | 8/10 [00:25<00:06,  3.28s/it]E0626 13:14:45.197010 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

Action = down
for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  1
Action = down
for m_5586 in 128 : L0  
 for n_5625 in 128 : L1  <<<<<< cursor (line 1 )
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  2


measure baseline:  90%|######### | 9/10 [00:29<00:03,  3.28s/it]E0626 13:14:48.452394 140161385207360 example_service.py:243] CRITICAL - 

Working_dir = /dev/shm/compiler_gym_dejang/s/0626T130832-679334-be24



for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

Action = down
for m_5586 in 128 : L0  <<<<<< cursor (line 0 )
 for n_5625 in 128 : L1  
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  1
Action = down
for m_5586 in 128 : L0  
 for n_5625 in 128 : L1  <<<<<< cursor (line 1 )
  for k_5587 in 128 : L2  
   %2[m_5586, k_5587, n_5625] <- multiply(%0, %1)  
   %3[m_5586, n_5625] <- add(%2)  
  %4[m_5586, n_5625] <- write(%3)  

>>> AGENT ITERATION =  2


measure baseline: 100%|##########| 10/10 [00:32<00:00,  3.25s/it]

BASELINE CUMULATIVE REWARD -0.141





## Running Ray and RLlib

At this point we're ready to train a policy with RLlib. First we'll initialize the directory in which to save *checkpoints*, and the directory in which to log results…

In [25]:
import os
import shutil

CHECKPOINT_ROOT = "tmp/rec"
shutil.rmtree(CHECKPOINT_ROOT, ignore_errors=True, onerror=None)

ray_results = "{}/ray_results/".format(os.getenv("HOME"))
shutil.rmtree(ray_results, ignore_errors=True, onerror=None)

Now start Ray, register our custom environment, and create an agent. BTW, if you see lots of "deprecation" warnings from [Tensorflow](https://www.tensorflow.org/) just ignore those…

In [26]:
info = ray.init(ignore_reinit_error=True)

2022-06-26 13:15:22,614	INFO services.py:1470 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8266[39m[22m


Register our environment so we can reference it by name. Then create a `PPOTrainer`.

In [27]:
import ray.rllib.agents.ppo as ppo
from ray.tune.registry import register_env

CONFIG = ppo.DEFAULT_CONFIG.copy()

CONFIG["log_level"] = "WARN"
CONFIG["num_workers"] = 3    # set to `0` for debug
CONFIG["horizon"] = 5


env_key = "compiler_gym"
tune.register_env("compiler_gym", make_training_env)
AGENT = ppo.PPOTrainer(CONFIG, env=env_key)

2022-06-26 13:15:40,560	INFO trainer.py:2332 -- Your framework setting is 'tf', meaning you are using static-graph mode. Set framework='tf2' to enable eager execution with tf2.x. You may also then want to set eager_tracing=True in order to reach similar execution speed as with static-graph mode.
2022-06-26 13:15:40,564	INFO ppo.py:414 -- In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn't work for you.
2022-06-26 13:15:40,564	INFO trainer.py:903 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
[2m[36m(RolloutWorker pid=1382142)[0m 2022-06-26 13:15:45,362	ERROR worker.py:451 -- Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::RolloutWorker.__init__()[39m (pid=1382142, ip=100.37.253.28, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7f7a464968b0>)


RayActorError: The actor died because of an error raised in its creation task, [36mray::RolloutWorker.__init__()[39m (pid=1382142, ip=100.37.253.28, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7f7a464968b0>)
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 506, in __init__
    self.env = env_creator(copy.deepcopy(self.env_context))
  File "/tmp/ipykernel_1378997/3916046868.py", line 6, in make_training_env
  File "/tmp/ipykernel_1378997/2322847629.py", line 4, in make_env
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/loop_tool_service-0.2.3-py3.8.egg/loop_tool_service/__init__.py", line 252, in make
    return compiler_gym.make(id, **kwargs)
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/compiler_gym/util/registration.py", line 16, in make
    return gym.make(id, **kwargs)
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/gym/envs/registration.py", line 200, in make
    return registry.make(id, **kwargs)
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/gym/envs/registration.py", line 105, in make
    env = spec.make(**kwargs)
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/gym/envs/registration.py", line 75, in make
    env = cls(**_kwargs)
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/compiler_gym/service/client_service_compiler_env.py", line 313, in __init__
    self.reward_space = reward_space
  File "/home/dejang/anaconda3/envs/compiler_gym/lib/python3.8/site-packages/compiler_gym/service/client_service_compiler_env.py", line 479, in reward_space
    raise LookupError(f"Reward space not found: {reward_space}")
LookupError: Reward space not found: flops_loop_nest_tensor

Let's train a policy using the PPO optimizer in RLlib. As the following code steps through each training iteration, watch how the measured improvements in the min, mean, and max rewards per episode.

In [None]:
import pandas as pd 
TRAIN_ITER = 2

df = pd.DataFrame(columns=[ "min_reward", "avg_reward", "max_reward", "steps", "checkpoint"])
status = "reward {:6.2f} {:6.2f} {:6.2f}  len {:4.2f}  saved {}"

for i in tqdm(range(TRAIN_ITER)):
    result = AGENT.train()
    breakpoint()
    checkpoint_file = AGENT.save(CHECKPOINT_ROOT)

    row = [
        result["episode_reward_min"],
        result["episode_reward_mean"],
        result["episode_reward_max"],
        result["episode_len_mean"],
        checkpoint_file,
        ]

    df.loc[len(df)] = row
    print(status.format(*row))
    
BEST_CHECKPOINT = checkpoint_file

## Rollout to Emulate a Use Case Deployment

Now let's define a function to run a *rollout* using a checkpointed policy.
Each rollout iteration will emuluate a deployed use of our recommender system for one user, and we'll measure the average rewards across many iterations:

In [None]:
def run_rollout (agent, env, n_iter=1, verbose=False):
    """
    iterate through `n_iter` episodes in a rollout to emulate deployment in a production use case
    """
    for episode in range(n_iter):
        state = env.reset()
        sum_reward = 0

        for step in range(MAX_STEPS):
            try:
                action = agent.compute_action(state)
                state, reward, done, info = env.step(action)
                sum_reward += reward

                if verbose:
                    print("reward {:6.3f}  sum {:6.3f}".format(reward, sum_reward))
                    env.render()
            except Exception:
                traceback.print_exc()

            if done:
                # report at the end of each episode
                print("CUMULATIVE REWARD:", round(sum_reward, 3), "\n")
                yield sum_reward
                break

Now we can apply the best trained policy in a rollout:

In [None]:
AGENT.restore(BEST_CHECKPOINT)
history = []

for episode_reward in run_rollout(AGENT, env, n_iter=500, verbose=False):
    history.append(episode_reward)
    
print("average reward:", round(sum(history) / len(history), 3))

How does the reported *average reward* from many rollouts (using RLlib to train a policy) compare with the *baseline cumulative reward* above based on a naïve strategy and no learning?  How does it compare with the predicted *mean reward per episode* from training? 

The baseline reward from a naïve strategy should be much lower (worse user ratings) than the other two measures.

These measures are an estimate for how a user would rate their recommended items.
Of course, not all users will like the jokes, so there will be some rollouts with negative rewards.
Overall we want the average reward to be *positive*, with `MAX_STEPS` as an upper bounds.

## Examine the trained policy

Use the following code to examine the trained policy that was optimized using PPO:

In [None]:
policy = AGENT.get_policy()
model = policy.model

print("\n", model.base_model.summary())

## Evaluate learning with TensorBoard

You also can run [TensorBoard](https://www.tensorflow.org/tensorboard) to visualize the RL training metrics from the log files. The results during training were written to a directory under `$HOME/ray_results`

If you are viewing this lesson on the Anyscale hosted platform, use the provided link to open TensorBoard.

If you are viewing this lesson on a laptop, open a terminal and run the following command, then open the URL shown in the output. (You can open a terminal using the `+` in the upper left-hand corner of Jupyter Lab.)

```shell
tensorboard --logdir=~/ray_results
```

Open the URL printed to view the TensorBoard GUI.

---

## Exercise 1

Compare use of the other datasets `"jester-data-2.csv"` and `"jester-data-3.csv"` by substituting them during the rollout.

How do the mean cumulative reward differ from the metrics in the lesson?

## Exercise 2

Compare the effect of using a larger `K` value for the number of clusters.

Show the difference, if any, by comparing:

  * baseline with random actions 
  * baseline with the naïve strategy
  * predicted average reward from training
  * stats from the rollout

## Discussion Questions

  1. In what ways could the "warm start" be improved?
  2. How could this code be modified to scale to millions of users?  Or to thousands of items?

## Clean up

Finally, let's shutdown Ray gracefully:

In [None]:
ray.shutdown()