<a href="https://colab.research.google.com/github/ktxdev/AIM-5001/blob/main/Copy_of_ConnectX_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install torch numpy kaggle-environments
from IPython.display import clear_output
clear_output()

In [4]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import defaultdict, deque
from kaggle_environments import evaluate, make, utils
import random
import math

clear_output()

In [12]:
class ConnectXNet(nn.Module):
    def __init__(self, rows=6, cols=7, action_size=7):
        super(ConnectXNet, self).__init__()
        self.rows, self.cols = rows, cols

        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)

        # Fully connected layers
        self.fc1 = nn.Linear(64 * rows * cols, 128)

        # Policy head (probabilities over actions)
        self.fc_policy = nn.Linear(128, action_size)

        # Value head (expected game outcome)
        self.fc_value = nn.Linear(128, 1)

    def forward(self, x):
        x = x.view(-1, 1, self.rows, self.cols)  # Shape: [batch, 1, 6, 7]
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = x.view(-1, 64 * self.rows * self.cols)
        x = torch.relu(self.fc1(x))

        policy = self.fc_policy(x)  # Raw logits
        value = torch.tanh(self.fc_value(x))  # Scaled to [-1, 1]

        return policy, value

    def predict(self, board):
        # Convert board to tensor and add batch dimension
        board_tensor = torch.FloatTensor(board).unsqueeze(0)
        with torch.no_grad():
            policy_logits, value = self.forward(board_tensor)

        # Mask invalid moves
        valid_moves = [col for col in range(board.shape[1]) if board[0][col] == 0]
        policy = torch.softmax(policy_logits, dim=1).squeeze().numpy()
        policy = np.zeros_like(policy)
        policy[valid_moves] = 1.0 / len(valid_moves)  # Uniform policy for valid moves

        return policy, value.item()

In [11]:
class Node:
    def __init__(self, prior=0, parent=None):
        self.parent = parent
        self.children = {}
        self.visits = 0
        self.total_value = 0.0
        self.prior = prior  # From NN policy

class MCTS:
    def __init__(self, model, c_puct=1.0):
        self.model = model
        self.c_puct = c_puct
        self.nodes = {}

    def is_terminal(self, board):
        # Check if the game is over (win/draw)
        return self.check_winner(board) != 0 or np.all(board != 0)

    def get_valid_moves(self, board):
        # Return columns where the top row is empty
        return [col for col in range(board.shape[1]) if board[0][col] == 0]

    def make_move(self, board, action, player=1):
        # Simulate a move in the column
        new_board = board.copy()
        for row in reversed(range(board.shape[0])):
            if new_board[row][action] == 0:
                new_board[row][action] = player
                break
        return new_board

    def check_winner(self, board, inarow=4):
        # Check horizontal, vertical, and diagonal wins
        rows, cols = board.shape
        for r in range(rows):
            for c in range(cols - inarow + 1):
                if np.all(board[r, c:c+inarow] == 1):
                    return 1
                elif np.all(board[r, c:c+inarow] == 2):
                    return 2

        for c in range(cols):
            for r in range(rows - inarow + 1):
                if np.all(board[r:r+inarow, c] == 1):
                    return 1
                elif np.all(board[r:r+inarow, c] == 2):
                    return 2

        for r in range(rows - inarow + 1):
            for c in range(cols - inarow + 1):
                if np.all(np.diag(board[r:r+inarow, c:c+inarow]) == 1):
                    return 1
                elif np.all(np.diag(board[r:r+inarow, c:c+inarow]) == 2):
                    return 2

                if np.all(np.diag(np.fliplr(board[r:r+inarow, c:c+inarow])) == 1):
                    return 1
                elif np.all(np.diag(np.fliplr(board[r:r+inarow, c:c+inarow])) == 2):
                    return 2
        return 0  # No winner

    def search(self, board, num_simulations=50):
        root = self.nodes.get(board.tobytes(), Node())

        for _ in range(num_simulations):
            node = root
            sim_board = board.copy()

            # Selection
            while node.children:
                action, node = self.select_child(node)
                sim_board = self.make_move(sim_board, action)

            # Expansion
            if not self.is_terminal(sim_board):
                policy, value = self.model.predict(sim_board)
                valid_moves = self.get_valid_moves(sim_board)
                for action in valid_moves:
                    if action not in node.children:
                        node.children[action] = Node(prior=policy[action], parent=node)

            # Backpropagation
            winner = self.check_winner(sim_board)
            value = 1 if winner == 1 else -1 if winner == 2 else 0
            while node:
                node.visits += 1
                node.total_value += value
                node = node.parent

        # Choose best action
        if not root.children:
            return random.choice(self.get_valid_moves(board))
        best_action = max(root.children.items(), key=lambda x: x[1].visits)[0]
        return best_action

    def select_child(self, node):
        total_visits = sum(child.visits for child in node.children.values())
        ucb_scores = {
            action: (child.total_value / (child.visits + 1e-6)) +
            self.c_puct * child.prior * math.sqrt(total_visits) / (child.visits + 1)
            for action, child in node.children.items()
        }
        best_action = max(ucb_scores.keys(), key=lambda a: ucb_scores[a])
        return best_action, node.children[best_action]

