<a href="https://colab.research.google.com/github/pj0620/google-colab-notebooks/blob/main/Minesweeper_rl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import random
from scipy.signal import convolve2d
%pip install stable-baselines3[extra]

import numpy as np
import gymnasium as gym
from gymnasium import spaces

!pip install gymnasium[atari]
!pip install gymnasium[accept-rom-license]

!apt-get install swig cmake ffmpeg
!pip install git+https://github.com/DLR-RM/rl-baselines3-zoo

Collecting stable-baselines3[extra]
  Downloading stable_baselines3-2.3.2-py3-none-any.whl.metadata (5.1 kB)
Collecting gymnasium<0.30,>=0.28.1 (from stable-baselines3[extra])
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting shimmy~=1.3.0 (from shimmy[atari]~=1.3.0; extra == "extra"->stable-baselines3[extra])
  Downloading Shimmy-1.3.0-py3-none-any.whl.metadata (3.7 kB)
Collecting autorom~=0.6.1 (from autorom[accept-rom-license]~=0.6.1; extra == "extra"->stable-baselines3[extra])
  Downloading AutoROM-0.6.1-py3-none-any.whl.metadata (2.4 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.6.1; extra == "extra"->stable-baselines3[extra])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m29.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdo

Collecting autorom~=0.4.2 (from autorom[accept-rom-license]~=0.4.2; extra == "accept-rom-license"->gymnasium[accept-rom-license])
  Downloading AutoROM-0.4.2-py3-none-any.whl.metadata (2.8 kB)
Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Installing collected packages: autorom
  Attempting uninstall: autorom
    Found existing installation: AutoROM 0.6.1
    Uninstalling AutoROM-0.6.1:
      Successfully uninstalled AutoROM-0.6.1
Successfully installed autorom-0.4.2
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
cmake is already the newest version (3.22.1-1ubuntu1.22.04.2).
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 49 not upgraded.
Need to get 1,116 kB of archives.
Aft

## Hyperparameter configurations





Create a file named `dql.yml` with the following contents

```
Minesweeper-v1:
  frame_stack: 1
  policy: 'CnnPolicy'
  n_timesteps: !!float 1e6
  buffer_size: 100000
  learning_rate: !!float 1e-4
  batch_size: 32
  learning_starts: 100000
  target_update_interval: 1000
  train_freq: 4
  gradient_steps: 1
  exploration_fraction: 0.1
  exploration_final_eps: 0.01
  # If True, you need to deactivate handle_timeout_termination
  # in the replay_buffer_kwargs
  optimize_memory_usage: False
```


## Custom gymnasium env

In [133]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from skimage.transform import resize

