mission0と1の対戦、3*3、マイナスなし

In [83]:
from magic import *

import os
import random

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn, optim
import matplotlib
from collections import defaultdict
import copy


In [84]:
hands = torch.tensor([1,2,3])
hands.device

device(type='cpu')

状態と行動を入力に

In [85]:

def rank_competition(arr: list, i: int):
    """リストのi個目の要素が何番目に大きいか"""
    v = arr[i]
    return sum(x > v for x in arr)

def rank_inverse(arr: list, rank: int):
    """リストでrank番目に大きいのは何個目か"""
    sorted_arr = sorted(arr, reverse=True)
    hand = sorted_arr[rank]
    for num_index, h in enumerate(arr):
        if hand == h: 
            return num_index

# ==== 行動をidと紐づけ ====
def id_to_action(size: int, hands: list[int], aid: int):
    x = (aid // 4) // size
    y = (aid // 4) % size
    number = aid % 4 + 1
    for num_index, hand in enumerate(hands):
        if number == hand:
            return x, y, num_index
    return x, y, None


def make_valid_filter(hands: torch.Tensor, action_space: int, nums_per_cell: int = 4,
                      neg_large: float = -1e9) -> torch.Tensor:
    """
    hands: (N, 3)  各行は {1,2,3,4} から3枚
    戻り値: (N, action_space) のフィルター
            合法: 0、 非合法: neg_large（例: -1e9）
    """
    device = hands.device
    N = hands.size(0)

    # 各 action_id の「数字」（1..4）を求める: num[a] = (a % 4) + 1
    a = torch.arange(action_space, device=device)
    num = (a % nums_per_cell) + 1                      # (36,)

    # hands に num が含まれるかを各サンプル・各アクションで判定
    # hands: (N,3) → (N,3,1), num: (36,) → (1,1,36)
    has_num = (hands.unsqueeze(2) == num.view(1, 1, -1)).any(dim=1)  # (N,36) bool

    # 合法: 0、非合法: neg_large
    filt = torch.zeros((N, action_space), dtype=torch.float32, device=device)
    filt = torch.where(has_num, filt, torch.full_like(filt, neg_large))
    return filt


def make_move_from_aid(s:GameState, pid: int, aid: int):
    x,y,num_index = id_to_action(s.rules.board_size, s.hands[pid], aid)
    if not num_index:
        return False, None
    return make_move(s, pid, x, y, num_index, "add")

def make_board_tensor(board, device=None, dtype=torch.float32):
    # Pythonの処理でNone判定→テンソル化
    vals = [[(0 if x is None else x) for x in row] for row in board]
    mask = [[(0 if x is None else 1) for x in row] for row in board]
    vals_t = torch.tensor(vals, dtype=dtype, device=device)  # (n,n)
    mask_t = torch.tensor(mask, dtype=dtype, device=device)  # (n,n)
    out = torch.stack([vals_t, mask_t], dim=0)               # (2,n,n)
    return out



報酬を定義

In [86]:
def get_reward(s: GameState):
    """
    戻り値:
     done,rewards
    勝利ならr=1、敗北ならr=-1
    """
    done = False
    rs = {0:0, 1:0}
    s_round = s.round
    for pidx in range(len(s.players)):
        if is_victory(s.board, s.missions[pidx]):
            rs[pidx] += 1
            rs[1-pidx] += -1
            done = True
    
    if not done:
        if s_round > 49:
            done = True
    
    return done, rs

def make_reward_filter(boards: torch.Tensor, board_size: int, mids: torch.Tensor, action_space: int):
    N = boards.size(0)
    filt = torch.zeros(size=(N, action_space), dtype=torch.float32)
    
    for n in range(N):
        missions: list[Mission] = [MISSIONS[mid.item()] for mid in mids[n]]
        for aid in range(action_space):
            board = copy.deepcopy(boards[n][0]).tolist()
            board_none = copy.deepcopy(boards[n][1])
            for x in range(board_size):
                for y in range(board_size):
                    if board_none[y][x] == 0:
                        board[y][x] = None

            x = (aid // 4) // board_size
            y = (aid // 4) % board_size
            number = aid % 4 + 1
            if board[y][x]:
                board[y][x] += number
            else:
                board[y][x] = number
            if is_victory(board, missions[0]):
                filt[n][aid]+=1
            if is_victory(board, missions[1]):
                filt[n][aid]+=-1
    return filt

DQNエージェント

In [87]:
from collections import deque

class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.buffer = deque(maxlen=buffer_size)
        self.batch_size = batch_size

    def add(self, s: GameState, aid: int, pid: int, reward, next_s: GameState, done: bool):
        
        board = make_board_tensor(s.board)
        hands = torch.tensor(s.hands[pid], dtype = torch.float32)
        aid = torch.tensor([aid])
        reward = torch.tensor([reward])
        next_board = make_board_tensor(next_s.board)
        next_hands = torch.tensor(next_s.hands[pid], dtype = torch.float32)
        done = torch.tensor([done], dtype=int)
        mid = torch.tensor([s.missions[pid].mission_id, s.missions[1 - pid].mission_id], dtype=int)
        data = (board, hands, aid, reward, next_board, next_hands, done, mid)
        self.buffer.append(data)


    def __len__(self):
        return len(self.buffer)

    def get_batch(self):
        """
        boards, hands, aids, rewards, next_boards, next_hands, dones, mids
        """
        data = random.sample(self.buffer, self.batch_size)
        boards, hands, aids, rewards, next_boards, next_hands, dones, mids = map(torch.stack, zip(*data))
        return boards, hands, aids.squeeze(), rewards.squeeze(), next_boards, next_hands, dones.squeeze(), mids
    #state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()


In [88]:
class qnet_CNN(nn.Module):
    """
    boardをCNNで処理、のちに手札と合わせる
    """
    def __init__(self, action_space: int, board_size: int, hand_dim: int = 3):
        super().__init__()
        self.action_space = action_space
        self.board_size = board_size
        self.board_tower = nn.Sequential(
            nn.Conv2d(2, 16, 3, padding=1), nn.ReLU(),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(),
            nn.Flatten(),  # → 64*size**2
        )
        self.hand_tower = nn.Sequential(
            nn.Linear(hand_dim, 32), nn.ReLU(),
            nn.Linear(32, 32), nn.ReLU(),
        )
        fused_dim = 32*board_size**2 + 32
        self.head = nn.Sequential(
            nn.Linear(fused_dim, 128), nn.ReLU(),
            nn.Linear(128, action_space),
        )
        with torch.no_grad():
            last = self.head[-1]          # 最後の Linear
            last.weight.zero_()
            last.bias.zero_()

    def forward(self, boards, hands, mids):
        """
        boards (ex: s.board.unsqueeze(0))     : (N, 2, size, size)
        hands  (ex: s.hands[pid].unsqueeze(0)): (N, 3)
        mids: (N, 2)
        """
        hands, _ = torch.sort(hands, dim=1)

        hb = self.board_tower(boards)
        hh = self.hand_tower(hands)
        h = torch.cat([hb, hh], dim=1)
        v_filt = make_valid_filter(hands, self.action_space)
        r_filt = make_reward_filter(boards, self.board_size, mids, self.action_space)
        return self.head(h) + v_filt + r_filt


In [89]:

class DQNAgent():

    def __init__(self, size: int):
        self.gamma = 0.98
        self.lr = 0.01
        self.epsilon = 0.1
        self.buffer_size = 100000
        self.batch_size = 64
        self.warmup_size = 1000

        self.replay_buffer = ReplayBuffer(self.buffer_size, self.batch_size)

        self.board_size = size
        self.action_space = (self.board_size**2) * 4 

        self.qnet = qnet_CNN(self.action_space, self.board_size)
        self.qnet.train()
        self.lossfun = nn.SmoothL1Loss() 
        self.optimizer = optim.SGD(self.qnet.parameters(), self.lr)

        self.qnet_target = copy.deepcopy(self.qnet)        

    def get_action(self, s: GameState, pid: int, rand: bool = True):
        if (np.random.rand() < self.epsilon and rand) or (len(self.replay_buffer) < self.warmup_size):
            num = None
            count = 0
            while num == None:
                count += 1
                aid = random.randint(0, self.action_space - 1)
                _,_,num = id_to_action(self.board_size, s.hands[pid], aid)
                if count == 100:
                    assert()
            return aid
        else:
            with torch.no_grad():
                mids = [s.missions[pid].mission_id, s.missions[1 - pid].mission_id]
                q = self.qnet(
                    make_board_tensor(s.board).unsqueeze(0),
                    torch.tensor(s.hands[pid], dtype=torch.float32).unsqueeze(0),
                    torch.tensor(mids, dtype=torch.int).unsqueeze(0)
                    )
            best_index = np.argmax(q).item()
            return best_index

    def update(self, s: GameState, next_s: GameState, aid: int, pid: int, reward: float, done):
        if len(self.replay_buffer) < self.warmup_size:
            if reward != 0:
                self.replay_buffer.add(s, aid, pid, reward, next_s, done)
            return
        self.replay_buffer.add(s, aid, pid, reward, next_s, done)


        boards, hands, aids, rewards, next_boards, next_hands, dones, mids = self.replay_buffer.get_batch()
        qs = self.qnet(boards, hands, mids)
        
        q = qs[np.arange(self.batch_size), aids]

        next_qs = torch.tensor(np.zeros([self.batch_size, self.action_space]))
        for i in range(5):
            new_hand = max(1, i)
            next_hands[:,2] = new_hand
            next_qs += self.qnet_target(next_boards, next_hands, mids)/5
        next_q = next_qs.max(axis=1)

        target = rewards + (1 - dones) * self.gamma * next_q.values

        self.optimizer.zero_grad()

        loss = self.lossfun(q, target)     
        loss.backward()
        self.optimizer.step()

    def sync_qnet(self):
        self.qnet_target = copy.deepcopy(self.qnet)

DQN学習

In [90]:
class DQNlearning_btw_agents:
    def __init__(self, rules:Rules = Rules()):
        self.rules = rules
        self.episode = 0
        self.episode_buffer = []
        self.rewards_0 = []
        self.rewards_1 = []

    def learning(self, agents: list[DQNAgent], episodes:int, sync_interval: int):
        while self.episode < episodes:
            s = start_game(["agent_0","agent1"],self.rules,mission_id=[0,1])
            done = False
            rews = deque(maxlen=2)
            states = [None, None]
            aids = [None, None]
            pid = s.turn

            while not done:
                states[pid] = copy.deepcopy(s)
                aid = agents[pid].get_action(s,pid)
                aids[pid] = aid
                x, y, num_index = id_to_action(3, s.hands[pid], aid)
                make_move(s, pid, x, y, num_index,"add")

                pid = s.turn
                done, rew = get_reward(s)
                rews.append(rew)
                if s.round == 0:
                    continue
                reward = 0
                """総報酬を計算し、Qを更新"""
                for rew in rews:
                    reward += rew[pid]
                agents[pid].update(states[pid], s, aids[pid], pid, reward, done)
            
            pid = 1 - pid
            reward = rews[1][pid]
            agents[pid].update(states[pid], s, aids[pid], pid, reward, done)
            
            is_ready_random = False
            if (self.episode + 1) % sync_interval == 0:
                print(f"episode{self.episode} completed")
                for agent in agents:
                    agent.sync_qnet()
                    if len(agent.replay_buffer) > agent.warmup_size:
                        is_ready_random = True
                if is_ready_random: 
                    """ランダムと対戦"""
                    self.vs_random(agents)
            
            self.episode += 1

    def vs_random(self, agents: list[DQNAgent]):
        reward_0 = 0
        reward_1 = 0
        action_space = agents[0].action_space
        board_size = agents[0].board_size
        for game in range(100):
            s = start_game(["agent_0", "random"], self.rules, mission_id=[0,1])
            done = False
            while not done:
                pid = s.turn
                if pid == 0:
                    aid = agents[pid].get_action(s,pid,False)
                    x, y, num_index = id_to_action(3, s.hands[pid], aid)
                    make_move(s, pid, x, y, num_index,"add")                
                else:
                    """ランダムの操作"""
                    num = None
                    while num == None:
                        aid = random.randint(0, action_space - 1)
                        x,y,num = id_to_action(board_size, s.hands[pid], aid)
                    make_move(s, pid, x, y, num,"add")
                done, rew = get_reward(s)
            reward_0 += rew[0]
            
            s = start_game(["random", "agent_1"], self.rules, mission_id=[0,1])
            done = False
            while not done:
                pid = s.turn
                if pid == 1:
                    aid = agents[pid].get_action(s,pid,False)
                    x, y, num_index = id_to_action(3, s.hands[pid], aid)
                    make_move(s, pid, x, y, num_index,"add")                
                else:
                    """ランダムの操作"""
                    num = None
                    while num == None:
                        aid = random.randint(0, action_space - 1)
                        x,y,num = id_to_action(board_size, s.hands[pid], aid)
                    make_move(s, pid, x, y, num,"add")
                done, rew = get_reward(s)
            reward_1 += rew[1]
        print(f"reward_0 = {reward_0}")
        print(f"reward_1 = {reward_1}")
        self.rewards_0.append(reward_0)
        self.rewards_1.append(reward_1)
        self.episode_buffer.append(self.episode)  
    """
    def graph(self):
        plt.figure()
        plt.plot(self.episode, self.rewards_0, label="rewards_1")   # 1本目
        plt.plot(self.episode, self.rewards_1, label="rewards_2")   # 2本目
        plt.xlabel("episode")
        plt.ylabel("reward")
        plt.legend()
        plt.grid(True)
        plt.show()
    """

In [91]:
agents_CNN2 = [DQNAgent(size=3), DQNAgent(size=3)]
dqnlearning = DQNlearning_btw_agents()

In [93]:
dqnlearning.learning(agents_CNN2, 2000, 50)

episode49 completed
episode99 completed
episode149 completed
episode199 completed
episode249 completed
episode299 completed
episode349 completed
episode399 completed
episode449 completed
episode499 completed
episode549 completed
episode599 completed
episode649 completed
episode699 completed
episode749 completed
episode799 completed
episode849 completed
episode899 completed
episode949 completed
episode999 completed
episode1049 completed
episode1099 completed
episode1149 completed
episode1199 completed
episode1249 completed
reward_0 = 77
reward_1 = 88
episode1299 completed
reward_0 = 85
reward_1 = 68
episode1349 completed
reward_0 = 85
reward_1 = 81
episode1399 completed
reward_0 = 86
reward_1 = 72
episode1449 completed
reward_0 = 72
reward_1 = 80
episode1499 completed
reward_0 = 72
reward_1 = 80
episode1549 completed
reward_0 = 76
reward_1 = 86
episode1599 completed
reward_0 = 86
reward_1 = 82
episode1649 completed
reward_0 = 89
reward_1 = 77
episode1699 completed
reward_0 = 85
reward_1

In [37]:
mids = torch.tensor([[0, 1]], dtype=torch.int32)
missions: list[Mission] = [MISSIONS[mid.item()] for mid in mids[0]]
missions

[Mission(mission_id=0, type='sum', target='row', number=11, description='どこかの行の和が11'),
 Mission(mission_id=1, type='sum', target='column', number=11, description='どこかの列の和が11')]

In [13]:
print(dqnlearning.episode_buffer)
print(dqnlearning.rewards_0)
print(dqnlearning.rewards_1)

[1249, 1299, 1349, 1399, 1449, 1499, 1549, 1599, 1649, 1699, 1749, 1799, 1849, 1899, 1949, 1999, 2049, 2099, 2149, 2199, 2249, 2299, 2349, 2399, 2449, 2499, 2549, 2599, 2649, 2699, 2749, 2799, 2849, 2899, 2949, 2999, 3049, 3099, 3149, 3199, 3249, 3299, 3349, 3399, 3449, 3499, 3549, 3599, 3649, 3699, 3749, 3799, 3849, 3899, 3949, 3999, 4049, 4099, 4149, 4199, 4249, 4299, 4349, 4399, 4449, 4499, 4549, 4599, 4649, 4699, 4749, 4799, 4849, 4899, 4949, 4999, 5049, 5099, 5149, 5199, 5249, 5299, 5349, 5399, 5449, 5499, 5549, 5599, 5649, 5699, 5749, 5799, 5849, 5899, 5949, 5999, 6049, 6099, 6149, 6199, 6249, 6299, 6349, 6399, 6449, 6499, 6549, 6599, 6649, 6699, 6749, 6799, 6849, 6899, 6949, 6999, 7049, 7099, 7149, 7199, 7249, 7299, 7349, 7399, 7449, 7499, 7549, 7599, 7649, 7699, 7749, 7799, 7849, 7899, 7949, 7999, 8049, 8099, 8149, 8199, 8249, 8299, 8349, 8399, 8449, 8499, 8549, 8599, 8649, 8699, 8749, 8799, 8849, 8899, 8949, 8999, 9049, 9099, 9149, 9199, 9249, 9299, 9349, 9399, 9449, 9499, 954

In [97]:
# 保存
torch.save(agents_CNN2[1].qnet.state_dict(), "agent_CNN3_1_qnet.pt")


In [42]:
board = [[8, 2, 5], [4, 6, None], [4, 2, None]]
board = make_board_tensor(board)
hands = torch.tensor([2,1,1], dtype=torch.float32)
agents_CNN2[0].qnet(board.unsqueeze(0), hands.unsqueeze(0))

tensor([[ 1.0574e-01,  1.1523e-01, -1.0000e+09, -1.0000e+09,  1.5073e-01,
          8.6536e-02, -1.0000e+09, -1.0000e+09,  1.3279e-01,  1.1295e-01,
         -1.0000e+09, -1.0000e+09,  9.3971e-02,  8.6145e-02, -1.0000e+09,
         -1.0000e+09,  1.3290e-01,  1.3460e-01, -1.0000e+09, -1.0000e+09,
          1.1042e-01,  6.1178e-02, -1.0000e+09, -1.0000e+09,  1.4451e-01,
          1.1740e-01, -1.0000e+09, -1.0000e+09,  1.1219e-01,  7.2284e-02,
         -1.0000e+09, -1.0000e+09,  1.2025e-01,  1.1254e-01, -1.0000e+09,
         -1.0000e+09]], grad_fn=<AddBackward0>)

DQN学習　Agent vs Random

In [None]:
import copy, random
from collections import deque



# --- 学習後の評価（探索オフで100局） ---
def eval_vs_random(agent: DQNAgent, n_games=100):
    wins_as_first = 0
    wins_as_second = 0
    for _ in range(n_games):
        # 先手
        s = start_game(["agent", "random"], rules, mission_id=[0,1])
        done = False
        while not done:
            pid = s.turn
            if pid == 0:
                aid = agent.get_action(s, pid, False)  # 探索なし
                x, y, num_index = id_to_action(board_size, s.hands[pid], aid)
                make_move(s, pid, x, y, num_index, "add")
            else:
                aid, (x, y, num_index) = random_legal_action(s, pid)
                make_move(s, pid, x, y, num_index, "add")
            done, rew = get_reward(s)
        wins_as_first += (rew[0] > 0)

        # 後手
        s = start_game(["random", "agent"], rules, mission_id=[0,1])
        done = False
        while not done:
            pid = s.turn
            if pid == 1:
                aid = agent.get_action(s, pid, False)
                x, y, num_index = id_to_action(board_size, s.hands[pid], aid)
                make_move(s, pid, x, y, num_index, "add")
            else:
                aid, (x, y, num_index) = random_legal_action(s, pid)
                make_move(s, pid, x, y, num_index, "add")
            done, rew = get_reward(s)
        wins_as_second += (rew[1] > 0)
    print(f"eval: first-win {wins_as_first}/{n_games}, second-win {wins_as_second}/{n_games}")


def random_legal_action(s, pid):
    """適当にサンプリングして合法手が見つかるまで回す（簡易版）"""
    while True:
        aid = random.randrange(ACTION_SPACE)
        x, y, num_index = id_to_action(board_size, s.hands[pid], aid)
        # is_valid_move を使えるならこちらが安全
        try:
            if is_valid_move(s, pid, x, y, num_index, "add"):
                return aid, (x, y, num_index)
        except NameError:
            # is_valid_move が無い場合は make_move を試してロールバック…は重いのでスキップ
            return aid, (x, y, num_index)


In [38]:
def input_xyi_simple(s:GameState, pid):
    """
    形式:  x,y,idx   例: 1,2,0
    - 0始まりの (x,y)
    - idx は手札インデックス
    常に op="add" で返します。
    """
    n = s.rules.board_size
    while True:
        raw = input("x,y,idx をカンマ区切りで: ")  # 例: 1,2,0
        try:
            x_str, y_str, idx_str = [t.strip() for t in raw.split(",")]
            x, y, idx = int(x_str), int(y_str), int(idx_str)
        except Exception:
            print("形式エラー：例) 1,2,0")
            continue

        if not (0 <= x < n and 0 <= y < n):
            print(f"x,y は 0..{n-1} の範囲です")
            continue
        if not (0 <= idx < len(s.hands[pid])):
            print(f"idx は 0..{len(s.hands[pid])-1} の範囲です")
            continue
        return x, y, idx

In [39]:
s = start_game(["agent_0","player"],Rules(),mission_id=[0,1])
done = False
pid = s.turn
print(s.board)
print(s.hands[1])


[[None, None, None], [None, None, None], [None, None, None]]
[4, 4, 4]


In [41]:
done = False
while not done:
    """盤面と手札の表示"""

    if pid == 0:
        aid = agents_CNN2[0].get_action(s,pid,False)
        x, y, num_index = id_to_action(3, s.hands[pid], aid)
        print(id_to_action(3, s.hands[pid], aid))
        make_move(s, pid, x, y, num_index,"add")
    
    else:
        """プレイヤーの操作"""
        x,y,num_index = input_xyi_simple(s, pid)
        make_move(s, pid, x, y, num_index,"add")
        
    print(s.board)
    print(s.hands[1])
    print(s.turn)
    pid = s.turn
    done, _ = get_reward(s)
    if done:
        print(s.winners)


(0, 2, 1)
[[None, None, None], [None, None, None], [4, None, None]]
[4, 4, 4]
1
形式エラー：例) 1,2,0
[[4, None, None], [None, None, None], [4, None, None]]
[4, 4, 2]
0
(2, 0, 1)
[[4, None, 4], [None, None, None], [4, None, None]]
[4, 4, 2]
1
[[4, None, 4], [4, None, None], [4, None, None]]
[4, 2, 1]
0
(1, 0, 0)
[[4, 1, 4], [4, None, None], [4, None, None]]
[4, 2, 1]
1
[[8, 1, 4], [4, None, None], [4, None, None]]
[2, 1, 4]
0
(1, 0, 2)
[[8, 2, 4], [4, None, None], [4, None, None]]
[2, 1, 4]
1
[[8, 2, 4], [4, 4, None], [4, None, None]]
[2, 1, 1]
0
(2, 0, 2)
[[8, 2, 5], [4, 4, None], [4, None, None]]
[2, 1, 1]
1
[[8, 2, 5], [4, 4, None], [4, 2, None]]
[1, 1, 2]
0
(1, 1, 2)
[[8, 2, 5], [4, 6, None], [4, 2, None]]
[1, 1, 2]
1
[[8, 2, 5], [4, 6, None], [4, 3, None]]
[1, 2, 1]
0
[1]
