In [0]:
%tensorflow_version 1.x

In [0]:
#installing dependencies
!apt-get -qq -y install libnvtoolsext1 > /dev/null
!ln -snf /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so.8.0 /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg> /dev/null
!pip -q install gym
!pip -q install pyglet
!pip -q install pyopengl
!pip -q install pyvirtualdisplay

In [0]:
import gym
import numpy as np
import math
from collections import deque
from keras.layers import Dense
from keras.optimizers import Adam 
from keras.models import Sequential

In [0]:
EPISODES = 400

In [0]:
#numpy 배열의 모든 데이터가 출력되도록 설정
np.set_printoptions(threshold=np.inf)

In [0]:
#gym.make('CartPole-v0') : Cart Pole 환경 설정
env = gym.make('CartPole-v0')

In [0]:
#env.reset() : Cart Pole 환경 초기화
obs = env.reset()
print('initial observation:', obs)

In [0]:
# 그래픽 출력 준비
from pyvirtualdisplay import Display
#Display(visible=0 (출력 안함 ), size=(1024 (이미지 가로), 768(이미지 세로)))
display = Display(visible=0, size=(1024, 768))
#이미지 출력 준비
display.start()

In [0]:
weight = 0.99
epsilon = 1.0
epsilon_min = 0.01
batch_size = 32
train_start = 1000

In [0]:
reply_buffer = deque(maxlen = 2000)

In [0]:
model = Sequential()
model.add(Dense(24, input_dim = 4, activation='relu', kernel_initializer='he_uniform'))
model.add(Dense(24, activation="relu", kernel_initializer="he_uniform"))
model.add(Dense(2, activation="linear", kernel_initializer="he_uniform"))
model.summary()

In [0]:
model.compile(loss="mse", optimizer=Adam(lr=0.001))

In [0]:
frameList = []

In [0]:
import random as pr

for episode in range(EPISODES):
    done = False
    total_reward = 0
    observation = env.reset()
    observation = np.reshape(observation, [1, 4])

    while not done:
        print("observation:", observation)
        if np.random.rand(1) <= epsilon:
            action = pr.choice([0, 1])
        else:
            q_value = model.predict(observation)
            action = np.argmax(q_value[0])
        print("action:", action)
        next_observation, reward, done, info = env.step(action)
        next_observation = np.reshape(next_observation, [1, 4])
        
        if done and total_reward < 199:
            reward -= 100
        reply_buffer.append((observation, action, reward, next_observation, done))

        total_reward += reward
        observation += next_observation
        if done:
            if total_reward < 199:
                total_reward += 100

            frameList.append(total_reward)
            print("=" * 100)
            print("episode:", episode, "/ total_reward:", total_reward)
            print("=" * 100)

In [0]:
#Cart Pole 게임의 각 장면의 이미지를 numpy 배열로 출력
#이미지는 rgb 값이 출력
env.render(mode = 'rgb_array')

In [0]:
#Q테이블을 생성
#np.zeros() : 0으로 초기화된 배열 생성
#np.zeros((1,1,6,3)) : 1,1,6,3인 4차원 배열 생성

#np.zeros((1,1,6,3)+(2,)) :1,1,6,3인 4차원 배열에 2칸씩 배열 추가
Q = np.zeros((1,1,6,3)+(2,))

Q

In [0]:
Q.shape

In [0]:
#observation의 최소값들의 리스트
LOW_BOUNDS=[-4.8,-0.5,-0.42,-0.88 ]
LOW_BOUNDS

In [0]:
#observation의 최대 값들의 리스트
HIGH_BOUNDS=[4.8,0.5,0.42,0.88]
HIGH_BOUNDS

