In [None]:
import numpy as np
import copy
import itertools
import random
import time
#import math

In [None]:
class Board(object):
    '''棋盘，默认是“井字棋”Tie Tac Toe，也可以作为五子棋的棋盘'''
    def __init__(self, height=3, width=3, ninline=3):
        self.width = width     #列
        self.height = height   #行 
        self.ninline = ninline    # 表示几个相同的棋子连成一线算作胜利
        
        #''' # M x N 矩阵，值是棋子类型，0空，1代表'x'，-1代表'o'; ''' 
        self.ox = np.zeros((self.height, self.width), dtype=int )
        
        #'''现棋盘空余可下位置集合如{(0,0), (0,1), }'''
        self._ox_availables = set( [(i,j) for i in range(self.height) for j in range(self.width)] )
        
        #'''棋盘棋局胜负标志，1代表'x'赢，-1代表'o'赢，0代表平局，2代表未结束'''
        self._ox_states = 2
                    

    def _nmatrixwinner(self,nm):
        '''计算方阵连线情况,1代表'x'连成一线，-1代表'o'连成一线，0代表没有
           把各行列的和组成集合，查找集合中是否存在3或-3，代表连成一行'''
        n_line = set(nm.sum(axis=1)) | set(nm.sum(axis=0))
        '''第一条对角线的和加入列表'''
        n_line.add(sum([nm[i, i] for i in range(self.ninline) ]))
        '''第二条对角线的和加入列表'''
        n_line.add(sum([nm[self.ninline-1-i,i] for i in range(self.ninline) ]))
    
        return -1 if -self.ninline in n_line else 1 if self.ninline in n_line else 0 
    
        
    def _checkwinner(self):
        '''检查棋盘棋局胜负，1代表'x'赢，-1代表'o'赢，0代表平局，2代表未结束'''
        #'''生成棋盘所有ninline维方阵，并判断方阵是否有连成一线的情况'''
        matrixlist = [self.ox[i:i+self.ninline, j:j+self.ninline] 
                      for i in range(self.height- self.ninline +1)
                        for j in range(self.width- self.ninline +1)
                     ]
    
        #'''如果出现ninline维方阵返回1或者-1，代表已经有了输赢，返回输赢结果'''
        for nm in matrixlist:
            if self._nmatrixwinner(nm) != 0:
                return self._nmatrixwinner(nm) 
        
        #'''所有ninline维方阵返回都是0，检查是否有可落子处，无则返回0代表平局，有则2代表未结束'''
        return 0 if not self._ox_availables else 2
          
    
    def board_now(self):
        return (self._ox_states, self._ox_availables)    #返回一个元组，第一个元素是输赢状态，第二个元素是剩余可落子位置的集合
    
    
    def board_update(self, player, location): 
        '''# player在location(x,y)处落子，更新棋盘，,更新可用位置，计算此时棋局胜负'''
        if location in self._ox_availables:
            self.ox[location] = 1 if player=='x' else -1
            self._ox_availables.remove( location )    
            self._ox_states = self._checkwinner()
            return 1    
        else:
            return 0     
                  
            
    def board_show(self):
        '''显示棋盘，参数是棋盘矩阵'''
        #'''打印列标号'''
        print("{0:>15}".format('x\y'), end='') # 5个字符
        for j in range(self.width):
            print("{0:^5}".format(j), end='') # 5个字符          
        print('\n')  # 换行,打印空行        
      
        #'''打印棋盘'''            
        for i in range(self.height):        #行数
            #'''打印行标号'''
            print("{0:>15}".format(i), end='') # 5个字符
            #'''打印该行棋盘'''    
            for j in range(self.width):
                if self.ox[i][j] == 0:    #未落子  5个字符
                    print("{0:^5}".format('+'), end='')
                elif self.ox[i][j] == 1:   # x 落子  5个字符
                    print("{0:^5}".format('X'), end='')
                elif self.ox[i][j] == -1:  # o 落子
                    print("{0:^5}".format('Q'), end='') 
            print('\n')     
  


