## Import the necessary packages

In [5]:
import torch  
import gymnasium as gym
import ale_py
import numpy as np  
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import matplotlib.pyplot as plt
import pandas as pd
import cv2

# Parameters

In [6]:
# Hyperparameters
GAMMA = 0.99
LR = 1e-4
ENTROPY_BETA = 0.01
BATCH_SIZE = 5
HIDDEN_SIZE = 512

# Simulation parameters
max_epochs=1000
max_episodes=20

env = gym.make("ALE/Pong-v5")
n_actions = env.action_space.n

## Definition of the Actor and Critic classes

In [7]:
class Actor(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(Actor, self).__init__()
        self.network = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, HIDDEN_SIZE),
            nn.ReLU(),
            nn.Linear(HIDDEN_SIZE, n_actions),
            nn.Softmax(dim=1)
        )
    
    def forward(self, state):
        state = state / 255.0
        return self.network(state)

class Critic(nn.Module):
    def __init__(self, input_shape):
        super(Critic, self).__init__()
        self.network = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, HIDDEN_SIZE),
            nn.ReLU(),
            nn.Linear(HIDDEN_SIZE, 1)
        )
    
    def forward(self, state):
        state = state / 255.0
        return self.network(state)



# Definition of A2C agents

In [None]:
def preprocess_state(state):
    state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)  # Convertir en niveaux de gris
    state = cv2.resize(state, (84, 84))  # Redimensionner à 84x84
    state = np.expand_dims(state, axis=0)  # Ajouter la dimension du canal
    state = state.astype(np.float32) / 255.0  # Normaliser entre 0 et 1
    return torch.tensor(state, dtype=torch.float32).unsqueeze(0)  # Ajouter une dimension batch

def advantage_actor_critic(env, max_epochs, max_episodes, learning_rate, gamma):
    actor = Actor((1, 84, 84), n_actions)
    critic = Critic((1, 84, 84))
    actor_optimizer = optim.Adam(actor.parameters(), lr=learning_rate)
    critic_optimizer = optim.Adam(critic.parameters(), lr=learning_rate)
    
    all_rewards = []
    total_entropy = 0
    
    for episode in range(max_episodes):
        log_probas, values, rewards = [], [], []
        state, _ = env.reset()
        state = preprocess_state(state)
        
        for epoch in range(max_epochs):
            #print(state.shape)
            value = critic(state).squeeze().detach().numpy()
            policy = actor(state)
            policy_np = policy.detach().numpy().squeeze()
            action = np.random.choice(n_actions, p=policy_np)
            log_proba = torch.log(policy.squeeze(0)[action])
            entropy = -np.sum(policy_np * np.log(policy_np + 1e-10))
            
            next_state, reward, done, _, _ = env.step(action)
            next_state = preprocess_state(next_state)
            
            values.append(value)
            log_probas.append(log_proba)
            rewards.append(reward)
            total_entropy += entropy
            
            if done or epoch == max_epochs - 1:
                q_value = critic(next_state).squeeze().detach().numpy()
                sum_rewards = np.sum(rewards)
                all_rewards.append(sum_rewards)
                
                if episode % 10 == 0:
                    print(f"Episode: {episode}, total reward: {sum_rewards}")
                break
            
            state = next_state
        print(f"values type: {type(values)}, example: {values[:5]}")
        values = torch.tensor(np.array(values), dtype=torch.float32)
        q_values = []
        q_value = 0  # Initialisation correcte
        for reward in reversed(rewards):
            q_value = reward + gamma * q_value
            q_values.insert(0, q_value)
        
        q_values = torch.FloatTensor(q_values)
        log_probas = torch.stack(log_probas)
        
        advantage = q_values - values
        actor_loss = (-log_probas * advantage).mean()
        critic_loss = 0.5 * advantage.pow(2).mean()
        actor_critic_loss = actor_loss + critic_loss - 0.001 * total_entropy
        
        actor_optimizer.zero_grad()
        critic_optimizer.zero_grad()
        actor_critic_loss.backward()
        actor_optimizer.step()
        critic_optimizer.step()
    
    smoothed_rewards = pd.Series(all_rewards).rolling(10).mean()
    return all_rewards, smoothed_rewards

## Train our agent in the Pong environnment

In [9]:
all_rewards, smoothed_rewards = advantage_actor_critic(env, max_epochs, max_episodes, learning_rate=LR, gamma=GAMMA)

Episode: 0, total reward: -20.0
values type: <class 'list'>, example: [array(0.0329124, dtype=float32), array(0.03292063, dtype=float32), array(0.03292017, dtype=float32), array(0.03291988, dtype=float32), array(0.03291979, dtype=float32)]
930
values type: <class 'list'>, example: [array(0.0329124, dtype=float32), array(0.03292063, dtype=float32), array(0.03291965, dtype=float32), array(0.03291988, dtype=float32), array(0.0329197, dtype=float32)]
912
values type: <class 'list'>, example: [array(0.0329124, dtype=float32), array(0.03291959, dtype=float32), array(0.03292081, dtype=float32), array(0.03292076, dtype=float32), array(0.03292103, dtype=float32)]
1000


KeyboardInterrupt: 

In [53]:
plt.plot(all_rewards)
plt.plot(smoothed_rewards)
plt.plot()
plt.xlabel('episode number')
plt.ylabel('reward')
plt.show()

: 