# 가치 기반 학습(Value-Based Learning)과 정책 기반 학습(Policy-Based Learning)
- **강화학습의 핵심 목표**: 환경과의 상호작용을 통해 기대되는 누적 보상(expected cumulative reward)을 최대화하는 정책(Policy)을 학습하는 것.
- 이는 에이전트가 어떤 상태에서 어떤 행동을 선택해야 할지를 학습하여 장기적으로 최적의 결과를 얻는 것을 의미합니다.

강화학습은 크게 가치 기반 학습과 정책 기반 학습으로 나뉩니다.

1. 가치 기반 학습 (Value-Based Learning)
  - 상태(state, state-action)의 가치 함수를 학습하고 이를 바탕으로 행동을 선택.
  - 대표적인 알고리즘: Q-Learning, SARSA.
  - Q-값(Q-Value): 특정 상태 $s$에서 행동 $a$를 취했을 때 기대되는 총 보상 $Q(s, a)$를 학습합니다.
  - 최적 정책은 다음과 같이 정의됩니다: $\pi^*$(s) = $argmax_a Q(s, a)$

2. 정책 기반 학습 (Policy-Based Learning)
  - 정책 함수 $\pi^*(a|s)$를 직접 학습합니다.
  - 대표적인 알고리즘: Actor-Critic.
  - 연속적인 행동 공간에서도 적용 가능하며, 직접 정책을 최적화합니다.

3. 가치 함수(Value Function)와 정책 함수(Policy Function)
- 가치 함수(Value Function): 상태나 상태-행동 쌍의 "가치"를 나타냅니다.
  - 상태 가치 함수 $V(s)$: 특정 상태 $(s$)의 가치. 지금부터 기대되는 return
  - 행동 가치 함수 $Q(s, a)$: 특정 상태 $(s$)에서 행동 $(a$)를 취했을 때의 가치. 지금 행동으로부터 기대되는 Return
- 정책 함수(Policy Function): 상태 $(s$)에서 행동 $(a$)를 선택할 확률을 나타냅니다.

4. 탐험과 활용 문제 (Exploration vs Exploitation)
- 탐험(Exploration): 새로운 정보를 탐색하기 위해 무작위 행동을 선택.
- 활용(Exploitation): 현재 학습된 최적 행동을 선택하여 보상을 극대화.
- 균형 유지가 중요하며, $epsilon$-탐욕적 정책($epsilon$-greedy)을 사용해 해결합니다.
- $epsilon$-탐욕적 정책: 초기에는 탐험을 많이하고 점차 줄여나가는 정책

# 정책 이터레이션
- **정책 이터레이션(Policy Iteration)**은 **가치 기반 학습(Value-Based Learning)**에 속함
- 정책 이터레이션은 주어진 정책 하에서 벨만 기대 방정식을 사용해 가치를 평가하고 개선
- 정책 이터레이션은 초기 정책으로 시작해서 점진적으로 개선하여 최적 정책에 도달하는 알고리즘
- 정책 이터레이션은 정책 평가와 정책 개선 단계를 번갈아 수행하면서 최적의 정책을 찾음

- 정책 평가(Policy Evaluation):
  - 현재 정책 하에 각 상태의 가치를 계산. 이 과정에서, 벨만 기대 방정식을 사용하여 모든 상태에 대해 가치 함수를 반복적으로 계산하며, 이는 현재 정책을 따랐을 때 각 상태에서 기대할 수 있는 장기적인 보상의 총합을 의미함

- 정책 개선(Policy Improvement):
  - 계산된 가치 함수 $V^\pi(s)$)를 바탕으로 정책을 업데이트.
정책이 더 이상 개선되지 않을 때(수렴할 때)까지 두 단계를 반복
상태 가치 함수를 사용하여 최적의 정책을 도출

 <img src="https://raw.githubusercontent.com/zoomKoding/zoomKoding.github.io/source/assets/_posts/RL2-12.png" width="500">


- Rule : 세모(-1)를 피해 동그라미(1)에 최단 경로로 도달하여 보상을 획득하는 것이 목적임
- 정책 평가(Evaluate) : 현재 정책에 따라 각 상태의 가치를 계산. 즉, 상태에서 가능한 모든 행동에 대한 기대 보상을 계산하고, 그 값을 바탕으로 상태의 가치를 업데이트
- 정책 개선(Improve) : 각 상태에서 가능한 행동 중, 가장 큰 보상(즉, 더 높은 상태 가치를 제공하는 행동)을 취할 확률이 더 높도록 정책 개선
Move : 현재까지의 정책을 기반으로 에이전트 이동 시작

