# 지뢰찾기

>state: 타일 하나가 가질 수 있는 상태


*   [-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

>action: open tile



*   [(0, 0), (0, 1)...(0, 8),
(1, 0), (1, 1)...(1, 8), ...(8,8)]: 9 x 9 = 81개의 action or [0, 1, 2, ...80]까지 indexing

> reward



*   -1 if (state == -1) 1 elif (state == 0) 0 else








# Import

In [None]:
import random
import numpy as np
import pandas as pd
from collections import deque
import matplotlib.pyplot as plt

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

# Env

In [None]:
devices = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(devices)

cuda


In [None]:
class MinesweeperEnv:
    def __init__(self, height=9, width=9, num_mines=10):
        self.height = height
        self.width = width
        self.num_mines = num_mines
        self.minefield = np.zeros((self.height, self.width), dtype=int)  # Initialize minefield
        self.initialize_minefield()
        self.reward = {'lose': -10, '0': 5, 'general': 3, 'win': 10}

    def initialize_minefield(self):
        # Reset minefield and place mines
        self.minefield = np.zeros((self.height, self.width), dtype=int)
        mines = set()
        while len(mines) < self.num_mines:
            x, y = random.randint(0, self.height - 1), random.randint(0, self.width - 1)
            if (x, y) not in mines:
                mines.add((x, y))
                self.minefield[x, y] = -1

        # Calculate numbers
        for (x, y) in mines:
            for i in range(max(0, x - 1), min(self.height, x + 2)):
                for j in range(max(0, y - 1), min(self.width, y + 2)):
                    if self.minefield[i, j] != -1:
                        self.minefield[i, j] += 1

    def reset(self, restrict=False):
        if not restrict:
            self.initialize_minefield()  # Ensure minefield is initialized if not restricted

        self.playerfield = np.full((self.height, self.width), 9)  # Reset playerfield
        self.exploded = False
        self.done = False

        if not restrict:
            # Uncover a safe cell to start
            while True:
                sx, sy = random.randint(0, self.height - 1), random.randint(0, self.width - 1)
                if self.minefield[sx, sy] != -1:
                    self.playerfield = self._uncover(sx, sy)
                    if self.playerfield[sx, sy] == 0:
                        self.auto_reveal_blocks(sx, sy)
                    break

        return self.playerfield

    def step(self, action_idx):
        # Convert action index to (x, y) coordinates
        x, y = divmod(action_idx, self.width)
        reward = 0

        # self.playerfield = self.playerfield.reshape(self.height, self.width)
        if self.playerfield[x, y] == 9:
            self.playerfield = self._uncover(x, y)

            if self.playerfield[x, y] == -1:
                self.exploded = True
                self.done = True
                reward = self.reward['lose']
            elif self.playerfield[x, y] == 0:
                self.auto_reveal_blocks(x, y)
                reward = self.reward['0']
                self.exploded = False
                self.done = False
            else:
                reward = self.reward['general']
                self.exploded = False
                self.done = False

        # Check if all non-mine cells are uncovered
        if not self.exploded and np.all((self.playerfield != 9) | (self.minefield == -1)):
            self.done = True
            reward = self.reward['win']

        return self.playerfield, reward, self.exploded, self.done

    def _uncover(self, x, y):
        self.playerfield[x, y] = self.minefield[x, y]
        return self.playerfield

    def auto_reveal_blocks(self, x, y):
        queue = [(x, y)]
        while queue:
            cx, cy = queue.pop(0)
            for i in range(max(0, cx - 1), min(self.height, cx + 2)):
                for j in range(max(0, cy - 1), min(self.width, cy + 2)):
                    if self.playerfield[i, j] == 9:
                        self.playerfield = self._uncover(i, j)
                        if self.minefield[i, j] == 0:
                            queue.append((i, j))

    def render(self):
        render_str = ''
        for x in range(self.height):
            for y in range(self.width):
                if self.playerfield[x, y] == 9:
                    render_str += '. '  # H for hidden
                elif self.playerfield[x, y] == -1:
                    render_str += 'M '  # M for mine
                else:
                    render_str += f'{self.playerfield[x, y]} '
            render_str += '\n'
        print(render_str)


In [None]:
def create_multiple_boards(num_boards, height=9, width=9, num_mines=10):
    boards = []
    for _ in range(num_boards):
            env = MinesweeperEnv(height, width, num_mines)
            boards.append(env)
    return boards

# Neural Network

나중에 보강

In [None]:
class NeuralNet(nn.Module):

    # state: 타일의 상태
    # input state: 타일의 상태가 모인 전체 게임판(player)
    # state_size.shape = (batch_size, 9, 9) -> 각 칸들은 다른 state를 가짐
    # action_size.shape = (batch_size, 81)
    # 상태가 들어오면 게임판 좌표 중 행동을 nn으로 근사
    # NeuralNet에 들어가는 모든 값들은 tensor
    def __init__(self, output):

        super(NeuralNet, self).__init__()

        self.conv1 = nn.Conv2d(1, 32, kernel_size = 3, stride = 1, padding = 1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size = 3, stride = 1, padding = 1)
        self.batchnorm1 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size = 3, stride = 1, padding = 1)
        self.batchnorm2 = nn.BatchNorm2d(128)
        self.maxpool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(128*2*2, 128)
        self.fc2 = nn.Linear(128, output) # 81개의 행동에 대한 확률 출력

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.batchnorm1(x)
        x = self.maxpool(x)
        x = F.relu(self.conv3(x))
        x = self.batchnorm2(x)
        x = self.maxpool(x)
        x = x.view(-1, 128*2*2)
        x = self.fc1(x)
        x = self.fc2(x)

        return x