In [0]:
#observation을 입력 받아서 해당 observation을 Q의 어디에 저장할 지를 설정하는 함수
#state: observation
#debug: 데이터를 출력 할지 여부
def state_to_bucket(state, debug):
    #observation을 배열의 어디에 저장할 지 인덱스가 저장된 리스트
    bucket_indice = []
    
    for i in range(4):
        if debug:
            print("="*100)
            print("i=",i)
            print("state[{}]:{}".format(i,state[i]))
            print("LOW_BOUNDS[{}]:{}".format(i,LOW_BOUNDS[i]))
            print("HIGH_BOUNDS[{}]:{}".format(i,HIGH_BOUNDS[i]))
            print("Q.shape[{}]:{}".format(i,Q.shape[i]))
        #state의 i 가 LOW_BOUNDS[i] (최소값) 보다 작으면
        if state[i] <= LOW_BOUNDS[i]:
            bucket_index = 0 #index는 0
        #state의 i 가 HIGH_BOUNDS[i] (최대값) 보다 크면    
        elif state[i] >= HIGH_BOUNDS[i]:
            #Q.shape[i]: Q배열의 길이
           # Q.shape[i]-1 : Q배열의 길이 -1 (마지막 인덱스)
            bucket_index = Q.shape[i]-1 #Q배열의 길이 -1 (마지막 인덱스) 가 인덱스
        else:
            # 최대값(HIGH_BOUNDS[i])-최소값(LOW_BOUNDS[i])/Q배열 길이(Q.shape[i])
            scale = (HIGH_BOUNDS[i] - LOW_BOUNDS[i])/Q.shape[i]
            #state[i]에서 최소값 (LOW_BOUNDS[i]) 빼고 scale로 나눠주면 인덱스가 리턴됨
            bucket_index = int((state[i] - LOW_BOUNDS[i])/scale)
            if debug:
                print("scale:", scale)
                print("bucket_index:",bucket_index)
        #bucket_indice.append(bucket_index): index를 bucket_indice 에 추가
        bucket_indice.append(bucket_index)
        if debug:
            print("bucket_indice:",bucket_indice)
            print("="*100)
    #tuple(bucket_indice): bucket_indice 를 tuple타입 객체로 만들어서 리턴
    return tuple(bucket_indice)

In [0]:
#Cart Pole 게임 재시작
env.reset()

In [0]:
#env.step(1) :  Cart Pole을 오른쪽으로 이동
observation,reward,done,info=env.step(1)

observation

In [0]:
state_to_bucket(observation,True)

In [0]:
#Cart Pole을 5번 왼쪽으로 이동
for index in range(5):
    observation,reward,done,info=env.step(0)

observation

In [0]:
state_to_bucket(observation,True)

In [0]:
import random as pr

In [0]:
#random 하게 action을 선택할 최소 비율
MIN_EXPLORE_RATE = 0.01
#learning_rate의  최소 비율
MIN_LEARNING_RATE = 0.1

In [0]:
((10)/25)

In [0]:
math.log10((10)/25)

In [0]:
1.0 - math.log10((10)/25)

In [0]:
min(1, 1.0 - math.log10((10)/25))

In [0]:
max(MIN_EXPLORE_RATE, min(1, 1.0 - math.log10((10)/25)))

In [0]:
math.log10((100)/25)

In [0]:
min(1, 1.0 - math.log10((100)/25))

In [0]:
max(MIN_EXPLORE_RATE, min(1, 1.0 - math.log10((100)/25)))

In [0]:
math.log10((1000)/25)

In [0]:
max(MIN_EXPLORE_RATE, min(1, 1.0 - math.log10((1000)/25)))

In [0]:
min(1, 1.0 - math.log10((1000)/25))

In [0]:
def get_explore_rate(t):
    if t >= 24:
        return max(MIN_EXPLORE_RATE, min(1, 1.0 - math.log10((t+1)/25)))
    else:
        return 1.0

In [0]:
math.log10((10)/25)

In [0]:
min(0.5, 1.0 - math.log10((10)/25))

In [0]:
max(MIN_LEARNING_RATE, min(0.5, 1.0 - math.log10((10)/25)))

In [0]:
math.log10((100)/25)

In [0]:
min(0.5, 1.0 - math.log10((100)/25))