In [14]:
def self_play(model, num_games=100, num_simulations=50):
    training_data = []

    for _ in range(num_games):
        board = np.zeros((6, 7), dtype=int)
        game_history = []
        done = False

        while not done:
            mcts = MCTS(model)
            action = mcts.search(board, num_simulations)
            game_history.append((board.copy(), action))
            board = mcts.make_move(board, action)

            winner = mcts.check_winner(board)
            if winner != 0 or np.all(board != 0):
                done = True

        # Assign rewards
        for i, (state, action) in enumerate(game_history):
            reward = 1 if winner == 1 else -1 if winner == 2 else 0
            training_data.append((state, action, reward))

    return training_data

In [20]:
def train(model, num_games=100, num_simulations=50, epochs=10, batch_size=32):
    optimizer = optim.Adam(model.parameters())

    for epoch in range(epochs):
        # Generate new training data through self-play
        training_data = []
        for _ in range(num_games):
            board = np.zeros((6, 7), dtype=int)
            game_history = []
            done = False

            while not done:
                mcts = MCTS(model)
                action = mcts.search(board, num_simulations)
                game_history.append((board.copy(), action))
                board = mcts.make_move(board, action, player=1)

                # Opponent's move (random for simplicity)
                valid_moves = mcts.get_valid_moves(board)
                if valid_moves:
                    opp_action = random.choice(valid_moves)
                    board = mcts.make_move(board, opp_action, player=2)

                winner = mcts.check_winner(board)
                if winner != 0 or len(valid_moves) == 0:
                    done = True

            # Assign rewards
            for i, (state, action) in enumerate(game_history):
                reward = 1 if winner == 1 else -1 if winner == 2 else 0
                training_data.append((state, action, reward))

        # Train on collected data
        random.shuffle(training_data)
        for i in range(0, len(training_data), batch_size):
            batch = training_data[i:i+batch_size]
            states, actions, rewards = zip(*batch)

            states = torch.FloatTensor(np.array(states))
            actions = torch.LongTensor(actions)
            rewards = torch.FloatTensor(rewards)

            # Forward pass
            policy_logits, values = model(states)

            # Loss
            policy_loss = nn.CrossEntropyLoss()(policy_logits, actions)
            value_loss = nn.MSELoss()(values.squeeze(), rewards)
            loss = policy_loss + value_loss

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch}, Loss: {loss.item()}")

In [21]:
model = ConnectXNet()
# training_data = self_play(model, num_games=100)
train(model, num_games=100, epochs=10)
torch.save(model.state_dict(), "alphazero_connectx.pth")

Epoch 0, Loss: 1.6029398441314697
Epoch 1, Loss: 0.3377048969268799
Epoch 2, Loss: 1.4077250957489014
Epoch 3, Loss: 0.8541877269744873
Epoch 4, Loss: 1.4834400415420532
Epoch 5, Loss: 0.29875826835632324
Epoch 6, Loss: 0.992655873298645
Epoch 7, Loss: 0.9899258017539978
Epoch 8, Loss: 0.637650728225708
Epoch 9, Loss: 0.7171280980110168


In [24]:
model_state_dict = torch.load("alphazero_connectx.pth")
model_state_dict

OrderedDict([('conv1.weight',
              tensor([[[[ 0.0842, -0.1090, -0.0640],
                        [ 0.2493, -0.1721,  0.2155],
                        [-0.2594, -0.2080,  0.1645]]],
              
              
                      [[[-0.0157,  0.2929, -0.2065],
                        [-0.2107, -0.2899,  0.3240],
                        [-0.1476, -0.3428,  0.1855]]],
              
              
                      [[[-0.2952,  0.1080,  0.0553],
                        [-0.0260,  0.1976,  0.0232],
                        [ 0.0683, -0.0431,  0.2905]]],
              
              
                      [[[ 0.3628, -0.1380, -0.1927],
                        [-0.0091, -0.3586, -0.2887],
                        [-0.2348, -0.0058, -0.0912]]],
              
              
                      [[[ 0.0149, -0.0370,  0.3532],
                        [-0.0198, -0.1872,  0.2462],
                        [-0.1544,  0.2512,  0.2096]]],
              
              
               

In [23]:

import json

with open('weights.json', 'w') as file:
    json.dump(model_state_dict, file, indent=4)


TypeError: Object of type Tensor is not JSON serializable

In [None]:
def my_agent(observation, configuration):
    import numpy as np
    from collections import OrderedDict

    board = np.array(observation.board).reshape(6, 7)

    model = ConnectXNet()

    model_state_dict =
    model.load_state_dict(torch.load("alphazero_connectx.pth"))
    model.eval()

    mcts = MCTS(model)
    action = mcts.search(board, num_simulations=50)
    return int(action)