In [None]:
input_data = torch.randn(1, 1, 9, 9)
neuralnetwork = NeuralNet(81)
output = neuralnetwork(input_data)
print(output.shape)
print(output.sum(dim = 1))

torch.Size([1, 81])
tensor([-4.5524], grad_fn=<SumBackward1>)


# Agent
> __init__


*   state_size, action_size
*   memory_size, memory_size_min, batch_size, memory
*  discount_factor, epsilon, epsilon_min, epsilon_decay
*  learning_rate, loss, model, target_model, optimizer
* update_target_model()

> method


*   update_target_model(self)
*   get_action(self, state)
* append_sample(self, state, action, reward, next_state, done)
* train_model(self)





## Hyperparameter

In [None]:
BATCH_SIZE = 16
MEMORY_SIZE = 1000
MEMORY_SIZE_MIN = 50

EPSILON = 0.9
EPSILON_DECAY = 0.999
EPSILON_MIN = 0.01
DISCOUNT_FACTOR = 0.1

LEARNING_RATE = 0.001

UPDATE_TIME = 20

In [None]:
class Agent:

    def __init__(self, state_size, action_size):

        self.state_size = state_size
        self.action_size = action_size

        # memory
        self.batch_size = BATCH_SIZE
        self.memory_size = MEMORY_SIZE
        self.memory_size_min = MEMORY_SIZE_MIN
        self.memory = deque(maxlen = self.memory_size)

        # epsilon
        self.epsilon = EPSILON
        self.epsilon_decay = EPSILON_DECAY
        self.epsilon_min = EPSILON_MIN
        self.discount_factor = DISCOUNT_FACTOR

        # cpu -> gpu
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # 인공신경망 model, 타깃신경망 target_model
        self.learning_rate = LEARNING_RATE
        self.loss = nn.MSELoss()
        self.model = NeuralNet(self.action_size).to(self.device)
        self.target_model = NeuralNet(self.action_size).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr = self.learning_rate)

        # 일정 time step(한 episode)이 지나면 target 신경망을 인공신경망으로 대체
        self.update_time = UPDATE_TIME
        self.update_target_model()

    # 인공신경망의 파라미터 -> target model의 파라미터
    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    # 엡실론 - 탐욕 정책
    def get_action(self, state):


        random_prob = np.random.rand()
        if random_prob <= self.epsilon:
            random_list = []
            random_action = random.randrange(self.action_size)
            random_list.append(random_action)
            x, y = divmod(random_action, 9)
            while (state[x, y] != 9):
                 available_list = [action for action in list(range(self.action_size)) if action not in random_list]
                 random_action = random.choice(available_list)
                 random_list.append(random_action)
                 x, y = divmod(random_action, 9)

            return random_action

        else:
            state = torch.tensor(state, dtype = torch.float32).to(self.device)
            # state가 주어졌을 때의 행동별 q_value
            q_value = self.model(state.unsqueeze(0).unsqueeze(0))

            # dim = 1: 행에서 최대값을 찾음
            _, max_q_idx = torch.max(q_value, dim = 1)
            max_q_idx = max_q_idx.item()
            x, y = divmod(max_q_idx, 9)

            while state[x, y] != 9:
                # 선택된 타일의 q_value를 최소화함
                min_q_value, _ = torch.min(q_value, dim = 1)
                min_q_value = min_q_value.item()
                q_value[0, max_q_idx] = min_q_value

                _, max_q_idx = torch.max(q_value, dim = 1)
                max_q_idx = max_q_idx.item()
                x, y = divmod(max_q_idx, 9)

            # 최대 큐함수를 갖는 행동의 인덱스
            return max_q_idx

    def append_sample(self, state, action, next_state, reward, exploded, done):
        # done: 게임이 끝났으면 True
        # 이때 reward를 다르게 줌
        self.memory.append((state, action, next_state, reward, exploded, done))

    def train_model(self):

        self.model.train()
        # self.target_model.eval()

        # 엡실론 업데이트
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        mini_batch = random.sample(self.memory, self.batch_size)
        states, actions, next_states, rewards, explodeds, dones = zip(*mini_batch)

        states = torch.tensor(states, dtype = torch.float32).unsqueeze(1).to(self.device)
        actions = torch.tensor(actions, dtype = torch.int64).to(self.device)
        next_states = torch.tensor(next_states, dtype = torch.float32).unsqueeze(1).to(self.device)
        rewards = torch.tensor(rewards, dtype = torch.int64).to(self.device)
        explodeds = torch.tensor(explodeds, dtype = torch.int64).to(self.device)
        dones = torch.tensor(dones, dtype = torch.int64).to(self.device)

        # 현재 상태에 대한 action별 큐함수(인공신경망의 예측)
        predicts = self.model(states)
        predicts = torch.squeeze(predicts).to(self.device)
        # one_hot_action.shape: (16, 1)을 (16, 81)로 one-hot encoding
        one_hot_action = F.one_hot(actions, self.action_size).to(self.device)
        # predicts와 one_hot_action을 element-wise하게 곱하면 최대 큐함수를 갖는 행동의 큐함수만 남음 -> 상태별 최대 큐함수를 구할 수 있음
        one_hot_pred = torch.sum(predicts * one_hot_action, dim = 1).to(self.device)

        # 다음 상태에 대한 target 신경망의 action별 큐함수
        # detach(): target model의 큐함수인 target_pred가 업데이트되지 않도록 함

        with torch.no_grad():
            target_pred, _ = torch.max(torch.squeeze(self.target_model(next_states)), dim = 1)

        # 벨만 최적 방정식
        targets = rewards + (1 - dones) * self.discount_factor * target_pred
        loss = self.loss(one_hot_pred, targets)

        # 가중치 업데이트
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

