Copyright **`(c)`** 2021 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see 'LICENCE.md' for details.

# Connect 4

In [1]:
from collections import Counter
from dataclasses import dataclass
from time import perf_counter
from tqdm import tqdm
import itertools
import functools
from typing import List

from utils import pretty_print_board, counted

import numpy as np
np.set_printoptions(precision=2)


In [2]:
NUM_COLUMNS = 7
COLUMN_HEIGHT = 6
FOUR = 4

PLAYER_1 = 1
PLAYER_2 = -1

# Board can be initiatilized with `board = np.zeros((NUM_COLUMNS, COLUMN_HEIGHT), dtype=np.byte)`
# Notez Bien: Connect 4 "columns" are actually NumPy "rows"

# Fix the 'col_height' and 'num_cols' parameters
# of pretty_print_board
pretty_print_board = functools.partial(pretty_print_board,
                                       col_height=COLUMN_HEIGHT,
                                       num_cols=NUM_COLUMNS)


## Basic Functions

In [3]:
def valid_moves(board):
    """Returns columns where a disc may be played"""
    return [n for n in range(NUM_COLUMNS) if board[n, COLUMN_HEIGHT - 1] == 0]


def play(board, column, player):
    """Updates `board` as `player` drops a disc in `column`"""
    (index,) = next((i for i, v in np.ndenumerate(board[column]) if v == 0))
    board[column, index] = player


def take_back(board, column):
    """Updates `board` removing top disc from `column`"""
    (index,) = [i for i, v in np.ndenumerate(board[column]) if v != 0][-1]
    board[column, index] = 0


def four_in_a_row(board, player):
    """Checks if `player` has a 4-piece line"""
    return (
        any(
            all(board[c, r] == player)
            for c in range(NUM_COLUMNS)
            for r in (list(range(n, n + FOUR)) for n in range(COLUMN_HEIGHT - FOUR + 1))
        )
        or any(
            all(board[c, r] == player)
            for r in range(COLUMN_HEIGHT)
            for c in (list(range(n, n + FOUR)) for n in range(NUM_COLUMNS - FOUR + 1))
        )
        or any(
            np.all(board[diag] == player)
            for diag in (
                (range(ro, ro + FOUR), range(co, co + FOUR))
                for ro in range(0, NUM_COLUMNS - FOUR + 1)
                for co in range(0, COLUMN_HEIGHT - FOUR + 1)
            )
        )
        or any(
            np.all(board[diag] == player)
            for diag in (
                (range(ro, ro + FOUR), range(co + FOUR - 1, co - 1, -1))
                for ro in range(0, NUM_COLUMNS - FOUR + 1)
                for co in range(0, COLUMN_HEIGHT - FOUR + 1)
            )
        )
    )

## Montecarlo Evaluation

In [4]:
def _mc(board, player):
    p = -player
    while valid_moves(board):
        p = -p
        c = np.random.choice(valid_moves(board))
        play(board, c, p)
        if four_in_a_row(board, p):
            return p
    return 0


def montecarlo(board, player, mc_func):
    montecarlo_samples = 100
    cnt = Counter(mc_func(np.copy(board), player) for _ in range(montecarlo_samples))
    return (cnt[1] - cnt[-1]) / montecarlo_samples


def eval_board(board, player, mc_func=_mc):
    if four_in_a_row(board, 1):
        # Alice won
        return 1
    elif four_in_a_row(board, -1):
        # Bob won
        return -1
    else:
        # Not terminal, let's simulate...
        return montecarlo(board, player, mc_func)

## Example

In [5]:
board = np.zeros((NUM_COLUMNS, COLUMN_HEIGHT), dtype=np.byte)
play(board, 3, PLAYER_1)
play(board, 0, PLAYER_2)
play(board, 4, PLAYER_1)
play(board, 0, PLAYER_2)
play(board, 5, PLAYER_1)

# eval_board(board, PLAYER_1)
pretty_print_board(board)


P1: 🟡	P2: 🔴
  ╔════╦════╦════╦════╦════╦════╦════╗
