In [None]:
# Super global constant variables
BOARD_ROW = 6
BOARD_COL = 7

In [None]:
import numpy as np
import random

class Connect4Env:
    def __init__(self):
        self.rows = BOARD_ROW
        self.cols = BOARD_COL
        self.board = np.zeros((self.rows, self.cols), dtype = np.int8)
        self.current_player = 1  # Player 1 = 1, Player 2 = 2
        self.reward = {'win': 10, 'draw': 0, 'lose': -10}

    def reset(self):
        self.__init__()
        return self.board.flatten()

    def play(self, action):
        """Thực hiện một hành động"""
        if action not in self.valid_moves():
            return self.board.flatten(), -10, True, {}  # Kết thúc nếu chọn sai

        for row in reversed(range(self.rows)):  # Tìm hàng trống thấp nhất
            if self.board[row, action] == 0:
                self.board[row, action] = self.current_player
                reward, done = self.isWinningMove()
                self.current_player = 3 - self.current_player  # Đổi lượt
                return self.board.flatten(), reward, done, {}

        return self.board.flatten(), -10, True, {}  # Kết thúc nếu cột đầy
    
    """ Check if current state is ended after making a move"""
    def isWinningMove(self):
        def check_direction(r, c, dr, dc, player):
            count = 0
            for _ in range(4):
                if 0 <= r < self.rows and 0 <= c < self.cols and self.board[r, c] == player:
                    count += 1
                    r += dr
                    c += dc
                else:
                    break
            return count == 4

        for r in reversed(range(self.rows)):
            for c in range(self.cols):
                if self.board[r, c] != 0:
                    player = self.board[r, c]
                    if (check_direction(r, c, 1, 0, player) or  # Vertical
                            check_direction(r, c, 0, 1, player) or  # Horizontal
                            check_direction(r, c, 1, 1, player) or  # Diagonal /
                            check_direction(r, c, 1, -1, player)):  # Diagonal \
                        return (self.reward['win'], True)
                    
        # Check draw game
        if np.all(self.board != 0):
            return (self.reward['draw'], True)
        
        return (0, False) # have not done yet

    def valid_moves(self):
        """Trả về danh sách cột có thể đi"""
        return [c for c in range(self.cols) if self.board[0, c] == 0]
    
    def printBoard(self):
        for row in self.board:
            print(" ".join(["⚫" if x == 0 else "🚗" if x == 1 else "🚕" for x in row]))
        print(" 0  1   2  3   4  5   6")

