### DQN Learning @ lunar-lander-v2 environment

code reference: https://goodboychan.github.io/python/reinforcement_learning/pytorch/udacity/2021/05/07/DQN-LunarLander.html

In [1]:
import torch
import torch.nn as nn
import torch.functional as F
import torch.optim as optim
import numpy as np
import random
from collections import namedtuple, deque

In [2]:
# Define Hyperparameters
BUFFER_SIZE = 10000
BATCH_SIZE = 64
GAMMA = 0.99
TAU=0.001
LR=5e-4
UPDATE_EVERY=4

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")

In [6]:
class ReplayBuffer:
  def __init__(self, action_size, buffer_size, batch_size):
    self.action_size = action_size
    self.memory = deque(maxlen=buffer_size)
    self.batch_size = batch_size
    self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

  def add(self, state, action, reward, next_state, done):
    e = self.experience(state, action, reward, next_state, done)
    self.memory.append(e)
  
  def sample(self):
    experiences = random.sample(self.memory, k=self.batch_size)
    states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
    actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).float().to(device)
    rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
    next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
    dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)
    return (states, actions, rewards, next_states, dones)
  
  def __len__(self):
    return len(self.memory)

In [7]:
# Define QNetwork
# https://tigris-data-science.tistory.com/entry/PyTorch-modeltrain-vs-modeleval-vs-torchnograd
class QNetwork(nn.Module):
  def __init__(self, state_size, action_size):
    super(QNetwork, self).__init__()
    self.fc1 = nn.Linear(state_size, 64)
    self.fc2 = nn.Linear(64, 64)
    self.fc3 = nn.Linear(64, action_size)
  
  def forward(self, state):
    x = F.relu(self.fc1(state))
    x = F.relu(self.fc2(x))
    x = self.fc3(x)
    return x

In [8]:
class Agent():
  def __init__(self, state_size, action_size):
    self.state_size = state_size
    self.action_size = action_size

    self.qnetwork_local = QNetwork(state_size, action_size).to(device)
    self.qnetwork_target = QNetwork(state_size, action_size).to(device)
    self.optimizer = optim.AdamW(self.qnetwork_local.parameters(), lr=LR)
    self.memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE)
    self.t_step = 0
  
  def step(self, state, action, reward, next_state, done):
    self.memory.add(state, action, reward, next_state, done)
    self.t_step = (self.t_step + 1) % UPDATE_EVERY
    if self.t_step == 0: 
      if len(self.memory) > BATCH_SIZE:
        experiences = self.memory.sample() # sample from replay buffer and learn
        self.learn(experiences, GAMMA)
  
  def act(self, state, eps=0.):
    state = torch.from_numpy(state).float().unsqueeze(0).to(device)
    self.qnetwork_local.eval() # Set the model to evaluation mode
    with torch.no_grad():
      action_values = self.qnetwork_local(state) # action values for each possible action
    self.qnetwork_local.train() # Set the model to training mode
    if random.random() > eps: # epsilon-greedy action
      return np.argmax(action_values.cpu().data.numpy()) # for probabilty of epsilon, select the action with the highest Q value
    else:
      return random.choice(np.arange(self.action_size)) # for probabilty of 1-epsilon, select a random action
    
  def learn(self, experiences, gamma): # most important part of the DQN
    states, actions, rewards, next_states, dones = experiences
    # Compute and minimize the loss
    ### Get the max predicted Q values (for next states) from target model
    Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)

    Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))
    Q_expected = self.qnetwork_local(states).gather(1, actions)

    ### Loss calculation
    loss = F.mse_loss(Q_expected, Q_targets)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    # Soft update
    # θ_target = τ*θ_local + (1 - τ)*θ_target
    for target_param, local_param in zip(self.qnetwork_target.parameters(), self.qnetwork_local.parameters()):
      target_param.data.copy_(TAU*local_param.data + (1.0-TAU)*target_param.data)

In [10]:
import gymnasium as gym

env = gym.make(
    "LunarLander-v2", render_mode="human", continuous=False, gravity=-10.0, enable_wind=True, wind_power=15.0, turbulence_power=1.5
)

def deep_q_learning(env, n_episodes=2000, max_t=1000, eps_start=1.0, eps_end=0.01, eps_decay=0.995):
    """Deep Q-Learning.
    
    Params
    ======
        n_episodes (int): maximum number of training episodes
        max_t (int): maximum number of timesteps per episode
        eps_start (float): starting value of epsilon, for epsilon-greedy action selection
        eps_end (float): minimum value of epsilon
        eps_decay (float): multiplicative factor (per episode) for decreasing epsilon
    """
    scores = []                        # list containing scores from each episode
    scores_window = deque(maxlen=100)  # last 100 scores
    eps = eps_start                    # initialize epsilon
    for i_episode in range(1, n_episodes+1):
        state = env.reset()
        score = 0
        for t in range(max_t):
            action = agent.act(state, eps)
            next_state, reward, done, _ = env.step(action)
            agent.step(state, action, reward, next_state, done)
            state = next_state
            score += reward
            if done:
                break 
        scores_window.append(score)       # save most recent score
        scores.append(score)              # save most recent score
        eps = max(eps_end, eps_decay*eps) # decrease epsilon
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")
        if i_episode % 100 == 0:
            print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
        if np.mean(scores_window)>=200.0:
            print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode-100, np.mean(scores_window)))
            torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
            break
    return scores

agent = Agent(state_size=8, action_size=4)
scores = deep_q_learning(env)

TypeError: expected np.ndarray (got tuple)