In [None]:
import os
import sys
import numpy as np
import random
import collections
from tqdm import tqdm

import gymnasium as gym

import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F

import matplotlib.pyplot as plt

In [None]:
def seed_all(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

In [None]:
class DQNnet(nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        '''Args:
            state_dim  int: state data's last dimension
            hidden_dim  List[int]: hidden dimension of every hidden layer
            action_dim  int:output action data's last dimension  
        '''
        super(DQNnet, self).__init__()
        self.num_layers = len(hidden_dim) + 1
        self.layers = nn.ModuleList(nn.Linear(in_channels, out_channels) for in_channels, out_channels in zip([state_dim] + hidden_dim, hidden_dim + [action_dim]))
        self.__init_parameters__()

    def forward(self, x):
        for idx, layer in enumerate(self.layers):
            if idx < self.num_layers - 1:
                x = F.leaky_relu(layer(x))
            else:
                x = layer(x)
        return x
    
    def __init_parameters__(self):
        # initialize the parameters of the modules
        for p in self.layers.parameters():
            if p.dim() > 1:
                init.xavier_uniform_(p)

In [None]:
class ReplayBuffer:
    ''' experience buffer for DQN learning '''
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def clear(self):
        self.buffer.clear()

    def sample(self, sample_size):
        transitions = random.sample(self.buffer, sample_size)
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done

    def size(self):
        return len(self.buffer)
    
    
class DQN_agent:
    def __init__(self, state_dim, hidden_dim, action_dim, lr, gamma, epsilon, target_update_frequency, device, mode='naive'):
        self.target_DQN_net = DQNnet(state_dim, hidden_dim, action_dim).to(device)
        self.DQN_net = DQNnet(state_dim, hidden_dim, action_dim).to(device)  

        self.target_DQN_net.to(device)
        self.DQN_net.to(device)
        
        self.action_dim = action_dim
        self.mode = mode
        self.optimizer = torch.optim.Adam(self.DQN_net.parameters(), lr=lr)
        self.gamma = gamma  # discount factor
        self.epsilon = epsilon  # epsilon-greedy rate
        self.target_update_frequency = target_update_frequency
        self.count = 0  # counter for step num
        self.device = device

    def take_action(self, state):
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.action_dim)
        else:
            state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)
            if self.mode == 'naive':
                action = torch.argmax(self.DQN_net(state)).item()
            else:
                action = torch.argmax(self.target_DQN_net(state)).item()
        return action

    def update(self, transition_dict):
        # get split record data
        states = torch.tensor(transition_dict['states'], dtype=torch.float32).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float32).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float32).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float32).view(-1, 1).to(self.device)

        max_next_q_values = self.target_DQN_net(next_states).max(dim=1)[0].view(-1, 1)
        # here become a single value tensor
        q_values = self.DQN_net(states).gather(1, actions)  # Q(s,a)
        q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)
        self.optimizer.zero_grad()
        dqn_loss = F.mse_loss(q_values, q_targets)
        dqn_loss.backward() 
        self.optimizer.step()

        if self.count % self.target_update_frequency == 0:
            self.target_DQN_net.load_state_dict(self.DQN_net.state_dict())
        self.count += 1

    def save(self, output_dir):
        output_path = os.path.join(output_dir, f"{self.mode}_DQN_net.pth")
        torch.save(self.DQN_net, output_path)
    
    def load(self, model_dir):
        model_path = os.path.join(model_dir, f"{self.mode}_DQN_net.pth")
        self.DQN_net = torch.load(model_path)
        self.target_DQN_net = torch.load(model_path)

