## IMPORTING MODULES

In [15]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation, patches

from env_config import ENV_CONFIG, RISK_PROFILES
from cognitive_minesweeper_env import CognitiveMinesweeperEnv

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F # For functions like one_hot, relu

import random
from collections import deque
import time

## RANDOM AGENT

In [None]:
COLOR_MAP = {
    0: (1, 1, 1),      # unrevealed
    1: (0.6, 1, 0.6),  # safe
    2: (1, 1, 0.4),    # low-risk
    3: (1, 0.4, 0.4),  # danger
    4: (0.4, 1, 1)     # flagged
}

def state_to_rgb(state, size=10):
    grid = state.reshape((size, size))
    return np.array([[COLOR_MAP[val] for val in row] for row in grid])

# Run environment and capture frames
env = CognitiveMinesweeperEnv()
state = env.reset()
done = False
frames = []
actions = []
step = 0

while not done:
    valid_indices = np.where(state == 0)[0]
    if len(valid_indices) == 0:
        break
    tile = np.random.choice(valid_indices)
    action_type = np.random.choice([0, 1])
    action = (tile, action_type)
    state, reward, done, _ = env.step(action)

    rgb = state_to_rgb(state)
    frames.append(rgb)
    actions.append((tile, action_type, reward))
    step += 1

# Setup figure
fig, ax = plt.subplots(figsize=(6, 6))
im = ax.imshow(frames[0], interpolation='nearest')
rect = patches.Rectangle((0, 0), 1, 1, edgecolor='red', facecolor='none', lw=2)
ax.add_patch(rect)
title = ax.set_title("")
ax.axis('off')

def update(frame_idx):
    im.set_data(frames[frame_idx])
    tile, action_type, reward = actions[frame_idx]
    r, c = divmod(tile, 10)
    rect.set_xy((c - 0.5, r - 0.5))
    title.set_text(f'Step {frame_idx}: {'Click' if action_type == 0 else 'Flag'} on ({r},{c}) - Reward: {reward}')

# Create animation
ani = animation.FuncAnimation(fig, update, frames=len(frames), interval=500)

# Save as GIF
ani.save("cognitive_minesweeper.gif", writer="pillow", fps=2)

# Optional: Save as MP4
# ani.save(\"cognitive_minesweeper.mp4\", writer=\"ffmpeg\", fps=2)

plt.show()

## SIMPLE DQN

In [None]:
# --- Main DQN Training Script (PyTorch Version) ---
# # --- Device Configuration ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# --- Hyperparameters ---
GAMMA = 0.99            # Discount factor
LEARNING_RATE = 0.001   # Learning rate for the optimizer
REPLAY_BUFFER_SIZE = 50000 # Max number of experiences to store (increased size)
BATCH_SIZE = 64         # Number of experiences to sample for each training step
EPSILON_START = 1.0     # Starting value of epsilon for exploration
EPSILON_END = 0.01      # Minimum value of epsilon
EPSILON_DECAY = 0.999   # Factor by which epsilon decays each step or episode (adjust as needed)
TARGET_UPDATE_FREQ = 500 # Frequency (in steps) to update target network
NUM_EPISODES = 5000     # Number of episodes to train for
MAX_STEPS_PER_EPISODE = ENV_CONFIG['max_steps'] # Get max steps from environment config

# Define grid size and action space size based on the environment
GRID_SIZE = ENV_CONFIG['grid_size']
STATE_SIZE = GRID_SIZE * GRID_SIZE # 100
NUM_ACTION_TYPES = 2
ACTION_SIZE = STATE_SIZE * NUM_ACTION_TYPES # 100 * 2 = 200

In [None]:
# --- Action Mapping Functions ---
def action_index_to_tuple(action_idx):
    tile_index = action_idx // NUM_ACTION_TYPES
    action_type = action_idx % NUM_ACTION_TYPES
    return (tile_index, action_type)

def action_tuple_to_index(tile_index, action_type):
    return tile_index * NUM_ACTION_TYPES + action_type