In [1]:
class Boardnode(object):
    '''棋盘节点'''
    def __init__(self, board, ):
        self.board = copy.deepcopy(board)#每一个局面（situation）该如何走子（action）的信息。这些信息是，
        self.N#N(s, a)是访问次数，
        self.W#W(s, a)是总行动价值，
        self.Q#Q(s, a)是平均行动价值，
        self.U#U(s, a)是= c_puct × 概率P(s, a) × np.sqrt(父节点访问次数N) / ( 1 + 某子节点action的访问次数N(s, a) )
        self.c#是一个决定探索水平的常数；这种搜索控制策略最初倾向于具有高先验概率和低访问次数的行为，但是渐近地倾向于具有高行动价值的行为。
        self.p#P(s, a)是被选择的概率。

        #表示当年棋局，选择action(i,j)后，各ucb, wins, Ni的值，未选择下一步棋提供依据
        self.ox_ucb = {(i,j):[0.0,0.0, 0] for i in range(self.height) for j in range(self.width)}
        
        #最后一步的玩家和落子位置
        self._ox_lastupdate = ()
               

    def board_lastupdate(self):
        return self._ox_lastupdate
    
               

In [None]:
def monte_carlo_tree_search(root):
    while resources_left(time, computational power):
        leaf = traverse(root) # leaf = unvisited node 
        simulation_result = rollout(leaf)
        backpropagate(leaf, simulation_result)
    return best_move(root)

def traverse(node): #遍历节点
    while fully_expanded(node):
        node = best_uct(node)
    return pick_univisted(node.children) or node # in case no children are present / node is terminal 

def rollout(node): #模拟展开
    while non_terminal(node):
        node = rollout_policy(node)
    return result(node) 

def rollout_policy(node): #利用模拟算法获取下一步走法
    return pick_random(node.children)

def backpropagate(node, result):
   if is_root(node) return 
   node.stats = update_stats(node, result) 
   backpropagate(node.parent)

def best_move(node):  #返回UCT 函数在访问节点中选择下一个要遍历的节点。这是蒙特卡罗树搜索的核心函数
    #pick move with highest number of UCTs
    

In [None]:
def simulation(ttt, playturn, ms=[]):
    '''根据给定棋盘现状和后续下法，返回棋盘最终状态，谁赢或者平局'''
    ox = copy.deepcopy(ttt)
    for i in range(len(ms)):
        ox.board_update(playturn, ms[i])    # 更新棋盘
        s, (v) = ox.board_now()   
        if s == 1:
            return len(ms)-i
        elif s == -1:
            return i-len(ms)       
        elif s == 2 :
            playturn = 'o' if playturn=='x' else 'x'
    return 0
    
def montercarlo_simu(ttt, player, choicemove, mounts=100):
    ox = copy.deepcopy(ttt)
    ox.board_update(player, choicemove)
    playturn = 'x' if player == 'o' else 'o'
    s, (v) = ox.board_now()
    wins = 0
    for count in range(mounts):
        mv = []
        v_setlist = list(v)
        for i in range(len(v)):
            mv.append(v_setlist.pop(random.randint(0,len(v_setlist)-1)))
            
        wins += simulation(ox,playturn, mv)
    
    return wins #返回本轮100此随机模拟的输赢和，作为choicemove的win值，n值同时加1
    
def ai_montecarlo_tree_search(ox, playturn, mounts=1000, timelast=3.0):
    timebegin = time.time()
    s, (v) = ox.board_now()
    simu_wins = {x:ox.ox_ucb[x] for x in v}    #初始化每个备选节点，(wins, N)
    #
    for mv in v:
        simu_wins[mv][0] += montercarlo_simu(ox, playturn, mv)
        simu_wins[mv][1] += 1
    pass
    
    
def ai_montecarlo_search(ox, playturn, mounts=4000, timelast=120.0):
    timebegin = time.time()
    s, (v) = ox.board_now()
    move_value = {x:0 for x in v}
    #在资源范围内，使用随机方式产生下法序列,并用该序列模拟输赢
    #当棋盘大于5*5时，25！已经是非常大的量，4000次模拟就显得非常小，丧失MonteCarlo算法意义
    for count in range(mounts):
        mv = []
        v_setlist = list(v)
        for i in range(len(v)):
            mv.append(v_setlist.pop(random.randint(0,len(v_setlist)-1)))
            
        move_value[mv[0]] += simulation(ox,playturn, mv)
        
        if (time.time()-timebegin) >= timelast : break
        
    if playturn == 'o':
        minvalue = move_value[min(move_value, key=move_value.get)]
        mvlist = [k for k,kv in move_value.items() if kv==minvalue ]
    if playturn == 'x':
        maxvalue = move_value[max(move_value, key=move_value.get)]
        mvlist = [k for k,kv in move_value.items() if kv==maxvalue ]

    return mvlist[random.randint(0,len(mvlist)-1)] #返回元组(x,y)
    
    
