# (1), (2)

In [1]:
!pip3 install gymnasium[classic_control]

Collecting gymnasium[classic_control]
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[classic_control])
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


## Import

In [2]:
import gym
import collections
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import itertools

## Hyperparameter 정의

In [3]:
#Hyperparameters
learning_rate = 0.001
gamma         = 0.98
buffer_limit  = 50000
batch_size    = 32

# GPU 사용을 위한 device 설정
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


  and should_run_async(code)


## ReplayBuffer 클래스 정의

In [4]:
## 에이전트의 경험을 저장하고 샘플링하는 역할
class ReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)    # 경험 저장 공간 설정

    def put(self, transition):
        self.buffer.append(transition)    # 새로운 경험인 transition을 저장 공간에 추가

    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)    # n(mini_batch의 크기)만큼 샘플링
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)   # 현재 상태(state)
            a_lst.append([a])   # 행동(action)
            r_lst.append([r])   # 보상(reward)
            s_prime_lst.append(s_prime)   # 다음 상태
            done_mask_lst.append([done_mask])   # 종료 여부

        # 텐서 형태로 반환 수행
        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
               torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
               torch.tensor(done_mask_lst)

    def size(self):
        return len(self.buffer)   # 저장 공간의 길이 반환

## Q-Network 클래스 정의

In [5]:
# DQN의 핵심 모델로, 현재 상태를 입력받아 각 행동에 대한 Q값 출력
class Qnet(nn.Module):
    def __init__(self):
        super(Qnet, self).__init__()
        self.fc1 = nn.Linear(4, 128)    # 입력 - 상태 공간 크기(4)
        self.fc2 = nn.Linear(128, 128)  # 히든
        self.fc3 = nn.Linear(128, 2)    # 출력 - 행동 공간 크기(2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))   # 첫번째, 두번째 층에서 ReLU 활성 함수 적용
        x = self.fc3(x)   # 최종 출력값 - 행동별 Q값
        return x

    def sample_action(self, obs, epsilon):
        out = self.forward(obs)   # Q값 계산
        coin = random.random()    # 0과 1사이 난수 생성
        if coin < epsilon:
            return random.randint(0,1)    # 설정한 epsilon보다 난수가 작다면 랜덤 행동 선택
        else :
            return out.argmax().item()    # 아니라면 최대 Q값을 가진 행동 선택

## 학습 함수 정의

In [6]:
# 위에서 정의한 DQN 학습
# 경험을 샘플링하여 Q-Network와 Target Q-Network의 차이를 줄이는 것이 목적
def train(q, q_target, memory, optimizer):
    for i in range(10):
        s, a, r, s_prime, done_mask = memory.sample(batch_size)   # 경험을 batch_size만큼 샘플링

        # 데이터를 GPU로 이동
        s = s.to(device)
        a = a.to(device)
        r = r.to(device)
        s_prime = s_prime.to(device)
        done_mask = done_mask.to(device)

        q_out = q(s)    # Q값 계산
        q_a = q_out.gather(1, a)    # 선택한 행동에 대한 Q값 저장
        max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)    # 다음 상태 s_prime에 대해 가장 큰 Q값 계산
        target = r + gamma * max_q_prime * done_mask    # Bellman Equation
        loss = F.smooth_l1_loss(q_a, target)    # Loss 함수로 Huber Loss 사용

        optimizer.zero_grad()   # 이전 단계의 optimizer 가중치 초기화
        loss.backward()   # 역전파를 이용해 가중치 계산
        optimizer.step()    # 가중치 업데이트

## 메인 함수 정의 및 실행

