<a href="https://colab.research.google.com/github/gitHubAndyLee2020/OpenAI_Gym_RL_Algorithms_Database/blob/main/DQN_Module.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### DQN

> About

- Trains a evaluation network that predicts the potential reward for some given state and each action called Q-value
- Has a copy of the evaluation network called target network. The target network is updated periodically, and it is used to stabilize training

> Pro

- Stability in Training; less oscillations and divergence during training

> Con

- Sample Inefficiency; needs large set of training data to learn effectively

```
class Net():
  def __init__(self):
    - Initialize neural network

  def forward(self, x):
    - Forward the input through the neural network
    - Return action probability of length number of actions; likelihood of taking those actions
```

```
class DQN():
  def __init__(self):
    - Initialize two Net; one for evaluation network, and one for target network
    - Initialize memory of length memory capacity that stores (state, action, reward, next state)
    - Initialize optimizer and loss function for the evaluation network

  def choose_action(self, state):
    - If random number is less than or equal to some ε-greedy policy value, choose the action generated by the evaluation network. Otherwise, choose the action randomly
    - This encourages explorations of taking novel actions

  def store_transition(self, state, action, reward, next_state):
    - Create transition object, which is a collection of state, action, reward, and next state
    - Store the transition in the memory; if the memory is full, start replacing it from the earliest item in the current memory

  def learn(self):
    - Every Q Network Iteration step, store the weights of evaluation network into target network
    - Otherwise, train the evaluation network
    - First, select a batch of random transitions from the memory
    - Then Q-Eval is calculated by getting the potential reward of taking an action using the evaluation network
    - Similarly, Q-Target is calculated by batch reward + Gamma * Q-Next where Q-Next is the potential reward of taking an action using the target network on next state; this calculates the potential reward of current action and next action combined; target network is used to provide a more stable target that doesn't change alongside Q-Target
    - The loss value is calculated between Q-Eval and Q-Target, which informs how much the evaluation network is off from the ideal target
    - Backpropagation adjusts the weights of the evaluation network
```

```
def reward_func(env, x, x_dot, theta, theta_dor):
  - Custom reward function, adjust to the environment as needed
```

```
def main():
  - Loop the training for some amount of epochs
  - For each training loop, collect transitions from the environment from the start
  - If the amount of stored transitions exceeds the memory capacity, start training the model
  - If the agent is done in the environment, break out of the data collection and training loop
```

### Example Code - CartPole-v0

In [None]:
# Import the required libraries for pytorch, numpy, gym and matplotlib
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
import matplotlib.pyplot as plt
import copy

# Set hyper-parameters
BATCH_SIZE = 128  # Size of the batch used in training
LR = 0.01  # Learning rate
GAMMA = 0.90  # Discount factor
EPISILO = 0.9  # Epsilon for ε-greedy policy
MEMORY_CAPACITY = 2000  # Capacity of replay buffer
Q_NETWORK_ITERATION = 100  # Frequency of target network update

# Create the gym environment (CartPole)
env = gym.make("CartPole-v0")
env = env.unwrapped  # Get the full environment
NUM_ACTIONS = env.action_space.n  # Number of possible actions
NUM_STATES = env.observation_space.shape[0]  # Number of state features
# Check the shape of a sample action
ENV_A_SHAPE = 0 if isinstance(env.action_space.sample(), int) else env.action_space.sample.shape

# Define the neural network architecture for approximating Q-function
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()  # Initialize the superclass (nn.Module)
        self.fc1 = nn.Linear(NUM_STATES, 50)  # First fully-connected layer
        self.fc1.weight.data.normal_(0, 0.1)  # Initialize weights
        self.fc2 = nn.Linear(50, 30)  # Second fully-connected layer
        self.fc2.weight.data.normal_(0, 0.1)  # Initialize weights
        self.out = nn.Linear(30, NUM_ACTIONS)  # Output layer
        self.out.weight.data.normal_(0, 0.1)  # Initialize weights

    def forward(self, x):
        x = self.fc1(x)  # First fully-connected layer
        x = F.relu(x)  # ReLU activation
        x = self.fc2(x)  # Second fully-connected layer
        x = F.relu(x)  # ReLU activation
        action_prob = self.out(x)  # Output layer
        return action_prob  # Return the Q-values for each action

