In [1]:
import os
import random
import math
import numpy as np
import json
import optuna
from optuna.pruners import MedianPruner
import matplotlib.pyplot as plt
from collections import defaultdict
from typing import List, Tuple, Optional
from statistics import mean

import pathlib
import sys

BASE_DIR = str(pathlib.Path(os.getcwd()).parent)
sys.path.append(str(BASE_DIR))
print(f"BASE_DIR: {BASE_DIR}")

BASE_DIR: /Users/mori/dev/Othello-Web-app-with-RL/rl_agent


In [2]:
model_save_dir = os.path.join(BASE_DIR, "models")
os.makedirs(model_save_dir, exist_ok=True)
weights_path = os.path.join(model_save_dir, "v2.json")

db_dir = os.path.join(BASE_DIR, "dbs")
os.makedirs(db_dir, exist_ok=True)
optuna_db_path = os.path.join(db_dir, "v2.db")

EMPTY, BLACK, WHITE = 0, 1, -1
DIRECTIONS = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]

SEED = 42

## Othello環境

In [3]:
class Othello:
    def __init__(self):
        self.board = [[EMPTY for _ in range(8)] for _ in range(8)]
        self.board[3][3] = self.board[4][4] = WHITE
        self.board[3][4] = self.board[4][3] = BLACK
        self.player = BLACK

    def clone(self):
        g = Othello()
        g.board = [row[:] for row in self.board]
        g.player = self.player
        return g

    def inside(self, r, c): return 0 <= r < 8 and 0 <= c < 8

    def legal_moves(self, player=None) -> List[Tuple[int, int]]:
        if player is None: player = self.player
        moves = []
        for r in range(8):
            for c in range(8):
                if self.board[r][c] != EMPTY: continue
                if self._would_flip(r, c, player):
                    moves.append((r, c))
        return moves

    def _would_flip(self, r, c, player) -> bool:
        if self.board[r][c] != EMPTY: return False
        for dr, dc in DIRECTIONS:
            rr, cc = r + dr, c + dc
            seen_opp = False
            while self.inside(rr, cc) and self.board[rr][cc] == opponent(player):
                seen_opp = True
                rr += dr; cc += dc
            if seen_opp and self.inside(rr, cc) and self.board[rr][cc] == player:
                return True
        return False

    def play(self, r, c, player=None):
        if player is None: player = self.player
        assert self.board[r][c] == EMPTY
        flipped = []
        for dr, dc in DIRECTIONS:
            line = []
            rr, cc = r + dr, c + dc
            while self.inside(rr,cc) and self.board[rr][cc] == opponent(player):
                line.append((rr,cc))
                rr += dr; cc += dc
            if line and self.inside(rr,cc) and self.board[rr][cc] == player:
                flipped.extend(line)
        if not flipped: raise ValueError("Illegal move")
        self.board[r][c] = player
        for rr,cc in flipped: self.board[rr][cc] = player
        self.player = opponent(player)
        if not self.legal_moves(self.player):
            self.player = opponent(self.player)

    def terminal(self) -> bool:
        if self.legal_moves(BLACK): return False
        if self.legal_moves(WHITE): return False
        return True

    def score(self) -> int:
        s = 0
        for r in range(8):
            for c in range(8):
                s += self.board[r][c]
        return s

    def winner(self) -> int:
        s = self.score()
        return BLACK if s > 0 else WHITE if s < 0 else 0

In [4]:
def generate_systematic_2tuples() -> List[Tuple[Tuple[int,int], ...]]:
    """8x8上の隣接ペアを系統的に列挙（横・縦・斜め）"""
    tuples = []
    # 横
    for r in range(8):
        for c in range(7):
            tuples.append(((r,c),(r,c+1)))
    # 縦
    for r in range(7):
        for c in range(8):
            tuples.append(((r,c),(r+1,c)))
    # 斜め（↘）
    for r in range(7):
        for c in range(7):
            tuples.append(((r,c),(r+1,c+1)))
    # 斜め（↙）
    for r in range(7):
        for c in range(1,8):
            tuples.append(((r,c),(r+1,c-1)))
    return tuples  # 合計: 56 + 56 + 49 + 49 = 210 本、各テーブルサイズ 3^2=9