### 알고리즘
1. **초기화**:
   - 초기 정책 $(\pi_0$)를 무작위로 설정.
2. **반복**:
   - **정책 평가**: 현재 정책 $\pi_k$에 대한 가치 함수 $(V^{\pi_k}(s)$) 계산.
     
     $$
     ^\pi(s) = \sum_{a} \pi(a|s) \sum_{s'} P(s'|s, a) \big[ R(s, a, s') + \gamma V^\pi(s') \big]
     $$
  - **정책 개선**: 가치 함수 $(V^{\pi_k}(s)$)를 사용해 새로운 정책 $(\pi_{k+1}$) 도출.

     $$
     \pi_{k+1}(s) = \arg\max_a \sum_{s'} P(s'|s, a) \big[ R(s, a, s') + \gamma V^{\pi_k}(s') \big]
     $$
3. 정책이 수렴할 때 종료.

### 특징
- **장점**: 정책과 가치 함수를 동시에 업데이트하기 때문에 수렴 속도가 빠름.
- **단점**: 정책 평가 단계가 반복적으로 수행되므로 계산 비용이 클 수 있음.

In [None]:
# 코드 출처 : https://github.com/rlcode/reinforcement-learning-kr
# 정책 이터레이션 : 각각의 파일을 py 파일로 만들어 로컬에서 실행하세요!

# environment.py
# Grid World 환경 시뮬레이션 및 GUI를 제공하는 코드입니다.

import tkinter as tk  # GUI 생성을 위한 라이브러리
from tkinter import Button  # 버튼 위젯 사용
import time  # 시간 지연을 위한 라이브러리
import numpy as np  # 배열 및 수학 계산을 위한 라이브러리
from PIL import ImageTk, Image  # 이미지를 Tkinter에 사용하기 위한 라이브러리

PhotoImage = ImageTk.PhotoImage
UNIT = 100  # 그리드의 각 셀 크기 (100 x 100 픽셀)
HEIGHT = 5  # 그리드월드의 세로 길이 (셀 단위)
WIDTH = 5  # 그리드월드의 가로 길이 (셀 단위)
TRANSITION_PROB = 1  # 상태 전이 확률
POSSIBLE_ACTIONS = [0, 1, 2, 3]  # 가능한 행동: 상, 하, 좌, 우
ACTIONS = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # 행동을 좌표로 표현 (상, 하, 좌, 우)
REWARDS = []  # 보상 설정을 위한 리스트