In [0]:
max(MIN_LEARNING_RATE, min(0.5, 1.0 - math.log10((100)/25)))

In [0]:
math.log10((1000)/25)

In [0]:
min(0.5, 1.0 - math.log10((1000)/25))

In [0]:
max(MIN_LEARNING_RATE, min(0.5, 1.0 - math.log10((1000)/25)))

In [0]:
def get_learning_rate(t):
    if t >= 24:
         return max(MIN_LEARNING_RATE, min(0.5, 1.0 - math.log10((t+1)/25)))
    else:
         return 1.0

In [0]:
import random as pr

def select_action(state, explore_rate):
    # Select a random action
    if np.random.rand(1) < explore_rate:
        #pr.choice([0,1]) : 0, 1 중에 숫자 하나를 랜덤하게 선택해서 리턴
        action = pr.choice([0,1])
    else:
        #np.argmax(Q[state]) : Q배열에서 state 번째에 최대값 리턴 
        action = np.argmax(Q[state])
    return action

In [0]:
weight=0.99

frameList=[]
#2000 번 반복해서 Q테이블 학습
for episode in range(2000):
    print("="*100)
    print("episode:",episode)
    #env.reset() : 게임을 재시작
    observation=env.reset() 
    
    #200 번 반복해서 게임 진행 
    for frame in range(200):
        print("="*100) 
        print("frame:",frame)
        print("="*100) 
        #get_learning_rate(episode) : learning rate 를 계산해서 리턴
        learning_rate = get_learning_rate(episode)
        #get_explore_rate(episode): explore rate 를 계산해서 리턴
        explore_rate = get_explore_rate(episode)
        #state_to_bucket(observation,False) : observation의 인덱스를 리턴 False: 데이터는 출력 안함
        #observation의 action 이 Q테이블의 어디에 저장 되 있는지 인덱스가 리턴
        #state_0 : observation의 action 정보가 저장된 인덱스
        state_0 = state_to_bucket(observation,False)
        print("state_0:",state_0)
        #0, 1 중에서 action을 선택해서 리턴
        action = select_action(state_0, explore_rate)
        print("action:",action)         

        #env.step(action): Cart Pole을 action 방향으로 이동
        observation, reward, done, info = env.step(action)
        print("observation:",observation)
        print("reward:",reward)
        print("done:",done)
        print("info:",info)
        #done이 True면 게임 종료
        if done==True:
            frameList.append(frame)
            break;

        # state_to_bucket(observation, False): 새로운 observe 의 action이 저장된 인덱스를 리턴받음
        state = state_to_bucket(observation, False)
        print("state:",state)      
        # np.amax(Q[state]): 새로운 observe 의 action의 최대값을 리턴받음
        best_q = np.amax(Q[state])
       
        #Q[state_0 + (action,)]) : action의 기존값
        #((1-learning_rate)*Q[state_0 + (action,)]) : action의 기존값에 1-learning_rate 곱함
        
        #best_q : 새로운 action의 최대값
        #learning_rate*(reward+weight*best_q) : reward+weight*best_q에 learning_rate를 곱함
        Q[state_0 + (action,)]= ((1-learning_rate)*Q[state_0 + (action,)])+learning_rate*(reward+weight*best_q)
        # state (새로운 Q배열 인덱스) 를 state_0 (기존 Q배열 인덱스) 대입
        state_0 = state      
        
        print("="*100)
        
    print("="*100)    
    

In [0]:
Q

In [0]:
frameList

In [0]:
#animationFrame: Cart Pole 게임의 각 장면의 이미지의 RGB 값을 저장할 리스트
animationFrame = []

#env.reset() : 게임을 재시작
observation=env.reset()   
#env.render(mode = 'rgb_array') :Cart Pole 게임의 각 장면의 이미지를 numpy 배열로 출력
#                                이미지는 rgb 값이 출력

