## 10.2 정책 순환법을 이용하는 강화학습의 사례
정책 순환법에 대한 실습으로 얼어붙은 호수Frozen Lake를 예제로 다룹니다.

### 10.2.1 Gym을 이용한 강화학습 환경 구성하기

In [3]:
import gym
flake = gym.make("FrozenLake-v1", is_slippery=False)

In [4]:
k = 0
new_s = flake.reset()
flake.render()


[41mS[0mFFF
FHFH
FFFH
HFFG


In [5]:
for _ in range(3):
    a_k = flake.action_space.sample()
    s, r, done, info = flake.step(a_k)
    flake.render()
    if done:
        break

  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Down)
SFFF
FHFH
FFFH
[41mH[0mFFG


### 10.2.2 무작위 행동에 따른 상태, 보상, 그리고 종료 여부 관찰하기

In [7]:
import pandas as pd

def run(N_Iter = 100, render_flag=False):
    """
    Return buff_df if done, otherwise return None 
    """
    new_s = flake.reset()
    if render_flag: flake.render()
    buff_df = pd.DataFrame({"S":[new_s],"S:(x,y)":[(0,0)], 
                "R":[0.0], "done":[False], 
                "A":[0], "A:name": [""]})
    buff_df.index.name = 'k'

    Actions = ["Left", "Down", "Right", "Up"]
    for iter in range(N_Iter):
        a_k = flake.action_space.sample()
        buff_df.loc[iter,'A':"A:name"] = (a_k, Actions[a_k])
        s, r, done, info = flake.step(a_k)
        if render_flag: flake.render()
        new_df = pd.DataFrame({"S":[s], "S:(x,y)":[(s%4,s//4)],
                                "R":[r], "done":[done], 
                                "A":[0], "A:name": [""]})
        buff_df = buff_df.append(new_df, ignore_index=True)
        buff_df.index.name = 'k'
        if done:
            return buff_df
    return None

run(10)

Unnamed: 0_level_0,S,"S:(x,y)",R,done,A,A:name
k,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,0,"(0, 0)",0.0,False,1,Down
1,4,"(0, 1)",0.0,False,2,Right
2,5,"(1, 1)",0.0,True,0,


### 10.2.3 반환값 구하기

- 현재 결과에 대해 감가상각을 고려한 미래 보상들을 합친 반환값 G[k]를 구해봅니다.

In [11]:
import numpy as np
def calc_g(r, factor = 0.9):
    g_prev = 0
    g = np.copy(r[1:])
    g = np.append(g, 0.0) # g[-1] is fixed to 0.0
    for rev_k in range(len(g)-2,-1,-1): 
        g[rev_k] += factor * g_prev
        g_prev = g[rev_k]
    return g

- 이제 구한 반환값을 DataFrame에 추가해 봅니다. 

In [12]:
def get_g(N_Iter=10):
    buff_df = run(N_Iter)
    if buff_df is not None:
        r = buff_df.R.values
        buff_df['G'] = calc_g(r)
    else:
        print('Try more iterations for each run')
        return None
    return buff_df

get_g()

Unnamed: 0_level_0,S,"S:(x,y)",R,done,A,A:name,G
k,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,0,"(0, 0)",0.0,False,2,Right,0.0
1,1,"(1, 0)",0.0,False,2,Right,0.0
2,2,"(2, 0)",0.0,False,0,Left,0.0
3,1,"(1, 0)",0.0,False,2,Right,0.0
4,2,"(2, 0)",0.0,False,3,Up,0.0
5,2,"(2, 0)",0.0,False,1,Down,0.0
6,6,"(2, 1)",0.0,False,2,Right,0.0
7,7,"(3, 1)",0.0,True,0,,0.0


### 10.2.4 가치함수 구하기 

In [13]:
def get_g_many(N_Epochs=5, N_Iter=50):
    gbuff_df = None
    for epoch in range(N_Epochs):
        buff_df = get_g(N_Iter) 
        if buff_df is not None:
            if epoch == 0:
                gbuff_df = buff_df
            else:
                gbuff_df = gbuff_df.append(buff_df)
    return gbuff_df

get_g_many()

Unnamed: 0_level_0,S,"S:(x,y)",R,done,A,A:name,G
k,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,0,"(0, 0)",0.0,False,2,Right,0.0
1,1,"(1, 0)",0.0,False,1,Down,0.0
2,5,"(1, 1)",0.0,True,0,,0.0
0,0,"(0, 0)",0.0,False,2,Right,0.0
1,1,"(1, 0)",0.0,False,1,Down,0.0
2,5,"(1, 1)",0.0,True,0,,0.0
0,0,"(0, 0)",0.0,False,3,Up,0.0
1,0,"(0, 0)",0.0,False,0,Left,0.0
2,0,"(0, 0)",0.0,False,2,Right,0.0
3,1,"(1, 0)",0.0,False,2,Right,0.0


In [14]:
gbuff_df = get_g_many(100)
V = np.zeros(flake.observation_space.n)
# N_V[S]: no of G values to calculate V[S]
N_V = np.zeros(flake.observation_space.n) 
for s in range(flake.observation_space.n):
    Gs_all = gbuff_df.G[gbuff_df.S==s].values
    if len(Gs_all) > 0:
        V[s] = np.average(Gs_all)
        N_V[s] = len(Gs_all)

V_df = pd.DataFrame({"V": V, "No of Gs": N_V})
V_df.index.name = 's'
V_df

Unnamed: 0_level_0,V,No of Gs
s,Unnamed: 1_level_1,Unnamed: 2_level_1
0,0.006094,358.0
1,0.008927,141.0
2,0.029064,69.0
3,0.041107,26.0
4,0.008732,136.0
5,0.0,75.0
6,0.042882,17.0
7,0.0,7.0
8,0.018326,72.0
9,0.028929,28.0


### 10.2.5 행동가치함수 구하기

In [16]:
gbuff_df = get_g_many(100)
Q = np.zeros((flake.observation_space.n, flake.action_space.n))
# N_Q[s,a]: no of G values to calculate Q[s,a]
N_Q = np.zeros((flake.observation_space.n, flake.action_space.n)) 
S_list = []
A_list = []
for s in range(flake.observation_space.n):
    for a in range(flake.action_space.n):
        Gs_all = gbuff_df.G[(gbuff_df.S==s) & (gbuff_df.A==a)].values
        if len(Gs_all) > 0:
            Q[s,a] = np.average(Gs_all)
            N_Q[s,a] = len(Gs_all)
        S_list.append(s)
        A_list.append(a)

SA_df = pd.DataFrame({"S": S_list, "A": A_list})
Q_df = pd.DataFrame({"Q": Q.reshape(-1), "No of Gs": N_Q.reshape(-1)},
                    index=pd.MultiIndex.from_frame(SA_df))
Q_df        

Unnamed: 0_level_0,Unnamed: 1_level_0,Q,No of Gs
S,A,Unnamed: 2_level_1,Unnamed: 3_level_1
0,0,0.009344,89.0
0,1,0.011071,83.0
0,2,0.000000,69.0
0,3,0.011808,91.0
1,0,0.000000,29.0
...,...,...,...
14,3,0.000000,2.0
15,0,0.000000,1.0
15,1,0.000000,0.0
15,2,0.000000,0.0


### 10.2.6 새로운 정책 구하기

In [17]:
PI = np.argmax(Q,axis=1)
PI

array([3, 0, 0, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 1, 2, 0])

### 10.2.7 새로운 정책 사용하기

In [18]:
def run_with_PI(PI=None, N_Iter = 100, render_flag=False):
    """
    Return buff_df if done, otherwise return None 
    """
    s = flake.reset()
    if render_flag: flake.render()
    buff_df = pd.DataFrame({"S":[s],"S:(x,y)":[(0,0)], 
                "R":[0.0], "done":[False], 
                "A":[0], "A:name": [""]})
    buff_df.index.name = 'k'

    Actions = ["Left", "Down", "Right", "Up"]
    for iter in range(N_Iter):
        if PI is not None:
            a_k = PI[s]
        else:
            a_k = flake.action_space.sample()
        buff_df.loc[iter,'A':"A:name"] = (a_k, Actions[a_k])
        s, r, done, info = flake.step(a_k)
        if render_flag: flake.render()
        new_df = pd.DataFrame({"S":[s], "S:(x,y)":[(s%4,s//4)],
                                "R":[r], "done":[done], 
                                "A":[0], "A:name": [""]})
        buff_df = buff_df.append(new_df, ignore_index=True)
        buff_df.index.name = 'k'
        if done:
            return buff_df
    return None

run_with_PI(PI=PI, render_flag=True)


[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFF

Unnamed: 0_level_0,S,"S:(x,y)",R,done,A,A:name
k,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
0,0,"(0, 0)",0.0,False,3,Up
1,0,"(0, 0)",0.0,False,3,Up
2,0,"(0, 0)",0.0,False,3,Up
3,0,"(0, 0)",0.0,False,3,Up
4,0,"(0, 0)",0.0,False,3,Up
...,...,...,...,...,...,...
96,0,"(0, 0)",0.0,False,3,Up
97,0,"(0, 0)",0.0,False,3,Up
98,0,"(0, 0)",0.0,False,3,Up
99,0,"(0, 0)",0.0,False,3,Up


---
### 10.2.8 전체 코드

In [20]:
# File: ex10_1_rl_policy_iter.py

# 1. Gym을 이용한 강화학습 환경 구성하기
import gym
flake = gym.make("FrozenLake-v1", is_slippery=False)

new_s = flake.reset()
flake.render()

for _ in range(3):
        a_k = flake.action_space.sample()
        s, r, done, info = flake.step(a_k)
        flake.render()
        if done:
            break

# 2. 무작위 행동에 따른 상태, 보상, 그리고 종료 여부 관찰하기
import pandas as pd

def run(N_Iter = 100, render_flag=False):
    """
    Return buff_df if done, otherwise return None 
    """
    new_s = flake.reset()
    if render_flag: flake.render()
    buff_df = pd.DataFrame({"S":[new_s],"S:(x,y)":[(0,0)], 
                "R":[0.0], "done":[False], 
                "A":[0], "A:name": [""]})
    buff_df.index.name = 'k'

    Actions = ["Left", "Down", "Right", "Up"]
    for iter in range(N_Iter):
        a_k = flake.action_space.sample()
        buff_df.loc[iter,'A':"A:name"] = (a_k, Actions[a_k])
        s, r, done, info = flake.step(a_k)
        if render_flag: flake.render()
        new_df = pd.DataFrame({"S":[s], "S:(x,y)":[(s%4,s//4)],
                                "R":[r], "done":[done], 
                                "A":[0], "A:name": [""]})
        buff_df = buff_df.append(new_df, ignore_index=True)
        buff_df.index.name = 'k'
        if done:
            return buff_df
    return None

run(10)

# 3. 반환값 구하기
import numpy as np
def calc_g(r, factor = 0.9):
    g_prev = 0
    g = np.copy(r[1:])
    g = np.append(g, 0.0) # g[-1] is fixed to 0.0
    for rev_k in range(len(g)-2,-1,-1): 
        g[rev_k] += factor * g_prev
        g_prev = g[rev_k]
    return g

buff_df = run(100)    
calc_g(buff_df.R.values)

def get_g(N_Iter=50):
    buff_df = run(N_Iter)
    if buff_df is not None:
        r = buff_df.R.values
        buff_df['G'] = calc_g(r)
    else:
        print('Try more iterations for each run')
        return None
    return buff_df

get_g()

# 4. 가치함수 구하기
def get_g_many(N_Epochs=5, N_Iter=50):
    gbuff_df = None
    for epoch in range(N_Epochs):
        buff_df = get_g(N_Iter) 
        if buff_df is not None:
            if epoch == 0:
                gbuff_df = buff_df
            else:
                gbuff_df = gbuff_df.append(buff_df)
    return gbuff_df

get_g_many()

gbuff_df = get_g_many(100)
V = np.zeros(flake.observation_space.n)
# N_V[S]: no of G values to calculate V[S]
N_V = np.zeros(flake.observation_space.n) 
for s in range(flake.observation_space.n):
    Gs_all = gbuff_df.G[gbuff_df.S==s].values
    if len(Gs_all) > 0:
        V[s] = np.average(Gs_all)
        N_V[s] = len(Gs_all)

V_df = pd.DataFrame({"V": V, "No of Gs": N_V})
V_df.index.name = 's'
V_df

# 5. 행동가치함수 구하기
gbuff_df = get_g_many(100)
Q = np.zeros((flake.observation_space.n, flake.action_space.n))
# N_Q[s,a]: no of G values to calculate Q[s,a]
N_Q = np.zeros((flake.observation_space.n, flake.action_space.n)) 
S_list = []
A_list = []
for s in range(flake.observation_space.n):
    for a in range(flake.action_space.n):
        Gs_all = gbuff_df.G[(gbuff_df.S==s) & (gbuff_df.A==a)].values
        if len(Gs_all) > 0:
            Q[s,a] = np.average(Gs_all)
            N_Q[s,a] = len(Gs_all)
        S_list.append(s)
        A_list.append(a)

SA_df = pd.DataFrame({"S": S_list, "A": A_list})
Q_df = pd.DataFrame({"Q": Q.reshape(-1), "No of Gs": N_Q.reshape(-1)},
                    index=pd.MultiIndex.from_frame(SA_df))
Q_df

# 6. 새로운 정책 구하기
PI = np.argmax(Q,axis=1)
PI.reshape(4,4)

# 7. 새로운 정책 사용하기 
def run_with_PI(PI=None, N_Iter = 100, render_flag=False):
    """
    Return buff_df if done, otherwise return None 
    """
    s = flake.reset()
    if render_flag: flake.render()
    buff_df = pd.DataFrame({"S":[s],"S:(x,y)":[(0,0)], 
                "R":[0.0], "done":[False], 
                "A":[0], "A:name": [""]})
    buff_df.index.name = 'k'

    Actions = ["Left", "Down", "Right", "Up"]
    for iter in range(N_Iter):
        if PI is not None:
            a_k = PI[s]
        else:
            a_k = flake.action_space.sample()
        buff_df.loc[iter,'A':"A:name"] = (a_k, Actions[a_k])
        s, r, done, info = flake.step(a_k)
        if render_flag: flake.render()
        new_df = pd.DataFrame({"S":[s], "S:(x,y)":[(s%4,s//4)],
                                "R":[r], "done":[done], 
                                "A":[0], "A:name": [""]})
        buff_df = buff_df.append(new_df, ignore_index=True)
        buff_df.index.name = 'k'
        if done:
            return buff_df
    return None

run_with_PI(PI=PI, N_Iter=1, render_flag=True)