# Define Environment

In [17]:
from typing import Dict
import numpy as np
import gym
from gym import spaces
import random

class DotsAndBoxesEnv(gym.Env):
    def __init__(self, grid_size=(5, 5)):  
        super(DotsAndBoxesEnv, self).__init__()

        self.grid_size = grid_size
        self.state = self.create_all_lines(grid_size)  # Dict[str, bool] -> key, isClicked -> key format: h-0-1
        self.done = False
        self.current_player = 1  # Player 1 starts

        # ✅ Action Space (total lines)
        self.action_space = spaces.Discrete(len(self.state))

        # ✅ Observation Space (binary representation of lines)
        self.observation_space = spaces.Box(low=0, high=1, shape=(len(self.state),), dtype=np.int8)

    def create_all_lines(self, size) -> Dict[str, bool]:
        lines = {}
        for i in range(size[0] + 1):  
            for j in range(size[1] + 1):
                if j < size[1]:
                    lines[f"v-{i}-{j}"] = False
                if i < size[0]:
                    lines[f"h-{i}-{j}"] = False
        return lines

    def step(self, action: int):
        if self.done:
            return self._get_observation(), 0, self.done, {}

        selected_line_key = self.get_action_key(action)

        if self.state[selected_line_key]:  # Line already clicked
            return self._get_observation(), -10, self.done, {}

        self.state[selected_line_key] = True  # Mark line as clicked

        reward = self.evaluate_board(selected_line_key)
        self.done = self.check_game_over()

        # Switch player ONLY IF no box was completed
        if reward == 0:
            self.current_player = 1 if self.current_player == 2 else 2
            reward = 0.1

        return self._get_observation(), reward, self.done, {}
    
    def get_action_key(self, action: int):
        all_lines = sorted(self.state.keys())
        return all_lines[action]

    def evaluate_board(self, last_move: str):
        """
        Check if the last move completed any box.
        Reward = +1 if at least one box was completed, else 0.
        """
        reward = 0
        parts = last_move.split("-")
        orientation, x, y = parts[0], int(parts[1]), int(parts[2])

        # Possible boxes affected by the last move
        affected_boxes = []

        if orientation == "h":  # Horizontal line
            if x > 0:  # Upper box
                affected_boxes.append((x - 1, y))
            if x < self.grid_size[0]:  # Lower box
                affected_boxes.append((x, y))
        elif orientation == "v":  # Vertical line
            if y > 0:  # Left box
                affected_boxes.append((x, y - 1))
            if y < self.grid_size[1]:  # Right box
                affected_boxes.append((x, y))

        # Check if any affected box is now completed
        for i, j in affected_boxes:
            if self.is_box_completed(i, j):
                reward = reward + 1  # A box was completed

        return reward

    def is_box_completed(self, i, j):
        """Check if a box at (i, j) is completed by verifying its four edges."""

        top = f"h-{i}-{j}"
        bottom = f"h-{i+1}-{j}"
        left = f"v-{i}-{j}"
        right = f"v-{i}-{j+1}"

        # Ensure all keys exist in self.state before accessing
        return (
            self.state.get(top, False) and
            self.state.get(bottom, False) and
            self.state.get(left, False) and
            self.state.get(right, False)
        )


    def check_game_over(self):
        return all(self.state.values())

    def reset(self):
        self.state = self.create_all_lines(self.grid_size)
        self.done = False
        self.current_player = 1  # Player 1 starts
        return self._get_observation()

    def _get_observation(self):
        return np.array(list(self.state.values()), dtype=np.int8)
    
    def valid_action(self):
        all_lines = sorted(self.state.keys())
        valid_moves = []
        for i in range(len(all_lines)):
            if self.state[all_lines[i]] == 0:
                valid_moves.append(i)

        return random.choice(valid_moves)


In [10]:
env = DotsAndBoxesEnv()
env.observation_space.sample()
env.state

