# Session 04 DQN - Assignment


In deep Q-learning, we use a neural network to approximate the Q-value function. The state is given as an input to the neural network. 
The output of the neural network represents the (estimated) Q-values of all possible actions. Using an argmax, we choose the action corresponding to the highest Q-value.


To train the Q network, we sample a batch of stored experiences from the replay memory. An experience is a tuple of (state, action, reward, next_state).
We input the state into the Q network and get the estimated Q-values. For the Q network to adjust the weights, it needs to have an idea of how accurate these predicted Q-values are.
However, we do not know the target or actual value here as we are dealing with a reinforcement learning problem. The solution is to estimate the target value by using a second neural network, called the target network. This target network will take the next state as an input and predict the Q-values for all possible actions from that state. 
Now we can compute the labels $y$ to train the policy network: $y = R(s, a) + \gamma max_{a'}Q(s', a') - Q_{t-1}(s, a)$.

The Q network can now be trained with the MSE loss. It's important to know that the target network is an exact copy of the policy network and the weights of the target network 

After a certain amount of Q-network updates, we copy its weights to the target network.

For more detailed information: https://www.analyticsvidhya.com/blog/2019/04/introduction-deep-q-learning-python/



In [5]:
import gymnasium as gym
import random
import numpy as np   
import matplotlib.pyplot as plt

# Import Tensorflow libraries

import tensorflow as tf
from keras.api.models import Sequential, load_model
from keras.api.layers import Activation, Dense, Dropout, BatchNormalization, InputLayer
from keras.api.optimizers import Adam
from keras.api.losses import MeanSquaredError

from IPython.display import HTML

from collections import deque

import threading

gpus = tf.config.list_physical_devices('GPU')
if gpus: tf.config.experimental.set_memory_growth(gpus[0], True)

In [None]:
class CustomDQNAgent:
    def __init__(
            self,
            state_size,
            action_size,
            learning_rate = 0.001,
            epsilon = 1):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        initial_model = self._build_model()

        self.q_network = initial_model
        self.target_network = initial_model

        self.epsilon = epsilon

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(optimizer=Adam(learning_rate=self.learning_rate), loss=MeanSquaredError())
        return model
    
    def act(self, state):
        if random.random() <= self.epsilon:
            return random.randint(self.action_size)
        else:
            q_values = self.q_network.predict(state, verbose=None)
            return np.argmax(q_values)
        
    

In [6]:
class CustomDQNAgent:
    def __init__(
            self,
            state_size,
            action_size,
            memory_minlen=250,
            memory_maxlen=2000,
            gamma=0.95,
            epsilon=1.0,
            epsilon_min=0.01,
            epsilon_decay=0.999,
            model: Sequential | None = None,
            target_update_freq=10
        ):
        self.state_size = state_size
        self.action_size = action_size
        self.memory_minlen = memory_minlen
        self.memory = deque(maxlen=memory_maxlen)
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.model = model if model else self._build_model()  # Q-Network
        self.target_model = self._build_model()  # Target-Network
        self.update_target_network()  # Initialize target model weights
        self.target_update_freq = target_update_freq
        self.train_steps = 0  # To track when to update the target network
        self.lock = threading.Lock()  # Ensure thread safety for model updates

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(optimizer=Adam(0.0003), loss=MeanSquaredError())
        return model

    def update_target_network(self):
        """Update the target network weights to match the Q-network."""
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.randint(self.action_size)  # Random action
        q_values = self.model.predict(state, verbose=None)
        return np.argmax(q_values[0])  # Best action

    def replay(self, batch_size):
        if len(self.memory) < self.memory_minlen:
            return  # Wait for enough samples
        minibatch = random.sample(self.memory, self.memory_minlen)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                # Use the target network for stable Q-value estimation
                target = reward + self.gamma * np.amax(self.target_model.predict(next_state, verbose=None)[0])
            target_f = self.model.predict(state, verbose=None)
            target_f[0][action] = target
            with self.lock:  # Lock during model training
                self.model.fit(state, target_f, batch_size=batch_size, epochs=1, verbose=None)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        
        # Update target network periodically
        self.train_steps += 1
        if self.train_steps % self.target_update_freq == 0:
            self.update_target_network()


## MountainCar-V0

A car is on a one-dimensional track, positioned between two "mountains". The goal is to drive up the mountain on the right; however, the car's engine is not strong enough to scale the mountain in a single pass. Therefore, the only way to succeed is to drive back and forth to build up momentum.
The agent (a car) is started at the bottom of a valley. For any given state the agent may choose to accelerate to the left, right or cease any acceleration.

<img src="./NotebookImages/MountainCart.gif">

For a description of the statevector, the action space and the episode termination,have a look at:https://github.com/openai/gym/blob/master/gym/envs/classic_control/mountain_car.py

- Implement a DQN to solve this environment.
- Try to minimize the total number of steps per episode needed to reach the flag.
- You are allowed to tweak the reward function. For example, giving an extra reward for getting closer to the flag.
- Modify the DQN implementation into a deep SARSA implementation. Compare the deep SARSA to the DQN implementation.

In [None]:
env = gym.make('MountainCar-v0', render_mode="human")
env.metadata["render_fps"] = 0
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

#loaded_model = load_model("saved_models/mountain_car_model_v1.keras")

agent = CustomDQNAgent(state_size, action_size)

def training_thread_fn():
    batch_size = 32
    while True:
        agent.replay(batch_size)

training_thread = threading.Thread(target=training_thread_fn, daemon=True)
training_thread.start()

num_episodes = 1000
try:
    for episode in range(num_episodes):
        state, _ = env.reset()
        previous_x_position = state[0]

        state = np.reshape(state, [1, state_size])

        total_reward = 0
        episode_length = 0

        for t in range(1000):
            action = agent.act(state)

            next_state, reward, done, _, _ = env.step(action)

            if done:
                reward = max(100, 300 - episode_length)
            else:
                velocity = abs(next_state[1])
                if velocity < 0.009:
                    reward = -1
                else:
                    position_shift = (next_state[0] - previous_x_position)
                    reward = (abs(position_shift) + velocity)

            total_reward += reward
            episode_length += 1

            previous_x_position = next_state[0]

            next_state = np.reshape(next_state, [1, state_size])

            agent.remember(state, action, reward, next_state, done)

            state = next_state

            if done:
                break
        
        print(f"episode: {episode}, episode length: {episode_length}, reward: {total_reward}, epsilon: {agent.epsilon}")
finally:
    env.close()
    agent.model.save("saved_models/mountain_car_model_v1.keras")

In [None]:
env = gym.make('MountainCar-v0', render_mode="human")
env.metadata["render_fps"] = 0
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

loaded_model_name = "mountain_car_model_v1"

agent = CustomDQNAgent(
    state_size=state_size,
    action_size=action_size,
    epsilon=0.00,
    epsilon_min=0.00,
    model=load_model(f"saved_models/{loaded_model_name}.keras"))

num_episodes = 100
try:

    for episode in range(num_episodes):
        state, _ = env.reset()
        state = np.reshape(state, [1, state_size])
        done = False
        episode_length = 0
        totalreward = 0

        for t in range(1000):

            action = agent.act(state)

            next_state, reward, done, _, _ = env.step(action)

            totalreward += reward
            episode_length += 1

            next_state = np.reshape(next_state, [1, state_size])

            state = next_state
        
        print(f"episode: {episode}, episode length: {episode_length}, reward: {total_reward}")
finally:
    env.close()

## LunarLander-v2

Landing pad is always at coordinates (0,0). Coordinates are the first two numbers in state vector. Reward for moving from the top of the screen to landing pad and zero speed is about 100..140 points. If lander moves away from landing pad it loses reward back. Episode finishes if the lander crashes or comes to rest, receiving additional -100 or +100 points. Each leg ground contact is +10. Firing main engine is -0.3 points each frame. Solved is 200 points. Landing outside landing pad is possible. Fuel is infinite, so an agent can learn to fly and then land on its first attempt. Four discrete actions available: do nothing, fire left orientation engine, fire main engine, fire right orientation engine.
For more information abou this environment see: https://github.com/openai/gym/blob/master/gym/envs/box2d/lunar_lander.py

<img src="./NotebookImages/LunarLander.gif">

- Implement a DQN to solve this environment. LunarLander-v2 defines "solving" as getting average reward of 200 over 100 consecutive trials. 
- Try to minimize the number of episodes it takes to solve the environment.
- How would you tweak the reward function for the LunarLander to make a quicker descent.
- Modify the DQN implementation into a deep SARSA implementation. Compare the deep SARSA to the DQN implementation.

In [7]:
# Create the environment
env = gym.make('LunarLander-v2')#, render_mode="human")
#env.metadata["render_fps"] = 0
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

loaded_model = load_model("saved_models/lunar_lander_model_v1.keras")

agent = CustomDQNAgent(
    state_size=state_size,
    action_size=action_size,
    memory_minlen=32,
    memory_maxlen=5000,
    gamma=0.99,
    epsilon=1,
    epsilon_decay=0.999,
    epsilon_min=0.01,
    model=None,
    target_update_freq=10
)

def training_thread_fn():
    batch_size = 32
    while True:
        agent.replay(batch_size)

training_thread = threading.Thread(target=training_thread_fn, daemon=True)
training_thread.start()

reward_history_maxlen = 100

reward_history = deque(maxlen=reward_history_maxlen) # last 100 episodes

num_episodes = 1000
episode = 0
try:
    for episode in range(num_episodes):
        episode += 1
        state, _ = env.reset()
        previous_x_position = state[0]

        state = np.reshape(state, [1, state_size])

        total_reward = 0
        episode_length = 0

        for t in range(500):
            action = agent.act(state)

            next_state, reward, done, _, _ = env.step(action)

            total_reward += reward
            episode_length += 1

            next_state = np.reshape(next_state, [1, state_size])

            agent.remember(state, action, reward, next_state, done)

            state = next_state

            if done:
                break

        reward_history.append(total_reward)

        average_reward_recent_history = sum(reward_history)/reward_history_maxlen
        
        print(f"episode: {episode}, episode length: {episode_length}, reward: {total_reward}, epsilon: {agent.epsilon}")

        if (average_reward_recent_history >= 200): # if average of reward history > 200 end
            print(f"on episode {episode}, the average reward on the last {reward_history_maxlen} episodes, exceeded 200 with: {average_reward_recent_history}")
            break

        #agent.replay(batch_size=32)
finally:
    env.close()
    agent.model.save("saved_models/lunar_lander_model_v1.keras")


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


episode: 1, episode length: 57, reward: -136.359107978066, epsilon: 0.3
episode: 2, episode length: 79, reward: -134.4467858381095, epsilon: 0.3
episode: 3, episode length: 65, reward: -141.9631122977795, epsilon: 0.3
episode: 4, episode length: 73, reward: -97.79481893157897, epsilon: 0.3
episode: 5, episode length: 79, reward: -153.9429480572283, epsilon: 0.3
episode: 6, episode length: 94, reward: -213.48779438578987, epsilon: 0.3


KeyboardInterrupt: 

In [None]:
env = gym.make('LunarLander-v2', render_mode="human")
env.metadata["render_fps"] = 60
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

loaded_model_name = "lunar_lander_model_v1"

agent = CustomDQNAgent(
    state_size=state_size,
    action_size=action_size,
    epsilon=0.00,
    epsilon_min=0.05,
    model=load_model(f"saved_models/{loaded_model_name}.keras"))

num_episodes = 100
try:

    for episode in range(num_episodes):
        state, _ = env.reset()
        state = np.reshape(state, [1, state_size])
        done = False
        episode_length = 0
        totalreward = 0

        for t in range(500):

            action = agent.act(state)

            next_state, reward, done, _, _ = env.step(action)

            totalreward += reward
            episode_length += 1

            next_state = np.reshape(next_state, [1, state_size])

            state = next_state
        
        print(f"episode: {episode}, episode length: {episode_length}, reward: {total_reward}")
finally:
    env.close()

## OPTIONAL: CarRacing-v0


Description of the environment:

Easiest continuous control task to learn from pixels, a top-down racing environment. Discreet control is reasonable in this environment as well, on/off discretisation is fine. State consists of 96x96 pixels. Reward is -0.1 every frame and +1000/N for every track tile visited, where N is the total number of tiles in track. For example, if you have finished in 732 frames, your reward is 1000 - 0.1*732 = 926.8 points. Episode finishes when all tiles are visited. Some indicators shown at the bottom of the window and the state RGB buffer. From left to right: true speed, four ABS sensors, steering wheel position, gyroscope.

<img src="./NotebookImages/CarRacing.gif">

Solve this environment with Deep Q-learning. 
- Skip the first 60 frames of an episode until the zooming has stopped and the car is ready to be controlled.
- Crop each state (=image) in such a way that the indicators are removed.
- It might be useful to convert the images to grayscale images
- You might want to take a couple of consecutive images as one state. 

The action space can for example look like this:
```
self.actionSpace = [(-1, 1, 0.2), (0, 1, 0.2),
                            (1, 1, 0.2),(-1, 1,0), (0, 1,0),
                            (1, 1,0), (-1, 0, 0.2), (0, 0, 0.2),
                            (1, 0, 0.2),(-1, 0,0), (0, 0,0), (1, 0,0)]
```
