# Dynamic Programming and Grid-World
- 강화 학습은 순차적으로 행동을 결정해야 하는 문제를 푸는 방법 중 하나
- Process
    - MDP 정의
    - 벨만 방정식의 계산
    - 최적 가치함수 + 최적 정책
- 즉,
    1. 순차적 행동 문제를 MDP로 전환
    2. 가치함수를 벨만 방정식으로 반복적으로 계산
    3. 최적 가치함수와 최적 정책을 찾는다.
- 강화학습 등장 전, 순차적 의사결정 문제를 푸는 방법론, 다이나믹 프로그래밍 학습

## 벨만 최적 방정식

$$v_{\ast}(s)=\max_{a}E\big[R_{t+1}+\gamma v_{\ast}(S_{t+1})\;\big|\;S_t=s,A_t=a\big]$$

## 동적 프로그래밍
- 리처드 벨만(Richard E. Bellman)이 1953년에 제시
- 동적이라는 얘기는 그 말이 가리키는 대상이 시간에 따라 변한다는 것을 의미
- 프로그래밍은 계획을 하는 것 자체를 의미.
- 여러 프로세스가 다단계로 이뤄지는 것
- 큰 문제를 바로 푸는 것이 아니라 작은 문제들을 풀어 나감

## Dynamic Programming 종류
- 정책 이터레이션 Policy Iteration
- 가치 이터레이션 Value Iteration

# 1. Policy Iteration

- Random Policy -> Optimal Policy
- 위를 위해 `평가`와 `발전`이 필요

## Policy Evaluation
- 정책 평가는 아래의 가치 함수로!
$$v_\pi(s)=E_\pi\big[\sum_{k=t}^{\infty}\gamma^{k-t}R_{k+1}\;\big|\;S_t=s\big]$$
- 물론 DP에서 환경에 대한 모든 정보를 알고 문제에 접근하기 때문에 위의 식을 계산할 수 있지만, 이는 비효율적이고 사실상 불가능
- DP의 효과, 문제를 최대한 작게 쪼개고 작은 문제에 저장된 값들을 서로 이용해 계산하는 방식을 이용!
- `벨만 기대 방정식`을 활용, 아래 식으로 가치 함수가 변형됨!
$$v_\pi(s)=E_\pi\big[R_{t+1}+\gamma v_\pi(S_{t+1})\;\big|\;S_t=s\big]$$
- 위 식을 컴퓨터가 계산 가능하도록 기댓값, 확률적인 부분을 합의 형태로 변환
$$v_\pi(s)=\sum_{a\in A}\pi(a|s)\big(r_{(s,a)}+\gamma v_\pi(s^\prime)\big)$$

## Policy Improvement
- 굉장히 많은 방법들이 존재
- 책에선 `Greedy Policy Improvement`를 소개
- 초기의 정책은 무작위로 설정 (ex> [0.25, 0.25, 0.25, 0.25])
- `q 함수`로 update
$$q_\pi(s,a)=E_\pi\big[R_{t+1}+\gamma v_\pi(S_{t+1})\;\big|\;S_t=s,A_t=a\big]$$
- 위를 컴퓨터가 계산 가능하게 변형
$$q_\pi(s,a)=r_{(s,a)}+\gamma v_\pi(s^\prime)$$
- 위 값을 기반으로 정책을 발전시킴
$$\pi^{\prime}(s)=\text{argmax}_{a\in A}q_\pi(s,a)$$

# 2. Value Iteration
- 정책 이터레이션은 명시적인 정책이 존재
- 정책이 독립적이므로 결정적인 정책이 아니라 어떠한 정책도 가능 (확률적!)
- 그러나, 최적 결정은 결정론적.
- 현재의 가치함수가 최적은 아니지만 최적이라고 가정,
- 가치함수에 대해 결정적인 형태의 정책을 적용한다면?
- DP를 활용, 여러번 연산을 반복하며 최적 가치함수에 수렴, 최적 정책을 구할 거라는 기대!
- 중요한 것,
    - 정책이 명시적으로 표현되는 것이 아님
    - 가치함수 안에 내재적(implicit)으로 포함돼있음