class NTupleValue:
    """
    n-tuple ネットワーク（3進エンコード）
    - 盤面は「手番プレイヤー視点」で符号を反転せず評価
    - 各タプルは {自=2, 空=1, 相手=0} の3値をbase-3でインデックス化
    """
    __slots__ = ("tuples", "tables", "base_pow", "n_trits", "rng")

    def __init__(self, tuples=None, seed=0):
        self.tuples = tuples if tuples is not None else generate_systematic_2tuples()
        self.n_trits = 3  # 自/空/相手
        # 各タプルの長さ（ここでは全て2）
        self.base_pow = [3**i for i in range(8)]  # 最大長8まで対応
        self.tables = [np.zeros(self.n_trits**len(t), dtype=np.float64) for t in self.tuples]
        self.rng = random.Random(seed)

    @staticmethod
    def _cell_trit(v: int, player: int) -> int:
        # 自=2, 空=1, 相手=0
        if v == player: return 2
        if v == EMPTY: return 1
        return 0

    def indices_for(self, game: Othello, player: int) -> List[int]:
        b = game.board
        idxs = []
        for t in self.tuples:
            s = 0
            for i, (r,c) in enumerate(t):
                s += self._cell_trit(b[r][c], player) * self.base_pow[i]
            idxs.append(s)
        return idxs

    def value_from_indices(self, idxs: List[int]) -> float:
        v = 0.0
        for tbl, idx in zip(self.tables, idxs):
            v += tbl[idx]
        return v

    def value(self, game: Othello, player: int) -> float:
        return self.value_from_indices(self.indices_for(game, player))

    def update(self, idxs: List[int], target: float, alpha: float):
        v = self.value_from_indices(idxs)
        delta = target - v
        a = alpha * delta / len(self.tables)  # 均等配分で学習安定化
        for tbl, idx in zip(self.tables, idxs):
            tbl[idx] += a
        return delta

    # 便利：保存/読み込み
    def to_dict(self):
        return {
            "tuples": self.tuples,
            "tables": [tbl.tolist() for tbl in self.tables]
        }
    @classmethod
    def from_dict(cls, d):
        obj = cls(tuples=[tuple(map(tuple, t)) for t in d["tuples"]])
        obj.tables = [np.array(t, dtype=np.float64) for t in d["tables"]]
        return obj

In [None]:
# ------------------------
# 方策（ε-greedy）と対局
# ------------------------
def choose_move(game: Othello, player: int, V: NTupleValue, eps=0.1):
    moves = game.legal_moves(player)
    if not moves: return None
    if random.random() < eps:
        return random.choice(moves)
    best_v = -1e18
    best_m = moves[0]
    for m in moves:
        g2 = game.clone(); g2.play(m[0], m[1], player)
        v = V.value(g2, player)
        if v > best_v:
            best_v = v; best_m = m
    return best_m

# ------------------------
# 学習ループ・評価
# ------------------------
def train(num_games=5000, alpha=0.01, my_eps=0.2, seed=SEED):
    random.seed(seed); np.random.seed(seed)
    V = NTupleValue(seed=seed)
    results = {BLACK:0, WHITE:0, 0:0}
    history = []
    for i in range(1, num_games+1):
        # 学習時：自分は ε-貪欲、相手はヒューリスティック例
        w = play_one_game_for_train(
            V, alpha=alpha, learn=True,
            my_policy="eps_greedy", my_eps=my_eps,
            opp_policy="heuristic", opp_V=None, opp_eps=0.05
        )
        results[w] += 1
        if i % 100 == 0:
            total = i
            history.append({
                "game": i,
                "black": results[BLACK]/total*100,
                "white": results[WHITE]/total*100,
                "draw": results[0]/total*100,
            })
            print(f"[{i}] B:{results[BLACK]/total*100:.1f}% W:{results[WHITE]/total*100:.1f}% D:{results[0]/total*100:.1f}%")
    return V, history

def plot_train_history(train_history):
    xs = [h['game'] for h in train_history]
    b = [h["black"] for h in train_history]
    w = [h["white"] for h in train_history]
    d = [h["draw"] for h in train_history]
    plt.figure(figsize=(7,4))
    plt.plot(xs, b, label="Black Win %")
    plt.plot(xs, w, label="White Win %")
    plt.plot(xs, d, label="Draw %")
    plt.xlabel("Games"); plt.ylabel("Win Rate %")
    plt.legend(); plt.grid(True); plt.tight_layout(); plt.show()

def play_match(V: NTupleValue, eps=0.0, games=50, seed=SEED):
    random.seed(seed)
    res = {BLACK:0, WHITE:0, 0:0}
    for _ in range(games):
        w = play_one_game(V, alpha=0.0, eps=eps, learn=False)
        res[w] += 1
    print(f"vs greedy self  ({games} games): B={res[BLACK]} W={res[WHITE]} D={res[0]}")
    return res

