In [0]:
# パッケージ のインポート
from game import  State
from dual_network import DN_INPUT_SHAPE
from math import sqrt
from tensorflow.keras.models import load_model
from pathlib import Path
import numpy as np

ModuleNotFoundError: ignored

In [0]:
# パラメータの準備
PV_EVALUATE_COUNT = 50

In [0]:
# 推論
def predict(model, state):
    # 推論のための入力データのシェイプの変換
    a, b, c = DN_INPUT_SHAPE # (3,3,2)
    x = np.array([state.pieces, state.enemy_pieces])
    x = x.reshape(c, a, b).transpose(1,2,0).reshape(1, a, b, c) # シェイプ(1, 3, 3, 2)

    # 推論
    y = model.predict(x, batch_size=1)

    # 方策の取得
    policies = y[0][0][list(state.legal_actions())] # 合法手のみ
    policies /= sum(policies) if sum(policies) else 1 # 合計１の確率分布に変換

    #価値の取得
    value = y[1][0][0]
    
    return policies, value

In [0]:
# ノードのリストを試行回数のリストに変換
def nodes_to_scores(nodes):
    scores = []
    for c in nodes:
        scores.append(c.n)
    return scores

In [0]:
# モンテカルロ木探索のスコアの取得
def pv_mcts_scores(model, state, temperature):
    # モンテカルロ木探索のノードの定義
    class node:
        # ノードの初期化
        def __init__(self, state, p):
            self.state = state # 状態
            self.p = p # 方策
            self.w = 0 # 累計価値
            self.n = 0 # 試行回数
            self.child_nodes = None # 子ノード群

        # 局面の評価
        def evaluate(self):
            if self.state.is_done():
                # 勝敗結果で価値を取得
                value = -1 if self.state.is_lose() else 0

                # 累計価値と試行回数の更新
                self.w += value
                self.n += 1
                return value

            # ゲーム終了時以外で子ノードが存在しない時
            if not self.child_nodes:
                # ニューラルネットワーくの推論で方策と価値を取得
                policies, value = predict(model, self.state)

                # 累計価値と試行回数の更新
                self.w += value
                self.n += 1

                # 子ノードの展開
                self.child_nodes = []
                for action, policy in zip(self.state.legal_actions(), policies):
                    self.child_nodes.append(node(self.state.next(action), policy))

                return value

            # 子ノードが存在する時
            else:
                # アークの評価値が最大の子ノードの評価で価値を取得
                value = self.next_child_node().evaluate()

                # 累計価値と試行回数の更新
                self.w += value
                self.n += 1
                return value

        # アーク評価値が最大の子ノードの取得
        def next_child_node(self):
            C_PUCT = 1.0 # 「勝率」と「手の予測確率　* バイアス」のバランスを調整するために定数
            t = sum(nodes_to_scores(self.child_nodes))
            pucb_values = []
            for c in self.child_nodes:
                pucb_values.append((-c.w / c.n if c.n else 0.0) + C_PUCT * c.p * sqrt(t) / (1 + c.n))
        
            # アーク評価値が最大の子ノードを返す
            return self.child_nodes[np.argmax(pucb_values)]

    # 現在の局面のノードの作成
    root_node = node(state, 0)

    # 複数回の評価の実行
    for _ in range(PV_EVALUATE_COUNT):
        root_node.evaluate()

    # 合法手の確率分布
    scores = nodes_to_scores(root_node.child_nodes)
    if temperature == 0: # 試行回数が最大値のスコアのみ１、それ以外は全て０
        action = np.argmax(scores)
        scores = np.zeros(len(scores))
        scores[action] = 1
    else: # ボルツマン分布でばらつき付加
        scores = boltzman(scores, temperature)
    
    return scores

In [0]:
# モンテカルロ木探索で行動選択
def pv_mcts_action(model, temperature=0):
    def pv_mcts_action(state):
        scores = pv_mcts_scores(model, state, temperature)
        return np.random.choice(state.legal_actions(), p=scores)
    # 関数の中の関数の返り値を返す
    return pv_mcts_action

In [0]:
# ボルツマン分布
def boltzman(xs, temperature):
    xs = [x ** (1 / temperature) for x in xs]
    return [x / sum(xs) for x in xs]

In [0]:
# 動作確認
if __name__ == '__main__':
    # モデル読み込み
    path = sorted(Path('./model').glob('*.h5'))[-1]
    model = load_model(str(path))

    # 状態の生成
    state = State()

    # モンテカルロ木探索で行動選択を行う関数の生成
    next_action = pv_mcts_action(model, 1.0)

    # ゲーム終了までループ
    while True:
        if state.is_done():
            break

        action = next_action(state)

        state = state.next(action)

        print(state)