#animationFrame.append : 이미지를 animationFrame에 추가
animationFrame.append(env.render(mode = 'rgb_array'))
#200 번 반복해서 게임 진행 
for frame in range(200):
    print("="*100) 
    print("frame:",frame)
    print("="*100) 
    #state_to_bucket(observation,False) : observation의 인덱스를 리턴 False: 데이터는 출력 안함
    #observation의 action 이 Q테이블의 어디에 저장 되 있는지 인덱스가 리턴
    #state_0 : observation의 action 정보가 저장된 인덱스
    state_0 = state_to_bucket(observation,False)
    print("state_0:",state_0)
    #np.amax(Q[state_0]) :  Q 테이블 state_0 인덱스의 최대값이 리턴됨
    #최대값을 변수 m에 저장
    m = np.amax(Q[state_0])
    print("Q[state_0]:",Q[state_0])
    print("m:",m)
    #m이 0이면 (Q 테이블 state_0 인덱스에 action정보가 저장되어 있지 않음)
    if m==0:
        #pr.choice([0,1]) : 0,1 중하나를 임의로 선택
        action=pr.choice([0,1])        
    else:
        #np.argmax(Q[state_0]): Q[state_0] 의 최대값을 리턴
        action=np.argmax(Q[state_0])

    print("action:",action) 

    #env.step(action): Cart Pole을 action 방향으로 이동
    observation, reward, done, info = env.step(action)
    print("observation:",observation)
    print("reward:",reward)
    print("done:",done)
    print("info:",info)
    #done이 True면 게임 종료
    if done==True:
        break;

    #env.render(mode = 'rgb_array') :Cart Pole 게임의 각 장면의 이미지를 numpy 배열로 출력
    #                                이미지는 rgb 값이 출력

    #animationFrame.append : 이미지를 animationFrame에 추가
    animationFrame.append(env.render(mode = 'rgb_array'))
    # state_to_bucket(observation, False): 새로운 observe 의 action이 저장된 인덱스를 리턴받음
    state = state_to_bucket(observation, False)
    print("state:",state)      
    #Setting up for the next iteration
    state_0 = state      

print("="*100)    
    

In [0]:
import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML

In [0]:
#animationFrame 에 저장된 이미지의 개수 조회
len(animationFrame)

In [0]:
#이미지의 가로 크기를 8  세로 크기를 5로  설정
#plt.figure(figsize=(이미지 가로, 이미지 세로)) : 
plt.figure(figsize=(8, 5))

#animationFrame[0] : Cart Pole 게임의 첫번째 이미지의 RGB 배열
#plt.imshow(animationFrame[0]) : Cart Pole 게임의 첫번째 이미지를 그림으로 변환해서 patch에 대입
patch = plt.imshow(animationFrame[0])

#애니메이션 진행할때 마다 호출하는 함수 애니메이션의 각 장면을 출력
def animate(index):
    #pach의 이미지를 바꿈
    #animationFrame[index] : animationFrame 의 index 번째 이미지로 바꿈
    patch.set_data(animationFrame[index])

#plt.gcf() : 이미지를 그릴 객체
#animate : 애니메이션을 그리기 위해서 호출하는 함수 애니메이션의 각 화면을 리턴
#len(animationFrame): animationFrame 에 저장된 이미지의 개수
#interval = 50 : 0.5초 마다 animate 함수를 호출해서 애니메시션 실행

#애니메이션을 실행 할 객체
#matplotlib.animation.FuncAnimation(애니메이션을 실행할 객체,애니메이션의 각 장면을 리턴할 함수, 
#             frames=애니매이션에서 그릴 이미지 개수, 
#             interval = 애니메이션 각 장면을 리턴할 함수를 호출하는 시간 (단위 1/1000 ))
ani = matplotlib.animation.FuncAnimation(plt.gcf(), animate, frames=len(animationFrame), interval = 100)

#ani.to_jshtml() : 애니메니션을 실행하고 각 결과를 html태그로 변환해서 리턴
#HTML() : 애니메이션을 화면에 출력
HTML(ani.to_jshtml())