def _eval_afterstate(g_after: Othello, player: int, V):
    """
    afterstate g_after を V で評価してスカラーを返す。
    V が NTupleValue でも LinearValue でも動く。
    """
    # NTupleValue: value(game, player)
    if hasattr(V, "indices_for") and hasattr(V, "value"):
        return V.value(g_after, player)
    # LinearValue: value(features(game, player))
    elif hasattr(V, "w"):
        return V.value(features(g_after, player))
    else:
        raise TypeError("Unsupported value function type for evaluation")

def select_move(
    game: Othello,
    player: int,
    V,
    policy: str = "eps_greedy",   # "eps_greedy" | "greedy" | "random" | "heuristic"
    eps: float = 0.1,
    tol: float = 1e-12,
):
    """
    学習/評価兼用の次手選択。V は NTupleValue でも LinearValue でもOK。
    """
    moves = game.legal_moves(player)
    if not moves:
        return None

    # 完全ランダム
    if policy == "random":
        return random.choice(moves)

    # ヒューリスティック（角→X回避→辺優先、同値はVで貪欲。Vが無ければランダム）
    if policy == "heuristic":
        corners = {(0,0),(0,7),(7,0),(7,7)}
        xcells = {(0,1),(1,0),(1,1),(0,6),(1,6),(1,7),(6,0),(6,1),(7,1),(6,6),(6,7),(7,6)}
        edges = set([(0,c) for c in range(2,6)] + [(7,c) for c in range(2,6)] +
                    [(r,0) for r in range(2,6)] + [(r,7) for r in range(2,6)])

        # 1) 角があれば角。同値は V で選ぶ（なければランダム）
        corner_moves = [m for m in moves if m in corners]
        if corner_moves:
            if V is None:
                return random.choice(corner_moves)
            best_v, cands = -1e18, []
            for m in corner_moves:
                g2 = game.clone(); g2.play(m[0], m[1], player)
                v = _eval_afterstate(g2, player, V)
                if v > best_v + tol: best_v, cands = v, [m]
                elif abs(v - best_v) <= tol: cands.append(m)
            return random.choice(cands)

        # 2) X回避 → 3) 辺優先
        pool = [m for m in moves if m not in xcells] or moves
        edge_pool = [m for m in pool if m in edges] or pool

        # 少しだけランダム性（epsで制御）
        if random.random() < eps:
            return random.choice(edge_pool)

        # 残りは V で貪欲（Vが無ければランダム）
        if V is None:
            return random.choice(edge_pool)
        best_v, cands = -1e18, []
        for m in edge_pool:
            g2 = game.clone(); g2.play(m[0], m[1], player)
            v = _eval_afterstate(g2, player, V)
            if v > best_v + tol: best_v, cands = v, [m]
            elif abs(v - best_v) <= tol: cands.append(m)
        return random.choice(cands)

    # ここからは V を使うモード（greedy / eps_greedy）
    assert V is not None, "greedy/eps_greedy policy requires V"

    # ε-貪欲
    if policy == "eps_greedy":
        if random.random() < eps:
            return random.choice(moves)
        # fallthroughして貪欲へ

    # 貪欲
    best_v = -1e18
    best_m = moves[0]
    for m in moves:
        g2 = game.clone(); g2.play(m[0], m[1], player)
        v = _eval_afterstate(g2, player, V)
        if v > best_v:
            best_v, best_m = v, m
    return best_m

def play_one_game_for_train(
    V: NTupleValue,
    alpha=0.01,
    learn=True,
    my_policy="eps_greedy",   # ← 追加: 自分の選択モード
    my_eps=0.2,               # ← 追加: 自分のε
    opp_policy="heuristic",   # ← 追加: 相手の選択モード
    opp_V=None,               # ← 追加: 相手がモデル貪欲/ε-貪欲ならそのV（Linear/NTuple）
    opp_eps=0.05              # ← 追加: 相手のε
):
    g = Othello()
    histories = []  # (idxs_after, player)

    while not g.terminal():
        p = g.player
        if p == BLACK:
            m = select_move(g, p, V, policy=my_policy, eps=my_eps)
        else:
            m = select_move(g, p, opp_V if opp_policy in ("greedy","eps_greedy") else V,
                            policy=opp_policy, eps=opp_eps)
        if m is None:
            g.player = opponent(g.player)
            continue
        g2 = g.clone(); g2.play(m[0], m[1], p)
        idxs = V.indices_for(g2, p)  # afterstate
        histories.append((idxs, p))
        g.play(m[0], m[1], p)

    w = g.winner()
    reward_black = 1.0 if w == BLACK else -1.0 if w == WHITE else 0.0

    if learn:
        for idxs, p in histories:
            target = reward_black if p == BLACK else -reward_black
            V.update(idxs, target, alpha)
    return w

