<a href="https://colab.research.google.com/github/polyjuice06/machine_learning/blob/main/RL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [76]:
import numpy as np

LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

MAP = [
        "SFFF",
        "FHFH",
        "FFFH",
        "HFFG"
    ]

class FrozenLakeEnv():

    """
    Winter is here. You and your friends were tossing around a frisbee at the park
    when you made a wild throw that left the frisbee out in the middle of the lake.
    The water is mostly frozen, but there are a few holes where the ice has melted.
    If you step into one of those holes, you'll fall into the freezing water.
    The surface is described using a grid like the following

        SFFF
        FHFH
        FFFH
        HFFG

    S : starting point, safe
    F : frozen surface, safe
    H : hole, fall to your doom
    G : goal, where the frisbee is located
    The episode ends when you reach the goal or fall in a hole.
    You receive a reward of 1 if you reach the goal, and zero otherwise.
    """

    def __init__(self, is_slippery):

        self.map = np.asarray(MAP, dtype='c')
        nrow, ncol = 4, 4
        self.nA = 4
        self.nS = nrow * ncol

        def to_s(row, col):
            return row * ncol + col

        def move(row, col, a):
            if a == 0:  # left
                col = max(col - 1, 0)
            elif a == 1:  # down
                row = min(row + 1, nrow - 1)
            elif a == 2:  # right
                col = min(col + 1, ncol - 1)
            elif a == 3:  # up
                row = max(row - 1, 0)
            return (row, col)

        mdp = list( )
        for i in range(self.nS):
            mdp.append([[], [], [], []])

        for row in range(nrow):
            for col in range(ncol):
                s = to_s(row, col)
                for a in range(4):
                    letter = self.map[row, col]
                    if letter in b'GH':
                        mdp[s][a].append([1.0, s, 0])
                    else:
                        if is_slippery:
                            for b in [(a - 1) % 4, a, (a + 1) % 4]:
                                newrow, newcol = move(row, col, b)
                                newstate = to_s(newrow, newcol)
                                newletter = self.map[newrow, newcol]
                                rew = float(newletter == b'G')
                                mdp[s][a].append((1.0 / 3.0, newstate, rew))
                        else:
                            newrow, newcol = move(row, col, a)
                            newstate = to_s(newrow, newcol)
                            newletter = self.map[newrow][newcol]
                            rew = float(newletter == b'G')
                            mdp[s][a].append([1.0, newstate, rew])

        self.MDP = mdp

In [93]:
# 교수님 구현
import numpy as np

def policy_iteration(env, gamma=0.99, theta=1e-8):
    V = np.zeros(env.nS)
    policy = np.ones([env.nS, env.nA]) / env.nA
    policy_stable = True
    
    while policy_stable:
        # Policy Evaluation
        while True:
            delta = 0
            new_V = np.zeros(env.nS)
            q = np.zeros([env.nS, env.nA]) / env.nA
            for s in range(env.nS):
                # 아래의 For문으로 각 action에 대한 q 값을 구함
                for a in range(env.nA):
                    for i in range(len(env.MDP[s][a])):
                        x = env.MDP[s][a][i]
                        q[s][a] += x[0] * (x[2] + gamma * V[x[1]])
                # deterministic, greedy한 action에 해당하는 q값이 new_V[s]가 됨
                new_V[s] = q[s][np.argmax(policy[s])]
            # 모든 s에 대해서 V 업데이트 완료, 업데이트 전/후 차이가 theta보다 작으면 수렴으로 판단
            difference = max(np.abs(new_V - V))
            delta = max(delta, difference)
            V = new_V
            if delta < theta:
                break
        # Policy Improvement
        differ = np.zeros(env.nS) # policy update가 되었는지 판단하기 위한 array
        for s in range(env.nS):
            old_action = np.argmax(policy[s])
            policy[s] = np.zeros(env.nA)
            new_action = np.argmax(q[s])
            policy[s][new_action] = 1 # policy update
            # update가 되었으면 False, 그대로이면 True
            if old_action == new_action:
                differ[s] = True
            else:
                differ[s] = False
        # 모든 state에서 update가 안 되었다면 (더 이상의 Update가 없다면) break
        # 아직 Update가 된다면 다시 Policy Evaluation으로 
        if np.all(differ) == True:
            break

    return policy, V

def value_iteration(env, gamma=0.99, theta=1e-8):
    V = np.zeros(env.nS)
    policy = np.ones([env.nS, env.nA]) / env.nA
    while True:
        delta = 0
        new_V = np.zeros(env.nS)
        for s in range(env.nS):
            policy[s] = np.zeros(env.nA) 
            q = np.zeros(env.nA)
            # state s에서, 4가지의 모든 action에 대해서 Q값을 구함
            for a in range(env.nA):
                for i in range(len(env.MDP[s][a])):
                    x = env.MDP[s][a][i]
                    q[a] += x[0] * (x[2] + gamma * V[x[1]])
            # new_V[s]를 max(q)로 update하며, argmax(q)가 action이 되고 policy도 update를 함
            new_V[s] = max(q)
            action = np.argmax(q)
            policy[s] = np.zeros(env.nA)
            policy[s][action] = 1
        # 모든 state에 대해서 for loop를 다 돌았다면 이러한 반복을 언제 멈출지 결정해야 함
        # 기존의 V와 update한 new_V 배열의 차이 중 가장 큰 값이 delta보다 크면 delta 업데이트
        # 그리고 기존의 V를 업데이트한 new_V로 넣어줌
        # 차이 delta가 theta보다 작다면 반복문 break
        difference = max(np.abs(V - new_V))
        delta = max(delta, difference)
        V = new_V
        if delta < theta:
            break
    return policy, V

