In [3]:
# # Connect 4 Training with Reinforcement Learning

# ## 1. Set Up the Environment

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# Constants for the game
ROWS, COLS = 6, 7

# Function to initialize the board
def create_board():
    return np.zeros((ROWS, COLS), dtype=int)

# Function to drop a piece in the board
def drop_piece(board, row, col, piece):
    board[row][col] = piece

# Function to check if column is valid
def is_valid_location(board, col):
    return board[0][col] == 0

# Function to get next open row in column
def get_next_open_row(board, col):
    for r in range(ROWS - 1, -1, -1):
        if board[r][col] == 0:
            return r

# Function to check if a player has won
def check_win(board, piece):
    # Check horizontal locations for win
    for c in range(COLS - 3):
        for r in range(ROWS):
            if board[r][c] == piece and board[r][c + 1] == piece and board[r][c + 2] == piece and board[r][c + 3] == piece:
                return True

    # Check vertical locations for win
    for c in range(COLS):
        for r in range(ROWS - 3):
            if board[r][c] == piece and board[r + 1][c] == piece and board[r + 2][c] == piece and board[r + 3][c] == piece:
                return True

    # Check positively sloped diagonals
    for c in range(COLS - 3):
        for r in range(ROWS - 3):
            if board[r][c] == piece and board[r + 1][c + 1] == piece and board[r + 2][c + 2] == piece and board[r + 3][c + 3] == piece:
                return True

    # Check negatively sloped diagonals
    for c in range(COLS - 3):
        for r in range(3, ROWS):
            if board[r][c] == piece and board[r - 1][c + 1] == piece and board[r - 2][c + 2] == piece and board[r - 3][c + 3] == piece:
                return True

    return False

# ## 2. Define the Neural Network Model

class Connect4Model(nn.Module):
    def __init__(self):
        super(Connect4Model, self).__init__()
        self.layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(ROWS * COLS, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, COLS)
        )

    def forward(self, x):
        return self.layers(x)

# ## 3. Define the Training Parameters and Hyperparameters

# Hyperparameters
learning_rate = 0.001
gamma = 0.99  # Discount rate
epsilon = 1.0  # Exploration rate
epsilon_min = 0.1
epsilon_decay = 0.995

# Initialize model, optimizer, and loss function
model = Connect4Model()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

# Initialize replay memory
replay_memory = deque(maxlen=2000)

# ## 4. Define Game Functions and Training Loop