board = Connect4Env()
state = board.reset()
print(state)
print(board.board)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(input_dim, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        return self.layer3(x)  # Không dùng softmax vì DQN tối ưu giá trị Q


In [None]:
import matplotlib.pyplot as plt
# epilson decay graph
epsilon_start = 1.0
epsilon_end = 0.1

max_episode = 50
episode = np.arange(max_episode)
epsilon_decay = np.log(epsilon_start/epsilon_end*100) / max_episode

eps = epsilon_end + (epsilon_start - epsilon_end) * np.exp(-episode * epsilon_decay)
plt.plot(episode, eps)
print(epsilon_decay)
print(eps)
plt.title('Epsilon decay graph')
plt.xlabel('Episode no.')
plt.ylabel('Epsilon')
plt.show()

In [None]:
""" Use Epsilon-greedy to find out the best way to make a move

    @Params:
    @Return: best column to play"""
def select_action(model, state, episode=None, training=True):
    """ Select action using epsilon-greedy with decay.
        Calculate epsilon based on episode if training, otherwise no exploration"""
    epsilon = epsilon_end + (epsilon_start - epsilon_end) * np.exp(-episode * epsilon_decay) if training else 0.0

    valid_moves = env.valid_moves()  # Lấy danh sách các cột hợp lệ

    if random.random() < epsilon:
        return np.random.choice(valid_moves)  # Chọn ngẫu nhiên trong các cột hợp lệ
    else:
        with torch.no_grad():
            q_values = model(torch.tensor(state, dtype=torch.float32).unsqueeze(0))
            return torch.argmax(q_values).item()


In [None]:
""" Let 2 models play against each other
    @Return: reward after play optimally"""
def play_game(dqn_player1, dqn_player2, env, epsilon):
    state = env.reset()
    done = False
    turn = 1  # 1 là Player 1, 2 là Player 2

    while not done:
        model = dqn_player1 if turn == 1 else dqn_player2
        action = select_action(model, state, epsilon)
        
        next_state, reward, done, _ = env.step(action)

        if done:
            return reward  # Trả về kết quả trận đấu

        state = next_state
        turn = 3 - turn  # Đổi lượt chơi


In [None]:
from collections import deque

"""@Params: """
def train_model(model, optimizer, memory, batch_size):
    if len(memory) < batch_size:
        return

    batch = random.sample(memory, batch_size)
    
    for state, action, reward, next_state, done in batch:
        q_values = model(torch.tensor(state, dtype=torch.float32))
        q_value = q_values[action]

        with torch.no_grad():
            next_q_values = model(torch.tensor(next_state, dtype=torch.float32))
            target_q_value = reward if done else reward + 0.99 * torch.max(next_q_values)

        loss = (q_value - target_q_value) ** 2

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


In [None]:
import time

env = Connect4Env()

input_dim = BOARD_ROW * BOARD_COL  # Size of board
output_dim = BOARD_COL  # Ouput action (column 0 -> n)

dqn_player1 = DQN(input_dim, output_dim)
dqn_player2 = DQN(input_dim, output_dim)

optimizer1 = optim.Adam(dqn_player1.parameters(), lr=0.001)
optimizer2 = optim.Adam(dqn_player2.parameters(), lr=0.001)

memory = deque(maxlen=10000)
batch_size = 64
num_episodes = 10
epsilon = 0.1
startTime = time.time()

for episode in range(num_episodes):
    state = env.reset()
    done = False
    turn = 1  # 1 là Player 1, 2 là Player 2

    while not done:
        model = dqn_player1 if turn == 1 else dqn_player2
        optimizer = optimizer1 if turn == 1 else optimizer2

        action = select_action(model, state, epsilon)
        next_state, reward, done, _ = env.play(action)

        memory.append((state, action, reward, next_state, done))

        # Huấn luyện mô hình
        train_model(model, optimizer, memory, batch_size)

        state = next_state
        turn = 3 - turn  # Đổi lượt chơi

    epsilon = epsilon_end + (epsilon_start - epsilon_end) * np.exp(-episode * epsilon_decay)

    if episode % 10 == 0:
        print(f"Episode {episode}: Reward {reward}, Epsilon {epsilon:.4f}")

endTime = time.time()
print(f"Trained {num_episodes} episodes after {endTime-startTime:.4f} seconds")
print("Training complete!")


In [None]:
def evaluate_model(model, env, num_games):
    total_rewards = 0

    for _ in range(num_games):
        state = env.reset()
        done = False
        turn = 1

        while not done:
            action = select_action(model, state, epsilon) if turn == 1 else np.random.choice(env.valid_moves())
            state, reward, done, _ = env.play(action)
            env.printBoard()
            if turn == 2:
                print(env.valid_moves())
            print(f"🧐 Chọn cột: {action}")
            turn = 3 - turn
        
        total_rewards += reward  # Cộng tổng điểm thưởng
        
    avg_reward = total_rewards / num_games
    print(f"📊 Model's Average Reward over {num_games} games: {avg_reward}")
    
    return avg_reward

In [None]:
torch.save(dqn_player1.state_dict(), "dqn_player1.pth")
torch.save(dqn_player2.state_dict(), "dqn_player2.pth")
print("✅ Model saved successfully!")


In [None]:
# import time

# env = Connect4Env()

# input_dim = BOARD_ROW * BOARD_COL  # Size of board
# output_dim = BOARD_COL  # Ouput action (column 0 -> n)

# dqn_player1 = DQN(input_dim, output_dim)
# dqn_player2 = DQN(input_dim, output_dim)

dqn_player1.load_state_dict(torch.load("dqn_player1.pth"))
dqn_player2.load_state_dict(torch.load("dqn_player2.pth"))
print("✅ Model loaded successfully!")

In [None]:
start_time = time.time()
evaluate_model(dqn_player1, env, num_games=100)
end_time = time.time()

print(f"⏳ Time taken: {end_time - start_time:.2f} seconds")
