In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import time
import copy

### 그림그리는 함수

In [None]:
def show_v_table_small(v_table, env):
    for i in range(env.reward.shape[0]):        
        print("+----------"*env.reward.shape[1])
        print("|", end="")
        for j in range(env.reward.shape[1]):
            print("{0:8.2f}  |".format(v_table[i,j]),end="")
        print()
    print("+----------"*env.reward.shape[1])

# V table 그리기    
def show_v_table(v_table, env):    
    for i in range(env.reward.shape[0]):        
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    print("                 |",end="")
                if k==1:
                        print("   {0:8.2f}      |".format(v_table[i,j]),end="")
                if k==2:
                    print("                 |",end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")
    
# Q table 그리기
def show_q_table(q_table,env):
    for i in range(env.reward.shape[0]):
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    print("{0:10.2f}       |".format(q_table[i,j,0]),end="")
                if k==1:
                    print("{0:6.2f}    {1:6.2f} |".format(q_table[i,j,3],q_table[i,j,1]),end="")
                if k==2:
                    print("{0:10.2f}       |".format(q_table[i,j,2]),end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")
    

# 정책 policy 화살표로 그리기
def show_q_table_arrow(q_table,env):
    for i in range(env.reward.shape[0]):        
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    if np.max(q[i,j,:]) == q[i,j,0]:
                        print("        ↑       |",end="")
                    else:
                        print("                 |",end="")
                if k==1:                    
                    if np.max(q[i,j,:]) == q[i,j,1] and np.max(q[i,j,:]) == q[i,j,3]:
                        print("      ←  →     |",end="")
                    elif np.max(q[i,j,:]) == q[i,j,1]:
                        print("          →     |",end="")
                    elif np.max(q[i,j,:]) == q[i,j,3]:
                        print("      ←         |",end="")
                    else:
                        print("                 |",end="")
                if k==2:
                    if np.max(q[i,j,:]) == q[i,j,2]:
                        print("        ↓       |",end="")
                    else:
                        print("                 |",end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")    
    
# 정책 policy 화살표로 그리기
def show_policy_small(policy,env):
    for i in range(env.reward.shape[0]):        
        print("+----------"*env.reward.shape[1],end="")
        print("+")
        print("|", end="")
        for j in range(env.reward.shape[1]):
            if env.reward_list1[i][j] == "road":
                if policy[i,j] == 0:
                    print("   ↑     |",end="")
                elif policy[i,j] == 1:
                    print("   →     |",end="")
                elif policy[i,j] == 2:
                    print("   ↓     |",end="")
                elif policy[i,j] == 3:
                    print("   ←     |",end="")
            else:
                print("          |",end="")
        print()
    print("+----------"*env.reward.shape[1],end="")
    print("+")
    
# 정책 policy 화살표로 그리기
def show_policy(policy,env):
    for i in range(env.reward.shape[0]):        
        print("+-----------------"*env.reward.shape[1],end="")
        print("+")
        for k in range(3):
            print("|",end="")
            for j in range(env.reward.shape[1]):
                if k==0:
                    print("                 |",end="")
                if k==1:
                    if policy[i,j] == 0:
                        print("      ↑         |",end="")
                    elif policy[i,j] == 1:
                        print("      →         |",end="")
                    elif policy[i,j] == 2:
                        print("      ↓         |",end="")
                    elif policy[i,j] == 3:
                        print("      ←         |",end="")
                if k==2:
                    print("                 |",end="")
            print()
    print("+-----------------"*env.reward.shape[1],end="")
    print("+")

### Agent 구현

In [None]:
class Agent():
    
    # 1. 행동에 따른 에이전트의 좌표 이동(위, 오른쪽, 아래, 왼쪽) 
    action = np.array([[-1,0],[0,1],[1,0],[0,-1]])
    
    # 2. 각 행동별 선택확률
    select_action_pr = np.array([0.25,0.25,0.25,0.25])
    
    # 3. 에이전트의 초기 위치 저장
    def __init__(self):
        self.pos = (0,0)
    
    # 4. 에이전트의 위치 저장
    def set_pos(self,position):
        self.pos = position
        return self.pos
    
    # 5. 에이전트의 위치 불러오기
    def get_pos(self):
        return self.pos

### Environment 구현

In [None]:
class Environment():
    
    # 1. 미로밖(절벽), 길, 목적지와 보상 설정
    cliff = -3
    road = -1
    goal = 1
    
    # 2. 목적지 좌표 설정
    goal_position = [2,2]
    
    # 3. 보상 리스트 숫자
    reward_list = [[road,road,road],
                   [road,road,road],
                   [road,road,goal]]
    
    # 4. 보상 리스트 문자
    reward_list1 = [["road","road","road"],
                    ["road","road","road"],
                    ["road","road","goal"]]
    
    # 5. 보상 리스트를 array로 설정
    def __init__(self):
        self.reward = np.asarray(self.reward_list)    

    # 6. 선택된 에이전트의 행동 결과 반환 (미로밖일 경우 이전 좌표로 다시 복귀)
    def move(self, agent, action):
        
        done = False
        
        # 6.1 행동에 따른 좌표 구하기
        new_pos = agent.pos + agent.action[action]
        
        # 6.2 현재좌표가 목적지 인지확인
        if self.reward_list1[agent.pos[0]][agent.pos[1]] == "goal":
            reward = self.goal
            observation = agent.set_pos(agent.pos)
            done = True
        # 6.3 이동 후 좌표가 미로 밖인 확인    
        elif new_pos[0] < 0 or new_pos[0] >= self.reward.shape[0] or new_pos[1] < 0 or new_pos[1] >= self.reward.shape[1]:
            reward = self.cliff
            observation = agent.set_pos(agent.pos)
            done = True
        # 6.4 이동 후 좌표가 길이라면
        else:
            observation = agent.set_pos(new_pos)
            reward = self.reward[observation[0],observation[1]]
            
        return observation, reward, done

### 재귀적으로 상태 가치함수를 계산하는 함수

In [None]:
# 상태 가치 계산
def state_value_function(env,agent,G,max_step,now_step):
    
    # 1. 감가율 설정
    gamma = 0.9
    
# 2. 현재 위치가 도착지점인지 확인
    if env.reward_list1[agent.pos[0]][agent.pos[1]] == "goal":
        return env.goal
    
# 3. 마지막 상태는 보상만 계산
    if (max_step == now_step):
        pos1 = agent.get_pos()
        
        # 3.1 가능한 모든 행동의 보상을 계산
        for i in range(len(agent.action)):
            agent.set_pos(pos1)
            observation, reward, done = env.move(agent,i)
            G += agent.select_action_pr[i] * reward
            
        return G
    
    # 4. 현재 상태의 보상을 계산한 후 다음 step으로 이동
    else:
        
        # 4.1현재 위치 저장
        pos1 = agent.get_pos()
        
        # 4.2 현재 위치에서 가능한 모든 행동을 조사한 후 이동
        for i in range(len(agent.action)):
            observation, reward, done = env.move(agent,i)      
            # 4.2.1 현재 상태에서 보상을 계산
            G += agent.select_action_pr[i] * reward

            # 4.2.2 이동 후 위치 확인 : 미로밖, 벽, 구멍인 경우 이동전 좌표로 다시 이동
            if done == True:
                if observation[0] < 0 or observation[0] >= env.reward.shape[0] or observation[1] < 0 or observation[1] >= env.reward.shape[1]:
                    agent.set_pos(pos1)

            # 4.2.3 다음 step을 계산
            next_v = state_value_function(env, agent, 0, max_step, now_step+1)
            G += agent.select_action_pr[i] * gamma * next_v

            # 4.2.4 현재 위치를 복구
            agent.set_pos(pos1)

        return G


### 𝜖−𝑔𝑟𝑒𝑒𝑑𝑦, 𝑔𝑟𝑒𝑒𝑑𝑦  함수 작성

In [None]:
#  𝜖−𝑔𝑟𝑒𝑒𝑑𝑦, 𝑔𝑟𝑒𝑒𝑑𝑦  함수 작성
def e_greedy(Q_table,agent,epsilon):
    pos = agent.get_pos()
    greedy_action = np.argmax(Q_table[pos[0],pos[1],:])
    pr = np.zeros(4)
    for i in range(len(agent.action)):
        if i == greedy_action:
            pr[i] = 1 - epsilon + epsilon/len(agent.action)
        else:
            pr[i] = epsilon / len(agent.action)

    return np.random.choice(range(0,len(agent.action)), p=pr)    

def greedy(Q_table,agent,epsilon):
    pos = agent.get_pos()
    return np.argmax(Q_table[pos[0],pos[1],:])   

### TD(0) coltrol : Q-learning

In [None]:
#  TD(0) contro : Q-learning

# 환경, 에이전트를 초기화
env = Environment()
agent = Agent()
gamma = 0.9
np.random.seed(0)

# 모든 𝑠∈𝑆,𝑎∈𝐴(𝑆)에 대해 초기화:
# 𝑄(𝑠,𝑎)←임의의 값
Q_table = np.random.rand(env.reward.shape[0], env.reward.shape[1],len(agent.action))

# Q(𝑡𝑒𝑟𝑚𝑖𝑛𝑎𝑙−𝑠𝑡𝑎𝑡𝑒,𝑎)=0
Q_table[2,2,:] = 0

max_episode = 10000
max_step = 100

print("start TD(0) control : Q-learning")
alpha = 0.1
epsilon = 0.8

# 각 에피소드에 대해 반복 :
for epi in tqdm(range(max_episode)):
    dleta = 0
    # S 를 초기화
    i = 0
    j = 0
    agent.set_pos([i,j])

    # 에피소드의 각 스텝에 대해 반복 :
    for k in range(max_step):
        # s 에서 행동 정책(Behavior policy)으로 행동 a를 선택 ( 예 : ε-greedy
        pos = agent.get_pos()
        action = e_greedy(Q_table,agent,epsilon)
        # 행동 a 를 취한 후 보상 r과 다음 상태 s^'를 관측
        observation, reward, done = env.move(agent, action)

        # s^' 에서 타깃 정책(Target policy)으로 행동 a^'를 선택 ( 예 : greedy)

        next_action = greedy(Q_table,agent,epsilon)
        
        # Q(s,a)←Q(s,a) + α[r+𝛾  maxa'𝑄(s',a')−𝑄(s,a)] 
        Q_table[pos[0],pos[1],action] += alpha * (reward + gamma * Q_table[observation[0], observation[1],next_action] - Q_table[pos[0],pos[1],action])
        # s가 마지막 상태라면 종료
        if done == True:
            break
        
# 학습된 정책에서 최적 행동 추출
optimal_policy = np.zeros((env.reward.shape[0], env.reward.shape[1]))
for i in range(env.reward.shape[0]):
    for j in range(env.reward.shape[1]):
        optimal_policy[i,j] = np.argmax(Q_table[i,j,:])

print("Q-learning : Q(s,a)")
show_q_table(np.round(Q_table,2),env)
print("Q-learning :optimal policy")
show_policy(optimal_policy,env)


start TD(0) control : Q-learning


100%|██████████████████████████████████████████████████████████████████████████| 10000/10000 [00:01<00:00, 5120.79it/s]


Q-learning : Q(s,a)
+-----------------+-----------------+-----------------+
|      1.10       |      2.56       |      4.17       |
|  1.10      4.56 |  3.10      6.17 |  4.55      4.17 |
|      4.56       |      6.18       |      7.97       |
+-----------------+-----------------+-----------------+
|      3.10       |      4.56       |      6.17       |
|  2.56      6.18 |  4.56      7.98 |  6.18      5.97 |
|      6.18       |      7.97       |      9.97       |
+-----------------+-----------------+-----------------+
|      4.56       |      6.18       |      9.97       |
|  4.17      7.97 |  6.17      9.97 |  9.97      9.97 |
|      4.17       |      5.97       |      9.97       |
+-----------------+-----------------+-----------------+
Q-learning :optimal policy
+-----------------+-----------------+-----------------+
|                 |                 |                 |
|      ↓         |      ↓         |      ↓         |
|                 |                 |                 |
+---

In [None]:
class Environment():
    
    # 1. 미로밖(절벽), 길, 목적지와 보상 설정
    cliff = -3
    road = -1
    goal = 1
    
    # 2. 목적지 좌표 설정
    goal_position = [2,2]
    
    # 3. 보상 리스트 숫자
    reward_list = [[road,road,road],
                   [road,road,road],
                   [road,road,goal]]
    
    # 4. 보상 리스트 문자
    reward_list1 = [["road","road","road"],
                    ["road","road","road"],
                    ["road","road","goal"]]
    
    # 5. 보상 리스트를 array로 설정
    def __init__(self):
        self.reward = np.asarray(self.reward_list)    

    # 6. 선택된 에이전트의 행동 결과 반환 (미로밖일 경우 이전 좌표로 다시 복귀)
    def move(self, agent, action):
        
        done = False
        
        # 6.1 행동에 따른 좌표 구하기
        new_pos = agent.pos + agent.action[action]
        
        # 6.2 현재좌표가 목적지 인지확인
        if self.reward_list1[agent.pos[0]][agent.pos[1]] == "goal":
            reward = self.goal
            observation = agent.set_pos(agent.pos)
            done = True
        # 6.3 이동 후 좌표가 미로 밖인 확인    
        elif new_pos[0] < 0 or new_pos[0] >= self.reward.shape[0] or new_pos[1] < 0 or new_pos[1] >= self.reward.shape[1]:
            reward = self.cliff
            observation = agent.set_pos(agent.pos)
            done = True
        # 6.4 이동 후 좌표가 길이라면
        else:
            observation = agent.set_pos(new_pos)
            reward = self.reward[observation[0],observation[1]]
            
        return observation, reward, done