class GraphicDisplay(tk.Tk):
    """그리드월드 시뮬레이션을 GUI로 시각화하는 클래스"""
    def __init__(self, agent):
        super(GraphicDisplay, self).__init__()
        self.title('Policy Iteration')  # 창 제목 설정
        self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT + 50))  # 창 크기 설정
        self.texts = []  # 캔버스 텍스트 저장
        self.arrows = []  # 캔버스 화살표 저장
        self.env = Env()  # 환경 객체 생성
        self.agent = agent  # 에이전트 객체 (정책 이터레이션 수행)
        self.evaluation_count = 0  # 평가 횟수
        self.improvement_count = 0  # 개선 횟수
        self.is_moving = 0  # 이동 여부 플래그
        # 이미지 로드
        (self.up, self.down, self.left, self.right), self.shapes = self.load_images()
        # 캔버스 생성
        self.canvas = self._build_canvas()
        # 보상 텍스트 추가
        self.text_reward(2, 2, "R : 1.0")
        self.text_reward(1, 2, "R : -1.0")
        self.text_reward(2, 1, "R : -1.0")

    def _build_canvas(self):
        """GUI 캔버스 및 버튼을 생성하는 메서드"""
        canvas = tk.Canvas(self, bg='white',
                           height=HEIGHT * UNIT,
                           width=WIDTH * UNIT)
        # "Evaluate" 버튼 추가
        iteration_button = Button(self, text="Evaluate",
                                  command=self.evaluate_policy)
        iteration_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.13, HEIGHT * UNIT + 10,
                             window=iteration_button)
        # "Improve" 버튼 추가
        policy_button = Button(self, text="Improve",
                               command=self.improve_policy)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.37, HEIGHT * UNIT + 10,
                             window=policy_button)
        # "Move" 버튼 추가
        policy_button = Button(self, text="move", command=self.move_by_policy)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.62, HEIGHT * UNIT + 10,
                             window=policy_button)
        # "Reset" 버튼 추가
        policy_button = Button(self, text="reset", command=self.reset)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.87, HEIGHT * UNIT + 10,
                             window=policy_button)

        # 그리드 라인 생성
        for col in range(0, WIDTH * UNIT, UNIT):  # 세로선
            x0, y0, x1, y1 = col, 0, col, HEIGHT * UNIT
            canvas.create_line(x0, y0, x1, y1)
        for row in range(0, HEIGHT * UNIT, UNIT):  # 가로선
            x0, y0, x1, y1 = 0, row, HEIGHT * UNIT, row
            canvas.create_line(x0, y0, x1, y1)

        # 캔버스에 초기 이미지 추가
        self.rectangle = canvas.create_image(50, 50, image=self.shapes[0])  # 이동 객체
        canvas.create_image(250, 150, image=self.shapes[1])  # 장애물
        canvas.create_image(150, 250, image=self.shapes[1])  # 장애물
        canvas.create_image(250, 250, image=self.shapes[2])  # 목표 지점

        canvas.pack()

        return canvas

    def load_images(self):
        """이미지 파일을 로드하고 크기를 조정하여 반환"""
        up = PhotoImage(Image.open("../img/up.png").resize((13, 13)))
        right = PhotoImage(Image.open("../img/right.png").resize((13, 13)))
        left = PhotoImage(Image.open("../img/left.png").resize((13, 13)))
        down = PhotoImage(Image.open("../img/down.png").resize((13, 13)))
        rectangle = PhotoImage(Image.open("../img/rectangle.png").resize((65, 65)))
        triangle = PhotoImage(Image.open("../img/triangle.png").resize((65, 65)))
        circle = PhotoImage(Image.open("../img/circle.png").resize((65, 65)))
        return (up, down, left, right), (rectangle, triangle, circle)

    def reset(self):
        """환경을 초기 상태로 리셋"""
        if self.is_moving == 0:
            self.evaluation_count = 0
            self.improvement_count = 0
            # 기존 텍스트 및 화살표 삭제
            for i in self.texts:
                self.canvas.delete(i)
            for i in self.arrows:
                self.canvas.delete(i)
            # 에이전트 초기화
            self.agent.value_table = [[0.0] * WIDTH for _ in range(HEIGHT)]
            self.agent.policy_table = ([[[0.25, 0.25, 0.25, 0.25]] * WIDTH
                                        for _ in range(HEIGHT)])
            self.agent.policy_table[2][2] = []  # 목표 상태
            # 이동 객체 초기 위치로 이동
            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)

    def text_value(self, row, col, contents, font='Helvetica', size=10,
                   style='normal', anchor="nw"):
        """지정된 셀에 값을 표시하는 텍스트 추가"""
        origin_x, origin_y = 85, 70
        x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
        font = (font, str(size), style)
        text = self.canvas.create_text(x, y, fill="black", text=contents,
                                       font=font, anchor=anchor)
        return self.texts.append(text)

    def text_reward(self, row, col, contents, font='Helvetica', size=10,
                    style='normal', anchor="nw"):
        """지정된 셀에 보상을 표시하는 텍스트 추가"""
        origin_x, origin_y = 5, 5
        x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
        font = (font, str(size), style)
        text = self.canvas.create_text(x, y, fill="black", text=contents,
                                       font=font, anchor=anchor)
        return self.texts.append(text)

    def rectangle_move(self, action):
        """현재 행동에 따라 이동 객체를 움직임"""
        base_action = np.array([0, 0])
        location = self.find_rectangle()  # 현재 위치 확인
        self.render()
        if action == 0 and location[0] > 0:  # 상
            base_action[1] -= UNIT
        elif action == 1 and location[0] < HEIGHT - 1:  # 하
            base_action[1] += UNIT
        elif action == 2 and location[1] > 0:  # 좌
            base_action[0] -= UNIT
        elif action == 3 and location[1] < WIDTH - 1:  # 우
            base_action[0] += UNIT
        # 이동
        self.canvas.move(self.rectangle, base_action[0], base_action[1])

    def find_rectangle(self):
        """이동 객체의 현재 위치를 반환"""
        temp = self.canvas.coords(self.rectangle)
        x = (temp[0] / 100) - 0.5
        y = (temp[1] / 100) - 0.5
        return int(y), int(x)

    def move_by_policy(self):
        """정책에 따라 이동 객체를 이동"""
        if self.improvement_count != 0 and self.is_moving != 1:
            self.is_moving = 1

            # 초기 위치로 이동
            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)

            # 정책에 따라 이동
            x, y = self.find_rectangle()
            while len(self.agent.policy_table[x][y]) != 0:
                self.after(100,
                           self.rectangle_move(self.agent.get_action([x, y])))
                x, y = self.find_rectangle()
            self.is_moving = 0

    def draw_one_arrow(self, col, row, policy):
        """정책에 따라 해당 셀에 화살표를 그림"""
        if col == 2 and row == 2:  # 목표 지점에서는 표시 안 함
            return

        if policy[0] > 0:  # 상
            origin_x, origin_y = 50 + (UNIT * row), 10 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.up))
        if policy[1] > 0:  # 하
            origin_x, origin_y = 50 + (UNIT * row), 90 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.down))
        if policy[2] > 0:  # 좌
            origin_x, origin_y = 10 + (UNIT * row), 50 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.left))
        if policy[3] > 0:  # 우
            origin_x, origin_y = 90 + (UNIT * row), 50 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.right))

    def draw_from_policy(self, policy_table):
        """정책 테이블에 따라 모든 셀에 화살표를 그림"""
        for i in range(HEIGHT):
            for j in range(WIDTH):
                self.draw_one_arrow(i, j, policy_table[i][j])

    def print_value_table(self, value_table):
        """값 함수 테이블을 화면에 표시"""
        for i in range(WIDTH):
            for j in range(HEIGHT):
                self.text_value(i, j, value_table[i][j])

    def render(self):
        """캔버스를 다시 렌더링"""
        time.sleep(0.1)
        self.canvas.tag_raise(self.rectangle)
        self.update()

    def evaluate_policy(self):
        """정책 평가 버튼 클릭 시 실행"""
        self.evaluation_count += 1
        # 기존 텍스트 삭제
        for i in self.texts:
            self.canvas.delete(i)
        self.agent.policy_evaluation()  # 정책 평가 수행
        self.print_value_table(self.agent.value_table)

    def improve_policy(self):
        """정책 개선 버튼 클릭 시 실행"""
        self.improvement_count += 1
        # 기존 화살표 삭제
        for i in self.arrows:
            self.canvas.delete(i)
        self.agent.policy_improvement()  # 정책 개선 수행
        self.draw_from_policy(self.agent.policy_table)


