In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

# Define the actor and critic networks
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.layer1 = nn.Linear(state_dim, 256)
        self.layer2 = nn.Linear(256, 256)
        self.output = nn.Linear(256, action_dim)
        self.max_action = max_action

    def forward(self, state):
        x = torch.relu(self.layer1(state))
        x = torch.relu(self.layer2(x))
        action = torch.tanh(self.output(x)) * self.max_action
        return action


class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.layer1 = nn.Linear(state_dim + action_dim, 256)
        self.layer2 = nn.Linear(256, 256)
        self.output = nn.Linear(256, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        q_value = self.output(x)
        return q_value


# Experience replay buffer
class ReplayBuffer:
    def __init__(self, max_size=100000):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = random.sample(range(len(self.buffer)), batch_size)
        states, actions, rewards, next_states, dones = zip(*[self.buffer[idx] for idx in indices])
        return (
            torch.stack(states),
            torch.stack(actions),
            torch.tensor(rewards).unsqueeze(1),
            torch.stack(next_states),
            torch.tensor(dones).unsqueeze(1)
        )


# DDPG training loop
def ddpg_train(actor, critic, target_actor, target_critic, replay_buffer, actor_optimizer, critic_optimizer, batch_size, gamma, tau):
    # Sample a mini-batch from the replay buffer
    states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

    # Compute target Q value
    with torch.no_grad():
        target_actions = target_actor(next_states)
        target_q_values = rewards + (1 - dones.int()) * gamma * target_critic(next_states, target_actions)

    # Update Critic
    critic_loss = nn.MSELoss()(critic(states, actions), target_q_values)
    critic_optimizer.zero_grad()
    critic_loss.backward()
    critic_optimizer.step()

    # Update Actor
    actor_loss = -critic(states, actor(states)).mean()
    actor_optimizer.zero_grad()
    actor_loss.backward()
    actor_optimizer.step()

    # Soft update target networks
    for target_param, param in zip(target_actor.parameters(), actor.parameters()):
        target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
    for target_param, param in zip(target_critic.parameters(), critic.parameters()):
        target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)


# Environment setup (simulating the linearized inverted pendulum)
def simulate_pendulum(state, action, dt=0.02, m=0.2, M=1.0, L=0.5, g=9.81):
    """
    Simulate one step of the inverted pendulum dynamics with Euler integration.
    """
    def inverted_pendulum_linear(state, u, m, M, l, g):
        A = torch.tensor([
            [0, 1, 0, 0],
            [0, 0, (m * g) / M, 0],
            [0, 0, 0, 1],
            [0, 0, ((M + m) * g) / (M * l), 0]
        ])
        B = torch.tensor([0, 1 / M, 0, 1 / (M * l)]).unsqueeze(1)
        state_dot = torch.matmul(A, state) + torch.matmul(B, u)
        return state_dot

    state_dot = inverted_pendulum_linear(state.unsqueeze(1), action, m, M, L, g)
    next_state = state + state_dot.squeeze(1) * dt
    reward = -((next_state[0] ** 2) + (next_state[2] ** 2))  # Minimize position and angle deviations
    done = torch.abs(next_state[2]) > np.pi / 2  # Done if pendulum falls over
    return next_state, reward, done


# Hyperparameters and initialization
state_dim = 4
action_dim = 1
max_action = 5.0  # Maximum force
gamma = 0.99
tau = 0.005
batch_size = 64
replay_buffer = ReplayBuffer()

# Initialize actor, critic, target networks, and optimizers
actor = Actor(state_dim, action_dim, max_action)
critic = Critic(state_dim, action_dim)
target_actor = Actor(state_dim, action_dim, max_action)
target_critic = Critic(state_dim, action_dim)
target_actor.load_state_dict(actor.state_dict())
target_critic.load_state_dict(critic.state_dict())
actor_optimizer = optim.Adam(actor.parameters(), lr=1e-3)
critic_optimizer = optim.Adam(critic.parameters(), lr=1e-3)

# DDPG Training Loop
num_episodes = 500
for episode in range(num_episodes):
    state = torch.tensor([0.0, 0.0, 0.1, 0.0]).unsqueeze(0)  # Initial state with small angle
    episode_reward = 0

    for t in range(200):
        action = actor(state).detach() + torch.normal(0, 0.1, size=(1, action_dim))  # Add noise for exploration
        next_state, reward, done = simulate_pendulum(state.squeeze(0), action)
        
        replay_buffer.add((state.squeeze(0), action, reward, next_state, done))
        
        state = next_state.unsqueeze(0)
        episode_reward += reward.item()
        
        if done:
            break

        # Train the agent if enough samples are available
        if len(replay_buffer.buffer) > batch_size:
            ddpg_train(actor, critic, target_actor, target_critic, replay_buffer, actor_optimizer, critic_optimizer, batch_size, gamma, tau)

    print(f"Episode {episode + 1}, Reward: {episode_reward}")


Episode 1, Reward: -15.759847288019955


RuntimeError: Tensors must have same number of dimensions: got 2 and 3