In [94]:
while True:
    mode = int(input("1.Not Slippery, 2.Slippery: "))
    if mode == 1:
        env = FrozenLakeEnv(is_slippery=False)
        break
    elif mode == 2:
        env = FrozenLakeEnv(is_slippery=True)
        break
    else:
        print("잘못 입력했습니다.")

# 환경 state 개수 및 action 개수
#print(env.nS, env.nA)

while True:
    mode = int(input("1.Policy Iteration, 2.Value Iteration: "))
    if mode == 1:
        policy, V = policy_iteration(env)
        break
    elif mode == 2:
        policy, V = value_iteration(env)
        break
    else:
        print("잘못 입력했습니다.")
print()

# print( env.MDP[0][1] )
# state 0 에서 action 1 을 선택했을 때 [상태 이동 확률, 도착 state, reward]

print("Optimal State-Value Function:")
for i in range(len(V)):
    if i>0 and i%4==0:
        print()
    print('{0:0.3f}'.format(V[i]), end="\t")
print("\n")

print("Optimal Policy [LEFT, DOWN, RIGHT, UP]:")
action = {0:"LEFT", 1:"DOWN", 2:"RIGHT", 3:"UP"}
for i in range(len(policy)):
    if i>0 and i%4==0:
        print()
    print(policy[i], end='    ')
print()

1.Not Slippery, 2.Slippery: 1
1.Policy Iteration, 2.Value Iteration: 1

Optimal State-Value Function:
0.951	0.961	0.970	0.961	
0.961	0.000	0.980	0.000	
0.970	0.980	0.990	0.000	
0.000	0.990	1.000	0.000	

Optimal Policy [LEFT, DOWN, RIGHT, UP]:
[0. 1. 0. 0.]    [0. 0. 1. 0.]    [0. 1. 0. 0.]    [1. 0. 0. 0.]    
[0. 1. 0. 0.]    [1. 0. 0. 0.]    [0. 1. 0. 0.]    [1. 0. 0. 0.]    
[0. 0. 1. 0.]    [0. 1. 0. 0.]    [0. 1. 0. 0.]    [1. 0. 0. 0.]    
[1. 0. 0. 0.]    [0. 0. 1. 0.]    [0. 0. 1. 0.]    [1. 0. 0. 0.]    


In [None]:
#내 구현
import numpy as np
def policy_iteration(env, gamma=0.99, theta=1e-8):
    V = np.zeros(env.nS,env.vA)
    policy = np.zeros(env.nS,env.vA,dtype = 'int')
    x = 0
    flag = True
    while flag and x < 100:
      V1,y,error=np.zeros(env.nS),0,1
      while error > theta and y < 100:
        new_value_function = np.zeros(env.nS)
        for i in range(env.nS):
          a = policy[i]
          transitions = env.MDP[i][a]
          for transition in transitions:
            prob, nextS, reward = transition
            new_value_function[i] += prob*(reward + gamma*V1[nextS])
        error = np.max(np.abs(new_value_function - V1))
        V1 = new_value_function
      V = V1
      new_policy = np.zeros(env.nS, dtype='int')
      for state in range(env.nS):
        Qs = np.zeros(env.nA)
        for a in range(env.nA):
          transitions = env.MDP[state][a]
          for transition in transitions:
            prob, nextS, reward = transition
            Qs[a] += prob*(reward + gamma*V[nextS])
        max_as = np.where(Qs==Qs.max())
        max_as = max_as[0]
        new_policy[state] = max_as[0]
      diff_policy = new_policy-policy
      if np.linalg.norm(diff_policy)==0: flag = False
      policy = new_policy
      x+=1
    return policy,V

def value_iteration(env, gamma=0.99, theta=1e-8):
    V = np.zeros(env.nS)
    policy = np.ones([env.nS, env.nA]) / env.nA
    error = 1
    while error > theta:
      new_value_function = np.zeros(env.nS)
      for s in range(env.nS):
        Qs = np.zeros(env.nA)
        for a in range(env.nA):
          transitions = env.MDP[s][a]
          for transition in transitions:
            prob, nextS, reward = transition
            Qs[a] += prob*(reward + gamma*V[nextS])
        new_value_function[s] = max(Qs)
      diff_vf = new_value_function-V
      V = new_value_function
      error = np.linalg.norm(diff_vf)

    for s in range(env.nS):
      Qs = np.zeros(env.nA)
      for a in range(env.nA):
        transitions = env.MDP[s][a]
        for transition in transitions:
          prob, nextS, reward = transition
          Qs[a] += prob*(reward + gamma*V[nextS])
        max_as = np.where(Qs==Qs.max())
        max_as = max_as[0]
      policy[s] = max_as[0]
    return policy, V