def choose_action(state, epsilon):
    if np.random.rand() <= epsilon:
        return random.choice([c for c in range(COLS) if is_valid_location(state, c)])
    else:
        state_tensor = torch.tensor(state.flatten(), dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            action_values = model(state_tensor)
        return torch.argmax(action_values).item()

def reward_function(board, player_id):
    # Custom reward function for winning, losing, or tying
    if check_win(board, player_id):
        return 1
    elif check_win(board, 3 - player_id):
        return -1
    return 0  # No win/lose condition met

# Training loop
episodes = 1000
for e in range(episodes):
    board = create_board()
    game_over = False
    state = board.copy()
    player_id = 1  # Alternate between players for each episode
    
    while not game_over:
        action = choose_action(state, epsilon)
        
        if is_valid_location(state, action):
            row = get_next_open_row(state, action)
            drop_piece(state, row, action, player_id)
            reward = reward_function(state, player_id)
            
            next_state = state.copy()
            replay_memory.append((state.flatten(), action, reward, next_state.flatten(), game_over))
            
            # Train the model
            if len(replay_memory) > 32:
                minibatch = random.sample(replay_memory, 32)
                for s, a, r, s_next, done in minibatch:
                    target = r + (gamma * torch.max(model(torch.tensor(s_next, dtype=torch.float32).unsqueeze(0))) if not done else 0)
                    prediction = model(torch.tensor(s, dtype=torch.float32).unsqueeze(0))[0][a]
                    
                    loss = criterion(prediction.unsqueeze(0), target.unsqueeze(0))
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

            # Update the state and check for game over
            state = next_state
            game_over = reward != 0 or not any(is_valid_location(state, c) for c in range(COLS))
            player_id = 3 - player_id  # Switch players

        if epsilon > epsilon_min:
            epsilon *= epsilon_decay

    print(f"Episode {e+1}/{episodes}, Reward: {reward}, Epsilon: {epsilon:.2f}")

# ## 5. Save the Model

torch.save(model.state_dict(), "connect4_trained_weights.pth")
print("Model saved successfully!")

Episode 1/1000, Reward: 1, Epsilon: 0.93
Episode 2/1000, Reward: 1, Epsilon: 0.86
Episode 3/1000, Reward: 1, Epsilon: 0.76
Episode 4/1000, Reward: 1, Epsilon: 0.66
Episode 5/1000, Reward: 1, Epsilon: 0.63
Episode 6/1000, Reward: 1, Epsilon: 0.56
Episode 7/1000, Reward: 1, Epsilon: 0.47
Episode 8/1000, Reward: 1, Epsilon: 0.43
Episode 9/1000, Reward: 1, Epsilon: 0.35
Episode 10/1000, Reward: 1, Epsilon: 0.31
Episode 11/1000, Reward: 1, Epsilon: 0.25
Episode 12/1000, Reward: 1, Epsilon: 0.17
Episode 13/1000, Reward: 1, Epsilon: 0.13
Episode 14/1000, Reward: 1, Epsilon: 0.10
Episode 15/1000, Reward: 1, Epsilon: 0.10
Episode 16/1000, Reward: 1, Epsilon: 0.10
Episode 17/1000, Reward: 1, Epsilon: 0.10
Episode 18/1000, Reward: 1, Epsilon: 0.10
Episode 19/1000, Reward: 1, Epsilon: 0.10
Episode 20/1000, Reward: 1, Epsilon: 0.10
Episode 21/1000, Reward: 1, Epsilon: 0.10
Episode 22/1000, Reward: 1, Epsilon: 0.10
Episode 23/1000, Reward: 1, Epsilon: 0.10
Episode 24/1000, Reward: 1, Epsilon: 0.10
E

In [14]:
model.load_state_dict(torch.load("connect4_trained_weights.pth"))
#print("--- Submit the OrderedDict below ---")
torch.set_printoptions(threshold=10_000)
print(model.state_dict())
"""
state_dict = OrderedDict([]) # paste the output in
"""
""" YOUR CODE HERE """
state_dict = model.state_dict()
""" YOUR CODE END HERE """

OrderedDict([('layers.1.weight', tensor([[-1.4157e-01, -2.0424e-02,  1.4510e-01,  1.2621e-01,  7.3770e-02,
          5.1647e-02, -5.9633e-03, -1.8117e-01, -1.4857e-01, -4.6359e-02,
          1.0993e-01, -4.9965e-02, -1.0809e-02,  1.6246e-01, -1.6051e-01,
          1.3988e-01, -8.8898e-02, -4.3457e-02, -2.1746e-01,  8.4738e-02,
          1.4026e-01, -1.8332e-02,  2.3908e-03, -9.2936e-02, -8.5863e-02,
         -3.0109e-02, -1.7884e-01,  1.3226e-02,  9.6562e-03,  1.5983e-01,
         -1.0600e-01,  3.0227e-02, -1.3290e-01, -1.2563e-01, -3.1637e-03,
         -2.0414e-01,  2.4525e-03, -6.7698e-02, -2.3050e-02, -1.7725e-01,
          1.0519e-01, -7.0640e-02],
        [-7.9523e-02,  8.3804e-02,  7.0394e-02,  1.8760e-02,  8.5731e-02,
          2.1188e-02,  3.5824e-02, -1.3178e-01, -4.5260e-02, -2.1380e-02,
          2.8006e-03, -3.9092e-02, -6.7942e-02, -9.7157e-02,  1.5038e-01,
          5.4404e-02,  7.0495e-02,  2.4026e-02,  4.8486e-02,  1.6506e-02,
         -6.5509e-02, -1.2692e-01, -1.1927e

' YOUR CODE END HERE '

In [None]:
import numpy as np
import random
import math
import game_utils
import copy
import torch
import torch.nn as nn
import torch.optim as optim

class AIAgent(object):
    def __init__(self, player_id=1):
        self.player_id = player_id
        self.opponent_id = 1 if player_id == 2 else 2
        self.max_depth = 3
        self.model = self.create_model()
        self.load_weights()

    def create_model(self):
        model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(6 * 7, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
        self.optimizer = optim.Adam(model.parameters(), lr=0.001)
        # Assuming `state_dict` is an OrderedDict containing the model weights
        state_dict = torch.load('path_to_trained_weights.pth')
        model.load_state_dict(state_dict)
        model.eval()
        return model

    def load_weights(self, state_dict):
        self.model.load_state_dict(state_dict)

    def preprocess_state(self, state):
        return np.array(state).reshape((1, 6, 7, 1))

    def make_move(self, state):        
        valid_moves = game_utils.get_valid_col_id(state)
        best_score = float('-inf')
        best_move = random.choice(valid_moves)

        for move in valid_moves:
            row = self.get_next_open_row(state, move)
            temp_state = copy.deepcopy(state)
            temp_state[row][move] = self.player_id
            temp_state_input = self.preprocess_state(temp_state)
            with torch.no_grad():
                score = self.model(temp_state_input).item()
            if score > best_score:
                best_score = score
                best_move = move
        return best_move

    def is_valid_location(self, state, col):
        return state[0][col] == 0

    def get_next_open_row(self, state, col):
        '''
        Find the next open row in the specified column.
        '''
        for r in range(len(state)-1, -1, -1):
            if state[r][col] == 0:
                return r

    def winning_move(self, state, piece):
        '''
        Check if the given piece has a winning sequence on the board.
        '''
        # Horizontal check
        for r in range(len(state)):
            for c in range(len(state[0]) - 3):
                if all(state[r][c + i] == piece for i in range(4)):
                    return True
        # Vertical check
        for r in range(len(state) - 3):
            for c in range(len(state[0])):
                if all(state[r + i][c] == piece for i in range(4)):
                    return True
        # Positive diagonal check
        for r in range(len(state) - 3):
            for c in range(len(state[0]) - 3):
                if all(state[r + i][c + i] == piece for i in range(4)):
                    return True
        # Negative diagonal check
        for r in range(3, len(state)):
            for c in range(len(state[0]) - 3):
                if all(state[r - i][c + i] == piece for i in range(4)):
                    return True
        return False

    def minimax(self, state, depth, alpha, beta, maximizingPlayer):
        '''
        Minimax algorithm with alpha-beta pruning to determine the best move for the agent based on the current game state.
        '''
        valid_locations = [c for c in range(len(state[0])) if self.is_valid_location(state, c)]
        is_terminal = self.winning_move(state, self.player_id) or self.winning_move(state, self.opponent_id) or not valid_locations
        if depth == 0 or is_terminal:
            if is_terminal:
                if self.winning_move(state, self.player_id):
                    return 100000000
                elif self.winning_move(state, self.opponent_id):
                    return -100000000
                else:
                    return 0
            else:
                return self.score_position(state, self.player_id)

        if maximizingPlayer:
            value = float('-inf')
            best_col = random.choice(valid_locations)
            for col in valid_locations:
                row = self.get_next_open_row(state, col)
                temp_state = copy.deepcopy(state)
                temp_state[row][col] = self.player_id
                new_score = self.minimax(temp_state, depth - 1, alpha, beta, False)
                if new_score > value:
                    value = new_score
                    best_col = col
                alpha = max(alpha, value)
                if alpha >= beta:
                    break
            return value
        else:
            value = float('inf')
            best_col = random.choice(valid_locations)
            for col in valid_locations:
                row = self.get_next_open_row(state, col)
                temp_state = copy.deepcopy(state)
                temp_state[row][col] = self.opponent_id
                new_score = self.minimax(temp_state, depth - 1, alpha, beta, True)
                if new_score < value:
                    value = new_score
                    best_col = col
                beta = min(beta, value)
                if alpha >= beta:
                    break
            return value 

    def score_position(self, state, piece):
        '''
        Evaluate the score of the board for the given piece and return the score.
        '''
        score = 0
        center_col = [int(i) for i in list(state[:, len(state[0]) // 2])]
        center_count = center_col.count(piece)
        score += center_count * 3
        for r in range(len(state)):
            row_array = [int(i) for i in list(state[r, :])]
            for c in range(len(row_array) - 3):
                window = row_array[c:c + 4]
                score += self.evaluate_window(window, piece)
        for c in range(len(state[0])):
            col_array = [int(i) for i in list(state[:, c])]
            for r in range(len(col_array) - 3):
                window = col_array[r:r + 4]
                score += self.evaluate_window(window, piece)
        for r in range(len(state) - 3):
            for c in range(len(state[0]) - 3):
                window = [state[r + i][c + i] for i in range(4)]
                score += self.evaluate_window(window, piece)
        for r in range(3, len(state)):
            for c in range(len(state[0]) - 3):
                window = [state[r - i][c + i] for i in range(4)]
                score += self.evaluate_window(window, piece)
        return score

    def evaluate_window(self, window, piece):
        '''
        Evaluate the window of 4 cells for the given piece and return the score.
        '''
        score = 0
        opp_piece = self.opponent_id if piece == self.player_id else self.player_id
        if window.count(piece) == 4:
            score += 100
        elif window.count(piece) == 3 and window.count(0) == 1:
            score += 5
        elif window.count(piece) == 2 and window.count(0) == 2:
            score += 2
        if window.count(opp_piece) == 3 and window.count(0) == 1:
            score -= 4
        return score