def evaluate_agent(V_my, games_per_color=50, opp_type="random", opp_V=None, opp_eps=0.05, seed=2025):
    """
    色交代の平均勝率（0〜1）を返す。
    BLACK/WHITE それぞれ games_per_color 対局し合計2*games_per_color。
    Args:
        V_my: 自分の価値関数
        games_per_color: 各色の対局数
        opp_type: 相手のタイプ
        opp_V: 相手の価値関数
        opp_eps: 相手のε
        seed: seed
    """
    random.seed(seed)
    wins = draws = losses = 0
    # 自分が黒
    for _ in range(games_per_color):
        r = play_one_game_fixed_opponent(V_my, V_opp, my_color=BLACK, opp_eps=opp_eps)
        if r > 0: wins += 1
        elif r < 0: losses += 1
        else: draws += 1
    # 自分が白
    for _ in range(games_per_color):
        r = play_one_game_fixed_opponent(V_my, V_opp, my_color=WHITE, opp_eps=opp_eps)
        if r > 0: wins += 1
        elif r < 0: losses += 1
        else: draws += 1
    total = 2 * games_per_color
    winrate = (wins + 0.5 * draws) / total
    return {"wins": wins, "losses": losses, "draws": draws, "total": total, "winrate": winrate}


In [7]:
# ------------------------
# 対人対局（実戦は eps=0.0）
# ------------------------
def print_board(game: Othello):
    b = game.board
    header = "   " + " ".join([chr(ord('a')+c) for c in range(8)])
    print(header)
    for r in range(8):
        line = f"{r+1:2d} "
        for c in range(8):
            if b[r][c] == BLACK: ch = "○"
            elif b[r][c] == WHITE: ch = "●"
            else: ch = "."
            line += ch + " "
        print(line)
    turn = "Black(○)" if game.player == BLACK else "White(●)"
    print(f"Turn: {turn}")

def parse_move(s: str):
    s = s.strip().lower()
    if s in ("pass", "p"): return None
    if len(s) != 2: return "ERR"
    col = ord(s[0]) - ord('a')
    row = ord(s[1]) - ord('1')
    if 0 <= row < 8 and 0 <= col < 8: return (row, col)
    return "ERR"

def human_vs_agent(V: NTupleValue, human_color=BLACK, seed=123):
    random.seed(seed)
    g = Othello()
    while not g.terminal():
        print_board(g)
        p = g.player
        if p == human_color:
            moves = g.legal_moves(p)
            if not moves:
                print("No legal moves. Pass.")
                g.player = opponent(g.player)
                continue
            s = input("Your move (e.g., d3 / pass / quit): ").strip().lower()
            if s in ("quit", "q"):
                print("Quit."); return
            mv = parse_move(s)
            if mv == "ERR":
                print("Format error. Try again."); continue
            if mv is None:
                g.player = opponent(g.player); continue
            if mv not in moves:
                print("Illegal. Try again."); continue
            g.play(mv[0], mv[1], p)
        else:
            m = choose_move(g, p, V, eps=0.0)  # 実戦は探索0
            if m is None:
                print("(Agent) Pass.")
                g.player = opponent(g.player); continue
            print(f"(Agent) move: {chr(ord('a')+m[1])}{m[0]+1}")
            g.play(m[0], m[1], p)

    print_board(g)
    w = g.winner()
    if w == 0: print("Draw!")
    elif w == BLACK: print("Black(○) wins!")
    else: print("White(●) wins!")

In [None]:
def opponent(c):
    """現在のプレイヤーの相手を返す"""
    return -c

def train_with_report(num_games, alpha, eps, seed, trial=None, report_every=200):
    V, hist = train(num_games=num_games, alpha=alpha, eps=eps, seed=seed)
    # 学習中の中間値（黒勝率など）をレポートしてプルーニングに使う
    if trial is not None:
        for h in hist:
            if h["game"] % report_every == 0:
                trial.report(h["black"], step=h["game"])
                if trial.should_prune():
                    raise optuna.TrialPruned()
    return V

def objective(trial):
    # --- 探索対象 ---
    alpha = trial.suggest_float("alpha", 1e-4, 5e-2, log=True)
    eps   = trial.suggest_float("eps",   1e-2, 2e-1, log=True)
    # 学習ゲーム数は控えめ（例: 2000〜6000）にして、評価にコストを回す
    num_games = trial.suggest_int("num_games", 2000, 6000, step=200)

    # --- 複数seedでノイズを平均化 ---
    seeds = [41, 42, 43]
    eval_results = []
    for s in seeds:
        V = train_with_report(num_games, alpha, eps, s, trial=trial, report_every=200)
        # 評価：固定相手×色交代（例：ヒューリスティック）
        er = evaluate_agent(V, games_per_color=30, opp_type="random", opp_V=None, opp_eps=0.05, seed=1000+s)
        eval_results.append(er["winrate"])
    return mean(eval_results)

