In [51]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from IPython.display import clear_output
import time
import matplotlib.pyplot as plt




board_size = 20
#torch.float32



dir = [[0, -1], [1, -1], [1, 0], [1, 1], [0, 1], [-1, 1], [-1, 0], [-1, -1]]
begin_x = 20
begin_y = 20
end_x = 590
end_y = 590
cell_size = 30
r = 10
status = {
    0: 0,
    1: 100,
    2: 3000,
    3: 10000,
    4: 20000,
    5: 20000
}








def get_evaluate(r : int,  c: int, chess_board):
    #分别判断下在此处我方和敌方的得分
    #最大化我方得分，抢占敌方最优位置
    enemy = judge(r, c, 1, 2, chess_board)
    friend = judge(r, c, 2, 1, chess_board)
    score = enemy + friend
    return score

def judge(y0: int, x0: int, enemy: int, friend: int, chess_board):
    score = 0
    cnt = []

    for d in dir:
        r = 0
        first_empty = -1
        for chess_cnt in range(0, 5):
            y = y0 + d[1] * chess_cnt
            x = x0 + d[0] * chess_cnt
            if x >= 20 or x < 0 or y >= 20 or y < 0:
                break
            if chess_board[y][x] == friend:
                break
            if chess_board[y][x] == 0 and first_empty == -1:
                first_empty = chess_cnt

            if chess_board[y][x] == enemy and first_empty <= 2:
                r += 1
        cnt.append(r)
    max_cnt = 0
    for i in range(0, 4):
        max_cnt = max(max_cnt, cnt[i] + cnt[i + 4])
        if max_cnt > 5:
            max_cnt = 5
    score += status[max_cnt]
    return score




def compare_scores(item1, item2):
    """
    用于比较两个元组中第一个元素（得分）大小的函数
    返回值为：
    - 若 item1 的得分大于 item2 的得分，返回 1
    - 若 item1 的得分小于 item2 的得分，返回 -1
    - 若二者得分相等，返回 0
    """
    score1 = item1[0]
    score2 = item2[0]
    if score1 > score2:
        return 1
    elif score1 < score2:
        return -1
    return 0


def place_where(chess_board):
    # scores = []  # 用于存储每个空白位置的得分以及对应的坐标
    center_score = 20
    res = -1
    y = -1
    x = -1
    for r in range(20):
        for c in range(20):
            if chess_board[r][c]!= 0:
                continue
            score = center_score - abs(10 - r) - abs(10 - c) + get_evaluate(r, c, chess_board)
            if score > res:
                res = score
                y = r
                x = c
            # scores.append((score, (c, r)))  # 将得分和坐标作为元组存入列表
    return x, y
    # # 使用自定义的比较函数对scores列表进行排序，实现按照得分从高到低排序
    # for i in range(len(scores) - 1):
    #     for j in range(len(scores) - i - 1):
    #         if compare_scores(scores[j], scores[j + 1]) < 0:
    #             scores[j], scores[j + 1] = scores[j + 1], scores[j]

    # ten_scores = scores[:3]  # 取前十个得分最高的位置（如果不足十个则取全部）
    # if len(ten_scores) == 0:
    #     return []  # 如果没有可落子的空白位置，返回空列表

    # choice = random.randint(0, len(ten_scores) - 1)  # 随机选择一个索引
    # return list(ten_scores[choice][1])  # 返回选中位置的坐标（转换为列表形式）