## 벨만 최적 방정식과 가치 이터레이션
- `벨만 기대 방정식`을 통해 전체 문제를 풀어서 나오는 답은? `현재 정책을 따라갔을 때 받을 참 보상`
    1. 가치함수를 현재 정책에 대한 가치함수라 가정
    2. 이를 반복적으로 계산
    3. 현재 정책에 대한 참 가치함수가 됨
$$v_\pi(s)=E_\pi\big[R_{t+1}+\gamma v_\pi(S_{t+1})\;\big|\;S_t=s\big]$$

- `벨만 최적 방정식`을 풀어 나오는 답은? `최적 가치함수`
    1. 가치함수를 최적 정책에 대한 가치함수라 가정
    2. 이를 반복적으로 계산
    3. 결국 최적 가치 정책에 대한 참 가치함수, 즉 최적 가치함수를 찾음
$$v_\ast(s)=\max_{a}E_\pi\big[R_{t+1}+\gamma v_\ast(S_{t+1})\;\big|\;S_t=s,A_t=a\big]$$

- 때문에 value iteration에서는 정책 발전이 필요없음!
    - 시작부터 최적의 정책이라고 가정했기 때문!

- 벨만 최적 방정식에서 보면 `max`를 취함. 즉, 업데이트 시 세밀한 정책의 값을 고려할 필요가 X
- 즉 현재 상태에서 가능한 $R_{t+1}+\gamma v_k(S_{t+1})$의 값들 중 최고의 값으로 업데이트하면 됨

$$v_{k+1}(s)=\max_{a\in A}\big(r_{(s,a)}+\gamma v_k(s^\prime)\big)$$

코드 출처: 
- https://github.com/rlcode/reinforcement-learning-kr-v2/tree/master/1-grid-world/1-policy-iteration
- https://github.com/rlcode/reinforcement-learning-kr-v2/blob/master/1-grid-world/2-value-iteration

In [None]:
import tkinter as tk # 내장 GUI module
from tkinter import Button
import time
import numpy as np
from PIL import ImageTk, Image
import os


PhotoImage = ImageTk.PhotoImage
UNIT = 100  # 픽셀 수
HEIGHT = 5  # 그리드월드 세로
WIDTH = 5  # 그리드월드 가로
TRANSITION_PROB = 1
POSSIBLE_ACTIONS = [0, 1, 2, 3]  # 좌, 우, 상, 하
ACTIONS = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # 좌표로 나타낸 행동
REWARDS = []

In [None]:
class Env:
    
    """ 한정된 예제만을 위한, 작은 클래스 """
    
    def __init__(self):
        self.transition_probability = TRANSITION_PROB
        self.width = WIDTH  # Width of Grid World
        self.height = HEIGHT  # Height of GridWorld
        self.reward = [[0] * WIDTH for _ in range(HEIGHT)]
        self.possible_actions = POSSIBLE_ACTIONS
        self.reward[2][2] = 1  # (2,2) 좌표 동그라미 위치에 보상 1
        self.reward[1][2] = -1  # (1,2) 좌표 세모 위치에 보상 -1
        self.reward[2][1] = -1  # (2,1) 좌표 세모 위치에 보상 -1
        self.all_state = []
        # 상태 좌표 생성
        for x in range(WIDTH):
            for y in range(HEIGHT):
                state = [x, y]
                self.all_state.append(state)

    def get_reward(self, state, action):
        next_state = self.state_after_action(state, action)
        return self.reward[next_state[0]][next_state[1]]

    def state_after_action(self, state, action_index):
        action = ACTIONS[action_index]
        return self.check_boundary([state[0] + action[0], state[1] + action[1]])

    @staticmethod
    def check_boundary(state):
        state[0] = (0 if state[0] < 0 else WIDTH - 1
                    if state[0] > WIDTH - 1 else state[0])
        state[1] = (0 if state[1] < 0 else HEIGHT - 1
                    if state[1] > HEIGHT - 1 else state[1])
        return state

    def get_transition_prob(self, state, action):
        return self.transition_probability

    def get_all_states(self):
        return self.all_state

