<a href="https://colab.research.google.com/github/leviathan-s/Reinforcement-Learning/blob/main/Cartpole_environment_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [2]:
# Hyperparameters
learning_rate = 0.0005
gamma = 0.98
buffer_limit = 50000
batch_size = 32

In [7]:
# 학습 효율을 상승시키기 위해 replaybuffer 클래스 정의
# 데이터 재사용으로 인한 학습 효율 증가 효과
# 학습 데이터간의 correlation 억제로 인한 학습 효율 증가 효과
class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)
        
    # 리플레이버퍼에 가장 최근에 들어온 데이터를 저장한다
    def put(self, transition):
        self.buffer.append(transition)


    # 버퍼에서 32개의 미니배치 데이터를 뽑아서 반환한다.
    # 트랜지션 데이터의 구성 (s,a,r,s_prime,done_mask)
    # 상태 s에서 액션 a를 하니 r의 보상을 받고 s_prime으로 이동하였다. done_mask : 종료상태 여부 표시
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])

        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), torch.tensor(done_mask_lst)

    def size(self):
        return len(self.buffer)


In [28]:
class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(4,128)
        self.fc2 = nn.Linear(128,128)
        self.fc3 = nn.Linear(128,2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    # epsilon-greedy 방식으로 실제 실행할 액션을 선택한다
    def sample_action(self, obs, epsilon):
        out = self.forward(obs)
        coin = random.random()
        # epsilon의 확률만큼 랜덤액션 선택
        if coin < epsilon:
            return random.randint(0,1)

        # 1 - epsilon의 확률만큼 액션 중 가장 가치가 높은 액션 선택
        else:
            return out.argmax().item()

# 매 episode마다 호출되는 트레이닝 함수
def train(q, q_target, memory, optimizer):
    # 한 번의 train당 10회의 미니배치를 선택한다
    for i in range(10):
        s,a,r,s_prime,done_mask = memory.sample(batch_size)

        q_out = q(s)
        q_a = q_out.gather(1,a)
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
        target = r + gamma*max_q_prime*done_mask
        loss = F.smooth_l1_loss(q_a, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def main():
    # 환경 선언
    env = gym.make("CartPole-v1")
    # Q Network
    q = Qnet()
    # Q Target Network
    q_target = Qnet()
    q_target.load_state_dict(q.state_dict()) # 가중치를 그대로 복사
    memory = ReplayBuffer()

    print_interval = 20
    score = 0.0
    optimizer = optim.Adam(q.parameters(), lr = learning_rate)
    # optimizer에는 q_target network의 파라미터는 집어넣지 않는다.

    # 총 천 회의 에피소드 학습을 진행한다
    for n_epi in range(1000):
        epsilon = max(0.01, 0.08-0.01*(n_epi/200))
        # epsilon 확률값을 8%에서 1%로 선형적으로 감소하게끔 한다
        s = env.reset() # 에피소드가 시작할 때마다 상태값 reset
        done = False

        while not done:
            a = q.sample_action(torch.from_numpy(s).float(),epsilon)
            s_prime, r, done, info = env.step(a)
            done_mask = 0.0 if done else 1.0
            memory.put((s,a,r/100.0, s_prime, done_mask)) # r값의 스케일이 너무 커서 100으로 나누어준다
            s = s_prime
            score += r
            if done:
                break

        # 리플레이 버퍼가 2000이상으로 충분히 커지면 학습을 시작한다
        # 초기 데이터가 많이 재사용되면 학습 효율이 감소하기 때문이다
        if memory.size() > 2000:
            train(q,q_target,memory, optimizer)


        if n_epi%print_interval==0 and n_epi!=0:
            q_target.load_state_dict(q.state_dict())
            print("n_episode : {}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format( \
                n_epi, score/print_interval, memory.size(), epsilon*100))
            score=0.0

    env.close()

main()

n_episode : 20, score : 10.1, n_buffer : 201, eps : 7.9%
n_episode : 40, score : 9.5, n_buffer : 391, eps : 7.8%
n_episode : 60, score : 9.7, n_buffer : 585, eps : 7.7%
n_episode : 80, score : 9.6, n_buffer : 776, eps : 7.6%
n_episode : 100, score : 9.8, n_buffer : 973, eps : 7.5%
n_episode : 120, score : 10.1, n_buffer : 1175, eps : 7.4%
n_episode : 140, score : 9.8, n_buffer : 1372, eps : 7.3%
n_episode : 160, score : 9.6, n_buffer : 1563, eps : 7.2%
n_episode : 180, score : 9.4, n_buffer : 1751, eps : 7.1%
n_episode : 200, score : 9.6, n_buffer : 1942, eps : 7.0%
n_episode : 220, score : 26.2, n_buffer : 2467, eps : 6.9%
n_episode : 240, score : 32.5, n_buffer : 3118, eps : 6.8%
n_episode : 260, score : 63.9, n_buffer : 4395, eps : 6.7%
n_episode : 280, score : 60.9, n_buffer : 5612, eps : 6.6%
n_episode : 300, score : 96.8, n_buffer : 7549, eps : 6.5%
n_episode : 320, score : 110.7, n_buffer : 9763, eps : 6.4%
n_episode : 340, score : 84.7, n_buffer : 11457, eps : 6.3%
n_episode : 