## Estudo de DDPG com Pendulum-v1 do Gym/Gymnasium e PyTorch

O `Pendulum-v1` é um **ambiente de simulação clássico** de aprendizado por reforço (Reinforcement Learning), incluído na biblioteca **Gym** (ou **Gymnasium**, sua versão mais atualizada).

Ele simula um **pêndulo invertido** preso por uma junta sem atrito, e o objetivo é **mantê-lo em pé (posição vertical)** aplicando torques na base.

---  

### Outros ambientes

| Ambiente                | Ação         | Complexidade | Observação                            |
|-------------------------|--------------|--------------|----------------------------------------|
| Pendulum-v1             | 1D contínua  | Baixa        | Simples, ótimo para DDPG inicial       |
| MountainCarContinuous-v0| 1D contínua  | Média        | Exploração importante                  |
| Reacher-v2              | 2D contínua  | Média        | Braço robótico                         |
| LunarLanderContinuous-v2| 2D contínua  | Média        | Controle com física realista           |
| HalfCheetah-v2          | N-dim contínua| Alta        | Benchmark clássico para RL contínuo    |
| Hopper-v2               | N-dim contínua| Alta        | Controle de salto com estabilidade     |
| Walker2d-v2             | N-dim contínua| Muito Alta  | Locomoção bípede                       |
| Ant-v2                  | N-dim contínua| Muito Alta  | Locomoção em 4 patas                   |
| Humanoid-v2             | N-dim contínua| Muito Alta  | Controle complexo de corpo humanoide   |  

---

### Variações e Extensões do DDPG

| Variação / Extensão     | Descrição breve                                                                 |
|--------------------------|---------------------------------------------------------------------------------|
| **TD3**                 | Usa dois críticos e atraso na atualização do ator para maior estabilidade.     |
| **SAC**                 | Algoritmo off-policy com entropia máxima para melhor exploração.                |
| **HER + DDPG**          | Usa experiências com metas alternativas em tarefas com sparse reward.          |
| **PER + DDPG**          | Priorização de experiências mais informativas na amostragem do replay buffer.  |
| **MADDPG**              | Extensão multiagente para ambientes com múltiplos agentes interativos.         |
| **Distributional DDPG**| Modela a distribuição do retorno em vez do valor esperado, melhorando precisão. |
| **Meta-DDPG / MAML-DDPG** | Aplica aprendizado de meta-política para adaptação rápida a novas tarefas.    |
| **Noisy DDPG**          | Introduz ruído nos pesos da rede para promover exploração eficiente.            |

---



In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn # Ver stable_baselines3 (com "MlpPolicy") parece mais simples
import torch.optim as optim
import random
from collections import deque

# Configuração de dispositivo (GPU via cuda(Compute Unified Device Architecture) ou CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Replay Buffer - armazena interações do agente com o ambiente
class ReplayBuffer:
    def __init__(self, max_size=100000):
        self.buffer = deque(maxlen=max_size)

    def add(self, transition):
        self.buffer.append(transition)

    def sample(self, batch_size):
        samples = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*samples)
        return map(lambda x: torch.FloatTensor(x).to(device), [states, actions, rewards, next_states, dones])

    def size(self):
        return len(self.buffer)

# Redes
# Ator (Actor): rede neural (nn) que decide ações dado o estado, retorna action (tensor) a = μ(s)
# Funções de ativação ReLU (Rectified Linear Unit) e tanh (Tangente Hiperbólica)
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, action_bound):
        super(Actor, self).__init__()
        self.action_bound = action_bound
        self.model = nn.Sequential(
            nn.Linear(state_dim, 256), nn.ReLU(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Linear(256, action_dim), nn.Tanh()
        )

    def forward(self, state):
        return self.model(state) * self.action_bound


# Crítico (Critic): rede neural (nn) que avalia a qualidade da ação, retorna Q-value (escalar) Q(s, a)
# Funçoes de ativação ReLU (Rectified Linear Unit)
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256), nn.ReLU(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Linear(256, 1)
        )

    def forward(self, state, action):
        return self.model(torch.cat([state, action], 1))

