In [1]:
# 用五字棋尝试一下
# You can change this to another two-player game.
# 给状态张量增加一个channel表示当前行棋方

# TODO 1.实现 train Nets on GPU. Done on 2023.1.7
# TODO 2.implement a memoryband storing trails for training network. Done on 2023.1.11
# TODO 3.实现训练sudoindex（比如轨迹长度的分布等等）收集至wandb. Done on 2023.1.19


from typing import Union, List
import numpy as np

BLACK, WHITE = 1, -1  # 颜色的先后手,黑圈O先,白叉X后

class State:
    '''实现 15 x 15 wuziqi 的棋盘'''
    X, Y = 'ABCDEFGHIJKLMNO',  '123456789uvwxyz'
    C = {0: '_', BLACK: 'O', WHITE: 'X'}

    def __init__(self):
        self.board = np.zeros((15, 15)) # (x, y)
        self.color = 1
        self.win_color = 0
        self.record = []

    def action2str(self, a:int):
        """用0-224编码落子位
            '1'  '2'  '3'   ... 'z'
        'A'  0    1    2    ...  14
        'B'  15   16   17   ...  29
        'C'  30   31   32   ...  44
        ...                 ...
        'O'  210  211  212  ...  224
        """
        return self.X[a // 15] + self.Y[a % 15]

    def str2action(self, s:str):
        return self.X.find(s[0]) * 15 + self.Y.find(s[1])

    def record_string(self):
        """记录动作的列表组装成字符串序列，用空格分隔

        Returns
        -------
            一条动作的（轨迹）字符串，空格分隔方便split
        """
        return ' '.join([self.action2str(a) for a in self.record])

    def __str__(self):
        # 打印棋盘
        s = '   ' + ' '.join(self.Y) + '\n'
        for i in range(15):
            s += self.X[i] + ' ' + ' '.join([self.C[self.board[i, j]] for j in range(15)]) + '\n'
        s += 'record = ' + self.record_string()
        return s

    def check_win(self, x:int, y:int):
        # check whether 5 stones are on the line , pad zero around board by (4,4) then compute if sum to 5*self.color
        x_tmp, y_tmp = x + 4, y + 4
        boardex4 = np.pad(self.board,(4,4))
        for i in range(5):
            if sum(boardex4[x_tmp-4+i:x_tmp+1+i, y_tmp])==5*self.color:
                return True
            elif sum(boardex4[x_tmp, y_tmp-4+i:y_tmp+1+i])==5*self.color:
                return True
            elif boardex4[x_tmp+i-4,y_tmp+i-4]+boardex4[x_tmp+i-3,y_tmp+i-3]+boardex4[x_tmp+i-2,y_tmp+i-2]+\
                    boardex4[x_tmp+i-1,y_tmp+i-1]+boardex4[x_tmp+i,y_tmp+i]==5*self.color:
                return True
            elif boardex4[x_tmp+i-4,y_tmp-i+4]+boardex4[x_tmp+i-3,y_tmp-i+3]+boardex4[x_tmp+i-2,y_tmp-i+2]+\
                    boardex4[x_tmp+i-1,y_tmp-i+1]+boardex4[x_tmp+i,y_tmp-i]==5*self.color:
                return True
        return False

    def play(self, action:Union[str, int]) -> 'State':
        # 关于type hint : Python中的类是在读取完整个类之后才被定义的，因此在类中无法通过正常方式表示这个类本身。
        # 替代方法是使用一个和类同名的字符串，这被称为自引用类型。
        """状态转移
        Parameters
        ----------
            action : 0-80的落子位置int,或者动作的用空格分隔的字符串序列str
        Returns
        -------
            self
        """
        # 如果是一条（轨迹）字符串（该对象对应有数据结构的设计，一定程度上组织成有可遍历的特征）
        # 那么，可直接设计递归的调用，归约为仅需实现单次的int输入的动作状态转移
        if isinstance(action, str):
            for astr in action.split(): # 默认用空格分隔字符串
                self.play(self.str2action(astr))
            return self

        x, y = action // 15, action % 15
        self.board[x, y] = self.color

        # # 检查是否5子连线
        if self.check_win(x , y):
            self.win_color = self.color

        self.color = -self.color
        self.record.append(action)
        return self

    def terminal(self):
        # 终止状态检查，用于selfplay循环条件
        return self.win_color != 0 or len(self.record) == 15 * 15

    def terminal_reward(self):
        # 返回终局奖励 1，-1
        return self.win_color if self.color == BLACK else -self.win_color

    def legal_actions(self) -> List[int]:
        # 返回根节点下的合法走子位，List of int
        return [a for a in range(15 * 15) if self.board[a // 15, a % 15] == 0]

    def feature(self, to_cuda:bool = False):
        # making input ndarray for NN_state
        # 堆个ndarry用作神经网络输入 : [当前行动者 ,我方视角棋盘，对方视角棋盘]
        # support sending ndarry to cuda tensor with added a batch_dim
        now_mover = np.ones((15, 15)) * self.color # 加1通道进卷积
        s = np.stack([now_mover, self.board == self.color, self.board == -self.color]).astype(np.float32)
        if to_cuda:
            return torch.from_numpy(s).unsqueeze(0).cuda()
        return s

    def action_feature(self, action, to_cuda:bool = False):
        # 制作动作矩阵
        # support sending ndarry to cuda tensor with added a batch_dim
        a = np.zeros((1, 15, 15), dtype=np.float32)
        a[0, action // 15, action % 15] = 1
        if to_cuda:
            return torch.from_numpy(a).unsqueeze(0).cuda()
        return a

# state = State().play('A2')
# print(state)
# print('input feature')
# print(state.feature())
# state = State().play('B2 A1 I2')
# print(state)
# print('input feature')
# print(state.feature())

In [2]:
# 定义组件网络Res&Conv

import torch
import torch.nn as nn
import torch.nn.functional as F

class Conv(nn.Module):
    def __init__(self, filters0, filters1, kernel_size, bn=False):
        super().__init__()
        self.conv = nn.Conv2d(filters0, filters1, kernel_size, stride=1, padding=kernel_size//2, bias=False)
        self.bn = None
        if bn:
            self.bn = nn.BatchNorm2d(filters1)

    def forward(self, x):
        h = self.conv(x)
        if self.bn is not None:
            h = self.bn(h)
        return h

class ResidualBlock(nn.Module):
    def __init__(self, filters):
        super().__init__()
        self.conv = Conv(filters, filters, 3, True)

    def forward(self, x):
        return F.relu(x + (self.conv(x)))

In [3]:
num_filters = 64
num_blocks = 12

class Representation(nn.Module):
    ''' Conversion from observation to inner abstract state '''
    def __init__(self, input_shape):
        super().__init__()
        self.input_shape = input_shape # (c, 15, 15)
        self.board_size = self.input_shape[1] * self.input_shape[2]
        # 初始化nn.Conv2d inputchannels，outputchannels
        self.layer0 = Conv(self.input_shape[0], num_filters, 3, bn=True)
        self.blocks = nn.ModuleList([ResidualBlock(num_filters) for _ in range(num_blocks)])

    def forward(self, x):
        h = F.relu(self.layer0(x))
        for block in self.blocks:
            h = block(h)
        return h # torch.Size([1, 16, 15, 15])

    def inference(self, x, pass_to_cpu:bool = True):
        self.eval()
        with torch.no_grad():
            # rp = self(torch.from_numpy(x).unsqueeze(0)) # cpu_only版本: conv2d的输入tensor需要四维，多加一维度在0位置
            rp = self(x)
        if not pass_to_cpu:
            return rp # print('rp tensor shape' , rp.shape) # torch.Size([1, 16, 15, 15])
        return rp.cpu().numpy()[0]  

class Prediction(nn.Module):
    ''' Policy and value prediction from inner abstract state '''
    def __init__(self, action_shape):
        super().__init__()
        self.board_size = np.prod(action_shape[1:]) # 15 x 15 = 81
        self.action_size = action_shape[0] * self.board_size # 1 x 81 = 81

        self.conv_p1 = Conv(num_filters, 4, 1, bn=True)
        self.conv_p2 = Conv(4, 1, 1)

        self.conv_v = Conv(num_filters, 4, 1, bn=True)
        self.fc_v = nn.Linear(self.board_size * 4, 1, bias=False)

    def forward(self, rp):
        h_p = F.relu(self.conv_p1(rp))
        # print('过第一层p卷积', h_p.shape) # torch.Size([1, 4, 15, 15])
        h_p = self.conv_p2(h_p).view(-1, self.action_size)
        # print('过第二层p卷积', h_p.shape) # torch.Size([1, 81])
        h_v = F.relu(self.conv_v(rp))
        # print('过第一层fc卷积', h_v.shape) # torch.Size([1, 4, 15, 15])
        h_v = self.fc_v(h_v.view(-1, self.board_size * 4))
        # print('过第二层fc层', h_v.shape) # torch.Size([1, 1])
        # range of value is -1 ~ 1
        return F.softmax(h_p, dim=-1), torch.tanh(h_v)

    def inference(self, rp, pass_to_cpu:bool = True):
        self.eval()
        with torch.no_grad():
            # p, v = self(torch.from_numpy(rp).unsqueeze(0))
            p, v = self(rp) # print('p shape is ', p.shape) # torch.Size([1, 81])
        if not pass_to_cpu:
            return p, v
        return p.cpu().numpy()[0], v.cpu().numpy()[0][0]

class Dynamics(nn.Module):
    '''Abstract state transition'''
    def __init__(self, rp_shape, act_shape):
        super().__init__()
        self.rp_shape = rp_shape
        self.layer0 = Conv(rp_shape[0] + act_shape[0], num_filters, 3, bn=True)
        self.blocks = nn.ModuleList([ResidualBlock(num_filters) for _ in range(num_blocks)])

    def forward(self, rp, a):
        h = torch.cat([rp, a], dim=1)
        # print('dim=1 cat shape h ' , h.shape) # torch.Size([1, 17, 15, 15])
        h = self.layer0(h)
        for block in self.blocks:
            h = block(h)
        return h

    def inference(self, rp, a, pass_to_cpu:bool = True):
        self.eval()
        with torch.no_grad():
            # cpu_only版本: rp = self(torch.from_numpy(rp).unsqueeze(0), torch.from_numpy(a).unsqueeze(0))
            rp = self(rp, a)
        if not pass_to_cpu:
            return rp
        return rp.cpu().numpy()[0]

class Net(nn.Module):
    '''Whole net'''
    def __init__(self):
        super().__init__()
        state = State()
        input_shape = state.feature().shape # state (c, 15, 15)
        action_shape = state.action_feature(0).shape # action (1, 15, 15)
        rp_shape = (num_filters, *input_shape[1:]) # hidden space (16, 15, 15)

        self.representation = Representation(input_shape)
        self.prediction = Prediction(action_shape)
        self.dynamics = Dynamics(rp_shape, action_shape)

    def predict(self, state0, path):
        '''Predict p and v from original state and path'''
        outputs = []
        x = state0.feature(to_cuda=True)
        rp = self.representation.inference(x, pass_to_cpu= False)
        outputs.append(self.prediction.inference(rp, pass_to_cpu = True))
        for action in path:
            a = state0.action_feature(action, to_cuda=True)
            rp = self.dynamics.inference(rp, a, pass_to_cpu = False)
            outputs.append(self.prediction.inference(rp, pass_to_cpu = True))
        return outputs

In [4]:
# 给上面的三个网络做单元测试用
def show_net(net, state):
    '''Display policy (p) and value (v)'''
    print(state)
    p, v = net.predict(state, [])[-1]
    print('p = ')
    print((p * 10000).astype(int).reshape((-1, *net.representation.input_shape[1:3])))
    print('v = ', v)
    print()

#  Outputs before training
show_net(Net().cuda(), State())

   1 2 3 4 5 6 7 8 9 u v w x y z
A _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
B _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
D _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
E _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
F _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
G _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
H _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
I _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
J _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
K _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
L _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
M _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
N _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
O _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
record = 
p = 
[[[45 43 45 45 46 46 46 46 46 46 46 46 45 46 43]
  [46 45 44 46 46 46 46 46 46 46 46 46 46 46 45]
  [45 42 41 44 45 45 45 45 45 45 45 45 45 45 46]
  [44 40 40 43 44 44 44 44 44 44 44 45 45 45 45]
  [44 40 40 43 44 44 44 44 44 44 44 45 45 45 45]
  [44 40 40 43 44 44 44 44 44 44 44 46 45 45 45]
  [44 40 41 43 44 44 45 44 44 45 45 46 45 45 45]
  [44 41 41 43 44 44 45 45 45 45 45 46 45 45 45]
  [44 41 41 43 44 44 45 44 44 45 45 46 45 45 45]
  [44 40 40 43 44 44 44 44 44 4

In [5]:
# 实现蒙特卡洛树搜索MCTS

class Node:
    '''Search result of one abstract (or root) state'''
    def __init__(self, p, v):
        self.p, self.v = p, v
        self.n, self.q_sum = np.zeros_like(p), np.zeros_like(p)
        self.n_all, self.q_sum_all = 1, v / 2 # prior

    def update(self, action, q_new):
        # Update
        self.n[action] += 1
        self.q_sum[action] += q_new

        # Update overall stats
        self.n_all += 1
        self.q_sum_all += q_new

In [6]:
import time
import copy

class Tree:
    '''Monte Carlo Tree'''
    def __init__(self, net):
        self.net = net
        self.nodes = {}

    def search(self, state, path, rp, depth):
        # Return predicted value from new state
        # rp is a Tensor on Gpu
        key = state.record_string()
        if len(path) > 0:
            key += '|' + ' '.join(map(state.action2str, path))
        if key not in self.nodes:
            p, v = self.net.prediction.inference(rp, pass_to_cpu = True)
            self.nodes[key] = Node(p, v)
            return v

        # State transition by an action selected from bandit
        node = self.nodes[key]
        p = node.p
        mask = np.zeros_like(p)
        if depth == 0:
            # Add noise to policy on the root node
            p = 0.75 * p + 0.25 * np.random.dirichlet([0.15] * len(p))
            # On the root node, we choose action only from legal actions
            mask[state.legal_actions()] = 1
            p *= mask
            p /= p.sum() + 1e-16

        n, q_sum = 1 + node.n, node.q_sum_all / node.n_all + node.q_sum
        ucb = q_sum / n + 2.0 * np.sqrt(node.n_all) * p / n + mask * 4 # PUCB formula
        best_action = np.argmax(ucb)

        # Search next state by recursively calling this function
        rp_next = self.net.dynamics.inference(rp, state.action_feature(best_action, to_cuda=True), pass_to_cpu=False)
        path.append(best_action)
        q_new = -self.search(state, path, rp_next, depth + 1) # With the assumption of changing player by turn
        node.update(best_action, q_new)

        return q_new

    def think(self, state, num_simulations, temperature = 0, show=False):
        # End point of MCTS
        if show:
            print(state)
        start, prev_time = time.time(), 0
        project_once = self.net.representation.inference(state.feature(to_cuda=True), pass_to_cpu=False) # Muzero在根节点状态只需rp一次，因为rpnet是固定的
        for _ in range(num_simulations):
            self.search(state, [], project_once, depth=0)
            # Display search result on every second
            if show:
                tmp_time = time.time() - start
                if int(tmp_time) > int(prev_time):
                    prev_time = tmp_time
                    root, pv = self.nodes[state.record_string()], self.pv(state)
                    print('%.2f sec. best %s. q = %.4f. n = %d / %d. pv = %s'
                          % (tmp_time, state.action2str(pv[0]), root.q_sum[pv[0]] / root.n[pv[0]],
                             root.n[pv[0]], root.n_all, ' '.join([state.action2str(a) for a in pv])))

        #  Return probability distribution weighted by the number of simulations
        root = self.nodes[state.record_string()]
        n = root.n + 1
        n = (n / np.max(n)) ** (1 / (temperature + 1e-8))
        return n / n.sum() # teacher--MCTS

    def pv(self, state):
        # Return principal variation (action sequence which is considered as the best)
        s, pv_seq = copy.deepcopy(state), []
        while True:
            key = s.record_string()
            if key not in self.nodes or self.nodes[key].n.sum() == 0:
                break
            best_action = sorted([(a, self.nodes[key].n[a]) for a in s.legal_actions()], key=lambda x: -x[1])[0][0]
            pv_seq.append(best_action)
            s.play(best_action)
        return pv_seq

In [7]:
# Search with initialized net

tree = Tree(Net().cuda())
next_step_0 = tree.think(State(), 100, show=True)

tree = Tree(Net().cuda())
next_step_n = tree.think(State().play('E4 F5 E5 F6 E6 F7 E7'), 200, show=True)
print(next_step_n.reshape((15, 15)))

# tree = Tree(Net().cuda())
# tree.think(State().play('F4 D5 F5 D6 F6 D7 F7'), 200, show=True)

# tree = Tree(Net().cuda())
# tree.think(State().play('B2 A2 A3 C1'), 200, show=True)

   1 2 3 4 5 6 7 8 9 u v w x y z
A _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
B _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
D _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
E _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
F _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
G _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
H _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
I _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
J _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
K _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
L _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
M _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
N _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
O _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
record = 
   1 2 3 4 5 6 7 8 9 u v w x y z
A _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
B _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
D _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
E _ _ _ O O O O _ _ _ _ _ _ _ _
F _ _ _ _ X X X _ _ _ _ _ _ _ _
G _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
H _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
I _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
J _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
K _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
L _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
M _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
N _ _ _ _ _ _ _ _ _ _ _ _ _ 

In [8]:
# Training of neural net
from tqdm import tqdm
import torch.optim as optim
import wandb

batch_size = 512
num_steps = 100

def gen_target(ep, k):
    '''Generate inputs and targets for training'''
    # path, reward, observation, action, policy
    turn_idx = np.random.randint(len(ep[0]))
    ps, vs, ax = [], [], []
    for t in range(turn_idx, turn_idx + k + 1):
        if t < len(ep[0]):
            p = ep[4][t]
            a = ep[3][t]
        else: # state after finishing game
            # p is 0 (loss is 0)
            p = np.zeros_like(ep[4][-1])
            # random action selection
            a = np.zeros(np.prod(ep[3][-1].shape), dtype=np.float32)
            a[np.random.randint(len(a))] = 1
            a = a.reshape(ep[3][-1].shape)
        vs.append([ep[1] if t % 2 == 0 else -ep[1]])
        ps.append(p)
        ax.append(a)
        
    return ep[2][turn_idx], ax, ps, vs

def train(episodes, net, optimizer):
    #     episodes = List(record:List[int], 
    #                     reward:int(0,1,-1), 
    #                     features:state.feature(), 
    #                     action_features:state.action_feature(action) from random.choice based on distribution p_targets, 
    #                     p_targets:teacher--MCTS))
    '''Train neural net on GPU'''
    p_loss_sum, v_loss_sum = torch.as_tensor(0, dtype=torch.float32).cuda() , torch.as_tensor(0, dtype=torch.float32).cuda()
    net.train()
    k = 3 # 与alpha不同（2个状态对就可以），至少3个连续状态用来训练Muzero
    for _ in tqdm(range(num_steps)):
        x, ax, p_target, v_target = zip(*[gen_target(episodes[np.random.randint(len(episodes))], k) for j in range(batch_size)])
        x = torch.from_numpy(np.array(x)).cuda()
        ax = torch.from_numpy(np.array(ax))
        p_target = torch.from_numpy(np.array(p_target))
        v_target = torch.FloatTensor(np.array(v_target))

        # Change the order of axis as [time step, batch, ...]
        ax = torch.transpose(ax, 0, 1).cuda()
        p_target = torch.transpose(p_target, 0, 1).cuda()
        v_target = torch.transpose(v_target, 0, 1).cuda()

        # Compute losses for k (+ current) steps
        p_loss, v_loss = torch.as_tensor(0, dtype=torch.float32).cuda() , torch.as_tensor(0, dtype=torch.float32).cuda()
        for t in range(k + 1):
            rp = net.representation(x) if t == 0 else net.dynamics(rp, ax[t - 1])
            p, v = net.prediction(rp)
            p_loss += F.kl_div(torch.log(p), p_target[t], reduction='sum')
            v_loss += torch.sum(((v_target[t] - v) ** 2) / 2)

        p_loss_sum += p_loss.item()
        v_loss_sum += v_loss.item()

        optimizer.zero_grad()
        (p_loss + v_loss).backward()
        optimizer.step()

    num_train_datum = num_steps * batch_size
    print('p_loss %f v_loss %f' % (p_loss_sum.cpu().numpy() / num_train_datum, v_loss_sum.cpu().numpy() / num_train_datum))
    wandb.log({
        'p-loss': p_loss_sum.cpu().numpy() / num_train_datum,
        'v-loss': v_loss_sum.cpu().numpy() / num_train_datum})
    return net

In [9]:
#  Battle against random agents
import plotly_express as px

def vs_random(net, n=100):
    results = { 0 : 0 , -1 : 0 , 1 : 0}
    length_of_turn = []
    for i in range(n):
        first_turn = i % 2 == 0
        turn = first_turn
        state = State()
        steps = 0
        while not state.terminal():
            if turn:
                p, _ = net.predict(state, [])[-1]
                action = sorted([(a, p[a]) for a in state.legal_actions()], key=lambda x:-x[1])[0][0]
            else:
                action = np.random.choice(state.legal_actions())

            state.play(action)
            steps += 1
            turn = not turn
        length_of_turn.append(steps)
        r = state.terminal_reward() if turn else -state.terminal_reward()
        results[r] = results.get(r, 0) + 1
    # 记录棋局长度的提琴图，方便之后继续训练比较使用
    plt = px.violin(length_of_turn, box=True, points= 'all')
    wandb.log({'length of turns vs. randomplayer': plt})
    return results

In [10]:
# enrich self-play episodes from symmetry

def symmetrys(action:int) -> List[int]:
    """15x15的黑白棋盘一共8种对称性，因此1条self-play轨迹可以扩充7条数据
    Parameters
    ----------
    action : int
        每一步执行的动作
    Returns
    -------
    List[int]
        [identify]id0, [上下翻转]id1, [左右翻转]id2, [左斜轴翻转]id3, [右斜轴翻转]id4, [向左旋转π/2]id5, [向左旋转π]id6, [向右旋转π/2]id7
    """
    x, y = action // 15, action % 15
    id1 = (14 - x) * 15 + y
    id2 = x * 15        + (14 - y)
    id3 = y * 15        + x
    id4 = (14 - y) * 15 + (14 - x)
    id5 = (14 - y) * 15 + x
    id6 = 224-action
    x7 , y7 = id6 // 15, id6 % 15
    id7 = (14 - y7) * 15 + x7
    return [action, id1, id2, id3, id4, id5, id6, id7]

print([State().action2str(i) for i in symmetrys(15)]) #　checked

['B1', 'N1', 'Bz', 'A2', 'Oy', 'O2', 'Nz', 'Ay']


In [14]:
# steps = [[1, 2, 3, 4], [4, 5, 6, 7], [7, 8, 9, 10]]
# for idx in range(4):
#     print([step[idx] for step in steps])

# test保存路径
import time
'.model/checkpoints/{}/'.format(time.strftime('%Y%m%d%H%M')).split('/')[-2]
# checked


# test保存deque
# import pickle
# episodes = [1]
# with open('.model/checkpoints/' + 'episodes.pt', 'wb') as ep:
#     pickle.dump(episodes, ep)
# # checked

'202301191444'

In [15]:
# Main algorithm of MuZero
from collections import deque 
import os
import pickle

config = {
"num_filters": 64, # NN:Resnet structure
"num_blocks": 12,  # NN:Resnet structure
"optimizer": 'SGD (with momentum)',
"learning_rate": 0.0003,
"weight_decay": 0.00003,
"momentum": 0.9,
"batch_size": 512,          # train:每次训练时采样的batch总数
"num_steps": 100,           # train:每次训练时的epoch总数
"num_game": 100,            # self-play:本次selfplay的总局数
"num_games_one_epoch": 20,  # self-play:每selfplay多少轮，训练模型一次
"num_simulations": 100,     # self-play:selfplay时每个Node的search搜索次数
"lr_scheduler": 'torch.optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=wandb.config.num_steps, eta_min=1e-5)',
"deque_maxlen": 2000,
"start_π_temperature": 1.0,
"π_temperature_decay": 0.8,
"PATH": '.model/checkpoints/{}/'.format(time.strftime('%Y%m%d%H%M'))
}

wandb.login()
wandb.init(project="Muzero-15-15-5ziqi", entity="opink", config=config, name=config["PATH"].split('/')[-2])
if not os.path.exists(wandb.config.PATH):
    os.makedirs(wandb.config.PATH)

num_games = wandb.config.num_game                      # selfplay的总局数
num_games_one_epoch = wandb.config.num_games_one_epoch # 每selfplay多少轮，训练模型一次
num_simulations = wandb.config.num_simulations         # 每个Node的search搜索次数

# load net parameters train&save on GPU
# net = Net()
# net.load_state_dict(torch.load(PATH + '1000r.pt'))
# net.cuda()

net = Net().cuda() # initial training
wandb.watch(net, log='all')

optimizer = optim.SGD(net.parameters(), lr=3e-4, weight_decay=3e-5, momentum=0.9)
schedular = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer,T_max=wandb.config.num_steps, eta_min=1e-5)

# Display battle results as {-1: lose 0: draw 1: win} (for episode generated for training, 1 means that the first player won)
vs_random_sum = vs_random(net)
print('vs_random = ', sorted(vs_random_sum.items()))

# episodes = []
episodes = deque([], maxlen=5000) # 移动队列用于存储最新的模型游戏

result_distribution = {1: 0, 0: 0, -1: 0}

for g in tqdm(range(num_games)):
    # Generate one episode
    record, p_targets, features, action_features = [], [], [], []
    state = State()
    # temperature using to make policy targets from search results
    temperature = 1.0

    tree = Tree(net) # 每轮游戏维护同一个树

    while not state.terminal():
        # tree = Tree(net) # 有必要每走一步都新建一个搜索树么？移到循环外面可以么？
        p_target = tree.think(state, num_simulations, temperature)
        p_targets.append(p_target)
        features.append(state.feature())

        # Select action with generated distribution, and then make a transition by that action
        action = np.random.choice(np.arange(len(p_target)), p=p_target)
        # 数据增广
        record.append(symmetrys(action)) 
        action_features.append(state.action_feature(action))
        state.play(action)
        temperature *= 0.8 # 这个温度参数感觉意义不明。。。？

    # reward seen from the first turn player
    reward = state.terminal_reward() * (1 if len(record) % 2 == 0 else -1)
    result_distribution[reward] += 1
    
    # 增广轨迹
    for idx in range(8):
        enrich_record = [step[idx] for step in record]
        episodes.append((enrich_record, reward, features, action_features, p_targets))

    if g % num_games_one_epoch == 0:
        print('game ', end='')
    print(g, ' ', end='')

    # Training of neural net
    if (g + 1) % num_games_one_epoch == 0:
        # Show the result distributiuon of generated episodes
        print('generated = ', sorted(result_distribution.items()))
        epi = list(episodes.copy())
        net = train(episodes=epi, net=net, optimizer=optimizer)
        schedular.step()
        del epi
        
        vs_random_once = vs_random(net)
        print('vs_random = ', sorted(vs_random_once.items()), end='')
        for r, n in vs_random_once.items():
            vs_random_sum[r] += n
        print(' sum = ', sorted(vs_random_sum.items()))

        new_checkpoint_path = wandb.config.PATH + f'{g // wandb.config.num_games_one_epoch}_round_model.pt'
        torch.save(net.state_dict(), new_checkpoint_path)
        with open(wandb.confi.PATH + 'episodes.pt', 'wb') as ep:
            pickle.dump(episodes, ep)
# 上传wandb.artifacts训练后的模型
eval_artifact = wandb.Artifact(type='model', name='run-%s-%s' % (wandb.config.PATH.split('/')[-2], wandb.config.num_game))
eval_artifact.add_dir('.model/checkpoints/')
# eval_artifact.add_file(new_checkpoint_path)
wandb.run.log_artifact(eval_artifact)

print('finished')

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
wandb: Currently logged in as: opink. Use `wandb login --relogin` to force relogin


  0%|          | 0/100 [00:00<?, ?it/s]

vs_random =  [(-1, 1), (0, 0), (1, 99)]


  1%|          | 1/100 [00:29<49:08, 29.78s/it]

game 0  

  2%|▏         | 2/100 [00:40<39:12, 24.01s/it]

1  

  3%|▎         | 3/100 [01:18<45:37, 28.22s/it]

2  

  4%|▍         | 4/100 [01:58<50:55, 31.83s/it]

3  

  5%|▌         | 5/100 [02:41<55:48, 35.24s/it]

4  

  6%|▌         | 6/100 [03:19<56:32, 36.09s/it]

5  

  7%|▋         | 7/100 [03:52<54:30, 35.17s/it]

6  

  8%|▊         | 8/100 [04:37<58:20, 38.04s/it]

7  

  9%|▉         | 9/100 [05:18<59:07, 38.98s/it]

8  

 10%|█         | 10/100 [05:43<51:55, 34.61s/it]

9  

 11%|█         | 11/100 [06:25<54:51, 36.98s/it]

10  

 12%|█▏        | 12/100 [07:13<59:07, 40.31s/it]

11  

 13%|█▎        | 13/100 [07:47<55:45, 38.46s/it]

12  

 14%|█▍        | 14/100 [08:32<57:45, 40.29s/it]

13  

 15%|█▌        | 15/100 [08:55<49:38, 35.04s/it]

14  

 16%|█▌        | 16/100 [09:34<50:46, 36.27s/it]

15  

 17%|█▋        | 17/100 [10:03<47:10, 34.10s/it]

16  

 18%|█▊        | 18/100 [10:42<48:30, 35.49s/it]

17  

 19%|█▉        | 19/100 [11:23<50:16, 37.24s/it]

18  



19  generated =  [(-1, 10), (0, 0), (1, 10)]


100%|██████████| 100/100 [00:19<00:00,  5.22it/s]


p_loss 7.840249 v_loss 1.763480


 19%|█▉        | 19/100 [12:28<53:12, 39.41s/it]

vs_random =  [(-1, 0), (0, 0), (1, 100)] sum =  [(-1, 1), (0, 0), (1, 199)]





AttributeError: Can't pickle local object 'TorchHistory.add_log_hooks_to_pytorch_module.<locals>.<lambda>'

In [16]:
new_checkpoint_path = f'.model/checkpoints/1000r.pt'
torch.save(net.state_dict(), new_checkpoint_path)

# related tips
# 4. Trained and Save on GPU, Load on GPU
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 
# When loading a model on a GPU that was trained and saved on GPU, 
# simply
# convert the initialized model to a CUDA optimized model using
# ``model.to(torch.device('cuda'))``.
# 
# Be sure to use the ``.to(torch.device('cuda'))`` function 
# on all model inputs 
# to prepare the data for the model.

# PATH = "model.pt"
# net.cuda()
# # Save
# torch.save(net.state_dict(), PATH)

# # Load
# device = torch.device("cuda")
# model = Net()
# model.load_state_dict(torch.load(PATH))
# model.to(device)

In [27]:
# Search with trained net

tree = Tree(net)
state = State()

next_step = tree.think(state.play('E4 F5 E5 F6 E6 F7 E7'), 2000, show=True)
print(next_step.reshape((9 , 9)))

   1 2 3 4 5 6 7 8 9
A _ _ _ _ _ _ _ _ _
B _ _ _ _ _ _ _ _ _
C _ _ _ _ _ _ _ _ _
D _ _ _ _ _ _ _ _ _
E _ _ _ O O O O _ _
F _ _ _ _ X X X _ _
G _ _ _ _ _ _ _ _ _
H _ _ _ _ _ _ _ _ _
I _ _ _ _ _ _ _ _ _
record = E4 F5 E5 F6 E6 F7 E7
1.00 sec. best D8. q = -0.6277. n = 32 / 245. pv = D8
2.00 sec. best D8. q = -0.6442. n = 38 / 414. pv = D8
3.00 sec. best D8. q = -0.6584. n = 47 / 552. pv = D8
4.00 sec. best D8. q = -0.6583. n = 61 / 675. pv = D8
5.00 sec. best D8. q = -0.6892. n = 75 / 797. pv = D8
6.00 sec. best D8. q = -0.6955. n = 89 / 956. pv = D8
7.00 sec. best D8. q = -0.7111. n = 99 / 1104. pv = D8
8.01 sec. best D8. q = -0.7193. n = 109 / 1233. pv = D8
9.00 sec. best D8. q = -0.7177. n = 119 / 1345. pv = D8
10.01 sec. best D8. q = -0.7185. n = 129 / 1463. pv = D8
11.00 sec. best D8. q = -0.7191. n = 141 / 1588. pv = D8
12.00 sec. best D8. q = -0.7202. n = 145 / 1732. pv = D8
13.00 sec. best D8. q = -0.7243. n = 149 / 1882. pv = D8
[[0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 

In [28]:
next_step = tree.think(state.play('F8'), 800, show=True)
print('是否已经终局: ', state.terminal())
print(next_step.reshape((9 , 9)))

   1 2 3 4 5 6 7 8 9
A _ _ _ _ _ _ _ _ _
B _ _ _ _ _ _ _ _ _
C _ _ _ _ _ _ _ _ _
D _ _ _ _ _ _ _ _ _
E _ _ _ O O O O _ _
F _ _ _ _ X X X X _
G _ _ _ _ _ _ _ _ _
H _ _ _ _ _ _ _ _ _
I _ _ _ _ _ _ _ _ _
record = E4 F5 E5 F6 E6 F7 E7 F8
1.00 sec. best E8. q = 0.9508. n = 92 / 203. pv = E8
2.00 sec. best E8. q = 0.9554. n = 148 / 381. pv = E8
3.01 sec. best E8. q = 0.9560. n = 207 / 528. pv = E8
4.01 sec. best E8. q = 0.9579. n = 253 / 667. pv = E8
是否已经终局:  False
[[0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]]


In [15]:
next_step = tree.think(state.play('E8'), 800, show=True)
print('是否已经终局: ', state.terminal())
print(next_step.reshape((9 , 9)))

   1 2 3 4 5 6 7 8 9
A _ _ _ _ _ _ _ _ _
B _ _ _ _ _ _ _ _ _
C _ _ _ _ _ _ _ _ _
D _ _ _ _ _ _ _ _ _
E _ _ _ O O O O O _
F _ _ _ _ X X X X _
G _ _ _ _ _ _ _ _ _
H _ _ _ _ _ _ _ _ _
I _ _ _ _ _ _ _ _ _
record = E4 F5 E5 F6 E6 F7 E7 F8 E8
1.00 sec. best F9. q = -0.8868. n = 36 / 189. pv = F9
2.01 sec. best F9. q = -0.8929. n = 62 / 330. pv = F9
3.01 sec. best F9. q = -0.8966. n = 76 / 449. pv = F9
4.00 sec. best F9. q = -0.9030. n = 91 / 548. pv = F9
5.01 sec. best F9. q = -0.9080. n = 107 / 640. pv = F9
6.01 sec. best F9. q = -0.9083. n = 120 / 726. pv = F9
是否已经终局:  True
[[0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]]