{'v-0-0': False,
 'h-0-0': False,
 'v-0-1': False,
 'h-0-1': False,
 'v-0-2': False,
 'h-0-2': False,
 'v-0-3': False,
 'h-0-3': False,
 'v-0-4': False,
 'h-0-4': False,
 'h-0-5': False,
 'v-1-0': False,
 'h-1-0': False,
 'v-1-1': False,
 'h-1-1': False,
 'v-1-2': False,
 'h-1-2': False,
 'v-1-3': False,
 'h-1-3': False,
 'v-1-4': False,
 'h-1-4': False,
 'h-1-5': False,
 'v-2-0': False,
 'h-2-0': False,
 'v-2-1': False,
 'h-2-1': False,
 'v-2-2': False,
 'h-2-2': False,
 'v-2-3': False,
 'h-2-3': False,
 'v-2-4': False,
 'h-2-4': False,
 'h-2-5': False,
 'v-3-0': False,
 'h-3-0': False,
 'v-3-1': False,
 'h-3-1': False,
 'v-3-2': False,
 'h-3-2': False,
 'v-3-3': False,
 'h-3-3': False,
 'v-3-4': False,
 'h-3-4': False,
 'h-3-5': False,
 'v-4-0': False,
 'h-4-0': False,
 'v-4-1': False,
 'h-4-1': False,
 'v-4-2': False,
 'h-4-2': False,
 'v-4-3': False,
 'h-4-3': False,
 'v-4-4': False,
 'h-4-4': False,
 'h-4-5': False,
 'v-5-0': False,
 'v-5-1': False,
 'v-5-2': False,
 'v-5-3': Fals

In [18]:
episodes = 10
for episode in range(1, episodes + 1):
    state = env.reset()
    done = False
    score = 0
    while not done:
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action=action)
        score += reward
        print(env.get_action_key(action=action), score, env.current_player)
        # Extract key-value pairs where val is True
        # changed_state = [(key, val) for key, val in env.state.items() if val is True]
        # print(changed_state)
    
    print('episode = {}, score = {}'.format(episode, score))

h-4-4 0.1 2
v-2-0 0.2 1
v-0-1 0.30000000000000004 2
h-3-5 0.4 1
h-4-0 0.5 2
h-2-0 0.6 1
v-2-4 0.7 2
v-1-2 0.7999999999999999 1
h-0-1 0.8999999999999999 2
h-0-4 0.9999999999999999 1
h-1-5 1.0999999999999999 2
v-2-1 1.2 1
h-1-2 1.3 2
v-2-1 -8.7 2
h-4-2 -8.6 1
v-2-2 -8.5 2
v-3-2 -8.4 1
v-0-1 -18.4 1
h-2-3 -18.299999999999997 2
h-0-3 -18.199999999999996 1
h-3-1 -18.099999999999994 2
h-4-1 -17.999999999999993 1
v-2-2 -27.999999999999993 1
h-4-0 -37.99999999999999 1
v-4-1 -37.89999999999999 2
v-3-4 -37.79999999999999 1
h-0-5 -37.69999999999999 2
v-0-1 -47.69999999999999 2
h-3-0 -46.69999999999999 2
v-4-2 -46.59999999999999 1
v-2-2 -56.59999999999999 1
v-3-0 -56.499999999999986 2
v-2-0 -66.49999999999999 2
h-2-5 -66.39999999999999 1
h-4-5 -66.3 2
v-0-1 -76.3 2
h-2-1 -75.3 2
v-3-2 -85.3 2
h-0-3 -95.3 2
v-0-3 -95.2 1
v-4-2 -105.2 1
v-2-4 -115.2 1
v-2-4 -125.2 1
h-1-0 -125.10000000000001 2
v-5-2 -125.00000000000001 1
v-1-3 -124.90000000000002 2
h-3-3 -124.80000000000003 1
h-2-3 -134.8 1
h-4-0 -1

# Train an RL Agent

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
import collections
import gym

# Define the Q-network
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 256)
        self.fc4 = nn.Linear(256, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)  # Q-values for each action

class DQNAgent:
    def __init__(self, env, lr=0.001, gamma=0.99, epsilon=1.0, epsilon_min=0.1, epsilon_decay=0.999, batch_size=32, memory_size=10000):
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size

        self.memory = collections.deque(maxlen=memory_size)  # Experience Replay Buffer

        self.model = DQN(input_dim=env.observation_space.shape[0], output_dim=env.action_space.n)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()

    def get_action(self, state):
        # if random.random() < self.epsilon:
        #     return self.env.action_space.sample()  # Explore
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        print("state = ", state)
        with torch.no_grad():
            q_values = self.model(state)
        print("qvalues = ", q_values)
        return torch.argmax(q_values).item()  # Exploit
    
    def get_random_valid_action(self):
        return self.env.valid_action()

    def store_experience(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def train_step(self):
        if len(self.memory) < self.batch_size:
            return  # Skip training if not enough samples

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.tensor(np.array(states), dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(1)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32)
        dones = torch.tensor(dones, dtype=torch.float32)

        # Compute Q-values
        q_values = self.model(states).gather(1, actions).squeeze()

        # Compute target Q-values
        with torch.no_grad():
            next_q_values = self.model(next_states).max(1)[0]
            target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        # Compute loss
        loss = self.loss_fn(q_values, target_q_values)

        # Backpropagation
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, num_episodes=1000):
        for episode in range(num_episodes):
            state = self.env.reset()
            total_reward = 0
            done = False
            while not done:
                # If it's AI's turn (Player 1)
                if self.env.current_player == 2:
                    action = self.get_action(state)  # AI chooses action
                    next_state, reward, done, _ = self.env.step(action)  # Take action
                    self.store_experience(state, action, reward, next_state, done)
                    self.train_step()  # Update model
                    state = next_state
                    total_reward -= reward

                # If it's opponent's turn (Player 2)
                elif self.env.current_player == 1:
                    # Here, the opponent plays randomly
                    action = self.get_random_valid_action()  # Opponent chooses random action
                    next_state, reward, done, _ = self.env.step(action)  # Take action
                    state = next_state
                    total_reward += reward

            # Decay epsilon for exploration-exploitation trade-off
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

            print(f"Episode {episode + 1}: Total Reward = {total_reward}")
    
    def get_sorted_actions_with_scores(self, state):
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)  # Add batch dimension
        with torch.no_grad():
            q_values = self.model(state_tensor).squeeze().numpy()  # Get Q-values for each action

        # Create a list of (action, q_value) pairs
        actions_with_scores = [(i, q_value) for i, q_value in enumerate(q_values)]

        # Sort the actions based on their Q-values in descending order (better actions first)
        sorted_actions = sorted(actions_with_scores, key=lambda x: x[1], reverse=True)

        return sorted_actions