class MinesweeperEnvironment(gym.Env):
  # Because of google colab, we cannot implement the GUI ('human' render mode)
  metadata = {"render_modes": ["console"]}

  def __init__(self, board_size=9, total_bombs=10, render_mode="console"):
    super().__init__()
    self.render_mode = render_mode
    self.board_size = board_size
    self.total_bombs = total_bombs

    self.visible = np.zeros((self.board_size, self.board_size), dtype=np.uint8)
    self.set_bombs()
    self.set_values()

    # Define observation space
    # self.observation_space = spaces.Dict({
    #   "visible_vals": spaces.Box(low=0, high=8, shape=(self.board_size, self.board_size), dtype=np.int8),
    #   "visible": spaces.Box(low=0, high=1, shape=(self.board_size, self.board_size), dtype=np.int8)
    # })

    # self.observation_space = spaces.Box(
    #     low=0, high=8, shape=(2 * self.board_size**2,), dtype=np.int8
    # )

    self.observation_space = spaces.Box(
        low=-1, high=1, shape=(self.board_size, self.board_size, 2)
    )

    # Define action space
    self.action_space = spaces.Discrete(self.board_size**2)

    self.last_action = -1

  def set_bombs(self):
    random.seed(10)
    self.bombs = np.zeros(shape=(self.board_size, self.board_size),  dtype=np.uint8)
    placed_bombs = 0
    while placed_bombs < self.total_bombs:
      i = random.randint(0, self.board_size-1)
      j = random.randint(0, self.board_size-1)

      if self.bombs[i][j] == 0:
        self.bombs[i][j] = 1
        placed_bombs += 1

  def set_values(self):
    KERNAL = np.ones((3, 3))
    self.vals = convolve2d(self.bombs, KERNAL, mode='same').astype(np.uint8)

  def reset(self, seed=None, options=None):
    # Reset the environment to an initial state
    self.visible = np.zeros((self.board_size, self.board_size), dtype=np.uint8)
    self.set_bombs()
    self.set_values()
    self.last_action = -1

    return self.get_state(), {}

  def propogate(self, x: int, y: int):
    if self.bombs[x][y] == 1 or self.visible[x][y] == 1:
      return

    self.visible[x][y] = 1
    if self.vals[x][y] == 0:
      for x_k in [x - 1, x, x + 1]:
        for y_k in [y - 1, y, y + 1]:
          if (x_k, y_k) == (x, y):
            continue

          if x_k < 0 or x_k > self.board_size - 1:
            continue

          if y_k < 0 or y_k > self.board_size - 1:
            continue

          self.propogate(x_k, y_k)


  def step(self, action):
    # Implement the logic for taking a step in the environment
    x = action // self.board_size
    y = action % self.board_size

    start_visible_cells = np.sum(self.visible)

    if self.visible[x][y] == 1:
      start_teminated = bool((self.board_size**2 - start_visible_cells) == self.total_bombs)
      return self.get_state(), -0.3, start_teminated, False, {}
      # return self.get_state(), -1, True, True, {}
    elif self.bombs[x][y] == 1:
      return self.get_state(), -1, True, True, {}

    next_to_zero = False
    if start_visible_cells > 1:
      for x_k in [x - 1, x, x + 1]:
        for y_k in [y - 1, y, y + 1]:
          if (x_k, y_k) == (x, y):
            continue

          if x_k < 0 or x_k > self.board_size - 1:
            continue

          if y_k < 0 or y_k > self.board_size - 1:
            continue

          if (self.vals[x_k][y_k] == 0) and (self.visible[x_k][y_k] == 1):
            next_to_zero = True
            break
    else:
      next_to_zero = True

    self.last_action = action

    if self.vals[x][y] == 0:
      self.propogate(x, y)

    self.visible[x][y] = 1
    end_visible_cells = np.sum(self.visible)
    teminated = bool((self.board_size**2 - end_visible_cells) == self.total_bombs)

    if teminated:
      return self.get_state(), 2, teminated, False, {}
    else:
      # print(f"start_visible_cells: {start_visible_cells}")
      # print(f"end_visible_cells: {end_visible_cells}")
      # reward = 2 * (float(end_visible_cells) - float(start_visible_cells)) / (self.board_size**2)
      reward = 0.3 if next_to_zero else -0.3
      return self.get_state(), reward, teminated, False, {}

  def get_state(self):
    visible_vals = self.visible * self.vals
    visible_vals = (visible_vals.astype(np.float32) - 4) / 4
    visible_scaled = 2 * (self.visible.astype(np.float32) - 0.5)
    return np.stack([visible_vals, visible_scaled], axis=2).astype(np.float32)

  def render(self, mode="console"):
    if self.render_mode != "console":
        raise NotImplementedError("Render mode not supported.")

    # Print the current visible board state
    bombs_left = self.board_size**2 - np.sum(self.visible)
    print(f"Current Board: {bombs_left} Bombs Left")
    print("# " + " ".join(str(i) for i in range(self.board_size)))
    for i in range(self.board_size):
        print(f"{i} ", end="")
        row = ""
        for j in range(self.board_size):
            if self.visible[i][j] == 1:
                # If the cell is visible, show its value (number of adjacent bombs)
                if self.bombs[i][j] == 1:
                  row += f"B "
                else:
                  row += f"{self.vals[i][j]} "
            else:
                # If the cell is hidden, show an asterisk
                row += "* "
        print(row)
    # print("\n")


from stable_baselines3.common.env_checker import check_env

env = MinesweeperEnvironment()

# Testing env
check_env(env)
print("starting game")
for round in range(1):
  obs, info = env.reset()
  for _ in range(10):
      # Random action
      action = env.action_space.sample()
      obs, reward, terminated, truncated, info = env.step(action)
      if terminated:
          obs, info = env.reset()
      # env.render()
      # reward_pos = f"{reward * (env.board_size ** 2)} / {env.board_size ** 2}"
      # print(f"action: {action} -> {(action // env.board_size, action % env.board_size)}")
      # print(f"reward: {reward if reward <= 0 else reward_pos}")
      # print("\n")
  env.reset()