class GoBang_Model(nn.Module):

    #模型初始化
    def __init__(self, board_size):
        super(GoBang_Model, self).__init__()
        self.board_size = board_size
        #第一个卷积层输入1通道，输出32通道，卷积核大小为3，填充大小1
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        #第一次增加非线性因素
        self.relu1 = nn.ReLU()
        #第一个池化层使用2x2的池化核，步长为2
        self.pool1 = nn.MaxPool2d(2)
        #第二个卷积层
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        #第二次增加非线性因素
        self.relu2 = nn.ReLU()
        #第二个池化层
        self.pool2 = nn.MaxPool2d(2)
        #第一个全连接层
        #输出特征数量设置为 128，两个池化层//4
        # print(board_size)
        # print("fc1:", 64 * (board_size // 4) * (board_size // 4))
        self.fc1 = nn.Linear(64 * (board_size // 4) * (board_size // 4), 128)
        #第三次引入非线性因素
        self.relu3 = nn.ReLU()
        #第二个全连接层
        self.fc2 = nn.Linear(128, board_size * board_size)

    #前向传播
    def forward(self, x):
        # print("Input shape:", x.shape)  # 打印输入形状
        x = self.conv1(x)
        # print("After conv1:", x.shape)  # 打印第一个卷积层后的形状
        x = self.relu1(x)
        x = self.pool1(x)
        # print("After pool1:", x.shape)  # 打印第一个池化层后的形状
        x = self.conv2(x)
        # print("After conv2:", x.shape)  # 打印第二个卷积层后的形状
        x = self.relu2(x)
        x = self.pool2(x)
        # print("After pool2:", x.shape)  # 打印第二个池化层后的形状
        x = x.view(1, -1)
        # print("After flatten:", x.shape)  # 打印展平后的形状
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        return x
    


def select_move(model, board_state, temperature=1.0):
    """
    根据模型输出的概率分布选择走法
    model: 五子棋模型
    board_state: 当前棋盘状态张量，形状为(1, board_size, board_size)
    temperature: 温度参数，用于控制采样的随机性，温度越高越随机，越低越偏向确定性选择
    :return: 走法坐标
    """
    with torch.no_grad():
        output = model(board_state)
        output = output.squeeze(0).flatten()  # 转换为一维概率向量

        mask = (board_state.squeeze(0) == 0).flatten().float()
        output = output * mask  # 将已下棋的位置的得分设为0
        
        # 根据温度参数调整概率分布
        # 根据 temperature 参数的值来分情况处理走法选择的逻辑。temperature 参数用于控制选择走法的随机性程度，
        # 当它大于 0 时，采用一种基于概率采样的方式来选择走法，使得选择具有一定随机性，更有利于探索不同的走法可能性，
        # 尤其在训练早期探索更多不同走法对模型学习更全面的策略有帮助。
        if temperature > 0:
            # 在这里，先将 output 除以 temperature，temperature 的作用类似一个缩放因子，
            # 温度越高，经过 softmax 后得到的概率分布越 “平缓”，意味着各个位置被选中的概率相对更均匀，随机性就越大；温度越低，概率分布越 “尖锐”，
            # 模型认为最优的几个位置的概率就会占比极大，随机性就越小，更偏向于确定性地选择模型认为最好的走法。
            probs = torch.softmax(output / temperature, dim=0).cpu().numpy()
            move_index = np.random.choice(len(probs), p=probs)
        else:
            move_index = torch.argmax(output).item()

        row = move_index // model.board_size
        col = move_index % model.board_size

    return row, col
    
def update_board_state(board_state, move, chess):
    """
    根据走法更新棋盘状态张量
    param board_state: 当前棋盘状态张量
    param move: 走法坐标（行，列）
    chess: 棋子颜色（如1表示黑棋，2表示白棋）
    return: 更新后的棋盘状态张量
    """
    row, col = move
    board_state[0][row][col] = chess
    return board_state
    

def is_game_over(board_state, board_size):
    """
    判断五子棋棋局是否结束
    :param board_state: 棋盘状态张量
    :param board_size: 棋盘大小
    :return: 是否结束（True/False）以及获胜方（None表示未结束或平局，1表示黑棋胜，2表示白棋胜）
    """
    directions = [(1, 0), (-1, 0), (0, 1), (0, -1), (1, 1), (-1, -1), (1, -1), (-1, 1)]
    for row in range(board_size):
        for col in range(board_size):
            if board_state[0][row][col]!= 0:
                piece_color = board_state[0][row][col]
                for direction in directions:
                    count = 1
                    for step in range(1, 5):
                        new_row = row + step * direction[0]
                        new_col = col + step * direction[1]
                        if 0 <= new_row < board_size and 0 <= new_col < board_size and board_state[0][new_row][new_col] == piece_color:
                            count += 1
                        else:
                            break
                    if count >= 5:
                        return True, piece_color

    if (board_state == 0).sum() == 0:  # 棋盘已满判断平局
        return True, None

    return False, None


def calculate_reward(result, chess):
    """
    根据棋局结果和执子方计算奖励
    result: 棋局结果（None表示未结束或平局，1表示黑棋胜，2表示白棋胜）
    piece_color: 执子方棋子颜色（如1表示黑棋，2表示白棋）
    return: 奖励值
    """
    if result == chess:
        return 1 
    elif result is None:
        return 0
    else:
        return -1
    

def count_connectivity(board_state, piece_color):
    """
    计算给定棋盘状态下指定颜色棋子的连通情况
    :param board_state: 棋盘状态张量，形状如(1, board_size, board_size)
    :param piece_color: 棋子颜色（如示例中用1表示己方棋子）
    :return: 连通数量或者连通程度的某种量化表示（例如连通子图的数量、最长连通长度等，根据需求定义）
    """
    board_size = board_state.shape[1]
    connectivity_count = 0
    # 可以通过遍历棋盘每个位置来检查棋子连通性
    for row in range(board_size):
        for col in range(board_size):
            if board_state[0][row][col] == piece_color:
                # 这里可以进一步编写代码来检查该棋子与周围同色棋子的连通情况
                # 比如向上下左右、斜向等方向遍历查找相连的同色棋子，统计连通数量等
                # 以下是简单示意，假设发现相连就增加连通计数（实际需要更严谨逻辑）
                connectivity_count += 1
    return connectivity_count


def update_model(model, optimizer, board_states, actions, rewards):
    """
    根据奖励更新模型参数
    model: 五子棋模型
    optimizer: 优化器（如Adam等）
    board_states: 整个棋局过程中的棋盘状态张量列表
    actions: 整个棋局过程中的走法坐标列表
    rewards: 对应的奖励值列表
    :return:
    """
    optimizer.zero_grad()

    # 总损失初始化
    total_loss = 0
    # 用于存储每步的基础均方误差损失（MSE），方便后续归一化及与其他损失项结合
    mse_losses = []
    # 用于存储每步的策略相关损失（例如基于走法概率分布的损失），方便后续处理
    policy_losses = []
    # 用于存储每步的基于五子棋规则特性的损失，引导模型学习符合规则的策略
    rule_based_losses = []

    for board_state, action, reward in zip(board_states, actions, rewards):
        output = model(board_state)
        row, col = action
        action_index = row * model.board_size + col

        # 构建目标张量
        target = torch.zeros_like(output).squeeze(0).flatten()
        target[action_index] = reward

        # 1. 计算基础均方误差损失（MSE）
        mse_loss = ((output.squeeze(0).flatten() - target) ** 2).sum()
        mse_losses.append(mse_loss)

        # 2. 计算策略相关损失（示例采用负对数似然损失来优化走法概率分布）
        probs = torch.softmax(output.squeeze(0).flatten(), dim=0)
        nll_loss = -torch.log(probs[action_index])
        policy_losses.append(nll_loss)

        # 3. 计算基于五子棋规则特性的损失
        # 这里简单示例，判断走法是否破坏自身连珠优势（可根据实际深入扩展和细化规则）
        # 获取当前棋盘上自身棋子的连通情况（需要定义相应函数来分析棋盘连通性，比如count_connectivity函数）
        own_connectivity = count_connectivity(board_state, 1)  # 假设1表示己方棋子
        new_board_state = update_board_state(board_state.clone(), action, 1)  # 模拟走这步后的棋盘
        new_connectivity = count_connectivity(new_board_state, 1)
        if new_connectivity < own_connectivity:
            rule_loss = torch.tensor(0.1, requires_grad=True)  # 若破坏连珠优势，给予一定损失惩罚
        else:
            rule_loss = torch.tensor(0.0, requires_grad=True)
        rule_based_losses.append(rule_loss)

    # 对每步的MSE损失进行归一化处理
    if mse_losses:
        mean_mse_loss = torch.stack(mse_losses).mean()
    else:
        mean_mse_loss = torch.tensor(0.0, requires_grad=True)

    # 对每步的策略相关损失进行归一化处理
    if policy_losses:
        mean_policy_loss = torch.stack(policy_losses).mean()
    else:
        mean_policy_loss = torch.tensor(0.0, requires_grad=True)

    # 对每步的规则特性损失进行归一化处理
    if rule_based_losses:
        mean_rule_loss = torch.stack(rule_based_losses).mean()
    else:
        mean_rule_loss = torch.tensor(0.0, requires_grad=True)

    # 组合不同类型的损失，可根据实际情况调整权重
    total_loss = mean_mse_loss + mean_policy_loss + mean_rule_loss

    print(total_loss)
    if total_loss == 0:
        return

    # 反向传播
    total_loss.backward()

    # 学习率调整（示例采用简单的固定步长衰减，可替换为更复杂的策略如余弦退火等）
    for param_group in optimizer.param_groups:
        param_group['lr'] *= 0.99  # 每次训练步长衰减学习率，可根据实际调整衰减因子

    optimizer.step()



#白棋
def play_with_gambling(model, board_size, learning_rate, num_games):
    
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    for _ in range(num_games):

        chess_board = []
        for i in range(20):
            t = []
            for j in range(20):
                t.append(0)
            chess_board.append(t)
            
        
        board_state = torch.tensor(chess_board).reshape(1, 20, 20).float()
        game_over = False
        board_states_list = []
        actions_list = []
        rewards_list = []

        while not game_over:
            
            move = select_move(model, board_state, 0.2)
            #将每一步棋走动时的棋盘记录下来，并避免使用引用
            board_states_list.append(board_state.clone())
            #记录每一步的走法
            actions_list.append(move)            
            board_state = update_board_state(board_state, move, 1)
            chess_board[move[0]][move[1]] = 1

            game_over, result = is_game_over(board_state, board_size)

            
            if not game_over:
                                #确定走法

                x, y = place_where(chess_board)
                board_states_list.append(board_state.clone())
                actions_list.append([y, x])
                board_state = update_board_state(board_state, [y, x], 2)
                chess_board[y][x] = 2
    
                game_over, result = is_game_over(board_state, board_size)


            

            

            #一局结束分配奖励
            if game_over:
                reward = calculate_reward(result, 1)
                rewards_list.append(reward)
                update_model(model, optimizer, board_states_list, actions_list, rewards_list)
                # records.append(record)
                if result == 1:
                    print("博弈输")
                    for i in range(20):
                        for j in range(20):
                            if chess_board[i][j] == 0:
                                print(".", end = ' ')
                            else:
                                print(chess_board[i][j], end = ' ')
                        print()
                elif result == 2:
                    print("博弈胜")
                    for i in range(20):
                        for j in range(20):
                            if chess_board[i][j] == 0:
                                print(".", end = ' ')
                            else:
                                print(chess_board[i][j], end = ' ')
                        print()
                else:
                    print("平局")
                               
    print("over")
    return model



# def dynamic_loss(record_list, line, ax):
#     x_data = list(range(len(record_list)))
#     # 确保record_list里的Tensor元素都经过detach处理，转为普通数值类型（比如float）
#     y_data = [record.detach().item() if isinstance(record, torch.Tensor) else float(record) if isinstance(record, (int, float)) else 0 for record in record_list]
#     line.set_data(x_data, y_data)
#     ax.relim()
#     ax.autoscale_view()
#     plt.draw()
#     plt.pause(0.01)
#     clear_output(wait=True)
#     time.sleep(0.1)






def main():

    board_size = 200
    num_games = 100
    learning_rate = 0.005

    
    # model_white = GoBang_Model(board_size)
    model_black = GoBang_Model(board_size)
    
    # model_black = torch.load('model_black_full.pth', weights_only = False)
    
    model_black = play_with_gambling(model_black, board_size, learning_rate, num_games)



    # torch.save(model_black, 'model_black_full.pth')


main()


def self_play(model_black, model_white, board_size, num_games, learning_rate):
    # chess_board = []
    # for i in range(20):
    #     t = []
    #     for j in range(20):
    #         t.append(0)
    #     chess_board.append(t)
    """
    执行自我博弈过程并更新模型
    model_black: 代表黑棋的模型
    model_white: 代表白棋的模型
    board_size: 棋盘大小
    num_games: 自我博弈的局数
    learning_rate: 学习率
    :return:
    """
    #创建 Adam 优化器
    #将模型中所有可学习的参数传递给优化器
    optimizer_black = optim.Adam(model_black.parameters(), lr=learning_rate)
    optimizer_white = optim.Adam(model_white.parameters(), lr=learning_rate)

    for _ in range(num_games):
        board_state = torch.zeros(1, 20, 20).float()
        game_over = False
        current_color = 1  # 黑棋先下，用1表示
        board_states_list = []
        actions_list = []
        rewards_list = []

        while not game_over:

            #选择下旗方
            if current_color == 1:
                model = model_black
            else:
                model = model_white

            #确定走法
            move = select_move(model, board_state)
            #将每一步棋走动时的棋盘记录下来，并避免使用引用
            board_states_list.append(board_state.clone())
            #记录每一步的走法
            actions_list.append(move)

            board_state = update_board_state(board_state, move, current_color)
            game_over, result = is_game_over(board_state, board_size)


            #一局结束分配奖励
            if game_over:
                reward = calculate_reward(result, current_color, 1)
                rewards_list.append(reward)
                if result == 1:
                    update_model(model_black, optimizer_black, board_states_list, actions_list, rewards_list)
                    print("黑胜")
                elif result == 2:
                    update_model(model_white, optimizer_white, board_states_list, actions_list, rewards_list)
                    print("白胜")
                else:
                    print("平局")
            else:
                current_color = 3 - current_color  # 切换棋子颜色

    return model_black, model_white


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x1600 and 160000x128)

In [41]:
import const
import socket
import torch
import torch.nn as nn
import torch.optim as optim


chess_board = []
for init_chess_board in range(const.board_size_H):
    t = []
    for j in range(const.board_size_W):
        t.append(0)
    chess_board.append(t)


class GoBang_Model(nn.Module):

    #模型初始化
    def __init__(self, board_size):
        super(GoBang_Model, self).__init__()
        self.board_size = board_size
        #第一个卷积层输入1通道，输出32通道，卷积核大小为3，填充大小1
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        #第一次增加非线性因素
        self.relu1 = nn.ReLU()
        #第一个池化层使用2x2的池化核，步长为2
        self.pool1 = nn.MaxPool2d(2)
        #第二个卷积层
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        #第二次增加非线性因素
        self.relu2 = nn.ReLU()
        #第二个池化层
        self.pool2 = nn.MaxPool2d(2)
        #第一个全连接层
        #输出特征数量设置为 128，两个池化层//4
        # print(board_size)
        # print("fc1:", 64 * (board_size // 4) * (board_size // 4))
        self.fc1 = nn.Linear(64 * (board_size // 4) * (board_size // 4), 128)
        #第三次引入非线性因素
        self.relu3 = nn.ReLU()
        #第二个全连接层
        self.fc2 = nn.Linear(128, board_size * board_size)

    #前向传播
    def forward(self, x):
        # print("Input shape:", x.shape)  # 打印输入形状
        x = self.conv1(x)
        # print("After conv1:", x.shape)  # 打印第一个卷积层后的形状
        x = self.relu1(x)
        x = self.pool1(x)
        # print("After pool1:", x.shape)  # 打印第一个池化层后的形状
        x = self.conv2(x)
        # print("After conv2:", x.shape)  # 打印第二个卷积层后的形状
        x = self.relu2(x)
        x = self.pool2(x)
        # print("After pool2:", x.shape)  # 打印第二个池化层后的形状
        x = x.view(1, -1)
        # print("After flatten:", x.shape)  # 打印展平后的形状
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        return x

def select_move(model, board_state, temperature=1.0):
    """
    根据模型输出的概率分布选择走法
    model: 五子棋模型
    board_state: 当前棋盘状态张量，形状为(1, board_size, board_size)
    temperature: 温度参数，用于控制采样的随机性，温度越高越随机，越低越偏向确定性选择
    :return: 走法坐标
    """
    with torch.no_grad():
        output = model(board_state)
        output = output.squeeze(0).flatten()  # 转换为一维概率向量
        
        mask = (board_state.squeeze(0) == 0).flatten().float()
        output = output * mask  # 将已下棋的位置的得分设为0
        
        # 根据温度参数调整概率分布
        # 根据 temperature 参数的值来分情况处理走法选择的逻辑。temperature 参数用于控制选择走法的随机性程度，
        # 当它大于 0 时，采用一种基于概率采样的方式来选择走法，使得选择具有一定随机性，更有利于探索不同的走法可能性，
        # 尤其在训练早期探索更多不同走法对模型学习更全面的策略有帮助。
        if temperature > 0:
            # 在这里，先将 output 除以 temperature，temperature 的作用类似一个缩放因子，
            # 温度越高，经过 softmax 后得到的概率分布越 “平缓”，意味着各个位置被选中的概率相对更均匀，随机性就越大；温度越低，概率分布越 “尖锐”，
            # 模型认为最优的几个位置的概率就会占比极大，随机性就越小，更偏向于确定性地选择模型认为最好的走法。
            probs = torch.softmax(output / temperature, dim=0).cpu().numpy()
            move_index = np.random.choice(len(probs), p=probs)
        else:
            move_index = torch.argmax(output).item()

        row = move_index // model.board_size
        col = move_index % model.board_size

    return col, row


def handle_connection(model):
    global chess_board
    serveport = 12000
  
    servesocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    servesocket.bind(('', serveport))
    servesocket.listen(1)
    print("运行中...\n")
    cnt = 1
    while True:
        connectionsocket, addr = servesocket.accept()
        sentence = connectionsocket.recv(1024).decode()

        # 检查接收到的数据长度是否符合预期
        length = const.board_size_W * const.board_size_H
        if len(sentence) != length:
            print("接收到的数据长度不正确，请重新发送" + str(len(sentence)))
            print(sentence)
            # connectionsocket.send(error_message.encode())
            # connectionsocket.close()
            continue


        for i in range(const.board_size_H):
            for j in range(const.board_size_W):
                chess_board[i][j] = (int(sentence[const.board_size_W * i + j]))
                print(chess_board[i][j], end='')
            print()
        print("!!!!!!!!!!!!!!!!!!!!!!!!")



        board_state = torch.tensor(chess_board).reshape(1, 20, 20).float()

        #先发y再发x
        num = select_move(model, board_state, 1)
        print(num)
        s = str(num[1]) + "," + str(num[0])
        # print(s)

        print("已处理" + str(cnt) + "条\n")
        cnt += 1

        if cnt == 10:
            break


        
        if cnt > 999 :
            cnt = 999

        connectionsocket.send(s.encode())
        connectionsocket.close()



def main():
    
    # model = torch.load('model_white_full.pth')
    # model_white = GoBang_Model(board_size)
    # state_dict_white = torch.load('model_white_full.pth')
    # model_white.load_state_dict(state_dict_white)
    model_black = torch.load('model_black_full.pth')
    # 将加载的权重数据应用到模型实例上
    handle_connection(model_white)

main()


  model_black = torch.load('model_black_full.pth')


NameError: name 'model_white' is not defined

In [None]:
# import torch
import tkinter as tk
from tkinter import messagebox
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random


board_size = 20
#torch.float32



dir = [[0, -1], [1, -1], [1, 0], [1, 1], [0, 1], [-1, 1], [-1, 0], [-1, -1]]
begin_x = 20
begin_y = 20
end_x = 590
end_y = 590
cell_size = 30
r = 10
status = {
    0: 0,
    1: 100,
    2: 3000,
    3: 10000,
    4: 20000,
    5: 20000
}


if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")




def get_evaluate(r : int,  c: int, chess_board):
    #分别判断下在此处我方和敌方的得分
    #最大化我方得分，抢占敌方最优位置
    enemy = judge(r, c, 1, 2, chess_board)
    friend = judge(r, c, 2, 1, chess_board)
    score = enemy + friend
    return score

def judge(y0: int, x0: int, enemy: int, friend: int, chess_board):
    score = 0
    cnt = []

    for d in dir:
        r = 0
        first_empty = -1
        for chess_cnt in range(0, 5):
            y = y0 + d[1] * chess_cnt
            x = x0 + d[0] * chess_cnt
            if x >= 20 or x < 0 or y >= 20 or y < 0:
                break
            if chess_board[y][x] == friend:
                break
            if chess_board[y][x] == 0 and first_empty == -1:
                first_empty = chess_cnt

            if chess_board[y][x] == enemy and first_empty <= 2:
                r += 1
        cnt.append(r)
    max_cnt = 0
    for i in range(0, 4):
        max_cnt = max(max_cnt, cnt[i] + cnt[i + 4])
        if max_cnt > 5:
            max_cnt = 5;
    score += status[max_cnt]
    return score




def compare_scores(item1, item2):
    """
    用于比较两个元组中第一个元素（得分）大小的函数
    返回值为：
    - 若 item1 的得分大于 item2 的得分，返回 1
    - 若 item1 的得分小于 item2 的得分，返回 -1
    - 若二者得分相等，返回 0
    """
    score1 = item1[0]
    score2 = item2[0]
    if score1 > score2:
        return 1
    elif score1 < score2:
        return -1
    return 0


def place_where(chess_board):
    scores = []  # 用于存储每个空白位置的得分以及对应的坐标
    center_score = 20

    for r in range(20):
        for c in range(20):
            if chess_board[r][c]!= 0:
                continue
            score = center_score - abs(10 - r) - abs(10 - c) + get_evaluate(r, c, chess_board)
            scores.append((score, (c, r)))  # 将得分和坐标作为元组存入列表

    # 使用自定义的比较函数对scores列表进行排序，实现按照得分从高到低排序
    for i in range(len(scores) - 1):
        for j in range(len(scores) - i - 1):
            if compare_scores(scores[j], scores[j + 1]) < 0:
                scores[j], scores[j + 1] = scores[j + 1], scores[j]

    ten_scores = scores[:3]  # 取前十个得分最高的位置（如果不足十个则取全部）
    if len(ten_scores) == 0:
        return []  # 如果没有可落子的空白位置，返回空列表

    choice = random.randint(0, len(ten_scores) - 1)  # 随机选择一个索引
    return list(ten_scores[choice][1])  # 返回选中位置的坐标（转换为列表形式）












class GoBang_Model(nn.Module):

    #模型初始化
    def __init__(self, board_size):
        super(GoBang_Model, self).__init__()
        self.board_size = board_size
        #第一个卷积层输入1通道，输出32通道，卷积核大小为3，填充大小1
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        #第一次增加非线性因素
        self.relu1 = nn.ReLU()
        #第一个池化层使用2x2的池化核，步长为2
        self.pool1 = nn.MaxPool2d(2)
        #第二个卷积层
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        #第二次增加非线性因素
        self.relu2 = nn.ReLU()
        #第二个池化层
        self.pool2 = nn.MaxPool2d(2)
        #第一个全连接层
        #输出特征数量设置为 128，两个池化层//4
        # print(board_size)
        # print("fc1:", 64 * (board_size // 4) * (board_size // 4))
        self.fc1 = nn.Linear(64 * (board_size // 4) * (board_size // 4), 128)
        #第三次引入非线性因素
        self.relu3 = nn.ReLU()
        #第二个全连接层
        self.fc2 = nn.Linear(128, board_size * board_size)

    #前向传播
    def forward(self, x):
        # print("Input shape:", x.shape)  # 打印输入形状
        x = self.conv1(x)
        # print("After conv1:", x.shape)  # 打印第一个卷积层后的形状
        x = self.relu1(x)
        x = self.pool1(x)
        # print("After pool1:", x.shape)  # 打印第一个池化层后的形状
        x = self.conv2(x)
        # print("After conv2:", x.shape)  # 打印第二个卷积层后的形状
        x = self.relu2(x)
        x = self.pool2(x)
        # print("After pool2:", x.shape)  # 打印第二个池化层后的形状
        x = x.view(1, -1)
        # print("After flatten:", x.shape)  # 打印展平后的形状
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.fc2(x)
        return x
    


def select_move(model, board_state, temperature=1.0):
    """
    根据模型输出的概率分布选择走法
    model: 五子棋模型
    board_state: 当前棋盘状态张量，形状为(1, board_size, board_size)
    temperature: 温度参数，用于控制采样的随机性，温度越高越随机，越低越偏向确定性选择
    :return: 走法坐标
    """
    with torch.no_grad():
        output = model(board_state)
        output = output.squeeze(0).flatten()  # 转换为一维概率向量

        # 根据温度参数调整概率分布
        # 根据 temperature 参数的值来分情况处理走法选择的逻辑。temperature 参数用于控制选择走法的随机性程度，
        # 当它大于 0 时，采用一种基于概率采样的方式来选择走法，使得选择具有一定随机性，更有利于探索不同的走法可能性，
        # 尤其在训练早期探索更多不同走法对模型学习更全面的策略有帮助。
        if temperature > 0:
            # 在这里，先将 output 除以 temperature，temperature 的作用类似一个缩放因子，
            # 温度越高，经过 softmax 后得到的概率分布越 “平缓”，意味着各个位置被选中的概率相对更均匀，随机性就越大；温度越低，概率分布越 “尖锐”，
            # 模型认为最优的几个位置的概率就会占比极大，随机性就越小，更偏向于确定性地选择模型认为最好的走法。
            probs = torch.softmax(output / temperature, dim=0)
            #使用pytorch的随机数，避免传回cpu
            move_index = torch.multinomial(probs, 1).item()
        else:
            move_index = torch.argmax(output).item()

        row = move_index // model.board_size
        col = move_index % model.board_size

    return row, col
    
def update_board_state(board_state, move, chess):
    """
    根据走法更新棋盘状态张量
    param board_state: 当前棋盘状态张量
    param move: 走法坐标（行，列）
    chess: 棋子颜色（如1表示黑棋，2表示白棋）
    return: 更新后的棋盘状态张量
    """
    row, col = move
    board_state[0][row][col] = chess
    return board_state
    

def is_game_over(board_state, board_size):
    """
    判断五子棋棋局是否结束
    :param board_state: 棋盘状态张量
    :param board_size: 棋盘大小
    :return: 是否结束（True/False）以及获胜方（None表示未结束或平局，1表示黑棋胜，2表示白棋胜）
    """
    directions = [(1, 0), (-1, 0), (0, 1), (0, -1), (1, 1), (-1, -1), (1, -1), (-1, 1)]
    for row in range(board_size):
        for col in range(board_size):
            if board_state[0][row][col]!= 0:
                piece_color = board_state[0][row][col]
                for direction in directions:
                    count = 1
                    for step in range(1, 5):
                        new_row = row + step * direction[0]
                        new_col = col + step * direction[1]
                        if 0 <= new_row < board_size and 0 <= new_col < board_size and board_state[0][new_row][new_col] == piece_color:
                            count += 1
                        else:
                            break
                    if count >= 5:
                        return True, piece_color

    if (board_state == 0).sum() == 0:  # 棋盘已满判断平局
        return True, None

    return False, None


def calculate_reward(result, chess, x):
    """
    根据棋局结果和执子方计算奖励
    result: 棋局结果（None表示未结束或平局，1表示黑棋胜，2表示白棋胜）
    piece_color: 执子方棋子颜色（如1表示黑棋，2表示白棋）
    return: 奖励值
    """
    if result == chess:
        return 1 * x
    elif result is None:
        return 0
    else:
        return -1 * x




def update_model(model, optimizer, board_states, actions, rewards):
    """
    根据奖励更新模型参数
    model: 五子棋模型
    optimizer: 优化器（如Adam等）
    board_states: 整个棋局过程中的棋盘状态张量列表
    actions: 整个棋局过程中的走法坐标列表
    rewards: 对应的奖励值列表

    """
    optimizer.zero_grad()
    # 将board_states列表转换为张量并移动到GPU
    board_states_tensor = torch.stack(board_states).to(device)
    # 将actions列表转换为张量并移动到GPU，
    actions_tensor = torch.tensor(actions, dtype=torch.long).to(device)
    # 将rewards列表转换为张量并移动到GPU
    rewards_tensor = torch.tensor(rewards, dtype=torch.float).to(device)

    loss = 0
    for i in range(len(board_states)):
        board_state = board_states_tensor[i].unsqueeze(0)
        action = actions_tensor[i].unsqueeze(0)
        reward = rewards_tensor[0]
        output = model(board_state)
        row, col = action[:, 0], action[:, 1]
        action_index = row * model.board_size + col
        target = torch.zeros_like(output).squeeze(0).flatten()
        target[action_index] = reward
        loss += ((output.squeeze(0).flatten() - target) ** 2).sum()

    loss.backward()
    optimizer.step()



# def update_model(model, optimizer, board_states, actions, rewards):
#     """
#     根据奖励更新模型参数
#     model: 五子棋模型
#     optimizer: 优化器（如Adam等）
#     board_states: 整个棋局过程中的棋盘状态张量列表
#     actions: 整个棋局过程中的走法坐标列表
#     rewards: 对应的奖励值列表

#     """
#     optimizer.zero_grad()
#     loss = 0
#     for board_state, action, reward in zip(board_states, actions, rewards):
#         output = model(board_state)
#         row, col = action
#         action_index = row * model.board_size + col
#         target = torch.zeros_like(output).squeeze(0).flatten()
#         target[action_index] = reward
#         loss += ((output.squeeze(0).flatten() - target) ** 2).sum()

#     loss.backward()
#     optimizer.step()


#白棋
def play_with_gambling(model, board_size, learning_rate, num_games):
    chess_board = []
    for i in range(20):
        t = []
        for j in range(20):
            t.append(0)
        chess_board.append(t)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for _ in range(num_games):
        board_state = torch.tensor(chess_board).reshape(1, 20, 20).float().to(device)
        game_over = False
        board_states_list = []
        actions_list = []
        rewards_list = []

        while not game_over:
            x, y = place_where(chess_board)
            board_states_list.append(board_state.clone())
            actions_list.append([y, x])
            board_state = update_board_state(board_state, [y, x], 1)
            chess_board[y][x] = 1


            #确定走法
            move = select_move(model, board_state)
            #将每一步棋走动时的棋盘记录下来，并避免使用引用
            board_states_list.append(board_state.clone())
            #记录每一步的走法
            actions_list.append(move)            
            board_state = update_board_state(board_state, move, 2)
            chess_board[move[0]][move[1]] = 2


            game_over, result = is_game_over(board_state, board_size)

            #一局结束分配奖励
            if game_over:
                reward = calculate_reward(result, 2, 3)
                rewards_list.append(reward)
                if result == 2:
                    update_model(model, optimizer, board_states_list, actions_list, rewards_list)
                    print("博弈输")
                else:
                    print("博弈胜")

    return model


def self_play(model_black, model_white, board_size, num_games, learning_rate):
    # chess_board = []
    # for i in range(20):
    #     t = []
    #     for j in range(20):
    #         t.append(0)
    #     chess_board.append(t)
    """
    执行自我博弈过程并更新模型
    model_black: 代表黑棋的模型
    model_white: 代表白棋的模型
    board_size: 棋盘大小
    num_games: 自我博弈的局数
    learning_rate: 学习率
    :return:
    """
    #创建 Adam 优化器
    #将模型中所有可学习的参数传递给优化器
    optimizer_black = optim.Adam(model_black.parameters(), lr=learning_rate)
    optimizer_white = optim.Adam(model_white.parameters(), lr=learning_rate)

    for _ in range(num_games):
        board_state = torch.zeros(1, 20, 20).float().to(device)
        game_over = False
        current_color = 1  # 黑棋先下，用1表示
        board_states_list = []
        actions_list = []
        rewards_list = []

        while not game_over:

            #选择下旗方
            if current_color == 1:
                model = model_black
            else:
                model = model_white

            #确定走法
            move = select_move(model, board_state)
            #将每一步棋走动时的棋盘记录下来，并避免使用引用
            board_states_list.append(board_state.clone())
            #记录每一步的走法
            actions_list.append(move)

            board_state = update_board_state(board_state, move, current_color)
            game_over, result = is_game_over(board_state, board_size)


            #一局结束分配奖励
            if game_over:
                reward = calculate_reward(result, current_color, 1)
                rewards_list.append(reward)
                if result == 1:
                    update_model(model_black, optimizer_black, board_states_list, actions_list, rewards_list)
                    print("黑胜")
                elif result == 2:
                    update_model(model_white, optimizer_white, board_states_list, actions_list, rewards_list)
                    print("白胜")
            else:
                current_color = 3 - current_color  # 切换棋子颜色

    return model_black, model_white


def main():

    board_size = 20
    num_games = 200
    learning_rate = 0.02
    #每十轮与博弈算法进行一次对弈
    model_black = GoBang_Model(board_size).to(device)
    model_white = GoBang_Model(board_size).to(device)
  
    i = 1
    while i <= num_games:
        print(i)
        if i % 10 == 0:
            play_with_gambling(model_white, board_size, learning_rate, 1)
            i += 1
        else:
            model_black, model_white = self_play(model_black, model_white, board_size, 9, learning_rate)
            i += 9
    model_black = model_black.to(device = 'cpu')
    model_white = model_white.to(device = 'cpu')
    torch.save(model_black, 'model_black_full.pth')
    torch.save(model_white, 'model_white_full.pth')



main()
