# 画像シグナリングゲーム（深層強化学習）

このノートブックでは，観測した画像の内容を受信者に伝達する画像シグナリングゲームを対象にして，言語創発の実験を行います．方策勾配法を用いる2体の深層強化学習エージェント（送信者と受信者）を同時に学習させ，確立される信号の性質を観察します．

まず，必要なパッケージをインストールして，インポートします．

In [None]:
!pip install "stable-baselines3[extra]==2.3.2"

In [None]:
import numpy as np
import gymnasium
from gymnasium import spaces
import torch
import torchvision
from stable_baselines3 import PPO
from stable_baselines3.ppo.policies import MlpPolicy
rng = np.random.default_rng()

まず，画像シグナリングゲームを表現する強化学習環境（`ImageSignalingGameEnv`）を定義します．
送信可能なメッセージの長さ（`c_len`），それぞれのメッセージが取り得る語彙の種類数（`c_voc`），入力に用いるCIFAR-100データセットのクラス名（`classes`）を指定して環境を作成できます．
環境の初期化を行う `__init__` メソッドの他に，以下の5つのメソッドを定義しています．

- `reset`: 最初からゲームプレイを行えるように環境をリセットします．この時，送信者に与えられる伝達対象の情報がランダムに決定されます．
- `observe`: エージェントに与えられる観測を返します．
- `step`: エージェントが選択した行動を受け取って，ゲームを進めます．
- `observation_space`: エージェントに与えられる観測の形式を返します．
- `action_space`: エージェントが選択する行動の形式を返します．

In [None]:
class ImageSignalingGameEnv():
    def __init__(self, c_len, c_voc, classes=['lion', 'rabbit']):
        self.c_len = c_len
        self.c_voc = c_voc
        self.classes = classes
        cifar = torchvision.datasets.CIFAR100('dataset', train=False, download=True)
        self.class_to_imgs = dict((cls, [img for img, label in cifar if cifar.class_to_idx[cls] == label]) for cls in self.classes)
        all_imgs_in_np_array = np.array([np.array(img) for cls, imgs in self.class_to_imgs.items() for img in imgs])
        self.pixel_mean = all_imgs_in_np_array.mean(axis=(0, 1, 2))
        self.pixel_std = all_imgs_in_np_array.std(axis=(0, 1, 2))
        self.agents = ['sender', 'receiver']
        self.reset()

    def reset(self):
        self.input_class_idx = rng.choice(len(self.classes))
        self.input_class = self.classes[self.input_class_idx]
        self.input_img_idx = rng.choice(len(self.class_to_imgs[self.input_class]))
        self.input_img = self.class_to_imgs[self.input_class][self.input_img_idx]
        self.done = False
        self.agent_selection = 'sender'

    def observe(self, agent):
        if agent == 'sender':
            feature = (np.array(self.input_img) - self.pixel_mean) / self.pixel_std # Standardize img
            return feature
        if agent == 'receiver':
            return self.message

    def step(self, action):
        assert self.done == False, 'WARNING: The game is done. Call reset().'
        self.rewards = {'sender': 0.0, 'receiver': 0.0}
        agent = self.agent_selection
        if agent == 'sender':
            self.message = action
            self.agent_selection = 'receiver'
        elif agent == 'receiver':
            self.action = action
            if action == self.input_class_idx:
                self.rewards['sender'] = 1.0
                self.rewards['receiver'] = 1.0
            self.done = True

    def observation_space(self, agent):
        if agent == 'sender':
            return spaces.Box(low=-np.inf, high=np.inf, shape=(32, 32, 3), dtype=np.float32)
        if agent == 'receiver':
            return spaces.MultiDiscrete([self.c_voc] * self.c_len)

    def action_space(self, agent):
        if agent == 'sender':
            return spaces.MultiDiscrete([self.c_voc] * self.c_len)
        if agent == 'receiver':
            return spaces.Discrete(len(self.classes))

強化学習エージェントにはPPOを用います．しかし，stable-baselines3の `PPO` クラスはシングルエージェント学習用に設計されており，そのままではマルチエージェント学習に用いることができません．
このため，ここではマルチエージェント学習のための `MAPPO` クラスを定義します．
`MAPPO` クラスは，そのエージェントの観測と行動の形式を確定するために，ダミーの環境（`DummyEnv`）を指定して作成できるようにしています．初期化を行う `__init__` メソッドの他に，以下の4つのメソッドを定義しています．

- `get_action`: 観測を受け取って，行動を選択して返します．
- `store_buffer`: 観測と状態のペアを受け取って，経験として訓練バッファに保存します．
- `update_reward`: 次の自分のターンが回ってくる前に，他のエージェントの行動によって自分が報酬を受け取れる場合があります（例えばシグナリングゲームの場合，送信者が行動した時点では報酬はありませんが，次に受信者が適切な行動をした場合には，送信者にも報酬が与えられます）．これを，直近の行動に対する報酬とみなして，訓練バッファの内容を更新します．
- `train`: 現在の訓練バッファの内容を使って，方策を更新します．

In [None]:
from stable_baselines3.common.utils import obs_as_tensor, configure_logger
from dataclasses import dataclass

@dataclass
class DummyEnv(gymnasium.Env):
    observation_space: spaces.Space
    action_space: spaces.Space