In [None]:
class GraphicDisplay(tk.Tk):
    
    __img_path = "../img/"
    
    def __init__(self, agent, Env):
        super(GraphicDisplay, self).__init__()
        if isinstance(agent, ValueIteration):
            self.title('Value Iteration')
            self.evaluation_count = 0
        elif isinstance(agent, PolicyIteration):
            self.title('Policy Iteration')
            self.iteration_count = 0
        else:
            raise ValueError('`agent` must be `ValueIteration` or `PolicyIteration`.')                
        self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT + 50))
        self.texts = []
        self.arrows = []
        self.env = Env()
        self.agent = agent
        self.improvement_count = 0
        self.is_moving = 0
        (self.up, self.down, self.left, self.right), self.shapes = self.load_images()
        self.canvas = self._build_canvas()
        self.text_reward(2, 2, "R : 1.0")
        self.text_reward(1, 2, "R : -1.0")
        self.text_reward(2, 1, "R : -1.0")
        
    @property
    def img_path(self):
        return self.__img_path
    
    @img_path.setter
    def img_path(self, path):
        self.__img_path = path
        
    def _get_button_methods(self):
        if isinstance(self.agent, PolicyIteration):
            command1 = self.evaluate_policy
            botton1 = 'Evaluate'
            command2 = self.improve_policy
            botton2 = 'Improve'
            command3 = self.move_by_policy
            botton3 = 'move'
            command4 = self.reset
            botton4 = 'reset'
        else:
            command1 = self.calculate_value
            botton1 = 'Calculate'
            command2 = self.print_optimal_policy
            botton2 = 'Print Policy'
            command3 = self.move_by_policy
            botton3 = 'move'
            command4 = self.clear
            botton4 = 'clear'
        return (
            (command1, command2, command3, command4),
            (button1,  button2,  button3,  button4),
        )

    def _build_canvas(self):
        canvas = tk.Canvas(
            self, bg='white', height=HEIGHT * UNIT, width=WIDTH * UNIT
        )
        # 버튼 초기화
        iteration_button = Button(
            self, text="Evaluate", command=self.evaluate_policy)
        iteration_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(
            WIDTH * UNIT * 0.13, HEIGHT * UNIT + 10, window=iteration_button)
        policy_button = Button(self, text="Improve",
                               command=self.improve_policy)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.37, HEIGHT * UNIT + 10,
                             window=policy_button)
        policy_button = Button(self, text="move", command=self.move_by_policy)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.62, HEIGHT * UNIT + 10,
                             window=policy_button)
        policy_button = Button(self, text="reset", command=self.reset)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.87, HEIGHT * UNIT + 10,
                             window=policy_button)

        # 그리드 생성
        for col in range(0, WIDTH * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = col, 0, col, HEIGHT * UNIT
            canvas.create_line(x0, y0, x1, y1)
        for row in range(0, HEIGHT * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = 0, row, HEIGHT * UNIT, row
            canvas.create_line(x0, y0, x1, y1)

        # 캔버스에 이미지 추가
        self.rectangle = canvas.create_image(50, 50, image=self.shapes[0])
        canvas.create_image(250, 150, image=self.shapes[1])
        canvas.create_image(150, 250, image=self.shapes[1])
        canvas.create_image(250, 250, image=self.shapes[2])

        canvas.pack()

        return canvas
    
    def _load_image(self, filename, size):
        return PhotoImage(
            Image.open(os.path.join(self.img_path, filename))
        ).resize(size)

    def load_images(self):
        up        = self._load_image('up.png', (13, 13))
        right     = self._load_image('right.png', (13, 13))
        left      = self._load_image('left.png', (13, 13))
        down      = self._load_image('down.png', (13, 13))
        rectangle = self._load_image('rectangle.png', (65, 65))
        triangle  = self._load_image('triangle.png', (65, 65))
        circle    = self._load_image('circle.png', (65, 65))
        return (up, down, left, right), (rectangle, triangle, circle)
    
    def text_value(self, row, col, contents, font='Helvetica', size=10,
                   style='normal', anchor="nw"):
        origin_x, origin_y = 85, 70
        x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
        font = (font, str(size), style)
        text = self.canvas.create_text(x, y, fill="black", text=contents,
                                       font=font, anchor=anchor)
        return self.texts.append(text)

    def text_reward(self, row, col, contents, font='Helvetica', size=10,
                    style='normal', anchor="nw"):
        origin_x, origin_y = 5, 5
        x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
        font = (font, str(size), style)
        text = self.canvas.create_text(x, y, fill="black", text=contents,
                                       font=font, anchor=anchor)
        return self.texts.append(text)

    def rectangle_move(self, action):
        base_action = np.array([0, 0])
        location = self.find_rectangle()
        self.render()
        if action == 0 and location[0] > 0:  # 상
            base_action[1] -= UNIT
        elif action == 1 and location[0] < HEIGHT - 1:  # 하
            base_action[1] += UNIT
        elif action == 2 and location[1] > 0:  # 좌
            base_action[0] -= UNIT
        elif action == 3 and location[1] < WIDTH - 1:  # 우
            base_action[0] += UNIT
        # move agent
        self.canvas.move(self.rectangle, base_action[0], base_action[1]) 

In [23]:
class PolicyEnv(GraphicDisplay):
    
    def __init__(self, agent, Env):
        super(PolicyEnv, agent, Env)

In [None]:
class GraphicDisplay(tk.Tk):    

    


    def reset(self):
        if self.is_moving == 0:
            self.evaluation_count = 0
            self.improvement_count = 0
            for i in self.texts:
                self.canvas.delete(i)

            for i in self.arrows:
                self.canvas.delete(i)
            self.agent.value_table = [[0.0] * WIDTH for _ in range(HEIGHT)]
            self.agent.policy_table = ([[[0.25, 0.25, 0.25, 0.25]] * WIDTH
                                        for _ in range(HEIGHT)])
            self.agent.policy_table[2][2] = []
            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)



    def find_rectangle(self):
        temp = self.canvas.coords(self.rectangle)
        x = (temp[0] / 100) - 0.5
        y = (temp[1] / 100) - 0.5
        return int(y), int(x)

    def move_by_policy(self):
        if self.improvement_count != 0 and self.is_moving != 1:
            self.is_moving = 1

            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)

            x, y = self.find_rectangle()
            while len(self.agent.policy_table[x][y]) != 0:
                self.after(100,
                           self.rectangle_move(self.agent.get_action([x, y])))
                x, y = self.find_rectangle()
            self.is_moving = 0

    def draw_one_arrow(self, col, row, policy):
        if col == 2 and row == 2:
            return

        if policy[0] > 0:  # up
            origin_x, origin_y = 50 + (UNIT * row), 10 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.up))
        if policy[1] > 0:  # down
            origin_x, origin_y = 50 + (UNIT * row), 90 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.down))
        if policy[2] > 0:  # left
            origin_x, origin_y = 10 + (UNIT * row), 50 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.left))
        if policy[3] > 0:  # right
            origin_x, origin_y = 90 + (UNIT * row), 50 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.right))

    def draw_from_policy(self, policy_table):
        for i in range(HEIGHT):
            for j in range(WIDTH):
                self.draw_one_arrow(i, j, policy_table[i][j])

    def print_value_table(self, value_table):
        for i in range(WIDTH):
            for j in range(HEIGHT):
                self.text_value(i, j, round(value_table[i][j], 2))

    def render(self):
        time.sleep(0.1)
        self.canvas.tag_raise(self.rectangle)
        self.update()

    def evaluate_policy(self):
        self.evaluation_count += 1
        for i in self.texts:
            self.canvas.delete(i)
        self.agent.policy_evaluation()
        self.print_value_table(self.agent.value_table)

    def improve_policy(self):
        self.improvement_count += 1
        for i in self.arrows:
            self.canvas.delete(i)
        self.agent.policy_improvement()
        self.draw_from_policy(self.agent.policy_table)