In [1]:
import numpy as np
import random
import json
from collections import defaultdict
from IPython.display import clear_output
import os
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

1) CONSTANTS & HELPERS

In [2]:
ROWS, COLS = 6, 7

def get_legal_moves(board):
    """Return all columns where the top cell is still free (0)."""
    return [c for c in range(COLS) if board[0, c] == 0]

def place_piece(board, player, col):
    """
    Drop 'player' (+1 or -1) into column 'col'.
    Returns a *copy* of the board with that move applied.
    """
    new_board = board.copy()
    for row in range(ROWS-1, -1, -1):
        if new_board[row, col] == 0:
            new_board[row, col] = player
            break
    return new_board

def check_winner(board):
    """
    Returns +1 if +1 has 4 in a row,
            -1 if -1 has 4 in a row,
             0 otherwise (no winner).
    """
    # Horizontal
    for r in range(ROWS):
        for c in range(COLS - 3):
            s = np.sum(board[r, c:c+4])
            if s == 4:
                return +1
            if s == -4:
                return -1
    # Vertical
    for r in range(ROWS - 3):
        for c in range(COLS):
            s = np.sum(board[r:r+4, c])
            if s == 4:
                return +1
            if s == -4:
                return -1
    # Diagonals
    for r in range(ROWS - 3):
        for c in range(COLS - 3):
            diag1 = sum(board[r+i, c+i] for i in range(4))
            diag2 = sum(board[r+i, c+3-i] for i in range(4))
            if diag1 == 4 or diag2 == 4:
                return +1
            if diag1 == -4 or diag2 == -4:
                return -1
    return 0  # no winner

def board_is_full(board):
    """Return True if the top row has no zeros => board is full."""
    return not any(board[0, c] == 0 for c in range(COLS))

def next_player(p):
    """Toggle player from +1 to -1 (or vice versa)."""
    return -p

def display_board(board):
    """
    Print the Connect-4 board in a human-friendly format:
      - 'X' for +1
      - 'O' for -1
      - ' ' (blank) for 0
    """
    clear_output(wait=True)
    horizontal_line = '-' * (7*5 + 8)
    print('   0     1     2     3     4     5     6')
    print(horizontal_line)
    for row in range(ROWS):
        row_str = '|'
        for col in range(COLS):
            if board[row, col] == 0:
                row_str += '  ' + ' ' + '  |'
            elif board[row, col] == 1:
                row_str += '  X  |'
            else:
                row_str += '  O  |'
        print(row_str)
        print(horizontal_line)

2) MCTS IMPLEMENTATION (REAL MONTE CARLO TREE SEARCH)

In [3]:
class Node:
    """
    A node in the MCTS tree.
    - board: current board state
    - player: which player (+1 or -1) moves at this node
    - children: dict of {move: Node} for expanded children
    - untried_moves: moves not expanded yet
    - visits: how many simulations visited this node
    - wins: how many times this node led to a 'win' for self.player
    """
    def __init__(self, board, player, parent=None):
        self.board = board
        self.player = player
        self.parent = parent
        self.children = {}         # move -> child Node
        self.untried_moves = get_legal_moves(board)
        self.visits = 0
        self.wins = 0

    def ucb1(self, child, c_param=2.0):
        """UCB1 = Q + c * sqrt( ln(N) / n )."""
        if child.visits == 0:
            return float('inf')
        # win-rate from self.player's perspective
        win_rate = child.wins / child.visits
        return win_rate + c_param * np.sqrt(np.log(self.visits) / child.visits)

    def best_child(self, c_param=2.0):
        """Select child with highest UCB1."""
        return max(self.children.values(), key=lambda c: self.ucb1(c, c_param))

    def expand(self):
        """Expand exactly one untried move, creating a child node."""
        move = self.untried_moves.pop()
        child_board = place_piece(self.board, self.player, move)
        child_node = Node(child_board, next_player(self.player), parent=self)
        self.children[move] = child_node
        return child_node, move

    def update(self, result):
        """
        result = +1, -1, or 0 from the final outcome (rollout).
        If result == self.player => "win" for this node.
        If result == 0 => treat as half-win (optional).
        """
        self.visits += 1
        if result == self.player:
            self.wins += 1
        elif result == 0:
            # treat draw as half-win (optional)
            self.wins += 0.5

def rollout(board, player):
    """
    Simulate until game ends, playing random moves.
    Return +1, -1, or 0 based on who wins or if it's a draw.
    """
    current_board = board.copy()
    current_player = player
    while True:
        winner = check_winner(current_board)
        if winner != 0:
            return winner
        if board_is_full(current_board):
            return 0  # draw
        legal_moves = get_legal_moves(current_board)
        move = random.choice(legal_moves)
        current_board = place_piece(current_board, current_player, move)
        current_player = next_player(current_player)

def backpropagate(node, result):
    """
    Propagate the 'result' of a rollout back up the tree.
    """
    while node is not None:
        node.update(result)
        node = node.parent

def mcts_search(root, iterations=1000):
    """
    Perform MCTS for 'iterations' simulations from 'root'.
    Return the best move (child) from the root, by highest visit count.
    """
    for _ in range(iterations):
        node = root
        # Selection
        while not node.untried_moves and node.children:
            node = node.best_child()

        # Expansion
        if check_winner(node.board) == 0 and not board_is_full(node.board):
            if node.untried_moves:
                node, _ = node.expand()

        # Rollout
        result = rollout(node.board, node.player)

        # Backprop
        backpropagate(node, result)

    # best child is the one with the most visits
    move, best_child = max(root.children.items(), key=lambda item: item[1].visits)
    return move