# main

In [None]:
# Initialize the number of boards and create them
num_boards = 10
boards = create_multiple_boards(num_boards)

board = boards[0]
agent = Agent(board.height * board.width, board.height * board.width)

# Define the number of episodes
EPISODES = 10000

# episode, 총 score, score_avg, 게임이 끝날 때까지의 action 수를 저장할 리스트
episodes, scores, score_avg, total_action, length_memory, win_list = [], [], [], [], [], []


for epi in range(EPISODES):
    done = False
    score = 0
    total_action_epi = []

    # Use the fixed board
    # env = board
    # Use the random board
    env = random.choice(boards)

    # 게임 다시 시작; 게임판 초기화
    state = env.reset(restrict=True) # 일단은 판을 고정하고, 나중에 할 때는 False로 바꿔서 계속 판이 업데이트된다.
    while not done:
        # 현재 상태를 기반으로 행동을 선택
        action = agent.get_action(state)
        total_action_epi.append(action)
        next_state, reward, exploded, done = env.step(action)
        score += reward
        agent.append_sample(state, action, next_state, reward, exploded, done)

        # memory의 길이가 최소 이상일 때 훈련
        # batch_size만큼의 데이터가 훈련
        if len(agent.memory) > agent.memory_size_min:
            agent.train_model()

        state = next_state

        if done:
            # win
            if not env.exploded:
                win_list.append(1)
            else:
                win_list.append(0)

             # agent.update_time마다 target_model의 가중치 업데이트
            if (epi % agent.update_time) == 0:
                agent.update_target_model()

            total_action.append(len(total_action_epi))
            episodes.append(epi)
            scores.append(score)
            score_avg.append(np.mean(scores[-50:]))
            length_memory.append(len(agent.memory))
            '''
            print(f"episode: {epi} | epsilon: {agent.epsilon} | num_action: {total_action[-1]} | score: {scores[-1]} | length of memory: {length_memory[-1]}")
            print(f"action_list: {total_action_epi}")
            env.render()
            '''

            if (epi % 50) == 0:
                win_rate = np.mean(win_list[-50:]) if len(win_list) >= 50 else np.mean(win_list) # -50이므로 마지막에서 50번째 최신 것을 반영함.
                print(f"Episode: {epi} | Score: {np.mean(scores[-50:])} | Epsilon: {agent.epsilon:.2f} | Win Rate: {win_rate}")