starting game


  and should_run_async(code)


## DQN

In [15]:
from math import log

# 18418
log(0.01) / log(0.99975)

18418 / 100_000

0.18418

In [134]:
import torch as th
import torch.nn as nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class CustomCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space, features_dim=256):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[2]  # Should be 2 for (9, 9, 2) input

        # Define a custom CNN architecture
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 64, kernel_size=5, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            # nn.Conv2d(64, 64, kernel_size=3, padding=1),
            # nn.ReLU(),
            # nn.Conv2d(64, 64, kernel_size=3, padding=1),
            # nn.ReLU(),
            nn.Flatten()
        )

        # Calculate the output size of the CNN dynamically
        with th.no_grad():
            sample_input = th.as_tensor(observation_space.sample()[None]).float().permute(0, 3, 1, 2)
            n_flatten = self.cnn(sample_input).shape[1]
            # print("Flattened output size after CNN:", n_flatten)  # Debugging statement

        # Define fully connected layers using the computed flattened size
        self.linear = nn.Sequential(
            nn.Linear(n_flatten, 512),  # Adjusted to match the flattened output size from CNN
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, features_dim)  # Output size of features_dim (256)
        )

    def forward(self, observations):
        observations = observations.permute(0, 3, 1, 2)  # Rearrange dimensions for Conv2d
        # print("Input to CNN:", observations.shape)  # Debugging input shape
        cnn_output = self.cnn(observations)
        # print("Output of CNN after flattening:", cnn_output.shape)  # Debugging output shape after CNN
        return self.linear(cnn_output)


In [None]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecTransposeImage

vec_env = make_vec_env(MinesweeperEnvironment, n_envs=1)

# vec_env = make_vec_env(MinesweeperEnvironment, n_envs=1)
# vec_env = VecTransposeImage(vec_env)  # Transpose to [batch_size, channels, height, width]

# Optimized hyperparameters for DQN
learning_rate = 1e-4           # Learning rate for weight updates
buffer_size = 300_000            # Replay buffer size
learning_starts = 10000         # Steps before learning begins
batch_size = 64                # Number of samples per training update
train_freq = (1, 'episode')    # Frequency of training updates
target_update_interval = 500   # Steps between target network updates
exploration_fraction = 0.25    # Fraction of total timesteps for epsilon decay
exploration_final_eps = 0.01   # Final epsilon value after decay
gamma = 0.99                   # Discount factor for future rewards

#  0.99975 ** x = 0.01 => x = log(0.01) / log(0.99975)

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
    features_extractor_kwargs=dict(features_dim=128)  # Output size of the final layer
)

# Instantiate DQN with improved hyperparameters
model = DQN(
    "CnnPolicy",
    # MinesweeperEnvironment(),
    vec_env,
    learning_rate=learning_rate,
    buffer_size=buffer_size,
    learning_starts=learning_starts,
    batch_size=batch_size,
    train_freq=train_freq,
    target_update_interval=target_update_interval,
    exploration_fraction=exploration_fraction,
    exploration_final_eps=exploration_final_eps,
    gamma=gamma,
    policy_kwargs=policy_kwargs,  # Use custom network
    verbose=1
)

# Train the model
model.learn(total_timesteps=300_000)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.000446 |
|    n_updates        | 15367    |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 2.99     |
|    ep_rew_mean      | -0.433   |
|    exploration_rate | 0.0832   |
| time/               |          |
|    episodes         | 16600    |
|    fps              | 447      |
|    time_elapsed     | 155      |
|    total_timesteps  | 69454    |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.00101  |
|    n_updates        | 15371    |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 2.99     |
|    ep_rew_mean      | -0.433   |
|    exploration_rate | 0.083    |
| time/               |          |
|    episodes         | 1

## PPO

In [None]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

vec_env = make_vec_env(MinesweeperEnvironment, n_envs=1)

# Define hyperparameters for PPO
learning_rate = 1e-4       # Learning rate for PPO
n_steps = 2048             # Number of steps to run for each environment per update
batch_size = 64            # Batch size for each update
n_epochs = 10              # Number of times to train on each batch
gamma = 0.99               # Discount factor
gae_lambda = 0.95          # GAE lambda, for variance reduction in advantage estimation
clip_range = 0.2           # Clip range for PPO, helps with stable training