class MAPPO():
    def __init__(self, policy, dummy_env, **kwargs):
        self.model = PPO(policy, dummy_env, **kwargs)
        self.train_buf = self.model.rollout_buffer
        self.obs_shape = dummy_env.observation_space.shape
        self.act_shape = dummy_env.action_space.shape

    def get_action(self, obs):
        obs = obs.reshape((-1,) + self.obs_shape)
        with torch.no_grad():
            obs_tensor = obs_as_tensor(obs, self.model.policy.device)
            actions, values, log_probs = self.model.policy.forward(obs_tensor)
        return actions.cpu().numpy()[0], values, log_probs

    def store_buffer(self, obs, action, dones, values, log_probs):
        self.train_buf.add(
            np.reshape(obs, (1,) + self.obs_shape),
            np.reshape(action, (1,) + self.act_shape),
            [0], dones, values, log_probs)
        self._last_values = values

    def update_reward(self, reward):
        self.train_buf.rewards[self.train_buf.pos - 1][0] += reward

    def train(self, values=None, dones=None):
        if values is None:
            values = self._last_values
            dones = np.ones(len(values))
        self.train_buf.compute_returns_and_advantage(values, dones)
        self.model.train()
        self.train_buf.reset()

以下のセルで，具体的に環境と強化学習モデルを定義します．
環境のパラメータを変えて実験してみましょう．

In [None]:
classes = ['lion', 'rabbit']   # 入力に用いるクラス名
c_len = 5   # メッセージの長さ
c_voc = 2   # メッセージの種類数
env = ImageSignalingGameEnv(c_len, c_voc, classes)

tensorboard_log = './tb'
n_steps = 2048            # 訓練データバッファのサイズ

agents = {}
for agent_name in env.agents:
    dummy_env = DummyEnv(env.observation_space(agent_name), env.action_space(agent_name))
    agent = MAPPO(MlpPolicy, dummy_env, verbose=1,
                learning_rate=0.0003, # 学習率
                batch_size=64, # バッチサイズ
                n_epochs=10, # エポック数
                n_steps=n_steps,
                device="auto"
                )
    tb_log_name = f'{agent_name}_{classes}_c_len{c_len}_c_voc{c_voc}'
    logger = configure_logger(True, tensorboard_log, tb_log_name, True)
    agent.model._logger = logger
    agents[agent_name] = agent

total_timesteps = 50000  # ゲームプレイ回数

以下のコードでは，入力として使われる画像をいくつかサンプルして表示しています．

In [None]:
import matplotlib.pyplot as plt
from pylab import rcParams
rcParams['figure.figsize'] = (1, 1)
for _ in range(5):
    env.reset()
    print(env.input_class)
    plt.imshow(env.input_img)
    plt.show()

以下のコードで，2体の強化学習モデルの学習を行います．
学習過程における報酬のログも，Tensorboardで確認できる形式で保存しています．

In [None]:
for timestep in range(total_timesteps):
    # ゲームプレイ開始
    env.reset()
    while not env.done:
        agent_name = env.agent_selection
        agent = agents[agent_name]
        obs = env.observe(agent_name)
        action, values, log_probs = agent.get_action(obs)
        env.step(action)
        agent.store_buffer(obs, action, [env.done], values, log_probs)
        for _agent_name in env.agents:
            reward = env.rewards[_agent_name]
            agents[_agent_name].update_reward(reward)
    # ゲームプレイ終了．方策を更新する
    if (timestep + 1) % n_steps == 0:
        for _agent_name in env.agents:
            _agent = agents[_agent_name]
            mean_reward = np.mean([r[0] for r in _agent.train_buf.rewards])
            _agent.model.logger.record("rollout/ep_rew_mean", mean_reward)
            _agent.model.logger.record("time/total_timesteps", timestep, exclude="tensorboard")
            _agent.model.logger.dump(timestep)
            _agent.train()

以下のコードで，学習した結果の方策を用いたときの，ゲームプレイごとの平均報酬を評価します．
コミュニケーションが成功すれば報酬1，失敗すれば報酬0なので，平均報酬の値は，コミュニケーションの成功率ということになります．

また，いくつかの入力画像に対して，送信者が送信したメッセージを表示しています．メッセージと画像の対応関係がどのようになっていると考えられるか，考察してみましょう．

In [None]:
# Evaluate policy
n_eval_episodes = 100
sender_rewards = []
receiver_rewards = []
imgs = []
messages = []
for _ in range(n_eval_episodes):
    env.reset()
    while not env.done:
        agent_name = env.agent_selection
        obs = env.observe(agent_name)
        action, values, log_probs = agents[agent_name].get_action(obs)
        if agent_name == 'sender':
            imgs.append(env.input_img)
            messages.append(action)
        env.step(action)
    sender_rewards.append(env.rewards['sender'])
    receiver_rewards.append(env.rewards['receiver'])
print(f'Sender average reward: {np.mean(sender_rewards):.2f}')
print(f'Receiver average reward: {np.mean(receiver_rewards):.2f}')
# Sample some of imgs and messages and show them
for i in range(10):
    print(messages[i])
    plt.imshow(imgs[i])
    plt.show()

以下のコマンドでTensorboardが起動します．
このセルは一度だけ実行し，再実行しないことをお勧めします．
新しく追加したデータを再読み込みしたい場合は，Tensorboardのコンソール上のリロードボタン（右上の方にあるボタン）を押してデータの再読み込みを行うようにするとよいです．

In [None]:
%load_ext tensorboard
%tensorboard --logdir=./tb

# 演習課題

- 入力画像のクラス（`classes`）に，別のクラスを指定したり，使用するクラスの数を3つ以上指定したときに，コミュニケーションの成功率がどのように変化するか調べてみよう．CIFAR-100データセットに含まれるクラスの一覧は https://www.cs.toronto.edu/~kriz/cifar.html に記載されている．
- メッセージの長さ（`c_len`）と語彙数（`c_voc`）を変化させたときに，コミュニケーションの成功率がどのように変化するか調べてみよう．
