In [0]:
import gym
import numpy as np
from gym.envs.registration import register
import math
import tensorflow as tf

In [0]:
tf.enable_eager_execution()

## 카트폴 게임

In [0]:
#installing dependencies
!apt-get -qq -y install libnvtoolsext1 > /dev/null
!ln -snf /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so.8.0 /usr/lib/x86_64-linux-gnu/libnvrtc-builtins.so
!apt-get -qq -y install xvfb freeglut3-dev ffmpeg> /dev/null
!pip -q install gym
!pip -q install pyglet
!pip -q install pyopengl
!pip -q install pyvirtualdisplay

In [0]:
import gym
env = gym.make("CartPole-v0")

In [0]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1024, 768))
display.start()

In [0]:
np.set_printoptions(threshold=np.inf)

## weight 모델 생성

In [0]:
initializer = tf.contrib.layers.xavier_initializer()

w1 = tf.Variable(initializer([4, 16]))
w2 = tf.Variable(initializer([16, 2]))

## Observation을 활용한 모델 적용

In [0]:
observation = env.reset()
hypothesis1 = tf.matmul(np.array(np.reshape(observation, [-1, 4]), dtype="float32"), w1)
hypothesis1 = tf.nn.relu(hypothesis1, 0)
hypothesis2 = tf.matmul(hypothesis1, w2)
hypothesis2

## 위의 작업을 함수로 정의

In [0]:
def predict(observation):
    hypothesis1 = tf.matmul(np.array(np.reshape(observation, [-1, 4]), dtype="float32"), w1)
    hypothesis1 = tf.nn.relu(hypothesis1, 0)
    hypothesis2 = tf.matmul(hypothesis1, w2)
    return hypothesis2

## 연관성이 적은 데이터를 활용하여 학습시키기 위해 데이터에서 일부만 선택해서 학습을 진행

In [0]:
x_stack = np.empty(0).reshape(0, 4)
x_stack = np.vstack([x_stack, observation])
x_stack = np.vstack([x_stack, observation * 3])
x_stack

In [0]:
import random
from collections import deque
REPLAY_MEMORY=50000

MAX_EPISODE = 5000

# minimum epsilon for epsilon greedy
MIN_E = 0.0
# epsilon will be `MIN_E` at `EPSILON_DECAYING_EPISODE`
EPSILON_DECAYING_EPISODE = MAX_EPISODE * 0.01

In [0]:
def annealing_epsilon(episode: int, min_e: float, max_e: float, target_episode: int) -> float:
    """Return an linearly annealed epsilon
    Epsilon will decrease over time until it reaches `target_episode`
         (epsilon)
             |
    max_e ---|\
             | \
             |  \
             |   \
    min_e ---|____\_______________(episode)
                  |
                 target_episode
     slope = (min_e - max_e) / (target_episode)
     intercept = max_e
     e = slope * episode + intercept
    Args:
        episode (int): Current episode
        min_e (float): Minimum epsilon
        max_e (float): Maximum epsilon
        target_episode (int): epsilon becomes the `min_e` at `target_episode`
    Returns:
        float: epsilon between `min_e` and `max_e`
    """
 
    slope = (min_e - max_e) / (target_episode)
    intercept = max_e
 
    return max(min_e, slope * episode + intercept)

In [0]:
#w값을 자동으로 수정하는 Optimizer 객체를 생성
optimizer = tf.train.AdamOptimizer(0.001)

