# Deep Q-Learning for Lunar Landing

https://gymnasium.farama.org  
https://gymnasium.farama.org/environments/box2d/lunar_lander

### Action Space
There are four discrete actions available:

0: 아무것도 하지 않음    
1: 왼쪽 방향 조정 엔진 발사   
2: 메인 엔진(아래로 추진) 발사    
3: 오른쪽 방향 조정 엔진 발사    

### Rewards
각 스텝(step)마다 보상이 주어지며,  
에피소드의 총 보상은 모든 스텝의 보상 합계.

#### 각 스텝에서의 보상 변화:
- 착륙 패드에 가까워질수록 보상 증가 (멀어질수록 감소)
- 속도가 느릴수록 보상 증가 (빠르면 감소 — 부드러운 착륙 유도)
- 기울어질수록(수평이 아닐수록) 보상 감소 → 똑바로 착지해야 함
- 다리가 지면에 닿으면,
  - 닿은 다리당 +10점
- 엔진 사용 페널티
  - 측면(좌/우) 엔진 작동 시: 매 프레임마다 −0.03점
  - 메인(아래) 엔진 작동 시: 매 프레임마다 −0.3점

#### 에피소드 종료 시 보상
- 추락하면: −100점  
- 안전하게 착륙하면: +100점

#### 성공 기준
- 에피소드 당 총 점수가 200점 이상이면 해당 에피소드를 성공(Solved)했다고 간주

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

### Importing the libraries

In [3]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [4]:
class Network(nn.Module):
    ''' 현재 상태(state)가 들어오면
        각 행동에 대한 점수(Q값)를 출력 '''

    def __init__(self, state_size, action_size, seed=42) -> None:
      super(Network, self).__init__()
      self.seed = torch.manual_seed(seed)
      self.fc1 = nn.Linear(state_size, 64) # 입력층 → 첫 번째 은닉층
      self.fc2 = nn.Linear(64, 64)
      self.fc3 = nn.Linear(64, action_size) # → 출력층

    # 순전파(Forward Propagation)
    def forward(self, state):
      x = self.fc1(state)
      x = F.relu(x) # ReLU 활성화 함수 (비선형성 추가)
      x = self.fc2(x)
      x = F.relu(x)
      return self.fc3(x)

## Part 2 - Training the AI

### Setting up the environment

In [5]:
import gymnasium as gym
env = gym.make('LunarLander-v3') # The Lunar Lander environment was upgraded to v3
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0] # 입력 뉴런 수
number_actions = env.action_space.n # 출력 뉴런 수
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


### Initializing the hyperparameters

- 학습률
  - 신경망이 얼마나 빠르게 업데이트되는지를 결정  
  - 너무 크면 발산, 너무 작으면 학습이 느려짐

- 미니배치
  - 한 번에 샘플링해서 학습에 사용하는 경험 수  
  - 일반적으로 64 또는 100 정도가 적당, Deep Q-Learning에 적합

- 할인율(gamma)
  - 미래 보상을 현재 가치로 얼마나 반영할지  
  - 0에 가까우면 현재 보상만 고려, 1에 가까우면 미래까지 고려  

- 리플레이 버퍼 크기
  - 에이전트가 저장할 수 있는 최대 경험 수  
  - 학습용 과거 데이터를 얼마나 오래 저장할것인가
  - 경험이 많을수록 샘플 다양성이 증가 → 학습 안정성 증가  

- 소프트 업데이트 계수(tau)
  - 타깃 네트워크를 천천히 업데이트하기 위한 계수. 보통 아주 작은 값 사용  

In [6]:
learning_rate = 5e-4 # 0.0005
minibatch_size = 100
gamma = 0.99 # discount factor
replay_buffer_size = int(1e5) # 10만, agent의 메모리 상태
tau = 1e-3 # 0.001, interpolation parameter

### Implementing Experience Replay

```
event = (state, action, reward, next_state, done)
```
- state: 현재 상태 (예: 8차원 벡터)
- action: 에이전트가 취한 행동 (0~3)
- reward: 그 행동을 통해 받은 보상
- next_state: 행동 결과로 도달한 다음 상태
- done: 에피소드 종료 여부 (True or False)