# Create Environment
# env = DotsAndBoxesEnv(grid_size=(5, 5))

# # Train the Agent
# agent = DQNAgent(env)
# agent.train(num_episodes=500)



In [36]:
env = DotsAndBoxesEnv(grid_size=(5, 5))
agent = DQNAgent(env)

In [44]:
state = env.reset()
print(state)
action = agent.get_action(state)
print(action)

# state[action] = 1
# print(state)
# action = agent.get_action(state)
# print(action)

[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
state =  tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])
81


# Evaluate the Trained Model

In [20]:
def evaluate_agent(agent, env, num_episodes=10):
    total_rewards = []
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0
        while not done:
            # Get the action from the trained agent
            action = agent.get_action(state)
            next_state, reward, done, _ = env.step(action)  # Take action in the environment
            state = next_state
            total_reward += reward
        
        total_rewards.append(total_reward)
        print(f"Episode {episode + 1}: Total Reward = {total_reward}")

    average_reward = np.mean(total_rewards)
    print(f"Average Reward over {num_episodes} episodes: {average_reward}")

# Evaluate the trained agent
evaluate_agent(agent, env, num_episodes=10)


Episode 1: Total Reward = -4569.3
Episode 2: Total Reward = -4209.4
Episode 3: Total Reward = -4989.200000000001
Episode 4: Total Reward = -3659.4
Episode 5: Total Reward = -3079.2000000000003
Episode 6: Total Reward = -7039.2
Episode 7: Total Reward = -2899.1
Episode 8: Total Reward = -2859.4
Episode 9: Total Reward = -3629.4
Episode 10: Total Reward = -3049.3
Average Reward over 10 episodes: -3998.290000000001


In [21]:
# Save the trained model
torch.save(agent.model.state_dict(), "dots_and_boxes_dqn_model2.pth")
print("Model saved successfully!")


Model saved successfully!


In [22]:
# Load the trained model
model = DQN(input_dim=env.observation_space.shape[0], output_dim=env.action_space.n)
model.load_state_dict(torch.load("dots_and_boxes_dqn_model2.pth"))
model.eval()  # Set the model to evaluation mode
print("Model loaded successfully!")


Model loaded successfully!


In [24]:
# Assume you already have a trained agent
state = env.reset()  # Get the initial state
sorted_actions = agent.get_sorted_actions_with_scores(state)

# Print the sorted actions with their scores
for action, score in sorted_actions:
    print(f"Action: {action}, Score: {score}")

# Find the action with the maximum score
max_action, max_score = max(sorted_actions, key=lambda x: x[1])
print(f"Action with max score: {max_action}, Score: {max_score}")


Action: 11, Score: 1001237.6875
Action: 40, Score: 999492.125
Action: 44, Score: 998613.0625
Action: 6, Score: 998537.25
Action: 52, Score: 997616.8125
Action: 30, Score: 997228.5
Action: 33, Score: 997118.75
Action: 15, Score: 997057.9375
Action: 54, Score: 997019.6875
Action: 20, Score: 996153.875
Action: 1, Score: 996133.6875
Action: 32, Score: 995064.5625
Action: 17, Score: 994632.875
Action: 53, Score: 994590.125
Action: 28, Score: 993549.625
Action: 23, Score: 993030.375
Action: 13, Score: 992057.875
Action: 29, Score: 989092.0
Action: 57, Score: 988798.25
Action: 26, Score: 988401.375
Action: 39, Score: 987200.3125
Action: 4, Score: 986649.875
Action: 50, Score: 985856.125
Action: 35, Score: 985390.25
Action: 19, Score: 985000.0
Action: 27, Score: 984382.4375
Action: 25, Score: 982056.5
Action: 41, Score: 981274.875
Action: 43, Score: 980140.25
Action: 31, Score: 979812.875
Action: 21, Score: 979544.875
Action: 7, Score: 978875.25
Action: 16, Score: 978116.1875
Action: 18, Score