# --- Study 作成：TPE seed固定＋MedianPruner ---
sampler = optuna.samplers.TPESampler(seed=SEED)  # 再現性のための固定seed（Optuna公式推奨）
pruner  = MedianPruner(n_startup_trials=5, n_warmup_steps=5, interval_steps=1)

study = optuna.create_study(
    study_name="othello_ntuple_eval",
    storage=f"sqlite:///{optuna_db_path}",
    sampler=sampler,
    pruner=pruner,
    load_if_exists=True,
    direction="maximize"
)
study.optimize(objective, n_trials=30)

bt = study.best_trial
params = bt.params
print("Best params:", params)  # 例: {'alpha': 0.008, 'eps': 0.15, 'num_games': 4000}

[I 2025-08-11 00:21:59,640] Using an existing study with name 'othello_ntuple_eval' instead of creating a new one.


[100] B:51.0% W:48.0% D:1.0%
[200] B:52.0% W:47.5% D:0.5%
[300] B:58.0% W:41.7% D:0.3%
[400] B:61.0% W:38.8% D:0.2%
[500] B:62.6% W:37.0% D:0.4%
[600] B:64.3% W:35.3% D:0.3%
[700] B:64.7% W:35.0% D:0.3%
[800] B:65.5% W:33.9% D:0.6%
[900] B:65.4% W:33.8% D:0.8%
[1000] B:66.5% W:32.6% D:0.9%
[1100] B:66.9% W:32.0% D:1.1%
[1200] B:67.2% W:31.8% D:1.0%
[1300] B:67.5% W:31.5% D:1.0%
[1400] B:67.2% W:31.7% D:1.1%
[1500] B:66.1% W:32.9% D:1.0%
[1600] B:65.7% W:33.3% D:1.0%
[1700] B:66.1% W:32.8% D:1.1%
[1800] B:66.1% W:32.8% D:1.1%
[1900] B:66.3% W:32.6% D:1.1%
[2000] B:66.0% W:32.9% D:1.1%
[2100] B:65.5% W:33.3% D:1.1%
[2200] B:65.5% W:33.2% D:1.3%
[2300] B:65.0% W:33.7% D:1.3%
[2400] B:63.7% W:35.0% D:1.2%
[2500] B:62.7% W:36.0% D:1.3%
[2600] B:61.6% W:37.0% D:1.3%
[2700] B:60.3% W:38.3% D:1.3%
[2800] B:58.8% W:39.9% D:1.3%
[2900] B:57.6% W:41.2% D:1.2%
[3000] B:56.4% W:42.3% D:1.3%
[3100] B:55.2% W:43.5% D:1.4%
[3200] B:54.3% W:44.3% D:1.4%
[3300] B:53.4% W:45.2% D:1.4%
[3400] B:52.6% W:46

[W 2025-08-11 00:25:33,251] Trial 1 failed with parameters: {'alpha': 0.0010253509690168491, 'eps': 0.17254716573280354, 'num_games': 5000} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/Users/mori/dev/Othello-Web-app-with-RL/rl_agent/.venv/lib/python3.12/site-packages/optuna/study/_optimize.py", line 201, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/var/folders/mx/r5dnnbr55_b4_pyzp04zlqtm0000gn/T/ipykernel_16554/3678377591.py", line 27, in objective
    V = train_with_report(num_games, alpha, eps, s, trial=trial, report_every=200)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/mx/r5dnnbr55_b4_pyzp04zlqtm0000gn/T/ipykernel_16554/3678377591.py", line 6, in train_with_report
    V, hist = train(num_games=num_games, alpha=alpha, eps=eps, seed=seed)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/fold

KeyboardInterrupt: 

In [None]:
SEED = 2025
V_best, _ = train(num_games=params["num_games"], alpha=params["alpha"], eps=params["eps"], seed=SEED)
V_best, _ = train(num_games=2000, alpha=0.1, eps=0.1, seed=SEED)

# 4) 任意：保存しておく
with open("./models/best_ntuple.json", "w") as f:
    json.dump(V_best.to_dict(), f)

# 5) 人間 vs ベストモデルで対戦（実戦は探索0）
human_vs_agent(V_best, human_color=WHITE, seed=SEED)  # 人間が白、エージェントが黒