5 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
4 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
3 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
2 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
1 ║ 🔴 ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
0 ║ 🔴 ║    ║    ║ 🟡 ║ 🟡 ║ 🟡 ║    ║
  ╚════╩════╩════╩════╩════╩════╩════╝
     0    1    2    3    4    5    6   


## Solution (OLD)

In [6]:
def count_by_length(board, player, length):
    board = (board == player)

    kernel = np.ones(length)
    diagspan = COLUMN_HEIGHT - length

    sequences = itertools.chain(
        # board rows
        (row for row in board),
        # board columns
        (col for col in board.T),
        # board diagonals
        (np.diag(board, i) for i in range(-diagspan, diagspan + 1)),
        # board antidiagonals
        (np.diag(np.fliplr(board), i) for i in range(-diagspan, diagspan + 1))
    )

    return sum(np.sum(np.convolve(kernel, seq) == length) for seq in sequences)


In [7]:
def best_move(board, player):
    scores = []
    for col in valid_moves(board):
        play(board, col, player)
        scores.append(player * eval_board(board, player))
        take_back(board, col)

    max_score = max(scores)
    return scores.index(max_score), max_score


In [8]:
player = PLAYER_1

b = board.copy()
# b = np.zeros((NUM_COLUMNS, COLUMN_HEIGHT), dtype=np.byte)
steps = 0

while not(four_in_a_row(b, PLAYER_1) or four_in_a_row(b, PLAYER_2)):
    col, _ = best_move(b, player)
    play(b, col, player)

    player = player * -1
    steps = steps + 1

print(f"Final state (steps={steps}): ")
pretty_print_board(b)


Final state (steps=1): 
P1: 🟡	P2: 🔴
  ╔════╦════╦════╦════╦════╦════╦════╗
5 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
4 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
3 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
2 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
1 ║ 🔴 ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
0 ║ 🔴 ║    ║ 🟡 ║ 🟡 ║ 🟡 ║ 🟡 ║    ║
  ╚════╩════╩════╩════╩════╩════╩════╝
     0    1    2    3    4    5    6   


## Minimax

In [9]:
def vanilla_min_max(board: np.ndarray, player: int, minimize=True, depth=3, *, score_func):
    """
    This function returns the next best move for the current player

    Parameters
    ----------
    board: array_like
        the connect 4 board
    player: int
        the player who plays next
    minimize: boolean
        whether the player is a minimizer or a maximizer
    depth: int
        depth of the minmax search
    score_func: function
        an (heuristic) function that evaluates the board assigning it a score between 0 and +inf

    Returns
    -------
    int
        the suggested next move to play
    float
        the score resulting from playing the suggested move
    """
    
    moves = valid_moves(board)

    if count_by_length(board, player, 4):
        return None, 1_000_000 // (4 - depth) 

    if count_by_length(board, -player, 4):
        return None, -1_000_000 // (4 - depth) 

    if depth == 0 or len(moves) == 0:
        # return None, score_func(board, player)
        return None, score_func(board, player)

    # compute the scores for all the possibile moves
    scores = []

    for move in moves:
        # play in the column 'move'
        play(board, move, player)

        # update the score
        _, new_score = vanilla_min_max(board, -player, not minimize, depth - 1, score_func=score_func)

        # undo the move
        take_back(board, move)

        # append the score
        # scores.append(0.9 * new_score)
        scores.append(new_score)

    best_score = min(scores) if minimize else max(scores)
    return moves[scores.index(best_score)], best_score        


### Alpha-beta pruning

In [10]:
# alpha and beta respectively represent the minimum score that the 
# maximizing player is assured of and the maximum score 
# that the minimizing player is assured of.