In [0]:
#큐 객체 생성
dis = 0.99
replay_buffer=deque()
frameList = []
costList=[]
#5000번 반복해서 실행
for step in range(5001):
    print("="*100)
    print("step:",step)
    print("="*100)
    #게임 재시작
    #env.reset() : 게임을 재시작
    observation=env.reset() 
    #e 값을 리턴 받음
    e=annealing_epsilon(step, MIN_E, 1.0, EPSILON_DECAYING_EPISODE)
    
    
    for frame in range(200):
        print("="*100)
        predQ=predict(observation)
        #np.random.rand(1): 난수를 0~1 사이 난수 1개 생성
        #난수가 e보다 작으면 
        if np.random.rand(1)<e:
            #env.action_space.sample(): 0,1,2,3 중 하나의 액션 랜덤하게 리턴
            action=env.action_space.sample()
        else:
            #predQ (예측한 Q값) 에서 가장 큰값을 가진 action을 선택
            action=np.argmax(predQ)

        print("action:",action)

        #env.step(action): action을 실행
        next_observation, reward, done, info=env.step(action)
                
        if done:
            #게임이 끝나면 reward는 -1
            reward=-1
            #게임이 끝나면 frame의 수를 저장
            frameList.append([step,frame])

        print("="*100)            
        print("next_observation:",next_observation)
        print("reward:",reward)
        print("done:",done)
        print("info:",info)
        print("="*100)
        #버퍼에 데이터 저장            
        replay_buffer.append((observation,action,reward,next_observation,done))
        #버퍼에 저장된 데이터의 개수가  REPLAY_MEMORY(50000) 초과이면
        if len(replay_buffer) >REPLAY_MEMORY:
            #맨처음에 저장된 데이터 삭제
            replay_buffer.popleft()
        
        observation=next_observation
        #게임이 끝나면 (done==True) 종료
        if done==True:
            break

    print("="*100)            
    print("CarPole Done!!: step:",step,":frame:",frame)
    print("="*100)            

    if step%10==0:
        #64번 반복
        for index in range(64):
            #random.sample(replay_buffer,10): reply_buffer에서 랜덤하게 10개의 데이터 선택
            minibatch=random.sample(replay_buffer,10)
            ###np.empty(0): 비어 있는 배열 생성
            #reshape(0,4) : 비어있는 배열을 4칸으로 설정
            x_stack=np.empty(0).reshape(0,4)
            ###np.empty(0): 비어 있는 배열 생성
            #reshape(0,2) : 비어있는 배열을 2칸으로 설정
            y_stack=np.empty(0).reshape(0,2)

            #minibatch에서 데이터를 꺼내서 대입
            for batch_observation,batch_action,batch_reward,batch_next_observation,batch_done in minibatch:
                #batch_observation 을 예측
                predQ=predict(batch_observation)
                #predQ를 numpy배열로 변환
                Q=predQ.numpy()
                #Qs의 action번째에 reward+dis*np.max(nextQ) 저장
                #*~1 는 -2의 1승을 곱함 
                #batch_done  True는 1이므로 *-2 곱하고

                #*~1 는 -2의 0승을 곱함 
                #batch_done False는 0 이므로 *-1 곱

                #batch_done이 False면 값이 작아짐
                Q[0,batch_action]=batch_reward+dis*np.max(predict(batch_next_observation))*~batch_done
                #Q를 텐서플로우가 cost를 계산할 수있는 Tensor 타입으로 변환
                Q=tf.convert_to_tensor(Q, np.float32)      
                # print("="*100) 
                # print("batch_observation:",batch_observation)           
                # print("batch_next_observation:",batch_next_observation)
                # print("batch_reward:",batch_reward)
                # print("batch_done:",batch_done)
                # print("predQ:",predQ)
                # print("Q:",Q)
                # print("="*100)  
                #np.vstack([x_stack, batch_observation]): x_stack에 batch_observation 추가
                x_stack=np.vstack([x_stack, batch_observation])
                ##np.vstack([y_stack,Q]): y_stack에 Q 추가
                y_stack=np.vstack([y_stack,Q])
                
                
                # print("="*100)
                # print("x_stack:",x_stack)
                # print("y_stack:",y_stack)
                # print("="*100)

            with tf.GradientTape() as tape:
                #predict(x_stack): x_stack을 예측
                predQ=predict(x_stack)
                #predQ의 실제값이 저장된 y_stack과 차를 계산
                # tf.reduce_mean(): 평균을 계산 합니다.
                #tf.square: 제곱을 계산합니다
                cost=tf.reduce_mean(tf.square(y_stack-predQ))
                #cost,w0,w1,b0,b1 를 이용하여 cost가 0이 되는 w1,b1의 기울기, w0,b0의 기울기를 계산해서 grads에 대입 
                # -> Backpropagation  실행
                grads=tape.gradient(cost, [w1,w2])
    
                #grads에 저장된 w0,b0의 기울기와 w1,b1 기울기를 w0,b0와 w1,b1 에서 빼주고 새로운
                #w0,b0와 w1,b1로 업데이트
                optimizer.apply_gradients(grads_and_vars=zip(grads,[w1,w2]))
                print("="*100)
                print("cost:",cost)
                costList.append(cost.numpy())
                print("="*100)            

In [0]:
#cost를 그래프로 그림
plt.plot(range(len(costList)), costList, marker='.', c='red', label="cost")

In [0]:
import matplotlib.pyplot as plt
import matplotlib.animation
import numpy as np
from IPython.display import HTML

In [0]:
#animationFrame: Cart Pole 게임의 각 장면의 이미지의 RGB 값을 저장할 리스트
animationFrame = []

#env.reset() : 게임을 재시작
observation=env.reset()   
#env.render(mode = 'rgb_array') :Cart Pole 게임의 각 장면의 이미지를 numpy 배열로 출력
#                                이미지는 rgb 값이 출력

#animationFrame.append : 이미지를 animationFrame에 추가
animationFrame.append(env.render(mode = 'rgb_array'))
#200 번 반복해서 게임 진행 
for frame in range(200):
    print("="*100) 
    print("frame:",frame)
    print("="*100) 
    
    predQ=predict(observation)
    print("predQ:",predQ)
    #np.amax(Q[state_0]) :  Q 테이블 state_0 인덱스의 최대값이 리턴됨
    #최대값을 변수 m에 저장
    m = np.amax(predQ)
    print("m:",m)
    #m이 0이면 (Q 테이블 state_0 인덱스에 action정보가 저장되어 있지 않음)
    if m==0:
        #pr.choice([0,1]) : 0,1 중하나를 임의로 선택
        action=pr.choice([0,1])        
    else:
        #np.argmax(Q[state_0]): Q[state_0] 의 최대값을 리턴
        action=np.argmax(predQ)

    print("action:",action) 

    #env.step(action): Cart Pole을 action 방향으로 이동
    new_observation, reward, done, info = env.step(action)
    #env.step(action): action을 실행
    next_observation, reward, done, info=env.step(action)
    
    print("next_observation:",next_observation)
    print("reward:",reward)
    print("done:",done)
    print("info:",info)
    #done이 True면 게임 종료
    if done==True:
        break;

    #env.render(mode = 'rgb_array') :Cart Pole 게임의 각 장면의 이미지를 numpy 배열로 출력
    #                                이미지는 rgb 값이 출력

    #animationFrame.append : 이미지를 animationFrame에 추가
    animationFrame.append(env.render(mode = 'rgb_array'))
    observation=next_observation
print("="*100)    
    

## 카트폴 게임 이미지를 저장하고, 보여주기

In [0]:
def animate(index):
    patch.set_data(animationFrame[index])

In [0]:
len(animationFrame)

In [0]:
plt.figure(figsize=(8, 5))
patch = plt.imshow(animationFrame[0])
ani = matplotlib.animation.FuncAnimation(plt.gcf(), 
                                         animate, 
                                         frames=len(animationFrame),
                                         interval = 50)

HTML(ani.to_jshtml())