In [5]:
!apt install cmake swig zlib1g-dev
%pip install torch torchvision
%pip install numpy protobuf onnx
%pip install pettingzoo[all]


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
cmake is already the newest version (3.22.1-1ubuntu1.22.04.2).
zlib1g-dev is already the newest version (1:1.2.11.dfsg-2ubuntu9.2).
zlib1g-dev set to manually installed.
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 34 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 0s (6,473 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 126101 files and directories currently installed.)
Preparing to un

# 強化学習モデルの学習 (main.py)

このセルでは、DQNアルゴリズムを用いて、`CartPole-v1`環境でモデルを学習させます。

In [None]:
from pettingzoo.test import api_test
from cat_toy_env import CatToyEnv
env_kwargs=dict(render_mode=None, max_steps=1000)

# 1個だけ環境を作る（並列ではなく）
env = CatToyEnv(**env_kwargs)
api_test(env, num_cycles=1000, verbose_progress=False)

In [16]:
from pettingzoo import AECEnv
from pettingzoo.utils.agent_selector import agent_selector
from gymnasium import spaces
import numpy as np
import json
from IPython.display import clear_output
import random
import os
import time
import pygame

class CatToyEnv(AECEnv):
    metadata = {"render_modes": ["human"], "name": "cat_toy_env_v0"}

    def __init__(self, render_mode=None, max_steps=1000):
        super().__init__()
        self.render_mode = render_mode
        self.max_steps = max_steps

        f = """
  {
  "description": "PythonとJavaScriptの両方で使用する設定ファイル",
  "actions":{
    "cat": [
      { "id": 0, "name": "up", "dx": 0, "dy": -3 },
      { "id": 1, "name": "down", "dx": 0, "dy": 3 },
      { "id": 2, "name": "left", "dx": -3, "dy": 0 },
      { "id": 3, "name": "right", "dx": 3, "dy": 0 }
    ],
    "toy": [
      { "id": 0, "name": "up", "dx": 0, "dy": -1 },
      { "id": 1, "name": "down", "dx": 0, "dy": 1 },
      { "id": 2, "name": "left", "dx": -1, "dy": 0 },
      { "id": 3, "name": "right", "dx": 1, "dy": 0 }
    ]
  },
  "observation_space": {
    "low": 0,
    "high": 800,
    "shape": [4],
    "dtype": "float32"
  },
  "environment": {
    "width": 800,
    "height": 600,
    "cat_width": 30,
    "cat_height": 30,
    "toy_width": 20,
    "toy_height": 20,
    "state_scale": 0.1
  }
}
"""
        config = json(f)

        env_config = config['environment']
        self.width = env_config['width']
        self.height = env_config['height']
        self.cat_width = env_config['cat_width']
        self.cat_height = env_config['cat_height']
        self.toy_width = env_config['toy_width']
        self.toy_height = env_config['toy_height']
        self.state_scale = env_config['state_scale']

        self.actions = config['actions']

        self.possible_agents = ['cat', 'toy']
        self.agents = self.possible_agents[:]
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()

        self.observation_spaces = {
            agent: spaces.Box(low=0, high=1, shape=(1, int(self.height*self.state_scale), int(self.width*self.state_scale)), dtype=np.float32)
            for agent in self.possible_agents
        }
        self.action_spaces = {
            "cat": spaces.Discrete(len(self.actions["cat"])),
            "toy": spaces.Discrete(len(self.actions["toy"])),
        }

        # ✅ PettingZoo AECEnv に必要な属性
        self._cumulative_rewards = {agent: 0.0 for agent in self.agents}
        self.rewards = {agent: 0.0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.step_count = 0

    def observe(self, agent):
        # 1/10にスケールして、stateを返す
        scaled_cat_x = int(self.cat_x * self.state_scale)
        scaled_cat_y = int(self.cat_y * self.state_scale)
        scaled_toy_x = int(self.toy_x * self.state_scale)
        scaled_toy_y = int(self.toy_y * self.state_scale)

        obs = np.zeros(self.observation_spaces["cat"].shape, dtype=np.float32)

        # Cat の位置（矩形領域）を 1 に設定
        obs[0,
            scaled_cat_y:scaled_cat_y + int(self.cat_height*self.state_scale),
            scaled_cat_x:scaled_cat_x + int(self.cat_width*self.state_scale)
        ] = 1.0
        # Toy の位置（矩形領域）を 0.5 に設定（オーバーラップ判定のため）
        obs[0,
            scaled_toy_y:scaled_toy_y + int(self.toy_height*self.state_scale),
            scaled_toy_x:scaled_toy_x + int(self.toy_width*self.state_scale)
        ] = 0.5

        return obs


    def reset(self, seed=None, options=None):
        self.agents = self.possible_agents[:]
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()

        self.rewards = {a: 0.0 for a in self.agents}
        self._cumulative_rewards = {a: 0.0 for a in self.agents}
        self.terminations = {a: False for a in self.agents}
        self.truncations = {a: False for a in self.agents}
        self.dones = {a: False for a in self.agents}  # 互換性のために残してもOK
        self.infos = {a: {} for a in self.agents}
        self.step_count = 0

        self.cat_x = random.randint(0, self.width - self.cat_width)
        self.cat_y = random.randint(0, self.height - self.cat_height)
        self.toy_x = random.randint(0, self.width - self.toy_width)
        self.toy_y = random.randint(0, self.height - self.toy_height)

        return self.observe(self.agent_selection)


    def step(self, action):
        agent = self.agent_selection

        if self.terminations[agent] or self.truncations[agent]:
            self._was_dead_step(action)
            return

        selected_action = self.actions[agent][action]
        dx = selected_action["dx"]
        dy = selected_action["dy"]

        if agent == "cat":
            self.cat_x = np.clip(self.cat_x + dx, 0, self.width - self.cat_width)
            self.cat_y = np.clip(self.cat_y + dy, 0, self.height - self.cat_height)
        elif agent == "toy":
            self.toy_x = np.clip(self.toy_x + dx, 0, self.width - self.toy_width)
            self.toy_y = np.clip(self.toy_y + dy, 0, self.height - self.toy_height)

        collision = self._is_collision()
        if collision:
            self.terminations = {a: True for a in self.agents}
            self.rewards["cat"] = 100.0
            self.rewards["toy"] = -100.0
        elif self.step_count >= self.max_steps:
            self.truncations = {a: True for a in self.agents}
        else:
            distance = ((self.cat_x - self.toy_x) ** 2 + (self.cat_y - self.toy_y) ** 2) ** 0.5
            self.rewards["cat"] = -distance
            self.rewards["toy"] = distance
        if self.step_count >= self.max_steps:
            self.truncations[agent] = True

        # ✅ 報酬加算
        self._cumulative_rewards[agent] += self.rewards[agent]
        self.step_count += 1

        # ✅ 次のエージェントへ切り替え
        self.agent_selection = self._agent_selector.next()

        if self.render_mode == "human":
            self.render()

    def _is_collision(self):
        return (
            self.cat_x < self.toy_x + self.toy_width and
            self.cat_x + self.cat_width > self.toy_x and
            self.cat_y < self.toy_y + self.toy_height and
            self.cat_y + self.cat_height > self.toy_y
        )

    def render(self):
        grid_size = 30  # 小さなグリッドに変更
        grid = [["." for _ in range(grid_size)] for _ in range(grid_size)]

        # CatとToyの位置に記号を配置（同じ場所なら C&T と表示）
        if self.cat_x == self.toy_x and self.cat_y == self.toy_y:
            grid[self.cat_y*grid_size//self.height][self.cat_x*grid_size//self.width] = "C&T"
        else:
            grid[self.cat_y*grid_size//self.height][self.cat_x*grid_size//self.width] = "C"
            grid[self.toy_y*grid_size//self.height][self.toy_x*grid_size//self.width] = "T"

        # (ipynbだけ)ターミナルをクリアするためにclear_outputを使用
        clear_output(wait=True)

        # グリッドを出力（y=0が上になるように反転）
        for row in reversed(grid):
            print(" ".join(row))  # 一行ごとに表示
        print("-" * (2 * grid_size))

        # その他の情報を表示
        print(f"agent: {self.agent_selection}, count: {self.step_count}, cat: {self.cat_x}, {self.cat_y}, toy: {self.toy_x}, {self.toy_y}")

        # フレーム間の遅延（1フレームごとの更新時間）
        time.sleep(0.05)  # 0.5秒ごとに更新（調整可能）


    def close(self):
        pass  # 特にリソース解放がなければ空でOK

In [17]:
import gymnasium as gym
import torch
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import os

In [18]:
num_iterations = 10000
num_episodes_per_iteration = 1
num_steps_per_episode = 1000
# num_epoches = 1
# num_replays_per_episode = num_epoches * num_episodes_per_iteration * num_steps_per_episode
update_target_steps = 10
replay_interval = 4

In [19]:
env_kwargs=dict(render_mode=None, max_steps = num_steps_per_episode)
# 1個だけ環境を作る
env_preview = CatToyEnv(**env_kwargs)

obs = env_preview.reset()

# 観測のshapeを確認
print("観測の形:", obs)
print("観測の中身:", obs)
# 学習用環境
env_learning = CatToyEnv(**env_kwargs)

TypeError: 'module' object is not callable

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

# CNNを使ったQネットワーク
class DQN(nn.Module):
    def __init__(self, input_shape, output_dim):
        super(DQN, self).__init__()
        c, h, w = input_shape  # (1, H, W)

        self.conv = nn.Sequential(
            nn.Conv2d(c, 2, kernel_size=3, stride=1, padding=1),  # (8, H, W)
            nn.ReLU(),
            nn.MaxPool2d(2, 2),                                   # (8, H/2, W/2)

            nn.Conv2d(2, 4, kernel_size=3, stride=1, padding=1), # (16, H/2, W/2)
            nn.ReLU(),
            nn.MaxPool2d(2, 2),                                    # (16, H/4, W/4),                                   # (8, H/2, W/2)

            nn.Conv2d(4, 4, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )

        # 畳み込み後の特徴量サイズを計算
        with torch.no_grad():
            dummy_input = torch.zeros(1, *input_shape)
            conv_out = self.conv(dummy_input)
            conv_out_size = conv_out.view(1, -1).size(1)
            print("Conv出力の形:", conv_out.shape)

        # 全結合層
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 16),
            nn.ReLU(),
            nn.Linear(16, output_dim)
        )

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)  # Flatten
        return self.fc(x)

class DQNAgent:
    def __init__(self, agent_name, env, learning_rate=1e-4, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
        self.agent_name = agent_name  # エージェント名（'cat' または 'toy'）
        self.action_space = env.action_spaces[self.agent_name]  # 各エージェントに対応するアクション空間
        self.state_shape = env.observation_spaces[self.agent_name].shape  # 例: (1, H, W)
        self.model = DQN(self.state_shape, self.action_space.n)
        self.target_model = DQN(self.state_shape, self.action_space.n)

        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate

        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.loss_fn = nn.MSELoss()

        self.memory = deque(maxlen=10000)
        self.batch_size = 64
        self.update_target_model()

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def store_experience(self, state, action, reward, next_state, done):
        self.memory.append((
            np.array(state, dtype=np.float32),
            action,
            reward,
            np.array(next_state, dtype=np.float32),
            done
        ))

    def act(self, state):
        if random.random() <= self.epsilon:
            return self.action_space.sample()  # ランダム行動
        state = torch.FloatTensor(state).unsqueeze(0)  # バッチ次元を追加
        q_values = self.model(state)
        return torch.argmax(q_values).item()  # 最大Q値に基づいて行動を選択

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(np.array(states))
        next_states = torch.FloatTensor(np.array(next_states))
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        dones = torch.FloatTensor(dones)

        # Q値の計算
        current_q_values = self.model(states).gather(1, actions.unsqueeze(1))
        next_q_values = self.target_model(next_states).max(1)[0].detach()
        target_q_values = rewards + (self.gamma * next_q_values * (1 - dones))

        # 損失計算とバックプロパゲーション
        loss = self.loss_fn(current_q_values.squeeze(1), target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # εを減少させる
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def save_model(self, filepath):
        torch.save(self.model.state_dict(), filepath)

    def load_model(self, filepath):
        self.model.load_state_dict(torch.load(filepath))
        self.target_model.load_state_dict(self.model.state_dict())


In [None]:
def train_dqn(agent_dict, env, num_iterations, num_episodes_per_iteration):
    total_rewards = {agent: 0.0 for agent in env.agents}
    rewards = {agent: 0.0 for agent in env.agents}
    steps = 0
    for iteration in range(num_iterations):
        for episode in range(num_episodes_per_iteration):
            obs = env.reset()
            prev_obs = {agent: obs for agent in env.agents}
            prev_action = {agent: None for agent in env.agents}
            is_last_reward = False

            for agent in env.agent_iter():
                obs, total_reward, terminated, truncated, _ = env.last()
                done = terminated or truncated

                if not is_last_reward:
                    rewards = env.rewards.copy()  # 各エージェントの報酬を更新

                if prev_action[agent] is not None:
                    # 前回行動の結果が今回のループで得られたので、ここで保存できる
                    agent_dict[agent].store_experience(
                        prev_obs[agent],         # s
                        prev_action[agent],      # a
                        rewards[agent],      # r (現在のループで得られた報酬)
                        obs,                     # s' (次状態)
                        float(done)              # done
                    )
                    # ここでreplayを行う
                    if env.step_count % replay_interval == 0:
                        for replay_agent in agent_dict.keys():
                            agent_dict[replay_agent].replay()

                if done:
                    action = None  # No action needed if agent is done
                    total_rewards[agent] += total_reward
                    steps += env.step_count
                    print(f"Episode {episode} finished for agent {agent} with reward {total_reward}, {rewards[agent]}, steps {env.step_count}")
                    is_last_reward = True
                else:
                    action = agent_dict[agent].act(obs)

                env.step(action)

                prev_obs[agent] = obs  # 次の状態を更新
                prev_action[agent] = action  # 次の行動を更新

        # ログ出力
        if iteration % update_target_steps == 0:
            print(f"Iteration {iteration}: " + ", ".join([f"{a}: {r / update_target_steps:.2f}" for a, r in total_rewards.items()]), steps / update_target_steps)
            total_rewards = {agent: 0.0 for agent in total_rewards.keys()}
            steps = 0

        # ターゲットネットワーク更新
        if iteration % update_target_steps == 0:
            for agent in agent_dict.values():
                agent.update_target_model()

def evaluate_model(agent_dict, eval_env, n_eval_episodes=10):
    reward_sums = {agent_name: [] for agent_name in agent_dict.keys()}

    for _ in range(n_eval_episodes):
        env = eval_env  # 環境がreset可能で、内部状態が共有でないと仮定
        env.reset()
        episode_rewards = {agent_name: 0.0 for agent_name in agent_dict.keys()}

        for agent in env.agent_iter():
            obs, reward, termination, truncation, info = env.last()
            done = termination or truncation

            if done:
                action = None  # 終了したら行動不要
            else:
                action = agent_dict[agent].act(obs)  # 各エージェントに行動させる

            env.step(action)
            episode_rewards[agent] += reward  # 各agentごとに報酬を記録

        for agent_name in reward_sums:
            reward_sums[agent_name].append(episode_rewards[agent_name])

    # 統計量（平均・標準偏差）を返す
    mean_std_rewards = {
        agent: (np.mean(rewards), np.std(rewards))
        for agent, rewards in reward_sums.items()
    }

    return mean_std_rewards

def save_dqn(agent_dict, base_path = "models"):
    os.makedirs(base_path, exist_ok=True)
    for agent_name, agent in agent_dict.items():
        filepath = os.path.join(base_path, f"{agent_name}_model.pth")
        agent.save_model(filepath)

def load_dqn(env, agents = ["cat", "toy"] , base_path = "models"):
    agent_dict = {}
    for agent_name in agents:
        filepath = os.path.join(base_path, f"{agent_name}_model.pth")
        agent = DQNAgent(agent_name, env)
        agent.load_model(filepath)
        agent_dict[agent_name] = agent
    return agent_dict

In [None]:
# エージェントの作成
agent_dict = {
    agent_name: DQNAgent(agent_name, env_learning)
    for agent_name in env_learning.agents
}


In [None]:
# 学習
train_dqn(agent_dict, env_learning, num_iterations, num_episodes_per_iteration)


In [None]:
# 評価用環境
env_kwargs=dict(render_mode="human", max_steps=1000)
env_eval = CatToyEnv(**env_kwargs)

# モデル評価
mean_std_rewards = evaluate_model(agent_dict, env_eval, n_eval_episodes=1)
print(f"mean_reward: {mean_std_rewards['cat']} +/- {mean_std_rewards['toy']}")

In [None]:
# モデルの保存
save_dqn(agent_dict, "models")

In [None]:
# 評価用環境
env_kwargs=dict(render_mode="human", max_steps=1000)
env_eval = CatToyEnv(**env_kwargs)

# モデルのロード
loaded_model = load_dqn(env_eval, ["cat", "toy"], "models")

# ロードしたモデルの評価
mean_std_rewards = evaluate_model(loaded_model, env_eval, n_eval_episodes=10)
print(f"mean_reward: {mean_std_rewards['cat']} +/- {mean_std_rewards['toy']}")

In [None]:
# 入力の2つのTensorを結合
toy = torch.randn(1, 2)
cat = torch.randn(1, 2)
concat_input = torch.cat([toy, cat], dim=1)  # shape: (1, 4)

# エクスポート対象モデル（例: policyネットワーク）
# dummyの環境
env_kwargs=dict(render_mode="human", max_steps=1000)
env_dummy = CatToyEnv(**env_kwargs)

# モデルのロード
loaded_model = load_dqn(env_dummy, ["cat", "toy"], "models")
policy_net = loaded_model["cat"].model  # catエージェントのポリシーネットワークを取得

# ONNXエクスポート
torch.onnx.export(
    policy_net,
    concat_input,  # ← dictではなく単一Tensor
    "cat_dqn_policy.onnx",
    export_params=True,
    opset_version=11,
    input_names=["obs"],
    output_names=["q_values"],
    dynamic_axes={
        "obs": {0: "batch_size"},
        "q_values": {0: "batch_size"}
    }
)


In [None]:
# 環境のクローズ
env_learning.close()
env_eval.close()

# 学習済みモデルの使用 (play.py)

このセルでは、学習済みのモデルをロードし、`CartPole-v1`環境でエージェントがどのように行動するかを観察します。

In [None]:
import gymnasium as gym
from stable_baselines3 import DQN
import time
from cat_toy_env import CatToyEnv

In [None]:
env_kwargs=dict(render_mode="", max_steps=1000, cat_speed = 2)

# 環境の作成
env = CatToyEnv(**env_kwargs)

# モデルのロード
model_playing = DQN.load("cat_dqn")

In [None]:
# エピソードの実行
obs, info = env.reset()
done = False
while not done:
    action, _states = model_playing.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    print("観測:", obs)
    done = terminated or truncated
    env.render()  # 環境の描画
    #time.sleep(0.001) # 0.01秒待機

In [None]:
# 環境のクローズ
env.close()