In [None]:
# --- Build the Q-Network (PyTorch) ---
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        # Define the layers
        self.fc1 = nn.Linear(state_size, 256) # Increased units
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, action_size)

    def forward(self, state):
        # Define the forward pass
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return self.fc3(x) # Linear output for Q-values

# --- Experience Replay Buffer ---
class ReplayBuffer:
    def __init__(self, capacity, device):
        self.buffer = deque(maxlen=capacity)
        self.device = device # Store device

    def store_transition(self, state, action, reward, next_state, done):
        # Store action as its index, ensure states are numpy arrays before converting to tensor
        self.buffer.append((np.array(state), action_tuple_to_index(*action), reward, np.array(next_state), done))

    def sample_batch(self, batch_size):
        if len(self.buffer) < batch_size:
            # Cannot sample if buffer is smaller than batch size
            return None

        batch = random.sample(self.buffer, batch_size)

        # Organize the batch data and convert to PyTorch tensors
        # Ensure correct data types (float32 for states/rewards/next_states/dones, long for actions)
        states = torch.from_numpy(np.array([exp[0] for exp in batch])).float().to(self.device)
        actions = torch.from_numpy(np.array([exp[1] for exp in batch])).long().to(self.device) # Actions should be LongTensor for indexing
        rewards = torch.from_numpy(np.array([exp[2] for exp in batch])).float().to(self.device)
        next_states = torch.from_numpy(np.array([exp[3] for exp in batch])).float().to(self.device)
        dones = torch.from_numpy(np.array([exp[4] for exp in batch])).float().to(self.device) # Use float for easier multiplication with (1-done)

        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

In [None]:
# --- Action Selection Function (Epsilon-Greedy with Valid Actions Masking) ---
def select_action(state, epsilon, env_instance, main_q_network, device):
    # Get indices of unrevealed tiles from the current state (value 0)
    current_env_state_flat = np.array(env_instance.state).flatten()
    valid_tile_indices = np.where(current_env_state_flat == 0)[0]

    # Determine all possible valid actions (reveal or flag for unrevealed tiles)
    valid_actions_indices = []
    for tile_idx in valid_tile_indices:
        valid_actions_indices.append(action_tuple_to_index(tile_idx, 0)) # Reveal (type 0)
        valid_actions_indices.append(action_tuple_to_index(tile_idx, 1)) # Flag (type 1)

    if not valid_actions_indices:
        return None # Signal no valid actions are available based on state 0

    # Epsilon-greedy choice
    if random.random() < epsilon:
        # Explore
        chosen_action_index = random.choice(valid_actions_indices)
    else:
        # Exploit
        state_tensor = torch.from_numpy(np.array(state)).float().unsqueeze(0).to(device)
        main_q_network.eval()
        with torch.no_grad():
            q_values = main_q_network(state_tensor)

        main_q_network.train()

        valid_mask = torch.zeros(ACTION_SIZE, dtype=torch.bool, device=device)
        valid_mask[valid_actions_indices] = True
        masked_q_values = torch.where(valid_mask, q_values.squeeze(0), torch.tensor(-1e9, device=device))

        # Choose the index with the maximum masked Q-value
        chosen_action_index = torch.argmax(masked_q_values).item() # .item() gets the scalar value

    # Convert the chosen action index back to the environment's tuple format
    return action_index_to_tuple(chosen_action_index)

In [None]:
# --- Training Step Function (PyTorch) ---
def train_step(states, actions, rewards, next_states, dones, gamma, main_q_network, target_q_network, optimizer, loss_fn, action_size):
    predicted_q_values = main_q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1) # Select Q for action and remove dim
    next_q_values = target_q_network(next_states).max(1)[0]
    # Calculate the target Q value using the Bellman equation
    # target_Q = reward + gamma * max_Q_next_state * (1 - done)
    target_q = rewards + gamma * next_q_values * (1 - dones)

    # Calculate the loss between the target Q and the predicted Q for the taken actions
    loss = loss_fn(predicted_q_values, target_q.detach())

    # Perform backpropagation and optimization
    optimizer.zero_grad() # Clear previous gradients
    loss.backward()       # Compute gradients
    optimizer.step()      # Update network weights

    return loss