class Env:
    def __init__(self):
        self.transition_probability = TRANSITION_PROB
        self.width = WIDTH
        self.height = HEIGHT
        self.reward = [[0] * WIDTH for _ in range(HEIGHT)]
        self.possible_actions = POSSIBLE_ACTIONS
        self.reward[2][2] = 1  # (2,2) 좌표 동그라미 위치에 보상 1
        self.reward[1][2] = -1  # (1,2) 좌표 세모 위치에 보상 -1
        self.reward[2][1] = -1  # (2,1) 좌표 세모 위치에 보상 -1
        self.all_state = []

        for x in range(WIDTH):
            for y in range(HEIGHT):
                state = [x, y]
                self.all_state.append(state)

    def get_reward(self, state, action):
        next_state = self.state_after_action(state, action)
        return self.reward[next_state[0]][next_state[1]]

    def state_after_action(self, state, action_index):
        action = ACTIONS[action_index]
        return self.check_boundary([state[0] + action[0], state[1] + action[1]])

    @staticmethod
    def check_boundary(state):
        state[0] = (0 if state[0] < 0 else WIDTH - 1
                    if state[0] > WIDTH - 1 else state[0])
        state[1] = (0 if state[1] < 0 else HEIGHT - 1
                    if state[1] > HEIGHT - 1 else state[1])
        return state

    def get_transition_prob(self, state, action):
        return self.transition_probability

    def get_all_states(self):
        return self.all_state

In [None]:
# policy_iteration
import random
from environment import GraphicDisplay, Env