# Agente
class DDPGAgent:
    def __init__(self, state_dim, action_dim, action_bound):
        self.actor = Actor(state_dim, action_dim, action_bound).to(device)
        self.target_actor = Actor(state_dim, action_dim, action_bound).to(device)
        self.critic = Critic(state_dim, action_dim).to(device)
        self.target_critic = Critic(state_dim, action_dim).to(device)

        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic.load_state_dict(self.critic.state_dict())

        # Atualiza pesos com Adam (Otimizador) e respectivos lr (learning rate - taxa de aprendizado)
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)

        self.buffer = ReplayBuffer()
        self.gamma = 0.99
        # taxa de atualização suave
        self.tau = 0.005
        self.action_bound = action_bound

    def act(self, state, noise=0.1):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        action = self.actor(state).detach().cpu().numpy()[0]
        action += noise * np.random.randn(*action.shape)
        return np.clip(action, -self.action_bound, self.action_bound)

    def update(self, batch_size=64):
        if self.buffer.size() < batch_size:
            return

        states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)

        # Atualiza crítico
        with torch.no_grad():
            target_actions = self.target_actor(next_states)
            target_q = self.target_critic(next_states, target_actions)
            y = rewards.unsqueeze(1) + self.gamma * (1 - dones.unsqueeze(1)) * target_q

        q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(q, y)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Atualiza ator
        actor_loss = -self.critic(states, self.actor(states)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Atualização suave dos alvos (soft update)
        # Actor
        for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        # Critic
        for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

# Treinamento
env = gym.make("Pendulum-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = float(env.action_space.high[0])

agent = DDPGAgent(state_dim, action_dim, action_bound)
# Inicialize antes do loop de episódios
rewards = []
episodes = 100
for ep in range(episodes):
    state, _ = env.reset()
    episode_reward = 0
    for step in range(200):
        action = agent.act(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        agent.buffer.add((state, action, reward, next_state, float(done)))
        agent.update()
        state = next_state
        episode_reward += reward
        if done:
            break

    rewards.append(episode_reward)  # <- Aqui adicionamos a recompensa final do episódio
    print(f"Episode {ep + 1}: Reward = {episode_reward:.2f}")

    torch.save(agent.actor.state_dict(), 'ddpg_actor.pth')
    torch.save(agent.critic.state_dict(), 'ddpg_critic.pth')






In [None]:
# Gráfico com e sem suavização
import matplotlib.pyplot as plt

def moving_average(data, window_size=10):
    return [sum(data[i-window_size:i])/window_size if i >= window_size else data[i] for i in range(len(data))]

plt.figure(figsize=(10, 5))
plt.plot(rewards, label='Recompensa (original)', alpha=0.3)
plt.plot(moving_average(rewards, window_size=10), label='Média móvel (janela=10)', color='orange', linewidth=2)
plt.xlabel("Episódio")
plt.ylabel("Recompensa total")
plt.title("Desempenho do agente DDPG no Pendulum")
plt.legend()
plt.grid(True)
# Salvar o gráfico como imagem PNG
plt.savefig(f"recompensas_ddpg_{episodes}.png")

plt.show()



In [None]:
# Carregar os pesos e gerar um vídeo Versão2
import gymnasium as gym
import torch
import torch.nn as nn
import numpy as np
import cv2

# Corrigir tipo bool8 se necessário
if not hasattr(np, 'bool8'):
    np.bool8 = np.bool_

# Criar o ambiente com modo de renderização
env = gym.make("Pendulum-v1", render_mode="rgb_array")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high[0]

# Redes
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, action_bound):
        super(Actor, self).__init__()
        self.action_bound = action_bound
        self.model = nn.Sequential(
            nn.Linear(state_dim, 256), nn.ReLU(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Linear(256, action_dim), nn.Tanh()
        )

    def forward(self, state):
        return self.model(state) * self.action_bound

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256), nn.ReLU(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Linear(256, 1)
        )

    def forward(self, state, action):
        return self.model(torch.cat([state, action], 1))


class DDPGAgent:
    def __init__(self, state_dim, action_dim, action_bound):
        self.actor = Actor(state_dim, action_dim, action_bound)
        self.critic = Critic(state_dim, action_dim)

agent = DDPGAgent(state_dim, action_dim, action_bound)
agent.actor.load_state_dict(torch.load("ddpg_actor.pth"))
agent.critic.load_state_dict(torch.load("ddpg_critic.pth"))


def generate_video():
    frames = []
    state, _ = env.reset()
    done = False

    while not done:
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        action = float(agent.actor(state_tensor).detach().numpy()[0])
        next_state, reward, terminated, truncated, _ = env.step(np.array([action]))
        done = terminated or truncated

        frame = env.render()
        frames.append(frame)
        state = next_state

    height, width, _ = frames[0].shape
    writer = cv2.VideoWriter(f"pendulum_video_{episodes}.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 30, (width, height))

    for frame in frames:
        # frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        # writer.write(frame_bgr)
        writer.write(frame)
    writer.release()




generate_video()