def alphabeta(board, player, minimizer=True, depth=3, alpha=-np.inf, beta=np.inf, *, score_func):
    """
    This function returns the next best move for the current player
    """
    
    moves = valid_moves(board)
    
    if count_by_length(board, player, 4):
        return None, 1_000_000 // (4 - depth)

    if count_by_length(board, -player, 4):
        return None, -1_000_000 // (4 - depth)

    if depth == 0 or len(moves) == 0:
        # return None, score_func(board, player)
        return None, score_func(board, player)

    # The maximizer starts from best_score = -inf
    # and it is assured that the minimum value it can encounter is alpha.
    # On the other side, the minimizer starts from
    # best_score = inf and it is assured that the maximum value
    # it can encouter is beta
    best_move, best_score = None, np.inf if minimizer else -np.inf

    for move in moves:
        # play in the column 'move'
        play(board, move, player)

        # update the score
        _, new_score = alphabeta(board, -player, not minimizer, depth - 1, alpha, beta, score_func=score_func)

        # scale a bit the score so that late outcomes are less preferable
        # new_score *= 0.9 

        take_back(board, move)

        if minimizer:
            if new_score < best_score:
                # better score found
                best_move, best_score = move, new_score
            
            beta = min(beta, best_score)
            if new_score < alpha:
                break # no need to go further
        else:
            # maxmizer
            if new_score > best_score:
                # better score found
                best_move, best_score = move, new_score
            
            alpha = max(alpha, best_score)
            if new_score > beta:
                break # no need to go further

    return best_move, best_score
        

In [11]:
def build_dict(depth, values=[]):
    if depth == 0:
        array = np.array(values, dtype=np.byte)
        ones = np.ones(4, dtype=np.byte)

        return {
            player:{
                1: tuple(np.argwhere(np.convolve(player * ones[:1], array, mode='valid') == 1).flatten()),
                2: tuple(np.argwhere(np.convolve(player * ones[:2], array, mode='valid') == 2).flatten()),
                3: tuple(np.argwhere(np.convolve(player * ones[:3], array, mode='valid') == 3).flatten()),
                4: tuple(np.argwhere(np.convolve(player * ones[:4], array, mode='valid') == 4).flatten()),
            } for player in [-1, 1]
        }

    return {
        n: build_dict(depth-1, values + [n]) for n in [-1, 0, 1]
    }


In [12]:
dict = build_dict(7)

def fast_convolve(row):
    _state = dict
    for digit in row:
        _state = _state[digit]

    for _ in range(7 - row.size):
        _state = _state[0]

    return _state

## Sequences extraction

In [13]:
@dataclass
class Sequence:
    board: np.ndarray
    start: int
    step: int
    length: int

    def __len__(self):
        return self.length


def compute_sequences(board: np.ndarray, player) -> List[Sequence]:
    diagspan = COLUMN_HEIGHT - 2

    sequences = itertools.chain(
        (((ncol, 0), (0, 1), col) for ncol, col in enumerate(board)),
        (((0, nrow), (1, 0), row) for nrow, row in enumerate(board.T)),
        (((max(-ndiag, 0), max(ndiag, 0)), (1, 1), np.diag(board, ndiag))
         for ndiag in range(-diagspan, diagspan)),
        # board antidiagonals
        (((max(-ndiag, 0), min(COLUMN_HEIGHT - 1, COLUMN_HEIGHT - 1 - ndiag)), (1, -1),
         np.diag(np.fliplr(board), ndiag)) for ndiag in range(-diagspan, diagspan))
    )

    r = []
    for start, step, sequence in sequences:
        # indices = np.argwhere(np.convolve(kernel, sequence, mode='valid') == length)
        r += [
            Sequence(board, np.array(start) + index * np.array(step),
                     np.array(step), length)
            for length, indices in fast_convolve(sequence)[player].items()
            for index in indices
        ]

    return r


## Heuristic eval

In [14]:
def heuristic_eval(board, player):
    """
    Compute an heuristic score of the board assuming
    the player is about to play
    """
    WIN = 1_000_000

    # check if the position is valid
    def is_valid(a, b): return 0 <= a < NUM_COLUMNS and 0 <= b < COLUMN_HEIGHT

    def is_free(a, b): return board[a, b] == 0
    def empty_below(a, b): return 0 if b == 0 else (board[a, :b] == 0).sum()

    # player
    player_score = 0
    for sequence in compute_sequences(board, player):
        # evaluate the sequence

        if len(sequence) == 4:
            # the player wins (return a big number)
            return WIN

        for nsteps in [-1, len(sequence)]:
            # take a step in one direction
            candidate = sequence.start + nsteps * sequence.step

            if is_valid(*candidate) and is_free(*candidate):
                # the position is valid and empty

                new_length = len(sequence) + 1

                # look a few steps ahead on the same direction
                # maybe filling this cell could join sequences
                for steps_ahead in range(2, 4 - new_length + 2):
                    a, b = candidate + steps_ahead * \
                        np.sign(nsteps) * sequence.step

                    if not is_valid(a, b) or not board[a, b] == player:
                        break
                    new_length += 1

                if new_length == 4 and empty_below(*candidate) == 0:
                    # current sequence has length 3 and can be extended
                    return WIN
                else:
                    player_score += new_length * \
                        (new_length - empty_below(*candidate))

    return player_score