# Instantiate PPO with custom hyperparameters
model = PPO(
    "CnnPolicy",           # Policy type, can try "CnnPolicy" for image-based inputs
    vec_env,
    learning_rate=learning_rate,
    n_steps=n_steps,
    batch_size=batch_size,
    n_epochs=n_epochs,
    gamma=gamma,
    gae_lambda=gae_lambda,
    clip_range=clip_range,
    verbose=1               # Verbose output
)

# Train the model
model.learn(total_timesteps=100000)

Using cuda device
Wrapping the env in a VecTransposeImage.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1.63e+03 |
|    ep_rew_mean     | -9.6e+05 |
| time/              |          |
|    fps             | 226      |
|    iterations      | 1        |
|    time_elapsed    | 9        |
|    total_timesteps | 2048     |
---------------------------------
---------------------------------------
| rollout/                |           |
|    ep_len_mean          | 1.63e+03  |
|    ep_rew_mean          | -9.6e+05  |
| time/                   |           |
|    fps                  | 207       |
|    iterations           | 2         |
|    time_elapsed         | 19        |
|    total_timesteps      | 4096      |
| train/                  |           |
|    approx_kl            | 25.478064 |
|    clip_fraction        | 0.66      |
|    clip_range           | 0.2       |
|    entropy_loss         | -2.98     |
|    explained_variance   | -6.08e-06 |
|

## Evaluation

In [129]:
# Test the trained agent
# using the vecenv
obs = vec_env.reset()
n_steps = 100
for step in range(n_steps):
    action, _ = model.predict(obs, deterministic=False)
    # print(f"Step {step + 1}")
    x = int(action[0]) // 10
    y = int(action[0]) % 10
    print("Action: ", (x, y))
    obs, reward, done, info = vec_env.step(action)
    print("reward=", reward, "done=", done)
    vec_env.render()
    if done:
        # Note that the VecEnv resets automatically
        # when a done signal is encountered
        print("Goal reached!", "reward=", reward)
        break

Action:  (5, 3)
reward= [0.19753087] done= [False]
Current Board: 73.0 Bombs Left
# 0 1 2 3 4 5 6 7 8
0 * * * * * * * * * 
1 * * * * * * * * * 
2 * * * * * * * * * 
3 * * * * * * * * * 
4 * * * * * * * 1 1 
5 * * * * * * * 1 0 
6 * * * * * * * 1 0 
7 * * * * * * * 2 1 
8 * * * * * * * * * 
Action:  (7, 3)
reward= [0.02469136] done= [False]
Current Board: 72.0 Bombs Left
# 0 1 2 3 4 5 6 7 8
0 * * * * * * * * * 
1 * * * * * * * * * 
2 * * * * * * * * * 
3 * * * * * * * * * 
4 * * * * * * * 1 1 
5 * * * * * * * 1 0 
6 * * * * * * * 1 0 
7 * * * * * * * 2 1 
8 * 1 * * * * * * * 
Action:  (5, 8)
reward= [0.02469136] done= [False]
Current Board: 71.0 Bombs Left
# 0 1 2 3 4 5 6 7 8
0 * * * * * * * * * 
1 * * * * * * * * * 
2 * * * * * * * * * 
3 * * * * * * * * * 
4 * * * * * * * 1 1 
5 * * * * * * * 1 0 
6 * * * * 1 * * 1 0 
7 * * * * * * * 2 1 
8 * 1 * * * * * * * 
Action:  (1, 3)
reward= [0.02469136] done= [False]
Current Board: 70.0 Bombs Left
# 0 1 2 3 4 5 6 7 8
0 * * * * * * * * * 
1 * 

## Old RL Zoo code

In [None]:
# from rl_zoo3.train import train
# from gym.envs.registration import register

# register(
#     id='Minesweeper-v1',
#     entry_point='msenv:MinesweeperEnvironment',  # Update '__main__' to the module name if this is not in your main script
#     max_episode_steps=100,  # Adjust based on expected game length
# )

# import gym
# print([k for k in gym.envs.registry.keys() if "Minesweeper" in k])

# !python -m rl_zoo3.train --algo dqn --env Minesweeper-v1 -f logs/ -c dqn.yml
