In [51]:
import gymnasium as gym
import ale_py
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import time

In [52]:
class QNetwork(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(QNetwork, self).__init__()

        # Definición de las capas convolucionales
        self.conv1 = nn.Conv2d(input_shape[0], 32, kernel_size=3, stride=1, padding=1)  # Primer bloque convolucional
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)  # Segundo bloque convolucional
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)  # Tercer bloque convolucional

        # Definición de la capa totalmente conectada
        # El tamaño de entrada de la capa FC depende del tamaño de la salida de la última capa convolucional
        self.fc1 = nn.Linear(64 * 7 * 7, 512)  # Ajusta esto según el tamaño de la salida
        self.fc2 = nn.Linear(512, num_actions)

    def forward(self, x):
        print(f"Input shape: {x.shape}")  # Imprimir la forma de la entrada

        # Aplicar las capas convolucionales
        x = torch.relu(self.conv1(x))
        print(f"After conv1: {x.shape}")
        x = torch.relu(self.conv2(x))
        print(f"After conv2: {x.shape}")
        x = torch.relu(self.conv3(x))
        print(f"After conv3: {x.shape}")

        # Aplanar la salida de las capas convolucionales antes de pasar a las capas FC
        x = x.view(x.size(0), -1)  # Aplanar (flatten)
        print(f"After flatten: {x.shape}")

        # Pasar por la primera capa totalmente conectada
        x = torch.relu(self.fc1(x))
        print(f"After fc1: {x.shape}")

        # Pasar por la segunda capa totalmente conectada (salida final)
        return self.fc2(x)

In [53]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def size(self):
        return len(self.buffer)

In [54]:
class DQNAgent:
    def __init__(self, env, buffer_size=10000, batch_size=32, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.1, learning_rate=1e-4):
        self.env = env
        self.buffer = ReplayBuffer(buffer_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.learning_rate = learning_rate
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.input_shape = env.observation_space.shape
        self.num_actions = env.action_space.n
        
        # Red neuronal para aproximar Q y la red objetivo
        self.q_network = QNetwork(self.input_shape, self.num_actions).to(self.device)
        self.target_network = QNetwork(self.input_shape, self.num_actions).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.learning_rate)

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return self.env.action_space.sample()  # Acción aleatoria
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
        q_values = self.q_network(state)
        return torch.argmax(q_values).item()

    def update_epsilon(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def learn(self):
        if self.buffer.size() < self.batch_size:
            return

        batch = self.buffer.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.tensor(np.array(states), dtype=torch.float32).to(self.device)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.int64).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.bool).to(self.device)

        # Calcular Q-valor objetivo
        with torch.no_grad():
            next_q_values = self.target_network(next_states)
            next_q_value = next_q_values.max(1)[0]
            target_q_values = rewards + (self.gamma * next_q_value * ~dones)

        # Q-valor actual
        q_values = self.q_network(states)
        q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

        # Calcular la pérdida
        loss = nn.MSELoss()(q_value, target_q_values)

        # Actualizar el modelo
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

In [55]:
def main():
    env = gym.make("ALE/Breakout-v5", render_mode="rgb_array")
    env = gym.wrappers.ResizeObservation(env, (16, 16))  # Reducir resolución a 84x84 píxeles

    agent = DQNAgent(env)

    total_episodes = 1000
    for episode in range(1, total_episodes + 1):
        state, _ = env.reset()
        state = np.array(state)

        done = False
        total_reward = 0
        while not done:
            action = agent.act(state)
            next_state, reward, done, _, _ = env.step(action)
            next_state = np.array(next_state)

            agent.buffer.push(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward

            agent.learn()

        # Actualizar la red objetivo cada ciertos episodios
        if episode % 10 == 0:
            agent.update_target_network()

        # Actualizar epsilon
        agent.update_epsilon()

        print(f"Ep {episode}/{total_episodes}, Total Reward: {total_reward}, Epsilon: {agent.epsilon}")

    env.close()

In [56]:
if __name__ == "__main__":
    main()

Input shape: torch.Size([32, 16, 16, 3])
After conv1: torch.Size([32, 32, 16, 3])
After conv2: torch.Size([32, 64, 16, 3])
After conv3: torch.Size([32, 64, 16, 3])
After flatten: torch.Size([32, 3072])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x3072 and 3136x512)