class PolicyIteration:
    def __init__(self, env):
        # 환경 객체를 받아서 초기화합니다.
        self.env = env
        # 각 상태에 대한 가치 함수를 2차원 리스트로 초기화합니다. 모든 값을 0으로 설정합니다.
        self.value_table = [[0.0] * env.width for _ in range(env.height)]
        # 모든 가능한 행동에 대해 동일한 확률을 가지는 정책 테이블을 초기화합니다.
        self.policy_table = [[[0.25, 0.25, 0.25, 0.25]] * env.width for _ in range(env.height)]
        # 특정 상태 (여기서는 [2, 2])는 종료 상태로 설정합니다. 해당 상태의 정책은 비어 있습니다.
        self.policy_table[2][2] = []
        # 감가율을 설정합니다. 이는 미래 보상의 현재 가치를 결정합니다.
        self.discount_factor = 0.9

    def policy_evaluation(self):
        # 다음 가치 함수를 저장할 임시 테이블을 초기화합니다.
        next_value_table = [[0.00] * self.env.width for _ in range(self.env.height)]
        # 모든 상태에 대해 가치 함수를 업데이트합니다.
        for state in self.env.get_all_states():
            value = 0.0
            # 종료 상태의 가치는 0입니다.
            if state == [2, 2]:
                next_value_table[state[0]][state[1]] = value
                continue

            # 현재 정책에 따른 가치 함수 업데이트 (벨만 기대 방정식 활용)
            for action in self.env.possible_actions:
                next_state = self.env.state_after_action(state, action)
                reward = self.env.get_reward(state, action)
                next_value = self.get_value(next_state)
                value += (self.get_policy(state)[action] * (reward + self.discount_factor * next_value))
                if state == [1, 0]:
                    print((self.get_policy(state)[action],(reward , self.discount_factor , next_value)))
                    print(next_state)
            next_value_table[state[0]][state[1]] = round(value, 4)

        # 업데이트된 가치 테이블로 현재 가치 테이블을 교체합니다.
        self.value_table = next_value_table

    def policy_improvement(self):
        # 정책을 개선하기 위한 임시 정책 테이블을 생성합니다.
        next_policy = self.policy_table
        for state in self.env.get_all_states():
            if state == [2, 2]:
                continue
            value = -99999
            max_index = []
            result = [0.0, 0.0, 0.0, 0.0]  # 각 행동에 대한 최적 정책 확률을 저장합니다.

            # 각 행동에 대해 '보상 + 할인된 가치 함수'를 계산하여 최적의 행동을 찾습니다.
            for index, action in enumerate(self.env.possible_actions):
                next_state = self.env.state_after_action(state, action)
                reward = self.env.get_reward(state, action)
                next_value = self.get_value(next_state)
                temp = reward + self.discount_factor * next_value

                # 최대 보상을 제공하는 행동(들)을 찾습니다.
                if temp == value:
                    max_index.append(index)
                elif temp > value:
                    value = temp
                    max_index.clear()
                    max_index.append(index)

            # 최적의 행동들에 대해 동일한 확률을 할당합니다.
            prob = 1 / len(max_index)
            for index in max_index:
                result[index] = prob

            next_policy[state[0]][state[1]] = result

        # 업데이트된 정책 테이블로 현재 정책 테이블을 교체합니다.
        self.policy_table = next_policy
        print(next_policy)

    def get_action(self, state):
        # 현재 정책에 따라 무작위로 행동을 선택합니다.
        random_pick = random.randrange(100) / 100
        policy = self.get_policy(state)
        policy_sum = 0.0
        for index, value in enumerate(policy):
            policy_sum += value
            if random_pick < policy_sum:
                return index

    def get_policy(self, state):
        # 특정 상태의 정책을 반환합니다. 종료 상태라면 0을 반환합니다.
        if state == [2, 2]:
            return 0.0
        return self.policy_table[state[0]][state[1]]

    def get_value(self, state):
        # 가치 테이블에서 특정 상태의 가치를 반환합니다.
        return round(self.value_table[state[0]][state[1]], 2)

if __name__ == "__main__":
    env = Env()
    policy_iteration = PolicyIteration(env)
    grid_world = GraphicDisplay(policy_iteration)
    grid_world.mainloop()

# 가치 이터레이션
- 가치 이터레이션(Value Iteration)은 가치 기반 학습(Value-Based Learning)에 속함
- 가치 이터레이션은 각 반복에서 직접적으로 최적의 가치 함수를 추정하고, 이를 바탕으로 최적의 정책을 결정함.
- 이 방법은 정책 평가와 정책 개선 단계를 하나로 합쳐 더 효율적으로 계산할 수 있게 해줌.
- 가치 함수 업데이트:
  - 모든 상태에 대해 가능한 모든 행동의 결과를 고려하여, 각 상태의 가치 함수를 업데이트함
  - 벨만 최적 방정식을 사용하여, 각 상태에서 가능한 행동 중에서 최대의 기대 가치를 제공하는 행동을 통해 가치 함수를 업데이트함.
  - 이렇게 계산된 가치 함수를 기반으로, 각 상태에서 가능한 행동들 중에서 가장 - 높은 가치를 제공하는 행동을 선택하여 최적의 정책을 결정함.