In [7]:
class ReplayMemory(object):
  ''' 에이전트가 이전에 겪었던 경험들을 저장하고 샘플링하여 학습 '''

  def __init__(self, capacity):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.capacity = capacity # 버퍼의 최대 크기 (저장 가능한 최대 경험 수)
    self.memory = [] # 실제로 경험(transition)을 저장할 리스트


  def push(self, event):
    ''' 에이전트가 경험한 (state, action, reward, next_state, done)
      같은 정보를 메모리에 저장 '''

    # # 새로운 경험(transition)을 메모리에 추가
    self.memory.append(event)

    # 메모리가 capacity를 초과하면, 가장 오래된 경험을 삭제하여 공간 확보
    if len(self.memory) > self.capacity:
      del self.memory[0]


  def sample(self, batch_size):
    ''' 메모리 버퍼에서 경험을 무작위로 선택 '''

    experiences = random.sample(self.memory, k = batch_size)

    # 상태(state) 배열을 numpy → float 텐서로 변환, 디바이스에 올림
    states = torch.from_numpy(
        np.vstack([e[0] for e in experiences if e is not None])
    ).float().to(self.device)

    # 행동(action) 배열을 numpy → long 텐서로 변환 (정수형), 디바이스에 올림
    actions = torch.from_numpy(
        np.vstack([e[1] for e in experiences if e is not None])
    ).long().to(self.device)

    rewards = torch.from_numpy(
        np.vstack([e[2] for e in experiences if e is not None])
    ).float().to(self.device)

    next_states = torch.from_numpy(
        np.vstack([e[3] for e in experiences if e is not None])
    ).float().to(self.device)

    # 종료 여부(done) 배열을 numpy → uint8 → float 텐서로 변환, 디바이스에 올림
    # done은 True/False이므로 bool -> int(0/1) -> float 변환 필요 : PyTorch는 bool → float 변환을 직접 지원 X
    dones = torch.from_numpy(
        np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)
    ).float().to(self.device)

    # 샘플링된 배치 반환 (순서는 보통 네트워크 학습 순서에 맞춰 정함)
    return states, next_states, actions, rewards, dones

### Implementing the DQN class

In [8]:
class Agent():
  ''' 에이전트 클래스 정의: 실제로 행동하고 학습하는 AI 객체 '''

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size # 관찰(입력) 상태의 차원 수 (LunarLander: 8)
    self.action_size = action_size # 행동의 수 (4가지)

    # 로컬 Q-네트워크 (현재 상태에서 행동을 예측)
    self.local_qnetwork = Network(state_size, action_size).to(self.device)

    # 타깃 Q-네트워크 (학습 안정성을 위한 목표 네트워크)
    # 일정 시간 간격마다 local_qnetwork의 가중치를 복사받아 학습을 안정화
    self.target_qnetwork = Network(state_size, action_size).to(self.device)

    # 로컬 Q-네트워크의 가중치를 학습시킬 도구 (가중치를 얼마나 어떻게 업데이트할지 결정)
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate)

    # 과거 경험을 저장하고 샘플링하는 버퍼
    self.memory = ReplayMemory(replay_buffer_size)
    # 몇 번의 스텝이 지났는지 추적 (업데이트 주기를 조절에 사용)
    self.t_step = 0


  def step(self, state, action, reward, next_state, done):
    ''' 1. 경험을 저장
        2. 특정 조건(4 스텝마다, 메모리 충분히 쌓이면)에서 학습을 시작 '''

    self.memory.push((state, action, reward, next_state, done)) # ReplayMemory에 새 경험 하나를 저장

    self.t_step = (self.t_step + 1) % 4 # 4스텝마다 학습하도록 제한 (효율/안정성)
    if self.t_step == 0:
      if len(self.memory.memory) > minibatch_size: # ReplayMemory 클래스의 인스턴스 안에있는 경험 리스트 ([])
        experiences = self.memory.sample(100) # 메모리에서 100개 무작위로 샘플링, Tensor 형태
        self.learn(experiences, gamma)


  def act(self, state, epsilon = 0.):
    ''' 에이전트가 현재 상태(state)를 보고, 무슨 행동을 할지 선택 (ε-greedy) '''

    # .unsqueeze(0): 배치 형태로 신경망에 넣기 위해
    # 배치 형태의 상태 텐서 1개 → 신경망 입력에 최적화된 상태
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)

    # 신경망을 추론 모드로 전환
    # dropout이나 batchnorm 같은 층은 훈련과 추론 시 다르게 작동
    self.local_qnetwork.eval()

    with torch.no_grad(): # 추론 중 불필요한 기울기 계산 방지 (메모리 낭비 방지, 속도 향상)

      # state를 Q-네트워크에 입력
      # 상태 s에서 각 행동 a에 대한 예측된 Q값 [Q(s, a₀), Q(s, a₁), Q(s, a₂), Q(s, a₃)]
      action_values = self.local_qnetwork(state)


    # 훈련 모드로 전환 (다시 학습 가능한 상태)
    self.local_qnetwork.train()

    # ε-greedy
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy()) # 가장 Q값이 큰 행동 선택 (탐욕적 선택)
    else:
      return random.choice(np.arange(self.action_size)) # 무작위 행동 (탐색)


  def learn(self, experiences, gamma):
    ''' 샘플링된 미니배치 경험을 기반으로 신경망의 Q값을 학습시키는 과정 '''
    states, next_states, actions, rewards, dones = experiences

    # Q-learning
    # 다음 상태(next_states)에서 취할 수 있는 Q값들 중 가장 큰 값 구하기
    # 미니배치 내 모든 다음 상태 s'에 대해, 가장 큰 Q(s', a')값을 구해서 column 벡터로 정리
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)

    # Q-learning
    q_targets = rewards + gamma * next_q_targets * (1 - dones)

    # 상태 s에서 선택했던 행동 a에 대한 Q값 예측
    q_expected = self.local_qnetwork(states).gather(1, actions)

    # 예측된 Q값과 목표 Q값의 차이
    loss = F.mse_loss(q_expected, q_targets)

    self.optimizer.zero_grad() # 이전 단계의 기울기(gradient)를 초기화
    loss.backward() # 손실을 기준으로 역전파 수행

    # 계산된 그래디언트를 기반으로 파라미터를 한 스텝 업데이트
    # 모델이 더 나은 Q값을 예측할 수 있도록 학습
    self.optimizer.step()

    # 로컬 Q 네트워크에서 학습한 내용을 → 타깃 네트워크에 조금씩 반영
    self.soft_update(self.local_qnetwork, self.target_qnetwork, tau)


  def soft_update(self, local_model, target_model, tau):
    ''' 강화학습의 안정화 장치
        타겟 네트워크를 매번 로컬 네트워크로 완전히 덮어쓰기 하면 학습이 불안정하고 발산할 수 있음
        - 타겟 네트워크(target_model)의 파라미터를
          로컬 네트워크(local_model)의 파라미터 방향으로 조금씩 이동시킴
        - local_model은 학습 중 계속 업데이트됨 (매 스텝마다)
          target_model은 안정성 확보를 위해 천천히 따라오게 함 '''

    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