## Evaluation

In [15]:
def play_game(initial_board, initial_player, player_func, opponent_func, verbose=False, return_board=False):
    player = initial_player

    board = initial_board.copy()
    steps = 0

    while not(four_in_a_row(board, PLAYER_1) or four_in_a_row(board, PLAYER_2)):
        if player == initial_player:
            move = player_func(board)
        else:
            move = opponent_func(board)

        if move is None:
            break

        play(board, move, player)

        if verbose:
            print(f"Player {player} played {move}")
            pretty_print_board(board)
            print()

        player = player * -1
        steps = steps + 1

    winner = 0
    if count_by_length(board, -initial_player, 4):
        winner = -initial_player
    elif count_by_length(board, initial_player, 4):
        winner = initial_player

    if return_board:
        return winner, board
    return winner


In [16]:
def run_simulation(game_function, runs, player):
    print(f"Performing {runs} simulations...")

    simulation_results = {player: 0, -player: 0, 0: 0}

    start_time = perf_counter()

    for _ in tqdm(range(runs)):
        simulation_results[game_function()] += 1

    elapsed_time = perf_counter() - start_time

    wins_perc = 100 * (simulation_results[player] / runs)
    draw_perc = 100 * (simulation_results[0] / runs)

    print(f"Simulations completed in {elapsed_time:.6f} seconds!")
    print(f"Wins={wins_perc:.2f}, Draws={draw_perc:.2f}")


In [17]:
board_evaluation_function = counted(heuristic_eval)

# reinitialize the random seed
np.random.seed(1234)

# initial board
board = np.zeros((NUM_COLUMNS, COLUMN_HEIGHT), dtype=np.byte)
# initial player
player = PLAYER_1

# player function
# player_func = lambda board: vanilla_min_max(board, player, depth=4, score_func=board_evaluation_function)[0]
player_func = lambda board: alphabeta(board, player, depth=3, score_func=board_evaluation_function)[0]

# opponent function
opponent_func = lambda board: np.random.choice(valid_moves(board))

# run a game
winner, final_board = play_game(board, player, player_func, opponent_func, return_board=True)

print(f"{winner} wins! (n. of calls to the board evaluation function: {board_evaluation_function.calls})")
pretty_print_board(final_board)


1 wins! (n. of calls to the board evaluation function: 1917)
P1: 🟡	P2: 🔴
  ╔════╦════╦════╦════╦════╦════╦════╗
5 ║    ║    ║    ║    ║    ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
4 ║    ║    ║    ║ 🟡 ║ 🟡 ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
3 ║    ║    ║    ║ 🟡 ║ 🔴 ║    ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
2 ║    ║ 🔴 ║    ║ 🟡 ║ 🔴 ║ 🟡 ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
1 ║ 🔴 ║ 🔴 ║    ║ 🟡 ║ 🟡 ║ 🔴 ║    ║
  ╠════╬════╬════╬════╬════╬════╬════╣
0 ║ 🟡 ║ 🔴 ║    ║ 🔴 ║ 🟡 ║ 🟡 ║ 🔴 ║
  ╚════╩════╩════╩════╩════╩════╩════╝
     0    1    2    3    4    5    6   


In [18]:
# run a batch of simulations
run_simulation(
    lambda: play_game(board, player, player_func, opponent_func),
    100, # number of simulations to perform
    player # initial player
)

Performing 100 simulations...


100%|██████████| 100/100 [02:49<00:00,  1.70s/it]

Simulations completed in 169.539023 seconds!
Wins=100.00, Draws=0.00