In [None]:
# --- Initialize Environment, Networks, Optimizer, and Buffer ---
env = CognitiveMinesweeperEnv()

# Initialize networks and move them to the chosen device (CPU/GPU)
main_q_network = QNetwork(STATE_SIZE, ACTION_SIZE).to(device)
target_q_network = QNetwork(STATE_SIZE, ACTION_SIZE).to(device)
# Copy initial weights from main network to target network
target_q_network.load_state_dict(main_q_network.state_dict())

optimizer = optim.Adam(main_q_network.parameters(), lr=LEARNING_RATE)
loss_fn = nn.MSELoss() # Mean Squared Error Loss in PyTorch

# Pass the device to the replay buffer so sampled tensors are on the correct device
replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE, device)

epsilon = EPSILON_START
TARGET_UPDATE_COUNT = 0 # Counter for target network updates (using steps)

In [None]:
# --- Main Training Loop ---
print("Starting DQN training (PyTorch)...")
for episode in range(NUM_EPISODES):
    state = env.reset()
    total_reward = 0
    done = False
    step = 0
    episode_start_time = time.time()

    state = np.array(state).flatten()

    while not done and step < MAX_STEPS_PER_EPISODE:
        action = select_action(state, epsilon, env, main_q_network, device)

        if action is None:
            print(f"Episode {episode}: No valid actions (state 0 tiles) found at step {step}. Ending episode.")
            done = True
            break

        next_state, reward, done, _ = env.step(action)

        next_state = np.array(next_state).flatten()
        replay_buffer.store_transition(state, action, reward, next_state, done)

        # Update the current state and total reward
        state = next_state
        total_reward += reward
        step += 1

        # --- Perform Training Step ---
        if len(replay_buffer) > BATCH_SIZE:
            batch = replay_buffer.sample_batch(BATCH_SIZE)
            states_batch, actions_batch, rewards_batch, next_states_batch, dones_batch = batch
            loss = train_step(states_batch, actions_batch, rewards_batch, next_states_batch, dones_batch,
                              GAMMA, main_q_network, target_q_network, optimizer, loss_fn, ACTION_SIZE)
            # Optional: print loss occasionally
            if step % 100 == 0:
               print(f" Ep {episode}, Step {step}, Loss: {loss.item():.4f}") # Use .item() for scalar tensor

        # --- Update Target Network ---
        TARGET_UPDATE_COUNT += 1
        if TARGET_UPDATE_COUNT % TARGET_UPDATE_FREQ == 0:
             target_q_network.load_state_dict(main_q_network.state_dict())
             # print("Target network updated") # Optional print

        if done:
            break # Exit step loop if episode is done

    # --- Decay Epsilon ---
    epsilon = max(EPSILON_END, epsilon * EPSILON_DECAY)


    # --- Episode End Summary ---
    episode_end_time = time.time()
    episode_duration = episode_end_time - episode_start_time
    print(f"Episode {episode + 1}/{NUM_EPISODES}, Total Reward: {total_reward:.2f}, Steps: {step}, Epsilon: {epsilon:.4f}, Duration: {episode_duration:.2f}s")

    # Optional: Save the model periodically
    # if (episode + 1) % 100 == 0:
    #     # Save the state dictionary of the model
    #     torch.save(main_q_network.state_dict(), f'dqn_minesweeper_episode_{episode+1}.pth')

print("\nTraining finished!")

In [None]:
# After training, you can save the final model's state dictionary
# torch.save(main_q_network.state_dict(), 'dqn_minesweeper_final_model.pth')

In [None]:
# To load the model later:
# loaded_model = QNetwork(STATE_SIZE, ACTION_SIZE).to(device)
# loaded_model.load_state_dict(torch.load('dqn_minesweeper_final_model.pth'))
# loaded_model.eval() # Set to evaluation 
# mode before inference