In [1]:
import random
import sys
import numpy as np
import os
from IPython.display import clear_output
import time 
from itertools import combinations
from tqdm.auto import tqdm

In [2]:
class Grid:
    def __init__(self, gridsize, mine):
        self.size = gridsize
        
        self.mine = mine
        self.init_grid()
    
    def init_grid(self):
        self.closed = gridsize * gridsize  # 원래는 __init__에 있었음.
                                           # 그러다 보니 self.closed가 초기화가 안되어서
                                           # 음수로 내려가게 되어서 self.mine과 같아지는 경우가 없었음
        self.bomb = False
        self.success = False
        self.map = [['-' for _ in range(self.size)] for __ in range(self.size)]
        self.ans = [[0 for _ in range(self.size)] for __ in range(self.size)]
        
        # Game 생성
        numlist = [_ for _ in range (self.size * self.size)]
        s = random.sample(numlist, self.mine)
        self.mine_grid = s
        # 정답지 생성
        # mine이 있는 곳에 '*'로 대체
        for i in range(self.mine):
            row = (s[i] // self.size)
            col = (s[i] % self.size)
            self.ans[row][col] = '*'
        
        # mine 주위에 숫자 결정
        for i in range(self.size):
            for j in range(self.size):
                cnt = 0
                if self.ans[i][j] == 0:
                    for y in range(i-1, i+2):
                        for x in range(j-1, j+2):
                            if not (y<0 or x<0 or y>=self.size or x>=self.size):
                                if self.ans[y][x] == "*":
                                    cnt += 1
                    self.ans[i][j] = cnt

    def accessans(self, r, c):
        return self.ans[r][c]
    
    def accessmap(self, r, c):
        return self.map[r][c]
    
    def openmap(self, r, c):
        self.closed -= 1
        self.map[r][c] = self.ans[r][c]

        # 주위의 모든 0 open
        if self.map[r][c] == 0:
            for y in range(r-1, r+2):
                for x in range(c-1, c+2):
                    if not (y<0 or x<0 or y>=self.size or x>=self.size):
                        if (self.ans[y][x] == "*" or self.map[y][x] != '-'):
                            continue
                        elif not (y == r and x == c):
                            self.openmap(y, x)
        
        
        
        
        
        # mine을 open한 경우 False, 살아남았을 경우 True
        if self.ans[r][c] == '*':
            self.bomb = True
        else:
            self.bomb = False
            

        if self.bomb == False:
            if self.closed == self.mine:  # 같을 때도 self.success가 안먹힐때가 있어서 이렇게 만듬
                self.success = True
                return

        
    def printans(self):
        for i in range(self.size):
            for j in range(self.size):
                print(self.ans[i][j], end=" ")
            print()
        
    def printmap(self):
        for i in range(self.size):
            for j in range(self.size):
                print(self.map[i][j], end=" ")
            print()
        print()

In [3]:
class Value:
    def __init__(self, grid):
        self.grid = grid
    
    # 현재 grid로부터 Value 계산
    def __getvalue__(self):
        if self.grid.success: # 지뢰 제외한 모든 칸 오픈하면 100
            return 100
        elif self.grid.bomb: # 지뢰 밟으면 -100
            return -100
        else:
            return 1 # 그외는 0
        # else의 경우 return을 0으로 두면 안되는 것이 이렇게 될 경우 수렴을 안함
        # 길찾기의 경우 얼마나 도달했는지 자체를 모르는 케이스이기 때문에 0으로 두어도 되지만
        # 지뢰찾기의 경우 open한 이후에 죽지 않았을 경우 자체가
        # 정답에 가까워지기 때문.

In [4]:
class State:
    def __init__(self, grid):
        self.grid = grid
    
    # 총 state 수
    def num_states(self):
        mine = self.grid.mine
        size = self.grid.size
        return pow((mine + 2), size * size) # ['-', 0, ... 'n']^(블럭 수) 
    
    # 현재 state를 숫자로 표현해서 반환
    def __getstate__(self):
        if self.grid.bomb or self.grid.success:
            return -1 # Terminal state
        
        state = 0
        mine = self.grid.mine
        size = self.grid.size
        factor = size * size - 1
        
        # 현재 state 숫자 계산
        for r in self.grid.map:
            for rc in r:
                if rc == '-':
                    state += pow((mine + 2), factor)*(mine + 1)
                else:
                    state += pow((mine + 2), factor)*rc
                factor -= 1
        return state

In [5]:
class Actions:
    def __init__(self, grid):
        self.grid = grid
        self.action_dict = {}
        v = 0
        for i, r in enumerate(self.grid.map):
            for j, rc in enumerate(r):
                self.action_dict[(i, j)] = v
                v += 1
    
    # 현재 실행할 수 있는 actions 반환
    def get_actions(self):
        actions = []
        for i, r in enumerate(self.grid.map):
            for j, rc in enumerate(r):
                if rc == '-':
                     actions.append((i, j))
        return actions

In [6]:
class World:
    def __init__(self, gridsize, mine):
        self.grid = Grid(gridsize, mine)
        self.state = State(self.grid)
        self.value = Value(self.grid)
        
    def currentState(self):
        return self.state.__getstate__()
    
    def currentValue(self):
        return self.value.__getvalue__()
    
    def num_states(self):
        return self.state.num_states()
    
    def num_actions(self):
        return self.grid.size * self.grid.size
    
    def openmap(self, point):
        i, j = point
        self.grid.openmap(i, j)

In [10]:
class Agent:
    def __init__(self, world, alpha):
        self.world = world
        self.qvalues = np.empty(world.num_states() * world.num_actions(), float) # Q(S, A)
        self.actions = Actions(world.grid)
        self.alpha = alpha
        self.gamma = gamma
    def init_state(self):
        self.S = self.world.currentState()

    def is_terminal(self):
        if self.S == -1:
            return True
        else:
            return False
        
        
    def explore(self, policy, TF):
        S = self.S
        actions = self.actions.get_actions()
        num_action = len(actions)
        num_size = pow(self.world.grid.size,2)
        actions_1 = actions
       
        if TF:
            self.world.grid.printmap()
            print('Current State: ' + str(self.S))
            print()
            print('Available actions: ' + str(actions))
        
        if policy == 'random walk':
            a = random.sample(actions, 1)[0]
        elif policy == 'greedy q value':
            # pi(S) ==> argmax_a Q(S,a)
            
            if num_size == num_action:
                mine_grid = self.world.grid.mine_grid
                mine_grid.sort(reverse=True)
                # 초반에 무조건 지뢰 안밟도록 하는 방법
                # agent는 사실상 모른다.

                for i in mine_grid:
                    del actions_1[i]
                    
            q_values = []
            
            for action in actions:
                action_index = action[0]*gridsize+action[1]+1
                q_index = S * action_index
                q_value = self.qvalues[q_index]
                
                q_values.append(q_value)
            
            action = q_values.index(max(q_values))
            
            a = actions[action]
            
            
        self.world.openmap(a)
        S_next = self.world.currentState()
        R = self.world.currentValue()
        
        if TF:
            print('Taking action ' + str(a))
            print()
            self.world.grid.printmap()

            print('Next State: ' + str(S_next))
            print('Reward: ' + str(R))
            print('------------------------------------------------------------------')
        
        
        self.S = S_next
        return S, a, R, S_next

    def learn(self, S, A, R, S_next):
        a_index = self.actions.action_dict[A]+1 #action을 (0,0)을 선택할 경우 a가 0이 되어버려서 의미가 없어짐.
        # ex) a_index = 0, ==> q_index는 state에 상관없이 0. 이에 따라 index가 아니라 order사용
        q_index = S * a_index # (S, A) 의 인덱스
        
        # max_a' Q(s',a')을 구해야함
        
        actions_S_next = self.actions.get_actions()
        
        q_values = [] 
        for action in actions_S_next:
            action_index = action[0]*gridsize+action[1]+1
            q_index_next = S_next * action_index
            q_value = self.qvalues[q_index_next]
            q_values.append(q_value)
            
        max_q = max(q_values)
        
        # max_q = max_a' Q(s',a')를 이용하여 max_q값 도출
        
        self.qvalues[q_index] = self.qvalues[q_index]*(1-self.alpha)+self.alpha*(R+self.gamma*max_q)
        # Q(s,a) <- (1-alpha)*Q(s,a) +alpha*[R(s,a,s') + gamma * max_a' Q(s',a')]
        pass
        


In [105]:
# Params
gridsize = 3
num_mines = 2
num_episodes = 200000 # episode 개수
num_random = 0
action_interval_time = 0 # 1초 간격으로 world 탐색
alpha = 0.05
gamma = 0.01

# alpha, gamma조절하기 위한 parameter
check = 100
check_pr = 0.75   # check 개수 중에서 몇퍼센트가 맞아야하는가

In [106]:
world = World(gridsize, num_mines)
agent = Agent(world, alpha)

# train
R_list=[]
percentage = 0
TF = False 

# print를 하게 할 것인가 안할 것인가 
# 즉, learning할때는 TF = False
# 결과 자체를 보고 싶을 때는 TF = True
# 따라서 이 셀은 TF = False로...

for episode in tqdm(range(num_episodes)):
    world.grid.init_grid()
    agent.init_state()
    if TF:
        print('<---------------------------- Episode {} ----------------------------->'.format(episode))

    while True:
        if agent.is_terminal():
            break
        time.sleep(action_interval_time)
        S, A, R, S_next = agent.explore('greedy q value',TF)
        agent.learn(S, A, R, S_next)
    
    time.sleep(action_interval_time)
    if TF:
        clear_output(wait=True)
    
    if R == 100:
        R_list.append(episode)

# <--------------- 정확도 체크용----------------> #
        
#     if episode > check - 1 :
#         r_100 = len(R_list)
#         pr = r_100/check
#         if pr >= float(check_pr):
#             percentage += 1
#         if len(R_list) == 0:
#             pass
#         elif R_list[0] == (episode - check):

#             del R_list[0]
        
#     if float(percentage) >= (10000/check):
#         print(episode)
#         break


HBox(children=(FloatProgress(value=0.0, max=200000.0), HTML(value='')))




In [175]:
# 따라서 이 셀은 TF = True로...

TF = True
world.grid.init_grid()
agent.init_state()
while True:
    if agent.is_terminal():
        break
    time.sleep(action_interval_time)
    S, A, R, S_next = agent.explore('greedy q value',TF)
    if R == 100:
        print("성공")
        

- - - 
- - - 
- - - 

Current State: 262143

Available actions: [(0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2), (2, 0), (2, 1), (2, 2)]
Taking action (0, 2)

- 1 0 
- 2 0 
- 2 0 

Next State: 216632
Reward: 1
------------------------------------------------------------------
- 1 0 
- 2 0 
- 2 0 

Current State: 216632

Available actions: [(0, 0), (1, 0), (2, 0)]
Taking action (0, 0)

1 1 0 
- 2 0 
- 2 0 

Next State: -1
Reward: 100
------------------------------------------------------------------
성공
