<a href="https://colab.research.google.com/github/slcnvly/deep-learning-practice/blob/master/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
!apt-get update > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

!pip install gym pyvirtualdisplay imageio imageio-ffmpeg > /dev/null 2>&1

In [21]:
import gymnasium as gym
import collections
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import imageio
from pyvirtualdisplay import Display

In [22]:
display = Display(visible=0, size=(1400, 900))
display.start()

def save_dqn_gif(model, env_name='CartPole-v1', filename="cartpole_result.gif"):
    import torch  # 혹시 몰라 import 추가

    # 모델이 지금 GPU에 있는지 CPU에 있는지 스스로 확인하게 함
    # 사용자가 device를 뭘로 설정했든 상관없이 모델이 있는 곳으로 따라감
    target_device = next(model.parameters()).device

    model.eval() # 평가 모드

    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False

    frames = []
    step_count = 0
    max_steps = 500

    print(f"[{filename}] GIF 생성 시작... (모델 위치: {target_device})")

    while not done and step_count < max_steps:
        frames.append(env.render())

        # 입력 데이터를 모델이 있는 곳(target_device)으로 강제 이동
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(target_device)

        with torch.no_grad():
            q_values = model(state_tensor)

        action = q_values.argmax().item()

        next_state, reward, done, truncated, _ = env.step(action)
        state = next_state
        step_count += 1

    env.close()
    imageio.mimsave(filename, frames, fps=30)
    print(f"✅ 저장 완료! {filename}을 다운로드 받으세요.")

In [23]:
# 하이퍼파라미터 설정
learning_rate = 0.0005
gamma         = 0.98
buffer_limit  = 50000
batch_size    = 32

# 1. Experience Replay Buffer
class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)

    def put(self, transition):
        S, A, R, S_prime, Done = transition
        self.buffer.append((S, A, R, S_prime, Done))

    def sample(self, n):
        # 버퍼에서 n개의 데이터를 무작위로 추출하여 s, a, r, s_prime, done_mask 형태의 텐서로 반환하는 로직
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
        for transition in mini_batch:
            s, a, r, s_prime, done = transition
            s_lst.append(s) # s는 이미 여러개의 값(ex. 위치, 속도, 각도, 각속도)을 가진 리스트
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask = 0.0 if done else 1.0
            done_lst.append([done_mask]) # 안전을 위해 그냥 형태 통일

        return torch.tensor(np.array(s_lst), dtype=torch.float), \
           torch.tensor(a_lst), \
           torch.tensor(np.array(r_lst), dtype=torch.float), \
           torch.tensor(np.array(s_prime_lst), dtype=torch.float), \
           torch.tensor(np.array(done_lst), dtype=torch.float)
        # 리스트->numpy->tensor의 변환 이유는 numpy 배열의 메모리 저장 방식(연속적)의 수혜를 받기 위해, 이후 딥러닝 라이브러리 호환성을 위해 tensor로 변환

    def size(self):
        return len(self.buffer)

In [24]:
# 2. Q-네트워크 (Q-Network)
class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        #신경망의 층(Linear Layer)을 정의
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def sample_action(self, obs, epsilon):
        # epsilon-greedy 방식으로 행동을 선택
        coin = random.random()
        if coin < epsilon:
          return random.randint(0,1)
        else:
          out = self.forward(obs)
          return out.argmax().item() # 신경망의 결과는 각 행동들의 Q함수 --> 따라서 argmax를 이용해 행동 자체를 뽑아냄

In [25]:
# 3. 학습 로직 (Train Function)
def train(q, q_target, memory, optimizer):
    # 1. 메모리에서 배치를 샘플링
    S, A, R, S_prime, Done = memory.sample(batch_size)
    # 2. 타겟 y_i 계산 (Target Network 사용) --> Moving target 문제를 해결하기 위해..
    q_target_out = q_target(S_prime)
    # 3. 현재 Q값 계산 (Q-Network 사용)
    q_out = q(S)
    # 4. Loss 계산 및 역전파(Backpropagation)
    q_a = q_out.gather(1, A) # gather함수 --> 1-dim(열 방향)의 행동 A값 인덱스의 q값만 추출하는 코드
    max_q_prime = q_target_out.max(1)[0].unsqueeze(1) # max(1)은 두가지 반환 --> [0]에서 최대값, [1]에서 argmax / unsqueeze를 통해 q_a와 차원을 맞춤([batchsize]1차원 --> [batchsize, 1]로)
    target = R + gamma * max_q_prime * Done
    loss = F.smooth_l1_loss(q_a, target) # L = 1/2 * (Target - current_Q)^2 의 식으로 계산
    optimizer.zero_grad()
    loss.backward() # L의 gradients 계산
    torch.nn.utils.clip_grad_norm_(q.parameters(), 1.0) # 안정장치 - norm이 1.0을 넘어가는 보상에 대해서 1.0으로 고
    optimizer.step() # Q_net의 가중치 학습