In [None]:
def train(agent: DQN_agent, buffer: ReplayBuffer, env: gym.Env, epochs: int, sample_size: int, maxstep: int = 1000, minimal_size: int = 0, save_dir = "model"):
    return_list = []
    num_bar = 20
    update_frequency = 10
    best_return = -sys.maxsize - 1
    for bar_idx in range(num_bar):
        with tqdm(total=int(epochs / num_bar), desc=f'Iteration {bar_idx+1}') as pbar:
            for episode in range(int(epochs / num_bar)):
                episode_return = 0
                state, _ = env.reset()
                done = False
                # do actions to get an episode train
                for step in range(maxstep):
                    action = agent.take_action(state)
                    next_state, reward, done, _, _ = env.step(action)
                    if reward == 0:
                        virtual_reward = 1000
                    elif next_state[1] > 0:
                        virtual_reward = -1 +  4000 * next_state[1] * next_state[1]
                    else:
                        virtual_reward = -1
                    buffer.add(state, action, virtual_reward, next_state, done)
                    state = next_state
                    episode_return += reward
                    # for stability, only when buffer size > minimal_size, agent will update parameters
                    if buffer.size() > minimal_size:
                        states, actions, rewards, next_states, dones = buffer.sample(sample_size)
                        transition_dict = {
                            'states': states,
                            'actions': actions,
                            'rewards': rewards,
                            'next_states': next_states,
                            'dones': dones
                        }
                        agent.update(transition_dict)
                    if done:
                        break
                if best_return < episode_return:
                    agent.save(save_dir)
                    best_return = episode_return
                pbar.set_postfix({'episode': f'{epochs / num_bar * bar_idx + episode + 1}', 'return': f'{episode_return}'})
                pbar.update(1)    

                return_list.append(episode_return)
    return return_list

In [None]:
def evaluate(agent: DQN_agent, env: gym.Env, test_num: int):
    total_reward = 0
    total_count = 0
    for epoch in range(test_num):
        epoch_reward = 0
        state,_ = env.reset()
        env.render()
        done = False
        for step in range(200):
            action = agent.take_action(state)
            next_state, reward, done, _ ,_ = env.step(action)
            state = next_state
            epoch_reward += reward
            if done:
                total_count += 1
                break
        total_reward += epoch_reward

    env.close()
    return total_count, total_reward / test_num

In [None]:
env_name = 'MountainCar-v0'
env = gym.make(env_name)

mode_list = ['naive', 'double']
lr = 1e-2
epochs = 100
state_dim = env.observation_space.shape[0]
hidden_dim = [16, 4]
action_dim = env.action_space.n

gamma = 0.9
epsilon = 0.01
target_update_frequency = 10
buffer_size = 200
minimal_size = 50
maxstep = 1000
sample_size = 20
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
seed = 42

test_num = 10

# seed_all(seed)

In [None]:
returns_list = dict()
eval_reward_list = dict()

for mode_name in mode_list:
    print("Now we starts to train {} mode".format(mode_name))
    # train the agent
    replay_buffer = ReplayBuffer(buffer_size)
    agent = DQN_agent(state_dim, hidden_dim, action_dim, lr, gamma, epsilon,
                target_update_frequency, device, mode=mode_name)

    returns = train(agent, replay_buffer, env, epochs, sample_size, maxstep=maxstep, minimal_size=minimal_size)
    returns_list[mode_name] = returns

for mode_name in mode_list:
    # draw figure of rewards curve
    episodes_list = range(len(returns_list[mode_name]))
    plt.plot(episodes_list, returns_list[mode_name], label=mode_name)
    plt.xlabel('Episodes')
    plt.ylabel('Returns')
    plt.title(f'{mode_name} DQN algorithm performance')
    fig_name = f'{mode_name}_compare'
    plt.savefig(fig_name)
    plt.clf()

In [None]:
for mode_name in mode_list:
    agent = DQN_agent(state_dim, hidden_dim, action_dim, lr, gamma, epsilon,
                target_update_frequency, device, mode=mode_name)
    agent.load("model")
    # evaluate the agent
    eval_env = gym.make(env_name)
    total_count, avg_reward = evaluate(agent, eval_env, test_num)
    eval_reward_list[mode_name] = avg_reward
print(total_count, eval_reward_list)