### Initializing the DQN agent

In [9]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [10]:
number_episodes = 2000 # 최대 훈련 에피소드 수
maximum_number_timesteps_per_episode = 1000 # 에피소드 하나에서 최대로 실행할 수 있는 타임스텝 수. 고착 상태 방지
epsilon_starting_value  = 1.0 # 탐색 비율의 시작값. 초반에는 거의 무작위로 행동
epsilon_ending_value  = 0.01 # 탐색 비율의 최소값. 후반에는 대부분 최고의 Q값 행동만 선택
epsilon_decay_value  = 0.995 # 감소율. 매 에피소드마다
epsilon = epsilon_starting_value # 초기화
scores_on_100_episodes = deque(maxlen = 100) # 	최근 100 에피소드의 점수 기록. 평균: AI의 성능이 얼마나 향상됐는지 판단


for episode in range(1, number_episodes + 1):

  # 우주선이 공중에 떠 있는 상태로 초기화
  # state는 8차원 상태 벡터 (위치, 속도, 각도, 다리 접촉 여부)
  state, _ = env.reset()

  score = 0 # 누적 보상 초기화

  # 최대 1000 타임스텝 동안 에이전트가 착륙 시도
  for t in range(maximum_number_timesteps_per_episode):
    action = agent.act(state, epsilon) # 현재 상태에 따라 행동을 선택 (epsilon 기반)
    next_state, reward, done, _, _ = env.step(action)
    agent.step(state, action, reward, next_state, done) # 경험 저장, 미니배치 학습

    state = next_state # 상태 업데이트 (다음 스텝을 위한 상태)
    score += reward # reward 누적

    # 우주선이 착륙 or 추락 등으로 종료
    if done:
      break

  scores_on_100_episodes.append(score) # 최근 100회 점수 기록 → 평균 추적

  # 엡실론 감소 (>= 0.01)
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)

  # 실시간 진행 상황
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "")

  # 100 에피소드마다 줄 바꿈
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))

  # 조기 종료 조건 : 평균 점수 ≥ 200이면 환경 해결로 판단
  if np.mean(scores_on_100_episodes) >= 200.0:
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))

    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth') # 모델 저장
    break

Episode 100	Average Score: -158.32
Episode 200	Average Score: -98.37
Episode 300	Average Score: -50.46
Episode 400	Average Score: 19.65
Episode 500	Average Score: 81.15
Episode 600	Average Score: 173.72
Episode 700	Average Score: 198.18
Episode 702	Average Score: 200.20
Environment solved in 602 episodes!	Average Score: 200.20


## Part 3 - Visualizing the results

In [11]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v3')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

