# Agent DQN with Experience replay and Fixed Q-target

- Created : **22/03/2025**
- Updated : **15/05/2025**

PoC using DQN from exercice 2.7

## Modules

In [None]:
# Built-in and 3rd party modules
from collections import namedtuple
import random

import numpy as np
from unityagents import UnityEnvironment
import matplotlib.pyplot as plt
%matplotlib inline
import torch
from tqdm.notebook import trange

# Custom modules
from dqn_agent__exp_replay import DQNAgentExpReplay

## Device

In [None]:
device = (
    "cuda" if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available()
    else "cpu"
    )
print(f"Availabe device: {device}")

## Definitions

In [None]:
Episode_Stats = namedtuple("Experience", field_names=["i_episode", "steps_to_resolution", "score", "is_solution"])

In [None]:
def plot_training_stats(
        episode_stats, mean_scores, shift_score_avgs, solution_threshold, scores_window_length, title=""):
    # plot the scores
    fig = plt.figure(figsize=(12, 6))
    ax = fig.add_subplot(111)

    scores = [episode.score for episode in episode_stats]
    ax.plot(np.arange(1, len(scores)+1), scores, color='b', label='Scores')
    ax.plot(
        np.arange(1, len(mean_scores)+1),
        mean_scores,
        color='y',
        linestyle='--',
        label='Score average from start'
        )
    ax.plot(
        np.arange(scores_window_length, scores_window_length + len(shift_score_avgs)),
        shift_score_avgs,
        color='r',
        label=f"{scores_window_length} episodes shift average"
        )
    ax.set_ylabel('Score')
    
    x_scatter = []
    y_scatter = []
    max_steps_to_resolution = 0
    for episode in episode_stats:
        if episode.is_solution:
            x_scatter.append(episode.i_episode)
            y_scatter.append(episode.steps_to_resolution)
            max_steps_to_resolution = max(max_steps_to_resolution, episode.steps_to_resolution)
    ax2 = ax.twinx()
    ax2.scatter(
        x_scatter,
        y_scatter,
        s=2,
        color='g',
        hatch="x",
        label='Steps to solution'
    )
    ax2.set_ylim(0, int(1.1 * max_steps_to_resolution))
    ax2.set_ylabel('Steps')

    ax.axhline(y=solution_threshold, color='k', linestyle='--', label='Solution threshold')
    
    ax.set_xlabel('Episode #')
    plt.title(title)
    fig.legend()
    plt.show()

In [None]:
def train_dqn_agent(
        agent,
        env,
        brain_name,
        eps_scheduler,
        eps_start=1.0,
        n_episodes=2000,
        solution_threshold=13,
        avg_window_length_scores=100,
        print_stats_each_n_episode=25,
        window_stability=1,
        max_timesteps=300
        ):
    """
    Deep Q-Learning agent training function.
    
    Args :
        - n_episodes (int): maximum number of training episodes
        - max_timesteps (int): maximum number of timesteps per episode
        - eps_start (float): starting value of epsilon, for epsilon-greedy action selection
        - eps_end (float): minimum value of epsilon
        - eps_decay (float): multiplicative factor (per episode) for decreasing epsilon
    """
    scores = []             # list containing scores from each episode
    score_avgs = []
    shift_score_avgs = []
    stats_episodes = []

    eps = eps_start
    solved = False
    last_solved = 0
    solved_episode_count = 0

    agent.train()
    for i_episode in trange(1, n_episodes+1, desc="Training", unit="episode", leave=False):
        # Reset the environment
        env_info = env.reset(train_mode=True)[brain_name]
        # Initial state
        state = env_info.vector_observations[0]

        score = 0               # episode score
        actions_of_episode = [] # list of actions taken during the episode
        steps_to_resolution = 0
        # Loop for each episode
        for i_step in range(1, max_timesteps+1):
            # Agent chooses action
            action = agent.act(state, eps=eps)
            actions_of_episode.append(action)
            
            # Apply action to environment and get environment evolution as
            # experience : next state, reward and done
            env_info = env.step(action)[brain_name]
            next_state = env_info.vector_observations[0]
            reward = env_info.rewards[0]
            done = env_info.local_done[0] 
            
            score += reward
            if score >= solution_threshold and steps_to_resolution == 0:
                steps_to_resolution = i_step
            
            # Agent learns from the experience
            agent.step(state, action, reward, next_state, done)

            # Move to the next state
            state = next_state                
            
            # Check if episode is done (a.k.a terminal state)
            if done:
                break 
        
        # Save episode stats
        stats_episodes.append(
            Episode_Stats(i_episode, steps_to_resolution, score, score >= solution_threshold)
            )
        # Save episode score
        scores.append(score)
        score_avgs.append(np.mean(scores))
        
        if score >= solution_threshold:
            last_solved = i_episode
            solved_episode_count += 1

        # Process mean of scores when enough scores
        if i_episode >= avg_window_length_scores:
            shift_score_avgs.append(np.mean(scores[-avg_window_length_scores:]))

            # Print stats regularly
            if i_episode % print_stats_each_n_episode == 0:
                print(f'\rEpisode {i_episode} | Mean scores {score_avgs[-1]:.2f}' + \
                        f'| {avg_window_length_scores} shift score average: {shift_score_avgs[-1]:.2f}' + \
                        f'| Environment solved {solved_episode_count} time(s).',
                        end=" ")
                if last_solved > 0:
                    print(f'Last solution at episode {last_solved}th with score {stats_episodes[-1].score}.', end="")

            # Check the stability of the solution
            if np.mean(shift_score_avgs[-window_stability:]) >= solution_threshold:
                print(f'\n>> Environment solved in {i_episode} episodes!\tAverage Score: {shift_score_avgs[-1]:.2f}')
                agent.save('checkpoint_solved.pth')
                solved = True
                break

        # Update epsilon
        eps = eps_scheduler(i_step, eps)

    # Save the last episode if environment not solved
    if not solved:
        agent.save('checkpoint_last_episode.pth')

    return stats_episodes, score_avgs, shift_score_avgs