Episode: 0 | Score: -10.0 | Epsilon: 0.90 | Win Rate: 0.0
Episode: 50 | Score: 4.72 | Epsilon: 0.74 | Win Rate: 0.0
Episode: 100 | Score: 1.4 | Epsilon: 0.60 | Win Rate: 0.0
Episode: 150 | Score: 9.7 | Epsilon: 0.44 | Win Rate: 0.0
Episode: 200 | Score: 12.48 | Epsilon: 0.31 | Win Rate: 0.02
Episode: 250 | Score: 23.98 | Epsilon: 0.18 | Win Rate: 0.06
Episode: 300 | Score: 26.44 | Epsilon: 0.11 | Win Rate: 0.24
Episode: 350 | Score: 48.14 | Epsilon: 0.05 | Win Rate: 0.54
Episode: 400 | Score: 48.04 | Epsilon: 0.03 | Win Rate: 0.62
Episode: 450 | Score: 57.4 | Epsilon: 0.01 | Win Rate: 0.82
Episode: 500 | Score: 58.2 | Epsilon: 0.01 | Win Rate: 0.9
Episode: 550 | Score: 59.74 | Epsilon: 0.01 | Win Rate: 0.94
Episode: 600 | Score: 63.22 | Epsilon: 0.01 | Win Rate: 0.94
Episode: 650 | Score: 60.22 | Epsilon: 0.01 | Win Rate: 0.84
Episode: 700 | Score: 56.54 | Epsilon: 0.01 | Win Rate: 0.76
Episode: 750 | Score: 58.48 | Epsilon: 0.01 | Win Rate: 0.74
Episode: 800 | Score: 48.2 | Epsilon: 0