# Define the DQN algorithm
class DQN():
    def __init__(self):
        self.eval_net, self.target_net = Net(), Net()  # Initialize Q and target networks

        self.learn_step_counter = 0  # For target updating
        self.memory_counter = 0  # For storing memory
        # Initialize memory (state, action, reward, next_state)
        self.memory = np.zeros((MEMORY_CAPACITY, NUM_STATES * 2 + 2))
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)  # Adam optimizer
        self.loss_func = nn.MSELoss()  # Mean squared error loss

    # Choose action based on state
    def choose_action(self, state):
        state = torch.unsqueeze(torch.FloatTensor(state), 0)  # Convert state to tensor
        if np.random.randn() <= EPISILO:  # ε-greedy policy
            action_value = self.eval_net.forward(state)
            action = torch.max(action_value, 1)[1].data.numpy()
            action = action[0] if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)
        else:  # Random policy
            action = np.random.randint(0, NUM_ACTIONS)
            action = action if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)
        return action

    # Store (s, a, r, s_) into memory
    def store_transition(self, state, action, reward, next_state):
        transition = np.hstack((state, [action, reward], next_state))  # Stack horizontally
        index = self.memory_counter % MEMORY_CAPACITY  # Replace old memory with new memory
        self.memory[index, :] = transition
        self.memory_counter += 1

    # Q-learning
    def learn(self):
        # Update the target network every Q_NETWORK_ITERATION times
        if self.learn_step_counter % Q_NETWORK_ITERATION == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())
        self.learn_step_counter += 1

        # Sample a mini-batch from memory
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
        batch_memory = self.memory[sample_index, :]
        batch_state = torch.FloatTensor(batch_memory[:, :NUM_STATES])
        batch_action = torch.LongTensor(batch_memory[:, NUM_STATES:NUM_STATES + 1].astype(int))
        batch_reward = torch.FloatTensor(batch_memory[:, NUM_STATES + 1:NUM_STATES + 2])
        batch_next_state = torch.FloatTensor(batch_memory[:, -NUM_STATES:])

        # Compute loss between Q-values and target Q-values
        q_eval = self.eval_net(batch_state).gather(1, batch_action)
        q_next = self.target_net(batch_next_state).detach()
        q_target = batch_reward + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 1)
        loss = self.loss_func(q_eval, q_target)

        # Backpropagation
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

# Custom reward function for CartPole environment
def reward_func(env, x, x_dot, theta, theta_dot):
    r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.5
    r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
    reward = r1 + r2
    return reward

# Main loop
def main():
    dqn = DQN()
    episodes = 400
    print("Collecting Experience....")
    reward_list = []
    plt.ion()  # Turn on interactive mode for matplotlib
    fig, ax = plt.subplots()
    for i in range(episodes):
        state = env.reset()  # Reset environment
        ep_reward = 0  # Initialize episode reward
        while True:
            env.render()  # Render the environment
            action = dqn.choose_action(state)  # Choose action
            next_state, _ , done, _, info = env.step(action)  # Take action
            x, x_dot, theta, theta_dot = next_state  # Components of next state
            reward = reward_func(env, x, x_dot, theta, theta_dot)  # Compute custom reward
            dqn.store_transition(state, action, reward, next_state)  # Store transition
            ep_reward += reward  # Add reward to total episode reward

            if dqn.memory_counter >= MEMORY_CAPACITY:  # If enough memory is collected
                dqn.learn()  # Start learning
                if done:  # If episode is done
                    print(f"episode: {i} , the episode reward is {round(ep_reward, 3)}")
            if done:  # If episode is done
                break
            state = next_state  # Update state

        r = copy.copy(reward)  # Copy reward
        reward_list.append(r)  # Append to reward list for plotting
        ax.set_xlim(0, 300)  # X-axis limits
        ax.plot(reward_list, 'g-', label='total_loss')  # Plotting
        plt.pause(0.001)  # Pause to update plot

