Copyright (c) 2021 Giovanni Squillero <squillero@polito.it>
https://github.com/squillero/computational-intelligence
Free for personal or classroom use; see 'LICENCE.md' for details.
# Connect4

In the following exercise we will analyze possible solutions to the connect4 game exploiting the min-max algorithm, montecarlo tree search and a combination of the two
### Basic Functions

In [78]:
import numpy as np
from collections import Counter

NUM_COLUMNS = 7
COLUMN_HEIGHT = 6
SIZE = NUM_COLUMNS * COLUMN_HEIGHT
FOUR = 4


def valid_moves(board):
    """Returns columns where a disc may be played"""
    return [n for n in center_range(NUM_COLUMNS//2, NUM_COLUMNS) if board[n, COLUMN_HEIGHT - 1] == 0]


def center_range(start: int, elements: int):
    i = 0
    j = 0
    while j < elements:
        yield start + i
        i *= -1
        if i >= 0:
            i += 1
        j += 1


def play(board, column, player):
    """Updates `board` as `player` drops a disc in `column`"""
    (index,) = next((i for i, v in np.ndenumerate(board[column]) if v == 0))
    board[column, index] = player


def take_back(board, column):
    """Updates `board` removing top disc from `column`"""
    (index,) = [i for i, v in np.ndenumerate(board[column]) if v != 0][-1]
    board[column, index] = 0


def four_in_a_row(board, player):
    """Checks if `player` has a 4-piece line"""
    return (
        any(
            all(board[c, r] == player)
            for c in range(NUM_COLUMNS)
            for r in (list(range(n, n + FOUR)) for n in range(COLUMN_HEIGHT - FOUR + 1))
        )
        or any(
            all(board[c, r] == player)
            for r in range(COLUMN_HEIGHT)
            for c in (list(range(n, n + FOUR)) for n in range(NUM_COLUMNS - FOUR + 1))
        )
        or any(
            np.all(board[diag] == player)
            for diag in (
                (range(ro, ro + FOUR), range(co, co + FOUR))
                for ro in range(0, NUM_COLUMNS - FOUR + 1)
                for co in range(0, COLUMN_HEIGHT - FOUR + 1)
            )
        )
        or any(
            np.all(board[diag] == player)
            for diag in (
                (range(ro, ro + FOUR), range(co + FOUR - 1, co - 1, -1))
                for ro in range(0, NUM_COLUMNS - FOUR + 1)
                for co in range(0, COLUMN_HEIGHT - FOUR + 1)
            )
        )
    )



## MinMaxEvaluation 

The minmax evaluation is implemented using several strategies:
* Caching of the position: when a result is found the value of the position is saved, before calculating a new position the cache is checked for the position exploiting vertical simmetry and player simmetry
* Order of exploration: The first move explored is always the center one as it is usually the best one
* alpha beta pruning: pruning is implemented using as a evaluation of a position the number of moves that a player must do before winning, this means that faster wins have an higher scores of slower wins or outright losses
* Negamax: The minmax algorithm is written as the negamax version, this means that we always optimize for the maximizing player and at each depth step we simply invert the board, this means that also the result of the negamax function must be inverted at each step, this strategy doesen't provide any improvement performance wise but improves code size

In [101]:

# Board can be initiatilized with `board = np.zeros((NUM_COLUMNS, COLUMN_HEIGHT), dtype=np.byte)`
# Notez Bien: Connect 4 "columns" are actually NumPy "rows"


def minmax(board, player):
    """
    Calls the negamax function and initializes the cache
    The board is multiplied by player in order to always set player 1 as the first one to move
    """
    cache = {}
    return negamax(board*player, cache=cache)

def negamax(board: np.ndarray, *, α=-float('inf'), β=float('inf'), cache):
    """
    Negamax with α-β pruning and cache optimization
    """
    # Check for cache hit with vertical simmetry
    if(t := board.tobytes()) in cache or (t := np.flip(board, axis=0).tobytes()) in cache:
        return cache[t][0], cache[t][1]
    # Check for cache hit with player and vertical simmetry
    if (t := (board * -1).tobytes()) in cache or (t := (np.flip(board, axis=0) * -1).tobytes()) in cache:
        return -cache[t][0], cache[t][1]

    # DRAW
    if np.sum(board != 0) == SIZE:
        return 0, None

    max_score = SIZE
    for move in valid_moves(board):
        play(board, move, 1)
        if four_in_a_row(board, 1):
            take_back(board, move)
            # Return the number of moves to arrive to this position
            return SIZE-np.count_nonzero(board), move
        take_back(board, move)

    if β > max_score:
        β = SIZE
        
    # if the window is incompatible we cut off the branch 
    if α >= β:
        return β, None

    best_move = None
    for move in valid_moves(board):
        play(board, move, 1)
        # call the negamax on the next move, the result returned must be inverted because it's the score of the other player
        score, _ = negamax(-1*board, α=-β, β=-α, cache=cache)
        # save the result to cache
        cache[board.tobytes()] = -score, move
        take_back(board, move)

        # incompatible window
        if -score >= β:
            return -score, move

        # better α
        if -score > α:
            α = -score
            best_move = move

    return α, best_move



## Montecarlo Tree Search

We can also use another approach with montecarlo simulations, the chosen algorithm will explore the solution space using the following formula:<br>
$UCB = V + 2\sqrt{\frac{\ln N}{n}}$<br>
This formula represents the upper confidence bound of node, the variables are:
* V is the average score of the node benaeath of the given node
* N is the number of times the parent node has been visited
* n is the number of times the node has been visited

At every step of the sampling if we aren't currently in a leaf node we choose the child node that maximizes the UBC, if instead we are in a leaf node (of the exploration not of the game state tree) we do a "rollout", we simply play randomly and record the outcome

#### Rollouts
The scoring function used to evaluate a final state is: $\text{score} = \text{board_size} - \text{moves}$, this rewards faster and more simpler solutions

In [80]:
def r_out(board):
    move = np.random.choice(valid_moves(board))
    play(board, move, 1)
    if np.count_nonzero(board == 0) == 0:
        take_back(board, move)
        return 0
    if four_in_a_row(board, 1):
        take_back(board, move)
        return 1
    res = -r_out(-board)
    take_back(board, move)
    return res


def roll_out(board):
    if four_in_a_row(board, 1):
        return np.count_nonzero(board == 0)
    if four_in_a_row(board, -1):
        return -np.count_nonzero(board == 0)
    if np.count_nonzero(board == 0) == 0:
        return 0
    return r_out(board)



####  Upper Confidence Bound

In [81]:

def UCB(node, node_data):
    if node_data[node]['n'] == 0:
        return float('inf')
    N = node_data[tuple(list(node)[:-1])]['n']
    sign = -1 if len(node) % 2 == 0 else 1
    return sign * node_data[node]['V'] / node_data[node]['n'] + 2 * np.sqrt(np.log(N) / node_data[node]['n'])



#### Simulation

In [102]:

def back_propagation(node, result, node_data, board):
    node_data[node]['V'] += result
    node_data[node]['n'] += 1

    current = list(node)
    while current:
        take_back(board, current.pop())
        node_data[tuple(current)]['n'] += 1
        node_data[tuple(current)]['V'] += result
        node_data[tuple(current)]['leaf'] = False


def tree_traversal(board, node_data):
    """
    instead of using complex classes we simply use a dictionary to store the results of the simulations indexed
    by the set of moves used to reach that particular state represented as tuples
    """
    player = 1
    current = ()
    while not node_data[current]['leaf']:
        if four_in_a_row(board, 1):
            back_propagation(current, np.count_nonzero(board == 0), node_data, board)
            return
        if four_in_a_row(board, -1):
            back_propagation(current, -np.count_nonzero(board == 0), node_data, board)
            return
        if np.count_nonzero(board) == board.size:
            back_propagation(current, 0, node_data, board)
            return

        moves = {}
        for m in valid_moves(board):
            play(board, m, player)
            moves[m] = UCB(current + (m,), node_data)
            take_back(board, m)

        m = max(moves, key=moves.get)

        play(board, m, player)
        player *= -1
        current = current + (m,)

    result = 0
    if node_data[current]['n'] == 0:
        result = roll_out(board * player)
    else:
        for m in valid_moves(board):
            node_data[current + (m,)] = {'V': 0, 'n': 0, 'leaf': True}
        node_data[current]['leaf'] = False
        result = roll_out(board * player)

    back_propagation(current, result, node_data, board)



def montecarlo(board: np.ndarray, player: int, n_samples: int):
    node_data = {(()): {'V': 0, 'N': 0, 'n': 0, 'leaf': True}}

    for _ in range(n_samples):
        tree_traversal(board * player, node_data)

    results = {move: node_data[(move,)]['V']/node_data[(move,)]['n'] for move in valid_moves(board)}
    return node_data[()]['V']/node_data[()]['n'] , max(results, key=lambda x: results[x])


## Mixing the Approaches
We can mix the two approaches by limiting the maximum depth of the negamax, after it is reached we consider it as a leaf node in respect to the minmax algorithm and we score the node using the montecarlo tree search algorithm

In [111]:
import numpy as np


def minmax_montecarlo(board, player):
    cache = {}
    return negamax_montecarlo(board*player, cache=cache)

def negamax_montecarlo(board: np.ndarray, *, α=-float('inf'), β=float('inf'), depth=0, maxdepth=3, cache=None):
    if cache is not None:
        if(t := board.tobytes()) in cache or (t := np.flip(board, axis=0).tobytes()) in cache:
            return cache[t][0], cache[t][1]
        if (t := (board * -1).tobytes()) in cache or (t := (np.flip(board, axis=0) * -1).tobytes()) in cache:
            return -cache[t][0], cache[t][1]

    # DRAW
    if np.sum(board != 0) == SIZE:
        return 0, None

    max_score = 1
    for move in valid_moves(board):
        play(board, move, 1)
        if four_in_a_row(board, 1):
            take_back(board, move)
            return max_score, move
        take_back(board, move)

    if β > max_score:
        β = 1
        if α >= β:
            return β, None

    if depth >= maxdepth:
        # Here is the main change with the evaluation when reaching the 
        # maximum depth
        for move in valid_moves(board):
            play(board, move, 1)
            score, _ = montecarlo(board, 1, 100)
            take_back(board, move)
            return score, move

    best_move = None
    for move in valid_moves(board):
        play(board, move, 1)
        score, _ = negamax_montecarlo(-1*board, α=-β, β=-α, depth=depth+1, cache=cache)
        print(depth)
        cache[board.tobytes()] = -score, move
        take_back(board, move)

        if -score >= β:
            return -score, move

        if -score > α:
            α = -score
            best_move = move

    return α, best_move