In [7]:
## T4 - 8분
def main():
    env = gym.make('CartPole-v1')   # 환경 생성
    q = Qnet().to(device)    # 새로운 Q-Network 생성
    q_target = Qnet().to(device)   # Target Q-Network 생성
    q_target.load_state_dict(q.state_dict())    # 가중치 초기화
    memory = ReplayBuffer()   # ReplayBuffer 초기화

    print_interval = 50   # print 할 에피소드 시점 설정
    score = 0.0   # 점수 초기화
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)    # Adam optimizer 설정

    for n_epi in range(5000):    # 에피소드 반복 횟수(10000->5000)
        epsilon = max(0.005, 0.08 - 0.01*(n_epi/200))  # Linear annealing from 8% to 1%
        s = env.reset()   # gym의 새로운 버전에서 env.reset()의 반환값이 변경됨
        s = torch.tensor(s, dtype=torch.float).to(device)  # 상태를 GPU로 이동
        done = False    # 종료조건

        while not done:
            a = q.sample_action(s, epsilon)  # GPU 텐서를 사용하여 행동 샘플링
            s_prime, r, done, info = env.step(a)    # gym의 새로운 버전에서 반환값이 4개로 변경됨
            s_prime = torch.tensor(s_prime, dtype=torch.float).to(device)  # 다음 상태를 GPU로 이동
            done_mask = 0.0 if done else 1.0    # 선택한 행동을 수행함에 따라 얻은 결과
            memory.put((s.cpu().numpy(), a, r / 100.0, s_prime.cpu().numpy(), done_mask))    # 경험 저장
            s = s_prime   # 다음 상태로 이동

            score += r    # 얻은 보상을 점수에 추가
            if done:
                break

        ## 일정 크기(2000) 이상 ReplayBuffer가 채워졌을 경우 학습 수행
        if memory.size()>2000:
            train(q, q_target, memory, optimizer)

        ## 학습 결과 출력
        if n_epi%print_interval==0 and n_epi!=0:
            q_target.load_state_dict(q.state_dict())    # Target Q-Network 가중치 업데이트
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                            n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0   # 점수 초기화
    env.close()   # 환경 종료

if __name__ == '__main__':
    main()

  deprecation(
  deprecation(
  if not isinstance(terminated, (bool, np.bool8)):


n_episode :50, score : 13.8, n_buffer : 688, eps : 7.8%
n_episode :100, score : 12.8, n_buffer : 1327, eps : 7.5%
n_episode :150, score : 13.0, n_buffer : 1978, eps : 7.3%


  return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \


n_episode :200, score : 17.1, n_buffer : 2834, eps : 7.0%
n_episode :250, score : 20.8, n_buffer : 3874, eps : 6.8%
n_episode :300, score : 23.8, n_buffer : 5062, eps : 6.5%
n_episode :350, score : 20.3, n_buffer : 6075, eps : 6.2%
n_episode :400, score : 39.9, n_buffer : 8068, eps : 6.0%
n_episode :450, score : 46.4, n_buffer : 10388, eps : 5.8%
n_episode :500, score : 47.9, n_buffer : 12783, eps : 5.5%
n_episode :550, score : 97.5, n_buffer : 17660, eps : 5.3%
n_episode :600, score : 123.3, n_buffer : 23825, eps : 5.0%
n_episode :650, score : 137.8, n_buffer : 30714, eps : 4.8%
n_episode :700, score : 161.0, n_buffer : 38766, eps : 4.5%
n_episode :750, score : 148.4, n_buffer : 46186, eps : 4.2%
n_episode :800, score : 173.3, n_buffer : 50000, eps : 4.0%
n_episode :850, score : 187.9, n_buffer : 50000, eps : 3.8%
n_episode :900, score : 183.0, n_buffer : 50000, eps : 3.5%
n_episode :950, score : 176.7, n_buffer : 50000, eps : 3.2%
n_episode :1000, score : 162.4, n_buffer : 50000, eps

# (3)

In [8]:
## 연속적인 transition을 입력
class SequentialReplayBuffer():
    def __init__(self):
        self.buffer = collections.deque(maxlen=buffer_limit)    # 경험 저장 공간 설정

    def put(self, transition):
        self.buffer.append(transition)    # 새로운 경험인 transition을 저장 공간에 추가

    def sample(self, n):
        if len(self.buffer) < n:    # 데이터 부족 시 예외 처리
            raise ValueError("Buffer does not contain enough transitions.")

        # 연속된 transition 가져오기
        idx = random.randint(0, len(self.buffer) - n)  # 시작 인덱스 설정
        mini_batch = list(itertools.islice(self.buffer, idx, idx + n))

        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []

        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)   # 현재 상태(state)
            a_lst.append([a])   # 행동(action)
            r_lst.append([r])   # 보상(reward)
            s_prime_lst.append(s_prime)   # 다음 상태
            done_mask_lst.append([done_mask])   # 종료 여부

        # 텐서 형태로 반환 수행
        return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
               torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
               torch.tensor(done_mask_lst)

    def size(self):
        return len(self.buffer)   # 저장 공간의 길이 반환

  and should_run_async(code)


In [9]:
# T4 - 10분
def main_v2():
    env = gym.make('CartPole-v1')   # 환경 생성
    q = Qnet().to(device)    # 새로운 Q-Network 생성
    q_target = Qnet().to(device)   # Target Q-Network 생성
    q_target.load_state_dict(q.state_dict())    # 가중치 초기화
    memory = SequentialReplayBuffer()   # ReplayBuffer -> SequentialReplayBuffer 변경

    print_interval = 50   # print 할 에피소드 시점 설정
    score = 0.0   # 점수 초기화
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)    # Adam optimizer 설정

    for n_epi in range(5000):    # 에피소드 반복 횟수(10000->5000)
        epsilon = max(0.005, 0.08 - 0.01*(n_epi/200))  # Linear annealing from 8% to 1%
        s = env.reset()   # gym의 새로운 버전에서 env.reset()의 반환값이 변경됨
        s = torch.tensor(s, dtype=torch.float).to(device)  # 상태를 GPU로 이동
        done = False    # 종료조건

        while not done:
            a = q.sample_action(s, epsilon)  # GPU 텐서를 사용하여 행동 샘플링
            s_prime, r, done, info = env.step(a)    # gym의 새로운 버전에서 반환값이 4개로 변경됨
            s_prime = torch.tensor(s_prime, dtype=torch.float).to(device)  # 다음 상태를 GPU로 이동
            done_mask = 0.0 if done else 1.0    # 선택한 행동을 수행함에 따라 얻은 결과
            memory.put((s.cpu().numpy(), a, r / 100.0, s_prime.cpu().numpy(), done_mask))    # 경험 저장
            s = s_prime   # 다음 상태로 이동

            score += r    # 얻은 보상을 점수에 추가
            if done:
                break

        ## 일정 크기(2000) 이상 ReplayBuffer가 채워졌을 경우 학습 수행
        if memory.size()>2000:
            train(q, q_target, memory, optimizer)

        ## 학습 결과 출력
        if n_epi%print_interval==0 and n_epi!=0:
            q_target.load_state_dict(q.state_dict())    # Target Q-Network 가중치 업데이트
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                            n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0   # 점수 초기화
    env.close()   # 환경 종료