- 가치 이터레이션은 일반적으로 정책 이터레이션보다 더 빠르게 수렴할 수 있으며, 계산 과정이 좀 더 단순함

 <img src="https://raw.githubusercontent.com/zoomKoding/zoomKoding.github.io/source/assets/_posts/RL2-14.png" width="700">


### 알고리즘
1. **초기화**:
   - $V(s)$를 임의의 값으로 초기화 (예: $V(s) = 0$).
2. **반복**:
   - 벨만 최적 방정식을 사용해 \(V(s)\)를 업데이트:

     $$
     V(s) \leftarrow \max_a \sum_{s'} P(s'|s, a) \big[ R(s, a, s') + \gamma V(s') \big]
     $$

   - $V(s)$가 수렴할 때까지 반복.

3. 최적 가치 함수 $V^*(s)$를 바탕으로 최적 정책 계산:

$$
     \pi^*(s) = \arg\max_a \sum_{s'} P(s'|s, a) \big[ R(s, a, s') + \gamma V^*(s') \big]
$$

### 특징
- **장점**: 정책 평가와 정책 개선 단계를 한 번에 수행하므로 계산이 간단하고 직관적.
- **단점**: 더 많은 반복이 필요할 수 있음.


value_iteration.py 파일 실행
- Rule : 세모(-1)를 피해 동그라미(1)에 최단 경로로 도달하여 보상을 획득하는 것이 목적임
- 계산(Calculate) : 각 상태에 대해 최적의 행동을 결정하기 위한 가치를 계산하고 업데이트
- Print Policy : 현재 추정된 최적의 정책을 시각적으로 표시
- Move : 현재까지의 정책을 기반으로 에이전트 이동 시작
- Clear : 최초 실행 상태로 초기화

In [None]:
# 코드 출처 : https://github.com/rlcode/reinforcement-learning-kr
# 가치 이터레이션 : 각각의 파일을 py 파일로 만들어 로컬에서 실행하세요!

# environment.py
import tkinter as tk
import time
import numpy as np
import random
from PIL import ImageTk, Image

PhotoImage = ImageTk.PhotoImage
UNIT = 100  # 픽셀 수
HEIGHT = 5  # 그리드월드 세로
WIDTH = 5  # 그리드월드 가로
TRANSITION_PROB = 1
POSSIBLE_ACTIONS = [0, 1, 2, 3]  # 상, 하, 좌, 우
ACTIONS = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # 좌표로 나타낸 행동
REWARDS = []