def ai_full_search(ox, playturn):
    #'''AI穷尽所有走法，找到最好的。当棋盘大于5*5时，25！已经是非常大的量去逐个计算了，基本不可行'''
    s, (v) = ox.board_now()
    move_value = {x:0 for x in v}
    #'''使用了排列生成，生成函数是个函数生成器； 返回剩余下法的所有排列'''
    mvs = itertools.permutations(v) 
    for mv in mvs:
        move_value[mv[0]] += simulation(ox, playturn, mv)
        
    if playturn == 'o':
        minvalue = move_value[min(move_value, key=move_value.get)]
        mvlist = [k for k,kv in move_value.items() if kv==minvalue ]
    if playturn == 'x':
        maxvalue = move_value[max(move_value, key=move_value.get)]
        mvlist = [k for k,kv in move_value.items() if kv==maxvalue ]

    return mvlist[random.randint(0,len(mvlist)-1)] #返回元组(x,y)

          

class MCTS(object):
    """
    AI player, use Monte Carlo Tree Search with UCB
    """

    def __init__(self, board, play_turn, n_in_row=5, time=5, max_actions=1000):

        self.board = board
        self.play_turn = play_turn # 出手顺序
        self.calculation_time = float(time) # 最大运算时间
        self.max_actions = max_actions # 每次模拟对局最多进行的步数
        self.n_in_row = n_in_row

        self.player = play_turn[0] # 轮到电脑出手，所以出手顺序中第一个总是电脑
        self.confident = 1.96 # UCB中的常数
        self.plays = {} # 记录着法参与模拟的次数，键形如(player, move)，即（玩家，落子）
        self.wins = {} # 记录着法获胜的次数
        self.max_depth = 1

    def get_action(self): # return move

        if len(self.board.availables) == 1:
            return self.board.availables[0] # 棋盘只剩最后一个落子位置，直接返回

        # 每次计算下一步时都要清空plays和wins表，因为经过AI和玩家的2步棋之后，整个棋盘的局面发生了变化，原来的记录已经不适用了——原先普通的一步现在可能是致胜的一步，如果不清空，会影响现在的结果，导致这一步可能没那么“致胜”了
        self.plays = {} 
        self.wins = {}
        simulations = 0
        begin = time.time()
        while time.time() - begin < self.calculation_time:
            board_copy = copy.deepcopy(self.board)  # 模拟会修改board的参数，所以必须进行深拷贝，与原board进行隔离
            play_turn_copy = copy.deepcopy(self.play_turn) # 每次模拟都必须按照固定的顺序进行，所以进行深拷贝防止顺序被修改
            self.run_simulation(board_copy, play_turn_copy) # 进行MCTS
            simulations += 1

        print("total simulations=", simulations)

        move = self.select_one_move() # 选择最佳着法
        location = self.board.move_to_location(move)
        print('Maximum depth searched:', self.max_depth)

        print("AI move: %d,%d\n" % (location[0], location[1]))

        return move

    def run_simulation(self, board, play_turn):
        """
        MCTS main process
        """

        plays = self.plays
        wins = self.wins
        availables = board.availables

        player = self.get_player(play_turn) # 获取当前出手的玩家
        visited_states = set() # 记录当前路径上的全部着法
        winner = -1
        expand = True

        # Simulation
        for t in range(1, self.max_actions + 1):
            # Selection
            # 如果所有着法都有统计信息，则获取UCB最大的着法
            if all(plays.get((player, move)) for move in availables):
                log_total = log(
                    sum(plays[(player, move)] for move in availables))
                value, move = max(
                    ((wins[(player, move)] / plays[(player, move)]) +
                     sqrt(self.confident * log_total / plays[(player, move)]), move)
                    for move in availables) 
            else:
                # 否则随机选择一个着法
                move = choice(availables)

            board.update(player, move)

            # Expand
            # 每次模拟最多扩展一次，每次扩展只增加一个着法
            if expand and (player, move) not in plays:
                expand = False
                plays[(player, move)] = 0
                wins[(player, move)] = 0
                if t > self.max_depth:
                    self.max_depth = t

            visited_states.add((player, move))

            is_full = not len(availables)
            win, winner = self.has_a_winner(board)
            if is_full or win: # 游戏结束，没有落子位置或有玩家获胜
                break

            player = self.get_player(play_turn)

        # Back-propagation
        for player, move in visited_states:
            if (player, move) not in plays:
                continue
            plays[(player, move)] += 1 # 当前路径上所有着法的模拟次数加1
            if player == winner:
                wins[(player, move)] += 1 # 获胜玩家的所有着法的胜利次数加1

    def get_player(self, players):
        p = players.pop(0)
        players.append(p)
        return p

    def select_one_move(self):
        percent_wins, move = max(
            (self.wins.get((self.player, move), 0) /
             self.plays.get((self.player, move), 1),
             move)
            for move in self.board.availables) # 选择胜率最高的着法

        return move

    def has_a_winner(self, board):
        """
        检查是否有玩家获胜
        """
        moved = list(set(range(board.width * board.height)) - set(board.availables))
        if(len(moved) < self.n_in_row + 2):
            return False, -1

        width = board.width
        height = board.height
        states = board.states
        n = self.n_in_row
        for m in moved:
            h = m // width
            w = m % width
            player = states[m]

            if (w in range(width - n + 1) and
                len(set(states[i] for i in range(m, m + n))) == 1): # 横向连成一线
                return True, player

            if (h in range(height - n + 1) and
                len(set(states[i] for i in range(m, m + n * width, width))) == 1): # 竖向连成一线
                return True, player

            if (w in range(width - n + 1) and h in range(height - n + 1) and
                len(set(states[i] for i in range(m, m + n * (width + 1), width + 1))) == 1): # 右斜向上连成一线
                return True, player

            if (w in range(n - 1, width) and h in range(height - n + 1) and
                len(set(states[i] for i in range(m, m + n * (width - 1), width - 1))) == 1): # 左斜向下连成一线
                return True, player

        return False, -1

    def __str__(self):
        return "AI"

