<a href="https://colab.research.google.com/github/seongheechoi/education/blob/main/DQN_practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install numpy==1.23.5
!pip install gym==0.25.2
!pip install matplotlib
!pip install gym[classic_control]

목표: 알고리즘을 완성하고 높은 test score 성능 달성하기

수행 내용:
1. 아래 DQN 코드에 10개의 빈칸 라인을 채워 코드를 구동시키세요.
  - 빈칸 부분은 주석으로 표시되어 있으며 1줄의 라인을 채우시면 됩니다.
2. 필요한 경우, 더 높은 성능을 달성하기 위해 hyperparameter조정, Q 네트워크 구조 변경 등 적절히 조절하세요.

참고 사항:
- 최종 점수는 마지막 10번의 test score 평균으로 계산합니다.
- Acrobot 환경의 특성 상 test score는 음수가 나오게 됩니다.

In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import gym
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

config = dict(
    env_name            = "Acrobot-v1", # **환경은 변경 금지**
    batch               = 128,          # 배치 크기
    buffer_size         = 10000,        # 버퍼의 최대 크기(transition 단위)
    lr                  = 0.001,        # 학습률
    gamma               = 0.99,         # discount(할인율)
    epsilon_init        = 0.1,          # 초기 epsilon(=초기 탐험 비율)
    epsilon_min         = 0.001,        # 최소 epsilon(=최종 탐험 비율)
    epsilon_decay       = 0.995,        # epsilon 감소 비율(매 episode마다 epsilon에 곱해짐)
    n_step              = 50000,        # 학습 횟수 (time step 단위)
    n_train_start       = 8000,         # 초기 버퍼 채우기(time step 단위)
    target_update_freq  = 100,          # target Q의 업데이트 주기(time step 단위)
    test_freq           = 100,          # test 주기(time step 단위)
)

env      = gym.make(config["env_name"])
test_env = gym.make(config["env_name"])
dState = env.observation_space.shape[0]
dAction = env.action_space.n

class Qnet(nn.Module):
    def __init__(self, dState, dAction):
        super().__init__()
        self.layers = nn.Sequential(
            # --- Q1: 입력층 ---
            nn.ReLU(),
            # --- Q2: 은닉층 ---
            nn.ReLU(),
            # --- Q3: 출력층 ---
        )

    def forward(self, x):
        return self.layers(x)


Q = Qnet(dState=dState, dAction=dAction).to(DEVICE)
targetQ = Qnet(dState=dState, dAction=dAction).to(DEVICE)
# --- Q4: target Q 네트워크를 Q 로 업데이트 ---
optimizerQ = optim.Adam(Q.parameters(), lr=config["lr"])

replay_buffer = deque(maxlen=config["buffer_size"])

gamma = config["gamma"]
epsilon = config["epsilon_init"]

def getAction(state, dAction, epsilon, Q):
    with torch.no_grad():
        if (random.random() > epsilon):
            state = torch.from_numpy(state).float().to(DEVICE)
            # --- Q5: state에서 모든 Q를 계산하고 가장 큰 행동 선택 ---
        else:
            # --- Q6: 랜덤 action 선택 ---
    return action

def test(env, Q, dAction, epsilon):
    with torch.no_grad():
        score = 0.0
        state = env.reset()
        while True:
            action = getAction(state, dAction, epsilon, Q)
            nextState, reward, done, _info = env.step(action)
            score += reward
            state = nextState
            if done:
                break
    return score


state = env.reset()
for t in range(1, config["n_train_start"]+1):
    action = getAction(state, dAction, epsilon, Q)
    nextState, reward, done, _info = env.step(action)
    transition = (state, action, reward, nextState, done)
    replay_buffer.append(transition)
    state = nextState
    if done:
        state = env.reset()

score = 0.0
scores = []
test_scores = []
losses = []
qs = []

state = env.reset()
for t in range(1, config["n_step"]+1):
    action = getAction(state, dAction, epsilon, Q)
    nextState, reward, done, _info = env.step(action)
    transition = (state, action, reward, nextState, done)
    replay_buffer.append(transition)

    score += reward
    state = nextState
    if done:
        state = env.reset()
        scores.append(score)
        score = 0.0
        epsilon = max(config["epsilon_min"], epsilon*config["epsilon_decay"])

    transitions = random.sample(replay_buffer, config["batch"])
    batch = []
    for item in zip(*transitions):
        item = torch.from_numpy(np.stack(item)).float().to(DEVICE)
        batch.append(item)
    states, actions, rewards, nextStates, dones = batch
    actions = actions.unsqueeze(dim=-1).long()
    rewards = rewards.unsqueeze(dim=-1)
    dones = dones.unsqueeze(dim=-1)

    # --- Q7: estimateQs 계산 ---

    with torch.no_grad():
        # --- Q8: nextTargetQs 계산 ---
        # --- Q9: targetQs 게산 ---

    # --- Q10: 목적함수 MSE loss 계산 ---

    optimizerQ.zero_grad()
    loss.backward()
    optimizerQ.step()
    losses.append(loss.detach().cpu().item())
    qs.append(estimateQs.detach().mean().cpu().item())

    if t % config["target_update_freq"] == 0:
        targetQ.load_state_dict(Q.state_dict())

    if t % config["test_freq"] == 0:
        test_score = test(test_env, Q, dAction, epsilon=config["epsilon_min"])
        test_scores.append(test_score)
        print(f'[time:{t}]test score: {test_score}, train loss: {losses[-1]:.3f}, epsilon: {epsilon:.3f}')


print(f'최종 점수: {np.array(test_scores[-10:]).mean()}')
with open("result.csv", "w") as f:
    f.write(str(np.array(test_scores[-10:]).mean()))

[time:100]test score: -500.0, train loss: 0.002, epsilon: 0.100
[time:200]test score: -500.0, train loss: 0.011, epsilon: 0.100
[time:300]test score: -500.0, train loss: 0.005, epsilon: 0.100
[time:400]test score: -500.0, train loss: 0.006, epsilon: 0.100
[time:500]test score: -500.0, train loss: 0.066, epsilon: 0.100
[time:600]test score: -500.0, train loss: 0.180, epsilon: 0.100
[time:700]test score: -500.0, train loss: 0.019, epsilon: 0.100
[time:800]test score: -500.0, train loss: 0.128, epsilon: 0.100
[time:900]test score: -500.0, train loss: 0.010, epsilon: 0.099
[time:1000]test score: -267.0, train loss: 0.025, epsilon: 0.099
[time:1100]test score: -481.0, train loss: 0.028, epsilon: 0.099
[time:1200]test score: -500.0, train loss: 1.140, epsilon: 0.099
[time:1300]test score: -500.0, train loss: 0.023, epsilon: 0.099
[time:1400]test score: -242.0, train loss: 0.018, epsilon: 0.099
[time:1500]test score: -500.0, train loss: 0.031, epsilon: 0.099
[time:1600]test score: -500.0, tra

In [None]:
import os
os.system("elice_grade result.csv")