In [1]:
import gym
from collections import deque as dq
import random

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# NN를 학습시키기 위한 hyperparameter
learning_rate = 0.0005
batch_size = 32

# 감마는 할인율이라고 부르는 값으로, 미래가치에 대한 중요도를 조절합니다.
# 클수록 미래에 받을 보상에 더 큰 가치를 두는 것.
gamma = 0.98

buffer_limit = 50000

In [2]:
# 강화학습은 Training data set이라는게 따로 없다. Agent가 행동을 취하고 데이터셋을 쌓아나가야합니다.
# 그 데이터셋을 쌓기 위한 버퍼
class ReplayBuffer():
    def __init__(self):
        self.buffer = dq(maxlen=buffer_limit)
    
    # 버퍼에는 (state, action ,reward, nstate, done) 값이 들어갑니다.
    def put(self, transition):
        self.buffer.append(transition)
    
    # 샘플 함수를 만드는 이유는 버퍼에 쌓인 데이터셋에서 랜덤으로 학습을 시키기 위함입니다.
    # 그냥 연속해서 쌓인 n개의 데이터셋을 그대로 사용하면 데이터간의 상관관계가 너무 크기 때문에 학슴이 잘 안됩니다.
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
               torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
               torch.tensor(done_mask_lst)
    
    def size(self):
        return len(self.buffer)

In [3]:
# cartpole의 state가 4개고 action은 2개이기 때문에 input 4, output 2인 NN생성
# 2층짜리 NN입니다. 임의로 설계했습니다.
class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
    # epsilon greedy 전략을 사용합니다.
    # 간단하게 설명하면 탐험이라는 개념을 통해서 가보지 않은 경로를 가볼 수 있게 해줍니다.
    def sample_action(self, observation, epsilon):
        out = self.forward(observation)
        coin = random.random()
        if coin < epsilon:
            return random.randint(0,1)
        else : 
            return out.argmax().item()

In [4]:
def train(q, q_target, memory, optimizer):
    for i in range(10):
        s,a,r,s_prime,done_mask = memory.sample(batch_size)
        
        # 벨만함수로부터 유도된 DQN 비용함수를 구현 학습시킵니다.
        q_out = q(s)
        q_a = q_out.gather(1,a)
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
        target = r + gamma * max_q_prime * done_mask
        loss = F.smooth_l1_loss(q_a, target)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [12]:
import numpy as np

In [15]:
env = gym.make('CartPole-v1')

tensor([[ 0.5011,  0.5045],
        [-0.2605, -0.3426],
        [ 0.4312,  0.3976],
        [ 0.0009,  0.0116],
        [ 0.2957,  0.3419],
        [ 0.3485,  0.3526],
        [ 0.4578,  0.4493],
        [ 0.1231, -0.0405],
        [ 0.3229,  0.3123],
        [-0.3035, -0.1345],
        [ 0.4271,  0.4495],
        [ 0.4439,  0.4244],
        [ 0.4272,  0.4253],
        [ 0.4729,  0.4824],
        [ 0.5111,  0.5068],
        [ 0.4655,  0.4129],
        [ 0.2215,  0.2979],
        [ 0.1017,  0.0941],
        [ 0.2221,  0.0929],
        [ 0.1944,  0.3051],
        [ 0.4375,  0.4476],
        [ 0.4465,  0.4589],
        [-0.0619, -0.1982],
        [ 0.3953,  0.4003],
        [-0.1845, -0.2624],
        [-0.2134, -0.3198],
        [ 0.4349,  0.3959],
        [ 0.4640,  0.4376],
        [ 0.3767,  0.4795],
        [ 0.3531,  0.3739],
        [ 0.0128,  0.0234],
        [ 0.1802,  0.2161]], grad_fn=<AddmmBackward>)

In [6]:
# Double Deep Q Learning 개념
# target_net을 semi constant로 사용
q = Qnet()
q_target = Qnet()
q_target.load_state_dict(q.state_dict())
memory = ReplayBuffer()

print_interval = 20
score = 0.0  
optimizer = optim.Adam(q.parameters(), lr=learning_rate)

for n_epi in range(2000+1):
    epsilon = max(0.01, 0.08 - 0.01*(n_epi/200)) #Linear annealing from 8% to 1%
    s = env.reset()
    done = False

    while not done:
        a = q.sample_action(torch.from_numpy(s).float(), epsilon)      
        s_prime, r, done, info = env.step(a)
        done_mask = 0.0 if done else 1.0
        memory.put((s,a,r/100.0,s_prime, done_mask))
        s = s_prime

        score += r
        if done:
            break
    
    # 메모리가 어느정도 차야 random sample이 가능하기 때문에 일정 이상 차면 학습을 진행
    if memory.size()>2000:
        train(q, q_target, memory, optimizer)

    if n_epi%print_interval==0 and n_epi!=0:
        # 일정 주기마다 semi constant인 target-net도 업데이트.
        q_target.load_state_dict(q.state_dict())
        print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                        n_epi, score/print_interval, memory.size(), epsilon*100))
        score = 0.0
env.close()


n_episode :20, score : 9.9, n_buffer : 199, eps : 7.9%
n_episode :40, score : 9.5, n_buffer : 389, eps : 7.8%
n_episode :60, score : 9.8, n_buffer : 586, eps : 7.7%
n_episode :80, score : 10.1, n_buffer : 787, eps : 7.6%
n_episode :100, score : 10.1, n_buffer : 988, eps : 7.5%
n_episode :120, score : 9.7, n_buffer : 1182, eps : 7.4%
n_episode :140, score : 9.7, n_buffer : 1376, eps : 7.3%
n_episode :160, score : 9.4, n_buffer : 1565, eps : 7.2%
n_episode :180, score : 9.7, n_buffer : 1759, eps : 7.1%
n_episode :200, score : 9.8, n_buffer : 1956, eps : 7.0%
n_episode :220, score : 18.9, n_buffer : 2334, eps : 6.9%
n_episode :240, score : 18.3, n_buffer : 2700, eps : 6.8%
n_episode :260, score : 21.5, n_buffer : 3130, eps : 6.7%
n_episode :280, score : 20.5, n_buffer : 3540, eps : 6.6%
n_episode :300, score : 17.6, n_buffer : 3891, eps : 6.5%
n_episode :320, score : 19.5, n_buffer : 4281, eps : 6.4%
n_episode :340, score : 23.2, n_buffer : 4746, eps : 6.3%
n_episode :360, score : 23.4, n

In [7]:
torch.save(q.state_dict(), 'Cartpole_weight.pt')