In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
from minigrid.wrappers import FlatObsWrapper
import random
from collections import deque
from datetime import datetime
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
 
env_name = "MiniGrid-Fetch-8x8-N3-v0"
epsilon = 1.0
epsilon_min = 0.1
epsilon_decay = 0.85
gamma = 0.99
batch_size = 128
memory_size = 100_000
episodes = 500
target_update_freq = 1
train_updates = 5
learning_rate = 0.003
 
env = gym.make(env_name)
env = FlatObsWrapper(env)
state, info = env.reset()
state_shape = len(state)
action_shape = env.action_space.n
print(f"State shape: {state_shape}, Action shape: {action_shape}")
 
log_dir = "runs/minigrid_" + datetime.now().strftime("%Y%m%d-%H%M%S")
writer = SummaryWriter(log_dir=log_dir)
 
class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(QNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, output_dim)
        )
 
    def forward(self, x):
        return self.net(x)
 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
q_model = QNetwork(state_shape, action_shape).to(device)
target_model = QNetwork(state_shape, action_shape).to(device)
target_model.load_state_dict(q_model.state_dict())
target_model.eval()
 
optimizer = optim.Adam(q_model.parameters(), lr=learning_rate)
loss_fn = nn.MSELoss()
memory = deque(maxlen=memory_size)
 
def store_transition(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))
 
def sample_batch():
    batch = random.sample(memory, batch_size)
    state, action, reward, next_state, done = zip(*batch)
    return (
        torch.tensor(state, dtype=torch.float32).to(device),
        torch.tensor(action, dtype=torch.int64).to(device),
        torch.tensor(reward, dtype=torch.float32).to(device),
        torch.tensor(next_state, dtype=torch.float32).to(device),
        torch.tensor(done, dtype=torch.bool).to(device)
    )
 
def epsilon_greedy_policy(state, epsilon):
    if random.random() < epsilon:
        return random.randint(0, action_shape - 1)
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
    q_values = q_model(state_tensor)
    return torch.argmax(q_values, dim=1).item()
 
def train_step(global_step):
    if len(memory) < batch_size:
        return None
    states, actions, rewards, next_states, dones = sample_batch()
 
    with torch.no_grad():
        next_q_values = target_model(next_states)
        max_next_q_values, _ = next_q_values.max(dim=1)
        target_q = rewards + gamma * max_next_q_values * (~dones)
 
    current_q = q_model(states)
    selected_q = current_q.gather(1, actions.unsqueeze(1)).squeeze()
 
    loss = loss_fn(selected_q, target_q)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
 
    writer.add_scalar("Loss/train", loss.item(), global_step)
    return loss.item()
 
# Entraînement
reward_history = []
global_step = 0
 
for episode in range(1, episodes + 1):
    state, info = env.reset()
    done = False
    episode_reward = 0
 
    while not done:
        action = epsilon_greedy_policy(state, epsilon)
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        store_transition(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward
 
        loss = train_step(global_step)
        global_step += 1
 
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    if episode % target_update_freq == 0:
        target_model.load_state_dict(q_model.state_dict())
 
    reward_history.append(episode_reward)
    writer.add_scalar("Reward/episode", episode_reward, episode)
    print(f"Episode {episode}/{episodes} — Reward: {episode_reward:.2f} — epsilon: {epsilon:.3f}")
 
# Evaluation finale
state, info = env.reset()
done = False
total_reward = 0
 
while not done:
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
    action = torch.argmax(q_model(state_tensor), dim=1).item()
    state, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    total_reward += reward
    env.render()
 
print(f"Récompense d'évaluation : {total_reward:.2f}")
writer.add_scalar("Reward/final_evaluation", total_reward)
 
# Graphique matplotlib
plt.plot(reward_history, label="Reward per Episode")
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.title("Reward Progression")
plt.legend()
plt.grid(True)
plt.savefig("reward_plot.png")
plt.show()
 
writer.close()
 
 