In [2]:
class Human(object):
    '''人类选手'''
    def __init__(self, board, player):
        self.board = board
        self.player = player

    def get_action(self):
        try:
            location = input('请输入落子的位置，如34代表3行4列（q代表退出)：')
            location = [int(n, 10) for n in input("Your move: ").split(",")]
            move = self.board.location_to_move(location)
        except Exception as e:
            move = -1
        if move == -1 or move not in self.board.availables:
            print("invalid move")
            move = self.get_action()
        return move

    def __str__(self):
        return "Human"

In [3]:
'''生成井字棋棋盘，并获得初始状态 ''' 
ttt = Board(3, 3, 3)
ttt_states, (ttt_availables) = ttt.board_now()
player = 'x'
i = 0

print("棋盘初始化，X先手：")
ttt.board_show()

while True :
    i += 1
    print("X先手，第%d回合："%i, end='')    

    if player == 'x':
        #print('请选手选择落子处的数字，如23代表在第2行第3列落子：', end='')
        #xy = input()
        #playlocation = (int(xy[0]), int(xy[1]) )
        timebegin = time.time()
        playlocation = ai_full_search(ttt, player)
        #playlocation = ai_montecarlo_search(ttt, player, 200)
        ttt.board_update(player, playlocation)
        s, (v) = ttt.board_now()
        print('X落子在', playlocation, '用时%0.4f'%(time.time()-timebegin), '秒; ', end='')
        if s == 1:
            print('选手X 赢')
            ttt.board_show()
            break
        if v == set():
            print('平局')
            ttt.board_show()
            break
        player = 'o'

    if player == 'o':
        timebegin = time.time()
        #playlocation = ai_full_search(ttt, player)
        playlocation = ai_montecarlo_search(ttt, player, 4000, 120)
        ttt.board_update(player, playlocation)
        s, (v) = ttt.board_now()
        print('O落子在', playlocation, '用时%0.4f'%(time.time()-timebegin), '秒. ')
        if s == -1:
            print('计算机Q 赢')
            ttt.board_show()
            break 
        if v == set():
            print('平局')
            ttt.board_show()
            break
        player = 'x'
        
    ttt.board_show()