class MCTS:
    """
    A real MCTS agent that picks the best move for the current board,
    figuring out which player's turn it is by counting +1 vs -1.
    """
    def __init__(self, iterations=1000):
        self.iterations = iterations

    def best_move(self, board):
        """
        Return a move in [0..6], or None if no moves.
        The current player is inferred from the board: whichever is behind in piece count.
        """
        plus_count = np.count_nonzero(board == 1)
        minus_count = np.count_nonzero(board == -1)
        if plus_count <= minus_count:
            current_player = +1
        else:
            current_player = -1

        # If there's a winner or board is full, no move
        if check_winner(board) != 0 or board_is_full(board):
            return None

        # Build the root node, run MCTS
        root = Node(board.copy(), current_player)
        best_col = mcts_search(root, iterations=self.iterations)
        return best_col

3) DATASET GENERATION: MCTS + RANDOM OPENINGS, THEN SAVE (X, y)

In [4]:
def generate_mcts_dataset(num_games=100000, skill_range=(20, 200), random_openings=True, random_openings_range=(0,10)):
    """
    Generate a dataset of (board -> best move), using MCTS with random skill levels.
      - If 'random_openings' is True, we do 0..3 random moves at the start.
    Returns a dict: { json(board) : best_move }
    """
    dataset = defaultdict(list)

    for _ in range(num_games):
        # random skill for this game
        skill_level = random.randint(*skill_range)
        agent = MCTS(iterations=skill_level)

        board = np.zeros((ROWS, COLS), dtype=int)
        player = +1
        game_over = False
        move_history = []

        # Random opening
        if random_openings:
            n_rand = random.randint(*random_openings_range)
            for _r in range(n_rand):
                legal = get_legal_moves(board)
                if not legal:
                    game_over = True
                    break
                col = random.choice(legal)
                board = place_piece(board, player, col)
                player = next_player(player)

        # Now use MCTS
        while not game_over:
            if board_is_full(board):
                game_over = True
                break

            move = agent.best_move(board)
            if move is None:
                # no valid moves or game is won
                game_over = True
                break

            # record the (board -> move)
            move_history.append((board.copy(), move))

            # place the piece
            board = place_piece(board, player, move)

            # check if this caused a winner
            if check_winner(board) != 0:
                game_over = True

            player = next_player(player)

        # Once game is done, add states to dataset
        for state, mv in move_history:
            board_str = json.dumps(state.tolist())
            dataset[board_str].append(mv)

    # For each board, pick the most common recommended move
    final_dataset = {}
    for k, moves in dataset.items():
        best_mv = max(set(moves), key=moves.count)
        final_dataset[k] = best_mv
    return final_dataset

def decode_board(board_str):
    """
    Convert board_str (JSON) back into a numpy array of shape (6,7).
    """
    arr_list = json.loads(board_str)
    return np.array(arr_list, dtype=int)

def encode_board_option_b(board_6x7):
    """
    Encode board as (6,7,2):
      channel 0 => +1
      channel 1 => -1
    """
    encoded = np.zeros((6, 7, 2), dtype=np.float32)
    plus_mask = (board_6x7 == 1)
    minus_mask = (board_6x7 == -1)
    encoded[plus_mask, 0] = 1.0
    encoded[minus_mask, 1] = 1.0
    return encoded

def build_np_arrays(dataset_dict):
    """
    Convert { json(board): best_move } to X, y arrays:
      X: shape (N,6,7,2), y: shape (N,)
    """
    X_list = []
    y_list = []
    for board_str, best_move in dataset_dict.items():
        board_6x7 = decode_board(board_str)
        x_encoded = encode_board_option_b(board_6x7)
        X_list.append(x_encoded)
        y_list.append(best_move)
    return np.array(X_list), np.array(y_list, dtype=np.int32)

4) MAIN EXAMPLE

In [None]:
if __name__ == "__main__":
    # Example: generate 100k games of data, random skill 20..50, random openings 0..3
    print("Generating MCTS-based dataset. This could take a while for large num_games...")
    dataset = generate_mcts_dataset(
        num_games=100, 
        skill_range=(3000, 5000), 
        random_openings=True, 
        random_openings_range=(4,15)
    )
    print(f"Collected {len(dataset)} unique board states.")

    # Convert to (X, y)
    X, y = build_np_arrays(dataset)
    print(f"X shape = {X.shape}, y shape = {y.shape}")

    # Optionally: save to .npy for later training
    np.save("connect4.npy", X)
    np.save("connect4.npy", y)

    # Quick demonstration of display_board
    # (Just picking a random state for illustration)
    sample_idx = random.randint(0, len(X)-1)
    sample_board = X[sample_idx]
    # "Decode" from (6,7,2) back to numeric
    plus_mask = (sample_board[:,:,0] == 1)
    minus_mask = (sample_board[:,:,1] == 1)
    numeric_board = np.zeros((6,7), dtype=int)
    numeric_board[plus_mask] = +1
    numeric_board[minus_mask] = -1

    print("\nRandom sample board from dataset (numeric):")
    print(numeric_board)
    print("\nASCII view:")
    display_board(numeric_board)

    print("\nDone.")

Generating MCTS-based dataset. This could take a while for large num_games...