if __name__ == '__main__':
    main_v2()

  deprecation(
  deprecation(


n_episode :50, score : 9.9, n_buffer : 494, eps : 7.8%
n_episode :100, score : 9.8, n_buffer : 984, eps : 7.5%
n_episode :150, score : 9.5, n_buffer : 1457, eps : 7.3%
n_episode :200, score : 10.0, n_buffer : 1956, eps : 7.0%
n_episode :250, score : 23.0, n_buffer : 3108, eps : 6.8%
n_episode :300, score : 35.5, n_buffer : 4885, eps : 6.5%
n_episode :350, score : 90.3, n_buffer : 9399, eps : 6.2%
n_episode :400, score : 185.9, n_buffer : 18696, eps : 6.0%
n_episode :450, score : 192.7, n_buffer : 28330, eps : 5.8%
n_episode :500, score : 148.3, n_buffer : 35744, eps : 5.5%
n_episode :550, score : 303.0, n_buffer : 50000, eps : 5.3%
n_episode :600, score : 229.1, n_buffer : 50000, eps : 5.0%
n_episode :650, score : 238.3, n_buffer : 50000, eps : 4.8%
n_episode :700, score : 269.3, n_buffer : 50000, eps : 4.5%
n_episode :750, score : 253.7, n_buffer : 50000, eps : 4.2%
n_episode :800, score : 239.7, n_buffer : 50000, eps : 4.0%
n_episode :850, score : 278.8, n_buffer : 50000, eps : 3.8%


### 실험결과<br />

큰 차이는 없었지만, 그럼에도 다소 성능이 떨어지고 안정성이 부족해보임을 확인할 수 있음.<br />샘플의 다양성이 부족하고 에폭이 많아지면 과적합 위험성이 존재


# (4)

In [10]:
def train_v2(q, memory, optimizer):
    for i in range(10):
        s, a, r, s_prime, done_mask = memory.sample(batch_size)

        # 데이터를 GPU로 이동
        s = s.to(device)
        a = a.to(device)
        r = r.to(device)
        s_prime = s_prime.to(device)
        done_mask = done_mask.to(device)

        q_out = q(s)
        q_a = q_out.gather(1, a)  # 선택한 행동에 대한 Q값
        max_q_prime = q(s_prime).max(1)[0].unsqueeze(1)  # 동일 Q-network로 다음 상태 Q값 계산
        target = r + gamma * max_q_prime * done_mask  # Bellman Equation
        loss = F.smooth_l1_loss(q_a, target)  # Huber Loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [11]:
# T4 - 7분
def main_v3():
    env = gym.make('CartPole-v1')   # 환경 생성
    q = Qnet().to(device)    # 새로운 Q-Network 생성
    memory = ReplayBuffer()   # ReplayBuffer 초기화

    print_interval = 50   # print 할 에피소드 시점 설정
    score = 0.0   # 점수 초기화
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)    # Adam optimizer 설정

    for n_epi in range(5000):    # 에피소드 반복 횟수(10000->5000)
        epsilon = max(0.005, 0.08 - 0.01*(n_epi/200))  # Linear annealing from 8% to 1%
        s = env.reset()   # gym의 새로운 버전에서 env.reset()의 반환값이 변경됨
        s = torch.tensor(s, dtype=torch.float).to(device)  # 상태를 GPU로 이동
        done = False    # 종료조건

        while not done:
            a = q.sample_action(s, epsilon)  # GPU 텐서를 사용하여 행동 샘플링
            step_result = env.step(a)  # Step API 대응
            if len(step_result) == 4:
                s_prime, r, done, info = step_result
            else:
                s_prime, r, terminated, truncated, info = step_result
                done = terminated or truncated

            s_prime = torch.tensor(s_prime, dtype=torch.float).to(device)  # 다음 상태를 GPU로 이동
            done_mask = 0.0 if done else 1.0    # 선택한 행동을 수행함에 따라 얻은 결과
            memory.put((s.cpu().numpy(), a, r / 100.0, s_prime.cpu().numpy(), done_mask))    # 경험 저장
            s = s_prime   # 다음 상태로 이동

            score += r    # 얻은 보상을 점수에 추가
            if done:
                break

        ## 일정 크기(2000) 이상 ReplayBuffer가 채워졌을 경우 학습 수행
        if memory.size()>2000:
            train_v2(q, memory, optimizer)

        ## 학습 결과 출력
        if n_epi%print_interval==0 and n_epi!=0:
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                            n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0   # 점수 초기화
    env.close()   # 환경 종료

if __name__ == '__main__':
    main_v3()

n_episode :50, score : 10.4, n_buffer : 522, eps : 7.8%
n_episode :100, score : 10.7, n_buffer : 1056, eps : 7.5%
n_episode :150, score : 10.6, n_buffer : 1588, eps : 7.3%
n_episode :200, score : 10.9, n_buffer : 2131, eps : 7.0%
n_episode :250, score : 33.9, n_buffer : 3828, eps : 6.8%
n_episode :300, score : 50.7, n_buffer : 6365, eps : 6.5%
n_episode :350, score : 52.0, n_buffer : 8963, eps : 6.2%
n_episode :400, score : 77.5, n_buffer : 12839, eps : 6.0%
n_episode :450, score : 83.0, n_buffer : 16989, eps : 5.8%
n_episode :500, score : 125.0, n_buffer : 23239, eps : 5.5%
n_episode :550, score : 131.5, n_buffer : 29815, eps : 5.3%
n_episode :600, score : 142.7, n_buffer : 36951, eps : 5.0%
n_episode :650, score : 155.7, n_buffer : 44736, eps : 4.8%
n_episode :700, score : 187.5, n_buffer : 50000, eps : 4.5%
n_episode :750, score : 198.7, n_buffer : 50000, eps : 4.2%
n_episode :800, score : 191.5, n_buffer : 50000, eps : 4.0%
n_episode :850, score : 112.7, n_buffer : 50000, eps : 3.8

### 실험결과<br />

단일 네트워크를 사용해 실행시간이 줄어들었지만(10분->7분), 성능이 비교적 매우 떨어지는 것을 확인할 수 있음.

# (5)

In [12]:
# T4 - 7분
def main_v4():
    env = gym.make('CartPole-v1')   # 환경 생성
    q = Qnet().to(device)    # 새로운 Q-Network 생성
    memory = SequentialReplayBuffer()   # ReplayBuffer -> SequentialReplayBuffer 변경

    print_interval = 50   # print 할 에피소드 시점 설정
    score = 0.0   # 점수 초기화
    optimizer = optim.Adam(q.parameters(), lr=learning_rate)    # Adam optimizer 설정

    for n_epi in range(5000):    # 에피소드 반복 횟수(10000->5000)
        epsilon = max(0.005, 0.08 - 0.01*(n_epi/200))  # Linear annealing from 8% to 1%
        s = env.reset()   # gym의 새로운 버전에서 env.reset()의 반환값이 변경됨
        s = torch.tensor(s, dtype=torch.float).to(device)  # 상태를 GPU로 이동
        done = False    # 종료조건

        while not done:
            a = q.sample_action(s, epsilon)  # GPU 텐서를 사용하여 행동 샘플링
            step_result = env.step(a)  # Step API 대응
            if len(step_result) == 4:
                s_prime, r, done, info = step_result
            else:
                s_prime, r, terminated, truncated, info = step_result
                done = terminated or truncated

            s_prime = torch.tensor(s_prime, dtype=torch.float).to(device)  # 다음 상태를 GPU로 이동
            done_mask = 0.0 if done else 1.0    # 선택한 행동을 수행함에 따라 얻은 결과
            memory.put((s.cpu().numpy(), a, r / 100.0, s_prime.cpu().numpy(), done_mask))    # 경험 저장
            s = s_prime   # 다음 상태로 이동

            score += r    # 얻은 보상을 점수에 추가
            if done:
                break

        ## 일정 크기(2000) 이상 ReplayBuffer가 채워졌을 경우 학습 수행
        if memory.size()>2000:
            train_v2(q, memory, optimizer)

        ## 학습 결과 출력
        if n_epi%print_interval==0 and n_epi!=0:
            print("n_episode :{}, score : {:.1f}, n_buffer : {}, eps : {:.1f}%".format(
                                                            n_epi, score/print_interval, memory.size(), epsilon*100))
            score = 0.0   # 점수 초기화
    env.close()   # 환경 종료

if __name__ == '__main__':
    main_v4()

n_episode :50, score : 10.0, n_buffer : 502, eps : 7.8%
n_episode :100, score : 9.6, n_buffer : 981, eps : 7.5%
n_episode :150, score : 9.7, n_buffer : 1468, eps : 7.3%
n_episode :200, score : 9.8, n_buffer : 1959, eps : 7.0%
n_episode :250, score : 19.2, n_buffer : 2917, eps : 6.8%
n_episode :300, score : 49.6, n_buffer : 5398, eps : 6.5%
n_episode :350, score : 52.6, n_buffer : 8028, eps : 6.2%
n_episode :400, score : 62.0, n_buffer : 11127, eps : 6.0%
n_episode :450, score : 67.3, n_buffer : 14492, eps : 5.8%
n_episode :500, score : 69.6, n_buffer : 17970, eps : 5.5%
n_episode :550, score : 68.6, n_buffer : 21398, eps : 5.3%
n_episode :600, score : 94.0, n_buffer : 26098, eps : 5.0%
n_episode :650, score : 130.7, n_buffer : 32634, eps : 4.8%
n_episode :700, score : 119.3, n_buffer : 38598, eps : 4.5%
n_episode :750, score : 145.1, n_buffer : 45853, eps : 4.2%
n_episode :800, score : 140.4, n_buffer : 50000, eps : 4.0%
n_episode :850, score : 130.3, n_buffer : 50000, eps : 3.8%
n_epi

### 실험결과<br />

naive DQN은 구조가 간단하지만, 기존 DQN 코드보다 성능이 훨씬 떨어지는 것을 알 수 있었음.



# (6)

 <table>
  <thead>
    <tr>
      <th><b>모델/변경 사항</b></th>
      <th><b>특징</b></th>
      <th><b>결론</b></th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td><b>(2) 원래의 DQN</b></td>
      <td>안정적인 학습과 빠른 수렴 가능<br />Hyperparameter 튜닝이 필요</td>
      <td>가장 균형 잡힌 성능을 제공</td>
    </tr>
    <tr>
      <td><b>(3) 연속 transition 샘플링</b></td>
      <td>샘플 다양성 부족, 과적합 가능성</td>
      <td>특정 환경에서 유용할 수 있지만, 일반적인 안정성은 떨어짐</td>
    </tr>
    <tr>
      <td><b>(4) Target Network 미사용</b></td>
      <td>구조가 간단<br />학습 불안정성 증가, Q-value 발산 가능성</td>
      <td>Target Network 없이 안정적인 학습은 어려움</td>
    </tr>
    <tr>
      <td><b>(5) naive DQN</b></td>
      <td>위 두 가지 단점 결합: 과적합 및 불안정성</td>
      <td>초보적인 실험에서는 유용하나 실제 성능은 가장 낮음</td>
    </tr>
  </tbody>
</table>