In [None]:
%pip install gymnasium ale-py
%pip install gymnasium[atari]
%pip install gymnasium[accept-rom-license]

In [None]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import ale_py
from collections import deque
import matplotlib.pyplot as plt
import base64
import random
from torch.optim.lr_scheduler import StepLR
import os

In [None]:
RANDOM_STATE = 10
torch.manual_seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)


In [None]:
class Actor(nn.Module):
    def __init__(self, state_size, action_size):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, action_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.clamp(x, -10, 10)
        return torch.softmax(self.fc4(x), dim=-1)


In [None]:

class Critic(nn.Module):
    def __init__(self, state_size):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        return self.fc4(x)


In [None]:

class A2CAgent:
    def __init__(self, env):
        self.env = env
        self.env.seed = torch.seed
        self.state_size = np.prod(env.observation_space.shape)
        self.action_size = env.action_space.n
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.actor = Actor(self.state_size, self.action_size).to(self.device)
        self.critic = Critic(self.state_size).to(self.device)

        # Optimizers
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=0.0001)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=0.0001)

        # Learning rate schedulers
        self.actor_scheduler = StepLR(self.actor_optimizer, step_size=50, gamma=0.9)
        self.critic_scheduler = StepLR(self.critic_optimizer, step_size=50, gamma=0.9)

        # Exploration and other parameters (optimal : epsilon = 0.45, epsilon_decay=0.96, lr = 0.0001, gamma = 0.97)
        self.gamma = 0.97
        self.epsilon = 0.35 # Initial exploration
        self.epsilon_decay = 0.97
        
        # Decay rate of exploration
        self.epsilon_min = 0.01  # Minimum exploration

        # Sliding window for tracking the last 10 scores
        self.scores_window = deque(maxlen=10)
        
        # Directory for saving models
        self.save_dir = './saved_models'
        os.makedirs(self.save_dir, exist_ok=True)

    def save_model(self, epoch, mean_score):
        """Saves both the actor and critic models to the specified directory."""
        model_path = os.path.join(self.save_dir, f"actor_critic_epoch_{epoch}_mean_score_{mean_score:.2f}.pth")
        print(f"Saving model to {model_path}")
        torch.save({
            'actor_state_dict': self.actor.state_dict(),
            'critic_state_dict': self.critic.state_dict(),
            'actor_optimizer_state_dict': self.actor_optimizer.state_dict(),
            'critic_optimizer_state_dict': self.critic_optimizer.state_dict(),
            'epoch': epoch,
            'mean_score': mean_score
        }, model_path)
        
    def load_model(self, model_path):
        """Charge les poids de l'actor et du critic depuis le fichier sauvegardé."""
        checkpoint = torch.load(model_path)
        self.actor.load_state_dict(checkpoint['actor_state_dict'])
        self.critic.load_state_dict(checkpoint['critic_state_dict'])
        # Optionnel : si vous voulez reprendre l'entraînement, vous pouvez aussi charger les optimizers
        self.actor_optimizer.load_state_dict(checkpoint['actor_optimizer_state_dict'])
        self.critic_optimizer.load_state_dict(checkpoint['critic_optimizer_state_dict'])
        print(f"Model loaded from {model_path}")

    def get_action(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)  # Random action for exploration
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        probs = self.actor(state).detach().cpu().numpy()[0]
        action = np.random.choice(self.action_size, p=probs)
        return action

    def train(self, epochs=200, max_moves=5000, batch_size=128, render=False):
        scores = []
        buffer = deque(maxlen=1000)  # Buffer for storing transitions
        for e in range(epochs):
                state, _ = self.env.reset()
                state = state.flatten()
                done = False
                score = 0
                max_moves_remaining = max_moves

                while not done and max_moves_remaining > 0:
                    max_moves_remaining -= 1
                    action = self.get_action(state)
                    next_state, reward, done, truncated, _ = self.env.step(action)
                    next_state = next_state.flatten()

                    # Store the experience in the buffer
                    buffer.append((state, action, reward, next_state, done))

                    state = next_state
                    score += reward

                # Update by batch
                if len(buffer) >= batch_size:
                    transitions = random.sample(buffer, batch_size)
                    states, actions, rewards, next_states, dones = zip(*transitions)
                    states_tensor = torch.from_numpy(np.array(states)).float().to(self.device)
                    next_states_tensor = torch.from_numpy(np.array(next_states)).float().to(self.device)
                    actions_tensor = torch.tensor(actions, dtype=torch.long).to(self.device)
                    rewards_tensor = torch.tensor(rewards, dtype=torch.float32).to(self.device)
                    dones_tensor = torch.tensor(dones, dtype=torch.float32).to(self.device)

                    # Calculate values and advantages
                    values = self.critic(states_tensor)
                    next_values = self.critic(next_states_tensor).detach()
                    advantages = rewards_tensor + self.gamma * next_values * (1 - dones_tensor) - values

                    probs = self.actor(states_tensor)
                    log_probs = torch.log(probs[range(batch_size), actions_tensor])
                    actor_loss = -(log_probs * advantages.detach()).mean()

                    # Calculate critic loss
                    critic_loss = (advantages.pow(2)).mean()

                    # Update actor and critic with gradient clipping
                    self.actor_optimizer.zero_grad()
                    actor_loss.backward()
                    self.actor_optimizer.step()

                    self.critic_optimizer.zero_grad()
                    critic_loss.backward()
                    self.critic_optimizer.step()

                # Decay epsilon to reduce exploration
                if self.epsilon > self.epsilon_min:
                    self.epsilon *= self.epsilon_decay

                # Add score to the sliding window and calculate the mean
                self.scores_window.append(score)
                mean_score = np.mean(self.scores_window)


                scores.append(score)
                print(f'Epoch: {e+1}/{epochs}, Score: {score}, Mean Score (Last 10): {mean_score:.2f}, Actor Loss: {actor_loss.item()}, Critic Loss: {critic_loss.item()}')
                
                # Check if the stopping condition is met (mean_score >= 75)
                if mean_score >= 75:
                    self.save_model(e, mean_score)
                    break
                    
                # Step the schedulers to update the learning rate
                self.actor_scheduler.step()
                self.critic_scheduler.step()

        return scores

    def test(self, episodes=10, max_moves=5000, render=True):
        for e in range(episodes):
            state, _ = self.env.reset()
            state = state.flatten()
            done = False
            score = 0
            max_moves_remaining = max_moves
            while not done and max_moves_remaining > 0:
                max_moves_remaining -= 1
                action = self.get_action(state)
                next_state, reward, done, truncated, _ = self.env.step(action)
                next_state = next_state.flatten()
                score += reward
                state = next_state
            print(f'Test Episode: {e+1}/{episodes}, Score: {score}')

In [None]:
# Après l'entraînement ou lors du test
model_path = '/kaggle/working/saved_models/actor_critic_epoch_75_mean_score_75.60.pth'  # Chemin vers votre modèle sauvegardé
agent.load_model(model_path)

# Ensuite, vous pouvez tester l'agent sur de nouvelles parties
print("Testing the agent...")
agent.test(episodes=20, render=False)