In [27]:
# 4. 메인 루프 (Main Loop)
def main():
    env = gym.make('CartPole-v1')
    q = Qnet()
    q_target = Qnet()
    q_target.load_state_dict(q.state_dict()) # 가중치 동기화
    memory = ReplayBuffer()

    optimizer = optim.Adam(q.parameters(), lr=learning_rate)
    score = 0.0

    for n_epi in range(10000):
        epsilon = max(0.01, 0.3 - 0.3*(n_epi/2000)) # Linear annealing(선형 감쇠) --> 학습을 해 나갈수록 입실론 값을 줄여나감
        s, _ = env.reset() # env.reset() --> 환경을 초기 상태로 reset + 첫번째 state을 반환
        done = False

        while not done:
            # 에이전트가 행동을 선택하고 환경과 상호작용
            action = q.sample_action(torch.from_numpy(s).float(), epsilon)
            s_prime, r, done, _, _ = env.step(action)
            memory.put((s, action, r/100.0, s_prime, done)) # 딥러닝은 보통 입출력이 -1~1 일때 최적화가 가장 잘 작동.. normalization을 통해 학습의 안정화를 꾀함
            # 학습 조건이 맞으면 train(q, q_target, memory, optimizer) 호출
            if memory.size() > 2000:
                train(q, q_target, memory, optimizer)

            s = s_prime
            score += r
            if done:
                break

        # 일정 주기마다 Target Network를 업데이트(가중치 복사)

        if n_epi % 20 == 0 and n_epi != 0:
            # 학습 중 평균 점수 (탐험 때문에 낮을 수 있음)
            avg_score = score / 20

            # 잠깐 입실론 0으로 설정하고 1판 테스트 (진짜 실력 측정)
            test_score = 0
            s_test, _ = env.reset()
            done_test = False
            test_step_count = 0
            max_test_steps = 500
            while not done_test:
                # 입실론 0.0 (무조건 Greedy)
                a_test = q.sample_action(torch.from_numpy(s_test).float(), 0.0)
                s_test, r_test, done_test, _, _ = env.step(a_test)
                test_score += r_test
                test_step_count += 1
                # 무한루프 방지 - 너무 오래 버티면 강제로 끝내기
                if test_step_count >= max_test_steps:
                    break

            # 결과 출력
            print(f"Episode: {n_epi} | Train Score: {avg_score:.1f} | Real Test: {test_score:.1f} | Eps: {epsilon:.2f}")

            # 모델 저장 및 타겟 업데이트
            q_target.load_state_dict(q.state_dict())
            if test_score >= 500.0:
                print(f"mission complete on Episode {n_epi}! terminate learning.")
                break
            score = 0.0

    env.close()
    save_dqn_gif(q, filename="final_success.gif")

if __name__ == '__main__':
    main()

Episode: 20 | Train Score: 61.0 | Real Test: 35.0 | Eps: 0.30
Episode: 40 | Train Score: 41.2 | Real Test: 10.0 | Eps: 0.29
Episode: 60 | Train Score: 10.8 | Real Test: 10.0 | Eps: 0.29
Episode: 80 | Train Score: 11.8 | Real Test: 9.0 | Eps: 0.29
Episode: 100 | Train Score: 12.2 | Real Test: 11.0 | Eps: 0.28
Episode: 120 | Train Score: 12.9 | Real Test: 13.0 | Eps: 0.28
Episode: 140 | Train Score: 20.6 | Real Test: 10.0 | Eps: 0.28
Episode: 160 | Train Score: 61.1 | Real Test: 100.0 | Eps: 0.28
Episode: 180 | Train Score: 58.0 | Real Test: 153.0 | Eps: 0.27
Episode: 200 | Train Score: 203.8 | Real Test: 203.0 | Eps: 0.27
Episode: 220 | Train Score: 150.3 | Real Test: 159.0 | Eps: 0.27
Episode: 240 | Train Score: 137.7 | Real Test: 121.0 | Eps: 0.26
Episode: 260 | Train Score: 117.3 | Real Test: 110.0 | Eps: 0.26
Episode: 280 | Train Score: 109.8 | Real Test: 98.0 | Eps: 0.26
Episode: 300 | Train Score: 99.5 | Real Test: 93.0 | Eps: 0.26
Episode: 320 | Train Score: 123.8 | Real Test: 22