In [1]:
import numpy as np

# Monte-Carlo Reinforcement Learning

In [2]:
class GridWorld():
    def __init__(self, n_row=4, n_col=4):
        self.end_row = n_row - 1
        self.end_col = n_col - 1
        self.x = 0
        self.y = 0
    
    def step(self, a):
        if a==0:
            self.north()
        elif a==1:
            self.south()
        elif a==2:
            self.west()
        elif a==3:
            self.east()

        reward = -1
        done = (self.x == self.end_row and self.y == self.end_col)
        return (self.x, self.y), reward, done

    def north(self):
        self.x = max(self.x - 1, 0)
      
    def south(self):
        self.x = min(self.x + 1, self.end_row)
      
    def west(self):
        self.y = max(self.y - 1, 0)
  
    def east(self):
        self.y = min(self.y + 1, self.end_col)
      
    def reset(self):
        self.x = 0
        self.y = 0
        return (self.x, self.y)

In [21]:
class Agent():
    def __init__(self):
        self.alpha = 0.0001  # 업데이트 비율
        #self.alpha = 1.0  # 업데이트 비율
        self.v_table = np.zeros((4, 4))  # 상태 가치 테이블
        self.pi = [0.25, 0.25, 0.25, 0.25]

    def select_action(self, s):
        action = np.random.choice(4, p=self.pi)
        return action

    def update_table(self, history, gamma):
        # 테이블의 값을 업데이트 한다
        cum_reward = 0
        for transition in history[::-1]:  # history의 뒤쪽부터 차례로 리턴 계산
            s, action, reward, s_prime = transition
            x, y = s
            # 몬테 카를로 업데이트. v(s) ← (1 − α) * v(s) + α * G
            self.v_table[x, y] = (1 - self.alpha) * self.v_table[x, y] + self.alpha * cum_reward
            cum_reward = reward + gamma * cum_reward

In [22]:
env = GridWorld()  # 환경
agent = Agent()  # 에이전트
gamma = 1.0  # 감쇄인자, 미래에 받을 보상이라도 현재와 동일한 가중치를 적용

In [23]:
for k in range(50000):
    done = False
    history = []
    # 에피소드 1회 진행
    s = env.reset()
    while not done:
        action = agent.select_action(s)
        s_prime, reward, done = env.step(action)
        history.append((s, action, reward, s_prime))
        # 상태 변경
        s = s_prime

    agent.update_table(history, gamma)

In [24]:
np.round(agent.v_table, 2)

array([[-58.46, -56.45, -52.76, -51.56],
       [-56.51, -53.64, -47.96, -44.45],
       [-53.11, -48.85, -39.73, -29.3 ],
       [-51.08, -43.97, -28.78,   0.  ]])

# Monte Carlo Control

In [25]:
class GridWorld():
    def __init__(self, n_row=7, n_col=7):
        self.end_row = n_row - 1
        self.end_col = n_col - 1
        self.x = 0
        self.y = 0
        self.wall = {(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (6, 2),
                     (0, 4), (2, 4), (3, 4), (4, 4), (5, 4), (6, 4)}
    
    def step(self, a):
        if a==0:
            self.north()
        elif a==1:
            self.south()
        elif a==2:
            self.west()
        elif a==3:
            self.east()

        reward = -1
        done = (self.x == self.end_row and self.y == self.end_col)
        return (self.x, self.y), reward, done

    def north(self):
        if (max(self.x - 1, 0), self.y) not in self.wall:
            self.x = max(self.x - 1, 0)
      
    def south(self):
        if (min(self.x + 1, self.end_row), self.y) not in self.wall:
            self.x = min(self.x + 1, self.end_row)
      
    def west(self):
        if (self.x, max(self.y - 1, 0)) not in self.wall:
            self.y = max(self.y - 1, 0)
  
    def east(self):
        if (self.x, min(self.y + 1, self.end_col)) not in self.wall:
            self.y = min(self.y + 1, self.end_col)
      
    def reset(self):
        self.x = 0
        self.y = 0
        return (self.x, self.y)

In [28]:
class MCAgent():
    def __init__(self):
        self.q_table = np.zeros((7, 7, 4)) # q-table 
        self.alpha = 0.01

    def select_action(self, s, epsilon=0.0):
        x, y = s
        dice = np.random.random()
        if dice < epsilon:
            action = np.random.randint(0, 4) # 랜덤 선택
        else:
            action_val = self.q_table[x,y,:] # 가장 확률이 높은 경우 선택
            action = np.argmax(action_val)
        return action
        
    def update_table(self, history, gamma):
        # q 테이블의 값을 업데이트 한다
        cum_reward = 0
        for transition in history[::-1]:
            s, a, r, s_prime = transition
            x,y = s
            # 몬테 카를로 업데이트. q(s, a) ← (1 − α) * q(s, a) + α * G
            self.q_table[x, y, a] = (1 - self.alpha) * self.q_table[x, y, a] + self.alpha * cum_reward
            cum_reward = r + gamma * cum_reward

    def show_table(self):
        q_lst = self.q_table.tolist()
        data = np.zeros((7, 7))
        for row_idx in range(len(q_lst)):
            row = q_lst[row_idx]
            for col_idx in range(len(row)):
                col = row[col_idx]
                action = np.argmax(col) if min(col) != 0 else -1
                data[row_idx, col_idx] = action
        return data

In [29]:
env = GridWorld()  # 환경
agent = MCAgent()  # 에이전트
gamma = 1.0  # 감쇄인자
epsilon = 0.9  # 탐험 비율

In [30]:
for k in range(50000):
    done = False
    history = []
    # env 초기화
    s = env.reset()
    # 에피소드 1회 진행
    while not done:
        a = agent.select_action(s, epsilon)
        s_prime, r, done = env.step(a)
        history.append((s, a, r, s_prime))
        s = s_prime
    # 에피소드 종료 후 테이블 업데이트
    agent.update_table(history, gamma)
    # decaying epsilon
    epsilon = max(epsilon - 0.001, 0.1)

In [31]:
agent.show_table()

array([[ 1.,  2., -1.,  1., -1.,  3.,  1.],
       [ 1.,  2., -1.,  3.,  3.,  1.,  1.],
       [ 1.,  1., -1.,  0., -1.,  1.,  1.],
       [ 1.,  1., -1.,  0., -1.,  1.,  1.],
       [ 3.,  1., -1.,  0., -1.,  1.,  1.],
       [ 3.,  3.,  3.,  0., -1.,  1.,  1.],
       [ 0.,  0., -1.,  0., -1.,  3., -1.]])