In [None]:
import gymnasium as gym
import numpy as np
import torch as th
import matplotlib.pyplot as plt

from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy

from torch.nn import functional as F

In [None]:
import torch

if torch.cuda.is_available():
    print("CUDA is available")
    num_cuda_devices = torch.cuda.device_count()
    print("Number of CUDA devices:", num_cuda_devices)
    if num_cuda_devices > 0:
        print("CUDA device name:", torch.cuda.get_device_name(0))
        print("CUDA device capability:", torch.cuda.get_device_capability(0))
    else:
        print("No CUDA devices found despite CUDA being available")
else:
    print("CUDA is not available")

In [None]:
env = gym.make("LunarLander-v2", render_mode="rgb_array")

In [None]:
dqn_model = DQN(
    "MlpPolicy",
    env,
    verbose=1,
    train_freq=4,
    gradient_steps=-1,
    gamma=0.99,
    exploration_fraction=0.12,
    exploration_final_eps=0.1,
    target_update_interval=250,
    learning_starts=0,
    buffer_size=50000,
    batch_size=128,
    learning_rate=6.3e-4,
    policy_kwargs=dict(net_arch=[256, 256]),
    # tensorboard_log=tensorboard_log,
    seed=2,
)

In [None]:
mean_reward, std_reward = evaluate_policy(
    dqn_model,
    dqn_model.get_env(),
    deterministic=True,
    n_eval_episodes=20,
)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

In [None]:
dqn_model.learn(int(1e5), log_interval=10)

In [None]:
mean_reward, std_reward = evaluate_policy(dqn_model, dqn_model.get_env(), deterministic=True, n_eval_episodes=20)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

### Visualize Q Values

In [None]:
def get_q_values(model: DQN, obs: np.ndarray) -> np.ndarray:
    """
    Retrieve Q-values for a given observation.

    :param model: a DQN model
    :param obs: a single observation
    :return: the associated q-values for the given observation
    """
    assert model.get_env().observation_space.contains(obs), f"Invalid observation of shape {obs.shape}: {obs}"
    obs_tensor = th.tensor(obs).float().unsqueeze(0).to(model.device)
    with th.no_grad():
        q_values = model.q_net.forward(obs_tensor).cpu().numpy().squeeze()

    assert isinstance(q_values, np.ndarray), "The returned q_values is not a numpy array"
    assert q_values.shape == (4,), f"Wrong shape: (4,) was expected but got {q_values.shape}"

    return q_values

In [None]:
obs, _ = env.reset()
print("obs.shape:", obs.shape)

print ("env.action_space:", env.action_space)
print("observation_space_shape:",env.observation_space.shape)


In [None]:
plt.axis('off')
plt.imshow(env.render())

In [None]:
action_str = ["Nothing", "Left Engine", "Main Engine", "Right Engine"] 

In [None]:
initial_state,_=env.reset()
print(env.step,"\n")
print(env.action_space,"\n")
print(env.metadata,"\n")
print(env.observation_space,"\n")

In [None]:
q_values = get_q_values(dqn_model, initial_state)
print(q_values)
q_value_nothing = q_values[0]
q_value_left = q_values[1]
q_value_main = q_values[2]
q_value_right=q_values[3]

print(f"Q-value of the initial state left={q_value_left:.2f} nothing={q_value_nothing:.2f} right={q_value_right:.2f}")

action = np.argmax(q_values)

print(f"Action taken by the greedy policy in the initial state: {action_str[action]}")

In [None]:
initial_q_value = q_values.max()
print(initial_q_value)

In [None]:
print(obs.shape)
print(obs)
obs = obs.flatten()
print(obs.shape)
print(obs)

In [None]:
from IPython.display import clear_output

episode_rewards = []
done = False
i = 0

while not done:
    i += 1
    
    # Clear the previous figure
    clear_output(wait=True)
    
    # Display current state
    plt.imshow(env.render())
    plt.show()

    # Retrieve q-value
    q_values = get_q_values(dqn_model, obs)

    # Take greedy-action
    action, _ = dqn_model.predict(obs, deterministic=True)

    print(f"Q-value of the current state \nnothing={q_values[0]:.2f} \nleft={q_values[1]:.2f} \nmain={q_values[2]:.2f} \nright={q_values[3]}")
    print(f"Action: {action_str[action]}")

    obs, reward, terminated, truncated, info = env.step(action)
    
    done = terminated or truncated

    episode_rewards.append(reward)

In [None]:
sum_discounted_rewards = 0
for i, reward in enumerate(reversed(episode_rewards)):
    sum_discounted_rewards += reward * (dqn_model.gamma ** i)

print(f"Sum of discounted rewards: {sum_discounted_rewards:.2f}")

In [None]:
from stable_baselines3.common.buffers import ReplayBuffer