class GraphicDisplay(tk.Tk):
    def __init__(self, value_iteration):
        super(GraphicDisplay, self).__init__()
        self.title('Value Iteration')
        self.geometry('{0}x{1}'.format(HEIGHT * UNIT, HEIGHT * UNIT + 50))
        self.texts = []
        self.arrows = []
        self.env = Env()
        self.agent = value_iteration
        self.iteration_count = 0
        self.improvement_count = 0
        self.is_moving = 0
        (self.up, self.down, self.left,
         self.right), self.shapes = self.load_images()
        self.canvas = self._build_canvas()
        self.text_reward(2, 2, "R : 1.0")
        self.text_reward(1, 2, "R : -1.0")
        self.text_reward(2, 1, "R : -1.0")

    def _build_canvas(self):
        canvas = tk.Canvas(self, bg='white',
                           height=HEIGHT * UNIT,
                           width=WIDTH * UNIT)
        # 버튼 초기화
        iteration_button = tk.Button(self, text="Calculate",
                                     command=self.calculate_value)
        iteration_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.13, (HEIGHT * UNIT) + 10,
                             window=iteration_button)

        policy_button = tk.Button(self, text="Print Policy",
                                  command=self.print_optimal_policy)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.37, (HEIGHT * UNIT) + 10,
                             window=policy_button)

        policy_button = tk.Button(self, text="Move",
                                  command=self.move_by_policy)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.62, (HEIGHT * UNIT) + 10,
                             window=policy_button)

        policy_button = tk.Button(self, text="Clear", command=self.clear)
        policy_button.configure(width=10, activebackground="#33B5E5")
        canvas.create_window(WIDTH * UNIT * 0.87, (HEIGHT * UNIT) + 10,
                             window=policy_button)

        # 그리드 생성
        for col in range(0, WIDTH * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = col, 0, col, HEIGHT * UNIT
            canvas.create_line(x0, y0, x1, y1)
        for row in range(0, HEIGHT * UNIT, UNIT):  # 0~400 by 80
            x0, y0, x1, y1 = 0, row, HEIGHT * UNIT, row
            canvas.create_line(x0, y0, x1, y1)

        # 캔버스에 이미지 추가
        self.rectangle = canvas.create_image(50, 50, image=self.shapes[0])
        canvas.create_image(250, 150, image=self.shapes[1])
        canvas.create_image(150, 250, image=self.shapes[1])
        canvas.create_image(250, 250, image=self.shapes[2])

        canvas.pack()

        return canvas

    def load_images(self):
        PhotoImage = ImageTk.PhotoImage
        up = PhotoImage(Image.open("../img/up.png").resize((13, 13)))
        right = PhotoImage(Image.open("../img/right.png").resize((13, 13)))
        left = PhotoImage(Image.open("../img/left.png").resize((13, 13)))
        down = PhotoImage(Image.open("../img/down.png").resize((13, 13)))
        rectangle = PhotoImage(
            Image.open("../img/rectangle.png").resize((65, 65)))
        triangle = PhotoImage(
            Image.open("../img/triangle.png").resize((65, 65)))
        circle = PhotoImage(Image.open("../img/circle.png").resize((65, 65)))
        return (up, down, left, right), (rectangle, triangle, circle)

    def clear(self):

        if self.is_moving == 0:
            self.iteration_count = 0
            self.improvement_count = 0
            for i in self.texts:
                self.canvas.delete(i)

            for i in self.arrows:
                self.canvas.delete(i)

            self.agent.value_table = [[0.0] * WIDTH for _ in range(HEIGHT)]

            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)

    def reset(self):
        self.update()
        time.sleep(0.5)
        self.canvas.delete(self.rectangle)
        return self.canvas.coords(self.rectangle)

    def text_value(self, row, col, contents, font='Helvetica', size=12,
                   style='normal', anchor="nw"):
        origin_x, origin_y = 85, 70
        x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
        font = (font, str(size), style)
        text = self.canvas.create_text(x, y, fill="black", text=contents,
                                       font=font, anchor=anchor)
        return self.texts.append(text)

    def text_reward(self, row, col, contents, font='Helvetica', size=12,
                    style='normal', anchor="nw"):
        origin_x, origin_y = 5, 5
        x, y = origin_y + (UNIT * col), origin_x + (UNIT * row)
        font = (font, str(size), style)
        text = self.canvas.create_text(x, y, fill="black", text=contents,
                                       font=font, anchor=anchor)
        return self.texts.append(text)

    def rectangle_move(self, action):
        base_action = np.array([0, 0])
        location = self.find_rectangle()
        self.render()
        if action == 0 and location[0] > 0:  # up
            base_action[1] -= UNIT
        elif action == 1 and location[0] < HEIGHT - 1:  # down
            base_action[1] += UNIT
        elif action == 2 and location[1] > 0:  # left
            base_action[0] -= UNIT
        elif action == 3 and location[1] < WIDTH - 1:  # right
            base_action[0] += UNIT

        self.canvas.move(self.rectangle, base_action[0],
                         base_action[1])  # move agent

    def find_rectangle(self):
        temp = self.canvas.coords(self.rectangle)
        x = (temp[0] / 100) - 0.5
        y = (temp[1] / 100) - 0.5
        return int(y), int(x)

    def move_by_policy(self):

        if self.improvement_count != 0 and self.is_moving != 1:
            self.is_moving = 1
            x, y = self.canvas.coords(self.rectangle)
            self.canvas.move(self.rectangle, UNIT / 2 - x, UNIT / 2 - y)

            x, y = self.find_rectangle()
            while len(self.agent.get_action([x, y])) != 0:
                action = random.sample(self.agent.get_action([x, y]), 1)[0]
                self.after(100, self.rectangle_move(action))
                x, y = self.find_rectangle()
            self.is_moving = 0

    def draw_one_arrow(self, col, row, action):
        if col == 2 and row == 2:
            return
        if action == 0:  # up
            origin_x, origin_y = 50 + (UNIT * row), 10 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.up))
        elif action == 1:  # down
            origin_x, origin_y = 50 + (UNIT * row), 90 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.down))
        elif action == 3:  # right
            origin_x, origin_y = 90 + (UNIT * row), 50 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.right))
        elif action == 2:  # left
            origin_x, origin_y = 10 + (UNIT * row), 50 + (UNIT * col)
            self.arrows.append(self.canvas.create_image(origin_x, origin_y,
                                                        image=self.left))

    def draw_from_values(self, state, action_list):
        i = state[0]
        j = state[1]
        for action in action_list:
            self.draw_one_arrow(i, j, action)

    def print_values(self, values):
        for i in range(WIDTH):
            for j in range(HEIGHT):
                self.text_value(i, j, values[i][j])

    def render(self):
        time.sleep(0.1)
        self.canvas.tag_raise(self.rectangle)
        self.update()

    def calculate_value(self):
        self.iteration_count += 1
        for i in self.texts:
            self.canvas.delete(i)
        self.agent.value_iteration()
        self.print_values(self.agent.value_table)

    def print_optimal_policy(self):
        self.improvement_count += 1
        for i in self.arrows:
            self.canvas.delete(i)
        for state in self.env.get_all_states():
            action = self.agent.get_action(state)
            self.draw_from_values(state, action)


