<a href="https://colab.research.google.com/github/mitosagi/puzzdra-nnsolver/blob/master/puzz.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 初期化

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!cp -r /content/drive/MyDrive/User/python/puzzdra-nnsolver /content/puzzdra-nnsolver
%cd /content/puzzdra-nnsolver
!pip install --log=pip_log -e .
!python puzz_test.py

/content/puzzdra-nnsolver
Obtaining file:///content/puzzdra-nnsolver
Installing collected packages: Puzzpy
  Running setup.py develop for Puzzpy
Successfully installed Puzzpy
163663
334311
323365
136624
145342
[[1, 3, 6, 6, 6, 3], [3, 3, 4, 3, 1, 1], [3, 2, 3, 3, 6, 5], [1, 3, 6, 6, 2, 4], [1, 4, 5, 3, 4, 2]]
[[0, 1, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0]]
[[127, 6, 3, 6, 6, 3], [3, 3, 4, 3, 1, 1], [3, 2, 3, 3, 6, 5], [1, 3, 6, 6, 2, 4], [1, 4, 5, 3, 4, 2]]
[[0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0]]
[[1, 6, 4, 6, 6, 3], [3, 3, 3, 3, 1, 1], [3, 2, 3, 3, 6, 5], [1, 3, 6, 6, 2, 4], [1, 4, 5, 3, 4, 2]]
[[0, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0]]
[[1, 6, 6, 3, 6, 3], [3, 3, 4, 3, 1, 1], [3, 2, 3, 3, 6, 5], [1, 3, 6, 6, 2, 4], [1, 4, 5, 3, 4, 2]]
[[0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0], 

In [3]:
!pip install git+https://github.com/DLR-RM/stable-baselines3

Collecting git+https://github.com/DLR-RM/stable-baselines3
  Cloning https://github.com/DLR-RM/stable-baselines3 to /tmp/pip-req-build-u2dh0r08
  Running command git clone -q https://github.com/DLR-RM/stable-baselines3 /tmp/pip-req-build-u2dh0r08
Building wheels for collected packages: stable-baselines3
  Building wheel for stable-baselines3 (setup.py) ... [?25l[?25hdone
  Created wheel for stable-baselines3: filename=stable_baselines3-1.1.0a11-cp37-none-any.whl size=160811 sha256=c2bc51420182d388351cd2f053279b59c9edff2f0d6e9ee871d06fcd42e09344
  Stored in directory: /tmp/pip-ephem-wheel-cache-bk4nax9u/wheels/cf/89/6b/cd4b89427eb5ff0858bcba73911088d606c59eb3a97290b1bb
Successfully built stable-baselines3
Installing collected packages: stable-baselines3
Successfully installed stable-baselines3-1.1.0a11


## サンプルの実行

In [4]:
import numpy as np
import gym
from gym import spaces


class GoLeftEnv(gym.Env):
  """
  Gymのインターフェースに従うカスタム環境
  エージェントが常に左に行くことを学ぶ環境
  """
  # ColabのためGUIを実装できない
  metadata = {'render.modes': ['console']}

  # 定数を定義
  LEFT = 0
  RIGHT = 1

  def __init__(self, grid_size=10):
    super(GoLeftEnv, self).__init__()

    # 1Dグリッドのサイズ
    self.grid_size = grid_size

    # グリッドの右側でエージェントを初期化
    self.agent_pos = grid_size - 1

    # 行動空間と状態空間を定義
    # gym.spacesオブジェクトでなければならない
    # 離散行動を使用する場合の例には、左と右の2つがある
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)

    # 状態はエージェントの座標になる
    # Discrete空間とBox空間の両方で表現できる
    self.observation_space = spaces.Box(low=0, high=self.grid_size,
                                       shape=(1,), dtype=np.float32)

  def reset(self):
    """
    【重要】観測はnumpy配列でなければならない
    :return: (np.array)
    """
    # グリッドの右側でエージェントを初期化
    self.agent_pos = self.grid_size - 1

    # float32に変換してより一般的なものにします（連続行動を使用する場合）
    return np.array(self.agent_pos).astype(np.float32)

  def step(self, action):
    if action == self.LEFT:
      self.agent_pos -= 1
    elif action == self.RIGHT:
      self.agent_pos += 1
    else:
      raise ValueError("Received invalid action={} which is not part of the action space".format(action))

    # グリッドの境界を表現
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

    # グリッドの左側にいるか
    done = self.agent_pos == 0

    # ゴールを除くすべての場所で0の報酬
    reward = 1 if self.agent_pos == 0 else 0

    # 必要に応じて情報を渡すことができるが、現在は未使用
    info = {}

    return np.array(self.agent_pos).astype(np.float32), reward, done, info

  def render(self, mode='console', close=False):
    if mode != 'console':
      raise NotImplementedError()

    # エージェントは「x」、残りは「.」として表現
    print("." * self.agent_pos, end="")
    print("x", end="")
    print("." * (self.grid_size - self.agent_pos))

In [5]:
env = GoLeftEnv(grid_size=10)

obs = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0

# ハードコードされた最高のエージェント：常に左に行く
n_steps = 20
for step in range(n_steps):
  print("Step {}".format(step + 1))
  obs, reward, done, info = env.step(GO_LEFT)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render()
  if done:
    print("Goal reached!", "reward=", reward)
    break

.........x.
Box(0.0, 10.0, (1,), float32)
Discrete(2)
1
Step 1
obs= 8.0 reward= 0 done= False
........x..
Step 2
obs= 7.0 reward= 0 done= False
.......x...
Step 3
obs= 6.0 reward= 0 done= False
......x....
Step 4
obs= 5.0 reward= 0 done= False
.....x.....
Step 5
obs= 4.0 reward= 0 done= False
....x......
Step 6
obs= 3.0 reward= 0 done= False
...x.......
Step 7
obs= 2.0 reward= 0 done= False
..x........
Step 8
obs= 1.0 reward= 0 done= False
.x.........
Step 9
obs= 0.0 reward= 1 done= True
x..........
Goal reached! reward= 1


In [6]:
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv

# 環境の生成
env = GoLeftEnv(grid_size=10)

# 環境のラップ
env = Monitor(env, filename=None, allow_early_resets=True)
env = DummyVecEnv([lambda: env])

In [7]:
# エージェントの訓練
model = PPO('MlpPolicy', env, verbose=1).learn(5000)

Using cuda device
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 92.6     |
|    ep_rew_mean     | 1        |
| time/              |          |
|    fps             | 935      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 51.4        |
|    ep_rew_mean          | 1           |
| time/                   |             |
|    fps                  | 711         |
|    iterations           | 2           |
|    time_elapsed         | 5           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.020174187 |
|    clip_fraction        | 0.412       |
|    clip_range           | 0.2         |
|    entropy_loss         | -0.675      |
|    explained_variance   | 0.251       |
|    learnin

In [8]:
# 訓練済みエージェントのテスト
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # VecEnvは、エピソード完了に遭遇すると自動的にリセットされることに注意
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [0]
obs= [[8.]] reward= [0.] done= [False]
........x..
Step 2
Action:  [0]
obs= [[7.]] reward= [0.] done= [False]
.......x...
Step 3
Action:  [0]
obs= [[6.]] reward= [0.] done= [False]
......x....
Step 4
Action:  [0]
obs= [[5.]] reward= [0.] done= [False]
.....x.....
Step 5
Action:  [0]
obs= [[4.]] reward= [0.] done= [False]
....x......
Step 6
Action:  [0]
obs= [[3.]] reward= [0.] done= [False]
...x.......
Step 7
Action:  [0]
obs= [[2.]] reward= [0.] done= [False]
..x........
Step 8
Action:  [0]
obs= [[1.]] reward= [0.] done= [False]
.x.........
Step 9
Action:  [0]
obs= [[9.]] reward= [1.] done= [ True]
.........x.
Goal reached! reward= [1.]


## 実際の処理

In [88]:
import numpy as np
import gym
from gym import spaces
from puzzpy import PuzzTable
from stable_baselines3.common.env_checker import check_env

class PuzzEnv(gym.Env):
  """
  パズドラの環境
  """
  # ColabのためGUIを実装できない
  metadata = {'render.modes': ['console']}

  def __init__(self):
    super(PuzzEnv, self).__init__()

    self.action_space = spaces.Discrete(5)

    # 状態はエージェントの座標になる
    # Discrete空間とBox空間の両方で表現できる
    # self.observation_space = spaces.Dict({'table': spaces.Box(low=0, high=5,
    #                                    shape=(5,6), dtype=np.float32),
    #                                    'start': spaces.Discrete(30),
    #                                    'turn': spaces.Box(low=1, high=155,
    #                                    shape=(1,), dtype=np.float32),
    #                                    'rew': spaces.Box(low=1, high=100,
    #                                    shape=(1,), dtype=np.float32)})
    
    #                                    'start': spaces.Box(low=0, high=1,
    #                                    shape=(5,6), dtype=np.float32),
    self.observation_space = spaces.Box(low=0, high=255, shape=(3,5,6), dtype=np.uint8)

  def zerosturn(self, table):
    zeros = np.zeros((5,6))
    zeros[0][0] = table.get_turn()
    zeros[0][1] = table.eval_otoshi()
    return zeros

  def retobs(self, table):

    
    # return {'table': np.array(table.get_table()).astype(np.float32),
    #           'start': np.array(table.get_XY_as_table()).astype(np.float32),
    #           'turn': table.get_turn(),'rew': table.eval_otoshi()}, table.eval_otoshi(), True, {}
    # return {'table': np.array(table.get_table()).astype(np.float32),
    #           'start': np.flatnonzero(table.get_XY_as_table())[0],
    #           'turn': table.get_turn(),'rew': table.eval_otoshi()}, table.eval_otoshi(), True, {}
    return np.stack([np.array(table.get_table()).astype(np.uint8),
                     np.array(table.get_XY_as_table()).astype(np.uint8),
                     self.zerosturn(table)]), table.eval_otoshi(), True, {}

  def reset(self):
    """
    【重要】観測はnumpy配列でなければならない
    :return: (np.array)
    """
    self.table = PuzzTable(155)
    return self.retobs(self.table)[0]

  def step(self, action):
    if action == 4:
      return self.retobs(self.table)

    next_table = self.table.next_tables()[action]

    if next_table.get_table()[0][0] == 127:
      return self.retobs(self.table)

    self.table = next_table

    if self.table.get_turn() <= 0:
      return self.retobs(self.table)

    # return {'table': np.array(self.table.get_table()).astype(np.float32),
    #           'start': np.array(self.table.get_XY_as_table()).astype(np.float32),
    #           'turn': self.table.get_turn(),'rew': self.table.eval_otoshi()}, 0, False, {}
    # return {'table': np.array(self.table.get_table()).astype(np.float32),
    #           'start': np.flatnonzero(table.get_XY_as_table())[0],
    #           'turn': self.table.get_turn(),'rew': self.table.eval_otoshi()}, 0, False, {}
    return np.stack([np.array(self.table.get_table()).astype(np.uint8),
                     np.array(self.table.get_XY_as_table()).astype(np.uint8),
                     self.zerosturn(self.table)]), 0, False, {}

  def render(self, mode='console', close=False):
    if mode != 'console':
      raise NotImplementedError()

    
    for i in self.table.get_table():
      print(*i)

check_env(PuzzEnv())

  "The minimal resolution for an image is 36x36 for the default `CnnPolicy`. "


In [89]:
env = PuzzEnv()

obs = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

n_steps = 20
for step in range(n_steps):
  print("Step {}".format(step + 1))
  obs, reward, done, info = env.step(env.action_space.sample())
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render()
  if done:
    print("Goal reached!", "reward=", reward)
    break

4 1 1 3 1 2
2 5 2 6 3 2
5 5 2 2 4 5
6 1 1 1 4 2
6 2 4 1 6 5
Box(0, 255, (3, 5, 6), uint8)
Discrete(5)
4
Step 1
obs= [[[  4.   1.   1.   3.   1.   2.]
  [  2.   5.   6.   2.   3.   2.]
  [  5.   5.   2.   2.   4.   5.]
  [  6.   1.   1.   1.   4.   2.]
  [  6.   2.   4.   1.   6.   5.]]

 [[  0.   0.   0.   0.   0.   0.]
  [  0.   0.   0.   1.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]]

 [[154.   1.   0.   0.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]]] reward= 0 done= False
4 1 1 3 1 2
2 5 6 2 3 2
5 5 2 2 4 5
6 1 1 1 4 2
6 2 4 1 6 5
Step 2
obs= [[[  4.   1.   1.   2.   1.   2.]
  [  2.   5.   6.   3.   3.   2.]
  [  5.   5.   2.   2.   4.   5.]
  [  6.   1.   1.   1.   4.   2.]
  [  6.   2.   4.   1.   6.   5.]]

 [[  0.   0.   0.   1.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]
  [  0.   0.   0.   0.   0.   0.]


In [90]:
from stable_baselines3 import PPO
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.envs.multi_input_envs import SimpleMultiObsEnv

# 環境の生成
env = PuzzEnv()

# 環境のラップ
env = Monitor(env, filename=None, allow_early_resets=True)
env = DummyVecEnv([lambda: env])
# env = SimpleMultiObsEnv()

In [91]:
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3.common.type_aliases import Schedule
from stable_baselines3.common.preprocessing import get_flattened_obs_dim, is_image_space
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import torch as th
from torch import nn

class NatureCNN(BaseFeaturesExtractor):
    """
    CNN from DQN nature paper:
        Mnih, Volodymyr, et al.
        "Human-level control through deep reinforcement learning."
        Nature 518.7540 (2015): 529-533.
    :param observation_space:
    :param features_dim: Number of features extracted.
        This corresponds to the number of unit for the last layer.
    """

    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 512):
        super(NatureCNN, self).__init__(observation_space, features_dim)
        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        assert is_image_space(observation_space, check_channels=False), (
            "You should use NatureCNN "
            f"only with images not with {observation_space}\n"
            "(you are probably using `CnnPolicy` instead of `MlpPolicy` or `MultiInputPolicy`)\n"
            "If you are using a custom environment,\n"
            "please check it using our env checker:\n"
            "https://stable-baselines3.readthedocs.io/en/master/common/env_checker.html"
        )
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(th.as_tensor(observation_space.sample()[None]).float()).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

class MyActorCriticCnnPolicy(ActorCriticPolicy):
    """
    CNN policy class for actor-critic algorithms (has both policy and value prediction).
    Used by A2C, PPO and the likes.
    :param observation_space: Observation space
    :param action_space: Action space
    :param lr_schedule: Learning rate schedule (could be constant)
    :param net_arch: The specification of the policy and value networks.
    :param activation_fn: Activation function
    :param ortho_init: Whether to use or not orthogonal initialization
    :param use_sde: Whether to use State Dependent Exploration or not
    :param log_std_init: Initial value for the log standard deviation
    :param full_std: Whether to use (n_features x n_actions) parameters
        for the std instead of only (n_features,) when using gSDE
    :param sde_net_arch: Network architecture for extracting features
        when using gSDE. If None, the latent features from the policy will be used.
        Pass an empty list to use the states as features.
    :param use_expln: Use ``expln()`` function instead of ``exp()`` to ensure
        a positive standard deviation (cf paper). It allows to keep variance
        above zero and prevent it from growing too fast. In practice, ``exp()`` is usually enough.
    :param squash_output: Whether to squash the output using a tanh function,
        this allows to ensure boundaries when using gSDE.
    :param features_extractor_class: Features extractor to use.
    :param features_extractor_kwargs: Keyword arguments
        to pass to the features extractor.
    :param normalize_images: Whether to normalize images or not,
         dividing by 255.0 (True by default)
    :param optimizer_class: The optimizer to use,
        ``th.optim.Adam`` by default
    :param optimizer_kwargs: Additional keyword arguments,
        excluding the learning rate, to pass to the optimizer
    """

    def __init__(
        self,
        observation_space: gym.spaces.Space,
        action_space: gym.spaces.Space,
        lr_schedule: Schedule,
        net_arch: Optional[List[Union[int, Dict[str, List[int]]]]] = None,
        activation_fn: Type[nn.Module] = nn.Tanh,
        ortho_init: bool = True,
        use_sde: bool = False,
        log_std_init: float = 0.0,
        full_std: bool = True,
        sde_net_arch: Optional[List[int]] = None,
        use_expln: bool = False,
        squash_output: bool = False,
        features_extractor_class: Type[BaseFeaturesExtractor] = NatureCNN,
        features_extractor_kwargs: Optional[Dict[str, Any]] = None,
        normalize_images: bool = True,
        optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
        optimizer_kwargs: Optional[Dict[str, Any]] = None,
    ):
        super(MyActorCriticCnnPolicy, self).__init__(
            observation_space,
            action_space,
            lr_schedule,
            net_arch,
            activation_fn,
            ortho_init,
            use_sde,
            log_std_init,
            full_std,
            sde_net_arch,
            use_expln,
            squash_output,
            features_extractor_class,
            features_extractor_kwargs,
            normalize_images,
            optimizer_class,
            optimizer_kwargs,
        )

In [71]:
# エージェントの訓練
model = PPO(MyActorCriticCnnPolicy, env, verbose=1).learn(300000)

Using cuda device
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 3.24     |
|    ep_rew_mean     | 1.03     |
| time/              |          |
|    fps             | 706      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 2.79        |
|    ep_rew_mean          | 0.95        |
| time/                   |             |
|    fps                  | 532         |
|    iterations           | 2           |
|    time_elapsed         | 7           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.009573733 |
|    clip_fraction        | 0.049       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.6        |
|    explained_variance   | -4.7e-05    |
|    learnin

In [94]:
# 訓練済みエージェントのテスト
obs = env.reset()
n_steps = 155
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  # print('obs=', obs, 'reward=', reward, 'done=', done)
  print('score=', obs[0][2][0][1])
  env.render('console')
  if done:
    # VecEnvは、エピソード完了に遭遇すると自動的にリセットされることに注意
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [3]
score= 0
3 2 2 4 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 2
Action:  [3]
score= 0
3 2 2 4 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 3
Action:  [3]
score= 0
3 2 4 2 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 4
Action:  [0]
score= 0
3 2 2 4 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 5
Action:  [3]
score= 0
3 2 4 2 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 6
Action:  [0]
score= 0
3 2 2 4 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 7
Action:  [3]
score= 0
3 2 4 2 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 8
Action:  [0]
score= 0
3 2 2 4 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 9
Action:  [3]
score= 0
3 2 4 2 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 10
Action:  [0]
score= 0
3 2 2 4 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 11
Action:  [3]
score= 0
3 2 4 2 5 6
6 6 5 1 6 5
5 5 6 2 3 6
6 6 3 4 2 2
4 6 2 1 4 2
Step 12
Action:  [0