class DoubleDQN(DQN):
    def train(self, gradient_steps: int, batch_size: int = 100) -> None:
        self.policy.set_training_mode(True)
        self._update_learning_rate(self.policy.optimizer)

        losses = []
        for _ in range(gradient_steps):
            # Sample replay buffer
            replay_data = self.replay_buffer.sample(batch_size, env=self._vec_normalize_env)

            with th.no_grad():
                # Compute the next Q-values using the target network
                next_q_values = self.q_net_target(replay_data.next_observations)
                # Compute q-values for the next observation using the online q net
                next_q_values_online = self.q_net(replay_data.next_observations)
                # Select action with online network
                next_actions_online = next_q_values_online.argmax(dim=1)
                # Estimate the q-values for the selected actions using target q network
                next_q_values = next_q_values.gather(1, next_actions_online.unsqueeze(1)).squeeze(1)
                # 1-step TD target
                target_q_values = replay_data.rewards + (1 - replay_data.dones) * self.gamma * next_q_values

            # Get current Q-values estimates
            current_q_values = self.q_net(replay_data.observations)
            # Retrieve the q-values for the actions from the replay buffer
            current_q_values = th.gather(current_q_values, dim=1, index=replay_data.actions.long())

            # Compute loss (Huber loss)
            loss = F.smooth_l1_loss(current_q_values, target_q_values)

            losses.append(loss.item())

            # Optimize the policy
            self.policy.optimizer.zero_grad()
            loss.backward()
            th.nn.utils.clip_grad_norm_(self.policy.parameters(), self.max_grad_norm)
            self.policy.optimizer.step()

        self._n_updates += gradient_steps

        self.logger.record("train/n_updates", self._n_updates, exclude="tensorboard")
        self.logger.record("train/loss", np.mean(losses))

In [None]:
from torch.nn import functional as F

from stable_baselines3.common.callbacks import BaseCallback


class MonitorQValueCallback(BaseCallback):
    """
    Callback to monitor the evolution of the q-value
    for the initial state.
    It allows to artificially over-estimate a q-value for initial states.

    """
    def __init__(self, sample_interval: int = 2500):
        super().__init__()
        self.timesteps = []
        self.max_q_values = []
        self.sample_interval = sample_interval
        n_samples = 512
        env = gym.make("LunarLander-v2")
        # Sample initial states that will be used to monitor the estimated q-value
        self.start_obs = np.array([env.reset()[0] for _ in range(n_samples)])
    
    def _on_training_start(self) -> None:
        # Create overestimation
        obs = th.tensor(self.start_obs, device=self.model.device).float()
        # Over-estimate going left q-value for the initial states
        target_q_values = th.ones((len(obs), 1), device=self.model.device).float() * 100

        for _ in range(100):
            # Get current Q-values estimates
            current_q_values = self.model.q_net(obs)

            # Over-estimate going left
            current_q_values = th.gather(current_q_values, dim=1, index=th.zeros((len(obs), 1), device=self.model.device).long())

            loss = F.mse_loss(current_q_values, target_q_values)

            # Optimize the policy
            self.model.policy.optimizer.zero_grad()
            loss.backward()
            self.model.policy.optimizer.step()

    def _on_step(self) -> bool:
        # Sample q-values
        if self.n_calls % self.sample_interval == 0:
            # Monitor estimated q-values using current model
            obs = th.tensor(self.start_obs, device=self.model.device).float()
            with th.no_grad():
                q_values = self.model.q_net(obs).cpu().numpy()

            self.logger.record("train/max_q_value", float(q_values.max()))
            self.timesteps.append(self.num_timesteps)
            self.max_q_values.append(q_values.max())
        return True

In [None]:
monitor_dqn_value_cb = MonitorQValueCallback()
print(monitor_dqn_value_cb)

In [None]:
double_q = DoubleDQN(
    "MlpPolicy",
    env,
    verbose=1,
    train_freq=4,
    gradient_steps=-1,
    gamma=0.99,
    exploration_fraction=0.12,
    exploration_final_eps=0.1,
    target_update_interval=250,
    learning_starts=0,
    buffer_size=50000,
    batch_size=128,
    learning_rate=6.3e-4,
    policy_kwargs=dict(net_arch=[256, 256]),
    # tensorboard_log=tensorboard_log,
    seed=2,
)

In [None]:
monitor_double_q_value_cb = MonitorQValueCallback()

In [None]:
monitor_dqn_value_cb = MonitorQValueCallback()

In [None]:
dqn_model.learn(int(1e5), log_interval=10, callback=monitor_dqn_value_cb)

In [None]:
double_q.learn(int(1e5), log_interval=10, callback=monitor_double_q_value_cb)

In [None]:
plt.figure(figsize=(6, 3), dpi=150)
plt.title("Evolution of max q-value for start states over time")
plt.plot(monitor_dqn_value_cb.timesteps, monitor_dqn_value_cb.max_q_values, label="DQN", color="pink")
plt.plot(monitor_double_q_value_cb.timesteps, monitor_double_q_value_cb.max_q_values, label="Double DQN", color="purple")
plt.legend()