## Initialisations

In [None]:
SEED = None #71

if SEED is not None:
    random.seed(SEED)
    torch.manual_seed(SEED)
    np.random.seed(SEED)

In [None]:
env = UnityEnvironment(file_name="Banana.app")

# get the default brain
brain_name = env.brain_names[0]
brain = env.brains[brain_name]

env_info = env.reset(train_mode=True)[brain_name]

# number of agents in the environment
print('Number of agents:', len(env_info.agents))

In [None]:
# Agent neural netwotk settings
model_parameters = {
    "fc1_units": 64,
    "fc2_units": 64,
}

# Training and test hyperparameters
eps_end = 0.01
eps_decay = 0.995

n_episodes_train = 2000
max_timesteps = 300 # max number of timesteps per episode then done, resulting from the "Take Random Actions in the Environment" test
window_stability = 2
avg_window_length_scores = 100 # Project success constraint
solution_threshold = 13 # Project success constraint
print_stats_each_n_episode = 25

n_episodes_test = 500

## Training

In [None]:
# Instanciate the agent
agent = DQNAgentExpReplay (
    state_size=brain.vector_observation_space_size,
    action_size=brain.vector_action_space_size,
    model_parameters=model_parameters,
    device=device
    )

# eps_scheduler : function to modify epsilon
# i_episode (based on 1) is the number of steps
# eps is the current epsilon
eps_scheduler = lambda i_episode, eps, : max(eps_end, eps_decay*eps)

stats_episodes, score_avgs, shift_score_avgs = train_dqn_agent(
    agent,
    env,
    brain_name,
    eps_scheduler,
    eps_start=1.0,
    n_episodes=n_episodes_train,
    avg_window_length_scores=avg_window_length_scores,
    solution_threshold=solution_threshold,
    window_stability=window_stability,
    print_stats_each_n_episode=print_stats_each_n_episode
)

In [None]:
plot_training_stats(
    stats_episodes, score_avgs, shift_score_avgs, solution_threshold, avg_window_length_scores,
    "Training : scores and score averages"
    )

## Test

In [None]:
reload = True
if reload:
    agent = DQNAgentExpReplay(
        state_size=brain.vector_observation_space_size,
        action_size=brain.vector_action_space_size,
        model_parameters=model_parameters,
        device=device
        )
    agent.load('checkpoint_solved.pth')
agent.eval()

In [None]:
env_info = env.reset(train_mode=False)[brain_name]

In [None]:
print_every_n_episodes = 1

success = False
score_avg = 0

scores = []
stats_episodes = []
score_avgs = []
shift_score_avgs = []
for i_episode in trange(1, n_episodes_test+1):
    env_info = env.reset(train_mode=False)[brain_name]
    state = env_info.vector_observations[0]
    
    score = 0    
    done = False
    steps_to_resolution = 0
    while not done:
        # Agent chooses action
        action = agent.act(state)
        
        # Apply action to environment
        env_info = env.step(action)[brain_name]
        
        # Get environment evolution as experience : next state, reward and done
        next_state = env_info.vector_observations[0]
        reward = env_info.rewards[0]
        done = env_info.local_done[0]
        state = next_state

         # Update the score
        score += reward                               
        steps_to_resolution += 1

        print(f"\rEpisode #{i_episode} : Score = {int(score)} in {steps_to_resolution} steps| Score avg = {score_avg:.2f}", end="")

    # Save episode stats
    stats_episodes.append(
            Episode_Stats(i_episode, steps_to_resolution, score, score >= solution_threshold)
            )
    scores.append(score)
    score_avg = np.mean(scores)
    score_avgs.append(score_avg)
    
    if i_episode >= avg_window_length_scores:
        shift_score_avgs.append(np.mean(scores[-avg_window_length_scores:]))
        if np.mean(shift_score_avgs[-window_stability:]) >= solution_threshold:
            print(f"\rEpisode #{i_episode} : Score = {int(score)} in {steps_to_resolution} steps| Score avg = {score_avgs[-1]:.2f}", end="")
            break

In [None]:
plot_training_stats(
    stats_episodes, score_avgs, shift_score_avgs, solution_threshold, avg_window_length_scores,
    "Testing : scores and score averages"
    )

## End closing Unity env

In [None]:
env.close()