In [3]:
import gymnasium as gym
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Define the PPO agent
class PPOAgent(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PPOAgent, self).__init__()
        self.policy_network = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )
        self.value_network = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, state):
        policy_output = self.policy_network(state)
        value_output = self.value_network(state)
        return policy_output, value_output

# Define the priority network
class PriorityNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PriorityNetwork, self).__init__()
        self.priority_network = nn.Sequential(
            nn.Linear(state_dim + action_dim + 1 + state_dim + 1, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, experience):
        priority_output = self.priority_network(experience)
        return priority_output

# Define the PPO trainer
class PPOTrainer:

    def __init__(self, agent, priority_network, gamma, lambda_, epsilon, c1, c2):
        self.agent = agent
        self.priority_network = priority_network
        self.gamma = gamma
        self.lambda_ = lambda_
        self.epsilon = epsilon
        self.c1 = c1
        self.c2 = c2

    def train(self, batch_size, epochs):
        for epoch in range(epochs):
            # Sample a batch of experiences from the replay buffer
            batch_experiences = self.sample_batch(batch_size)

            # Compute the TD-error for each experience in the batch
            td_errors = []
            for experience in batch_experiences:
                state, action, reward, next_state, done = experience
                td_error = reward + self.gamma * self.agent.value_network(next_state) - self.agent.value_network(state)
                td_errors.append(td_error)

            # Train the priority network
            self.priority_network.train()
            priority_optimizer = optim.Adam(self.priority_network.parameters(), lr=0.001)
            priority_loss_fn = nn.MSELoss()
            for experience, td_error in zip(batch_experiences, td_errors):
                priority_optimizer.zero_grad()
                priority_output = self.priority_network(experience)
                loss = priority_loss_fn(priority_output, torch.tensor(td_error))
                loss.backward()
                priority_optimizer.step()

            # Train the PPO agent
            self.agent.train()
            policy_optimizer = optim.Adam(self.agent.policy_network.parameters(), lr=0.001)
            value_optimizer = optim.Adam(self.agent.value_network.parameters(), lr=0.001)
            for experience in batch_experiences:
                state, action, reward, next_state, done = experience
                policy_optimizer.zero_grad()
                value_optimizer.zero_grad()
                policy_output, value_output = self.agent(state)
                policy_loss = -torch.log(policy_output[action]) * reward
                value_loss = (value_output - reward) ** 2
                loss = policy_loss + value_loss
                loss.backward()
                policy_optimizer.step()
                value_optimizer.step()

    def sample_batch(self, batch_size):
        # Sample a batch of experiences from the replay buffer
        # This is a placeholder for the actual sampling logic
        batch_experiences = []
        for _ in range(batch_size):
            batch_experiences.append(np.random.rand(5))  # state, action, reward, next_state, done
        return batch_experiences

# Create the Gym Car2D environment
env =  gym.make("CarRacing-v2")

In [4]:
# Create the PPO agent and priority network
agent = PPOAgent(state_dim=env.observation_space.shape[0], action_dim=env.action_space.shape[0])
priority_network = PriorityNetwork(state_dim=env.observation_space.shape[0], action_dim=env.action_space.shape[0])

# Create the PPO trainer
trainer = PPOTrainer(agent, priority_network, gamma=0.99, lambda_=0.95, epsilon=0.1, c1=0.5, c2=0.01)

# Train the PPO agent
trainer.train(batch_size=32, epochs=1000)

TypeError: linear(): argument 'input' (position 1) must be Tensor, not numpy.float64

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributions as distributions
import gymnasium as gym
import numpy as np

# Define the PPO agent
class PPOAgent(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PPOAgent, self).__init__()
        self.policy_network = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim*2)  # output mean and std
        )
        self.value_network = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, state):
        state = state.view(-1, 96*3)
        policy_output = self.policy_network(state)
        value_output = self.value_network(state)
        return policy_output, value_output

    def sample_action(self, policy_output):
        mean, log_std = policy_output.chunk(2, dim=-1)
        std = torch.exp(log_std)
        dist = distributions.Normal(mean, std)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action, log_prob

# Define the priority network
class PriorityNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PriorityNetwork, self).__init__()
        self.priority_network = nn.Sequential(
            nn.Linear(state_dim + action_dim + 1 + state_dim + 1, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, experience):
        priority_output = self.priority_network(experience)
        return priority_output

# Define the PPO trainer
class PPOTrainer:
    def __init__(self, agent, priority_network, gamma, lambda_, epsilon, c1, c2):
        self.agent = agent
        self.priority_network = priority_network
        self.gamma = gamma
        self.lambda_ = lambda_
        self.epsilon = epsilon
        self.c1 = c1
        self.c2 = c2
        self.replay_buffer = []

    def train(self, batch_size, epochs):
        env = gym.make("CarRacing-v2")
        for episode in range(epochs):
            state, _ = env.reset()
            done = False
            rewards = 0
            while not done:
                state_tensor = torch.tensor(state, dtype=torch.float32)
                policy_output, _ = self.agent(state_tensor)
                action, log_prob = self.agent.sample_action(policy_output)
                next_state, reward, done, _, _ = env.step(action.numpy())
                rewards += reward
                self.replay_buffer.append([state, action.numpy(), reward, next_state, done])
                state = next_state
            print(f"Episode {episode+1}, Reward: {rewards}")
            self.update_policy(batch_size)

    def update_policy(self, batch_size):
        batch = np.random.choice(self.replay_buffer, batch_size)
        states = torch.tensor([x[0] for x in batch], dtype=torch.float32)
        actions = torch.tensor([x[1] for x in batch], dtype=torch.float32)
        rewards = torch.tensor([x[2] for x in batch], dtype=torch.float32)
        next_states = torch.tensor([x[3] for x in batch], dtype=torch.float32)
        dones = torch.tensor([x[4] for x in batch], dtype=torch.bool)
        for _ in range(5):
            policy_loss = 0
            value_loss = 0
            for i in range(batch_size):
                policy_output, value_output = self.agent(states[i])
                action, log_prob = self.agent.sample_action(policy_output)
                policy_loss += -log_prob * rewards[i]
                value_loss += (value_output - rewards[i]) ** 2
            policy_loss /= batch_size
            value_loss /= batch_size
            self.agent.policy_network.zero_grad()
            self.agent.value_network.zero_grad()
            policy_loss.backward()
            value_loss.backward()
            self.agent.policy_network.step()
            self.agent.value_network.step()


# Create the PPO agent and priority network
agent = PPOAgent(state_dim=96*3, action_dim=3)
priority_network = PriorityNetwork(state_dim=96*3, action_dim=3)

# Create the PPO trainer
trainer = PPOTrainer(agent, priority_network, gamma=0.99, lambda_=0.95, epsilon=0.1, c1=0.5, c2=0.01)

# Train the PPO agent
trainer.train(batch_size=32, epochs=1000)

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()