# Run the main function
if __name__ == '__main__':
    main()

### Example Code (Optimized) - CartPole-v0

In [None]:
# Import required libraries
import argparse
import pickle
from collections import namedtuple
from itertools import count
import os, time
import numpy as np
import matplotlib.pyplot as plt
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal, Categorical
from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
from tensorboardX import SummaryWriter

# Set hyperparameters and environment variables
seed = 1
render = False
num_episodes = 2000

# Initialize the CartPole environment and set the state and action spaces
env = gym.make('CartPole-v0').unwrapped
num_state = env.observation_space.shape[0]
num_action = env.action_space.n

# Seed for reproducibility
torch.manual_seed(seed)
env.seed(seed)

# Define the structure of a transition in experience replay
Transition = namedtuple('Transition', ['state', 'action', 'reward', 'next_state'])

# Define the neural network for Q-Learning
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(num_state, 100)
        self.fc2 = nn.Linear(100, num_action)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        action_value = self.fc2(x)
        return action_value

# Define the DQN agent
class DQN():
    # Initialize class variables
    capacity = 8000
    learning_rate = 1e-3
    memory_count = 0
    batch_size = 256
    gamma = 0.995
    update_count = 0

    def __init__(self):
        super(DQN, self).__init__()
        self.target_net, self.act_net = Net(), Net()
        self.memory = [None]*self.capacity
        self.optimizer = optim.Adam(self.act_net.parameters(), self.learning_rate)
        self.loss_func = nn.MSELoss()
        self.writer = SummaryWriter('./DQN/logs')

    # Policy: Select action
    def select_action(self,state):
        state = torch.tensor(state, dtype=torch.float).unsqueeze(0)
        value = self.act_net(state)
        action_max_value, index = torch.max(value, 1)
        action = index.item()
        if np.random.rand(1) >= 0.9:
            action = np.random.choice(range(num_action), 1).item()
        return action

    # Store transitions for experience replay
    def store_transition(self,transition):
        index = self.memory_count % self.capacity
        self.memory[index] = transition
        self.memory_count += 1

    # Update Q-values and policy
    def update(self):
        if self.memory_count >= self.capacity:
            state = torch.tensor([t.state for t in self.memory]).float()
            action = torch.LongTensor([t.action for t in self.memory]).view(-1,1).long()
            reward = torch.tensor([t.reward for t in self.memory]).float()
            next_state = torch.tensor([t.next_state for t in self.memory]).float()
            reward = (reward - reward.mean()) / (reward.std() + 1e-7)
            with torch.no_grad():
                target_v = reward + self.gamma * self.target_net(next_state).max(1)[0]
            for index in BatchSampler(SubsetRandomSampler(range(len(self.memory))), batch_size=self.batch_size, drop_last=False):
                v = (self.act_net(state).gather(1, action))[index]
                loss = self.loss_func(target_v[index].unsqueeze(1), v)
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                self.writer.add_scalar('loss/value_loss', loss, self.update_count)
                self.update_count += 1
                if self.update_count % 100 == 0:
                    self.target_net.load_state_dict(self.act_net.state_dict())
        else:
            print("Memory Buff is too less")

# Main function to train the agent
def main():
    agent = DQN()
    for i_ep in range(num_episodes):
        state = env.reset()
        if render: env.render()
        for t in range(10000):
            action = agent.select_action(state)
            next_state, reward, done, _, info = env.step(action)
            if render: env.render()
            transition = Transition(state, action, reward, next_state)
            agent.store_transition(transition)
            state = next_state
            if done or t >= 9999:
                agent.writer.add_scalar('live/finish_step', t+1, global_step=i_ep)
                agent.update()
                if i_ep % 10 == 0:
                    print("episodes {}, step is {} ".format(i_ep, t))
                break

# Entry point of the script
if __name__ == '__main__':
    main()