class Env:
    def __init__(self):
        self.transition_probability = TRANSITION_PROB
        self.width = WIDTH  # Width of Grid World
        self.height = HEIGHT  # Height of GridWorld
        self.reward = [[0] * WIDTH for _ in range(HEIGHT)]
        self.possible_actions = POSSIBLE_ACTIONS
        self.reward[2][2] = 1  # reward 1 for circle
        self.reward[1][2] = -1  # reward -1 for triangle
        self.reward[2][1] = -1  # reward -1 for triangle
        self.all_state = []

        for x in range(WIDTH):
            for y in range(HEIGHT):
                state = [x, y]
                self.all_state.append(state)

    def get_reward(self, state, action):
        next_state = self.state_after_action(state, action)
        return self.reward[next_state[0]][next_state[1]]

    def state_after_action(self, state, action_index):
        action = ACTIONS[action_index]
        return self.check_boundary([state[0] + action[0], state[1] + action[1]])

    @staticmethod
    def check_boundary(state):
        state[0] = (0 if state[0] < 0 else WIDTH - 1
        if state[0] > WIDTH - 1 else state[0])
        state[1] = (0 if state[1] < 0 else HEIGHT - 1
        if state[1] > HEIGHT - 1 else state[1])
        return state

    def get_transition_prob(self, state, action):
        return self.transition_probability

    def get_all_states(self):
        return self.all_state

In [None]:
# value_iteration.py
# -*- coding: utf-8 -*-
from environment import GraphicDisplay, Env

class ValueIteration:
    def __init__(self, env):
        # 환경 객체 생성
        self.env = env
        # 가치 함수를 2차원 리스트로 초기화
        self.value_table = [[0.0] * env.width for _ in range(env.height)]
        # 감가율
        self.discount_factor = 0.9

    # 가치 이터레이션
    # 벨만 최적 방정식을 통해 다음 가치 함수 계산
    def value_iteration(self):
        next_value_table = [[0.0] * self.env.width for _ in
                            range(self.env.height)]
        for state in self.env.get_all_states():
            if state == [2, 2]:
                next_value_table[state[0]][state[1]] = 0.0
                continue
            # 가치 함수를 위한 빈 리스트
            value_list = []

            # 가능한 모든 행동에 대해 계산
            for action in self.env.possible_actions:
                next_state = self.env.state_after_action(state, action)
                reward = self.env.get_reward(state, action)
                next_value = self.get_value(next_state)
                value_list.append((reward + self.discount_factor * next_value))
            # 최댓값을 다음 가치 함수로 대입
            next_value_table[state[0]][state[1]] = round(max(value_list), 2)
        self.value_table = next_value_table

    # 현재 가치 함수로부터 행동을 반환
    def get_action(self, state):
        action_list = []
        max_value = -99999

        if state == [2, 2]:
            return []

        # 모든 행동에 대해 큐함수 (보상 + (감가율 * 다음 상태 가치함수))를 계산
        # 최대 큐 함수를 가진 행동(복수일 경우 여러 개)을 반환
        for action in self.env.possible_actions:

            next_state = self.env.state_after_action(state, action)
            reward = self.env.get_reward(state, action)
            next_value = self.get_value(next_state)
            value = (reward + self.discount_factor * next_value)

            if value > max_value:
                action_list.clear()
                action_list.append(action)
                max_value = value
            elif value == max_value:
                action_list.append(action)

        return action_list

    def get_value(self, state):
        return round(self.value_table[state[0]][state[1]], 2)

if __name__ == "__main__":
    env = Env()
    value_iteration = ValueIteration(env)
    grid_world = GraphicDisplay(value_iteration)
    grid_world.mainloop()