# Inverted Pendulum Control Challenge

- Observation Space: Front-facing camera images of the cart-pendulum system (RGB images)
- Control Space: Scalar acceleration applied to the cart (measured in m/s²)
- Expert Data: Trajectories of expert demonstrations, each containing:
    - 5 seconds of data
    - Sampled at 10Hz (every 0.1s)
    - Each sample is a pair of (image, control input)
    - Total of 50 pairs per trajectory
    - Number of Expert Trajectories: 100

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import os
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from env import PendulumEnv
import time

# Expert Dataset Class
This is already implemented for you.

You can modify the transforms being applied to the images

In [None]:
# Loading the expert data
class PendulumDataset(Dataset):
    def __init__(self, data_dir: str, transform=None):
        """
        Args:
            data_dir (str): Directory containing the expert trajectories
            transform: Optional transforms to apply to the images
        """
        self.data_dir = data_dir
        self.transform = transform
        self.samples = []

        # Load all trajectory files
        trajectory_files = [
            f
            for f in os.listdir(data_dir)
            if f.startswith("trajectory_") and f.endswith(".npy")
        ]

        for traj_file in trajectory_files:
            # Load the trajectory data (images and control inputs)
            traj_path = os.path.join(data_dir, traj_file)
            traj_data = np.load(traj_path, allow_pickle=True)

            # Each trajectory contains 50 (image, control) pairs
            for i in range(len(traj_data)):
                img_path = traj_data[i]["image_path"]
                control = traj_data[i]["control"]
                self.samples.append((img_path, control))

    def __len__(self):
        """Return number of data points"""
        return len(self.samples)

    def __getitem__(self, idx):
        """Get the idx-th data point"""
        img_path, control = self.samples[idx]

        # Load and convert image
        image = Image.open(img_path).convert("RGB")
        # Apply transforms if available
        if self.transform:
            image = self.transform(image)

        # Convert control to tensor
        control = torch.tensor(control, dtype=torch.float32)

        return image, control

# The pendulum controller
This controller should take in the image input and output the force (scalar) to be applied to the cart to keep the pendulum upright.

You will need to implement an architecture for your controller and implement its forward pass

In [None]:
# The model for the controller
class PendulumController(nn.Module):
    def __init__(self):
        super(PendulumController, self).__init__()
        # TODO: Add your layers here

    def forward(self, x):
        # TODO: Implement forward pass here
        # Returning dummy control here, for now
        return x.flatten()[0]

# Training loop
This function implements the main training loop. 

You will need to implement this function

In [None]:
# Main training loop
def train_controller(model: PendulumController, data_loader: DataLoader, num_epochs: int, learning_rate:float):
    """
    Train the pendulum controller model

    Args:
        model: The neural network model
        data_loader: DataLoader containing the training data
        num_epochs: Number of training epochs
        learning_rate: Learning rate for the optimizer

    Returns:
        Lists of training loss values
    """
    # TODO: Implement the training loop here
    train_losses = [np.inf for _ in range(num_epochs)]
    return train_losses

# Evaluation
This function computes the metric that you want to optimize on a given dataset.

This is optional, but feel free to implement a metric here that you can use for evaluation

In [None]:
# Compute error on specified dataset
def evaluate_controller(model: PendulumController, data_loader: DataLoader):
    """
    Evaluate the trained controller on a test set

    Args:
        model: The trained neural network model
        data_loader: DataLoader containing the test data

    Returns:
        Error on the test set
    """
    # TODO(optional): Implement the evaluation function
    avg_loss = np.inf
    return avg_loss

# Main block
This block of code combines all the components to train a controller, and optionally evaluate it.

In [None]:
# Main function to train a model and evaluate it
# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# TODO: Add any image transformations as needed
transform = transforms.Compose([
    transforms.ToTensor(),
])

# Create datasets
data_dir = './pendulum_data'
dataset = PendulumDataset(data_dir=data_dir, transform=transform)

# Split into train and test sets (80/20 split)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Initialize the model
model = PendulumController()

# Train the model
train_losses = train_controller(model, train_loader, num_epochs=20, learning_rate=1e-3)

# Plot training loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses)
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.grid(True)
plt.show()

# Evaluate on test set
test_loss = evaluate_controller(model, test_loader)
print(f"Loss on test set is {test_loss}")

# Save the model
torch.save(model.state_dict(), 'pendulum_controller.pth')

# Test the learned controller online
The code blocks below use the trained controller to control the pendulum online in the environment.

In [None]:
# Test the learned policy in simulation
def test_controller_in_sim(model, num_episodes=5, max_steps=300):
    """
    Test the trained controller in the simulation environment

    Args:
        model: Trained PendulumController model
        num_episodes: Number of episodes to run
        max_steps: Maximum steps per episode
    """
    # Load the trained model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()

    # Create the environment
    env = PendulumEnv()

    # TODO: Define the same transform used during training
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])

    for episode in range(num_episodes):
        print(f"Episode {episode+1}/{num_episodes}", flush=True)

        # Reset environment
        observation = env.reset()
        failed = False
        for step in range(max_steps):
            # Process the observation
            img_tensor = transform(observation).unsqueeze(0).to(device)

            # Get action from policy
            with torch.no_grad():
                action = model(img_tensor).item()

            # Take a step in the environment
            observation, done = env.step(action)
            env.render()

            if done:
                failed = True
                print(f"Episode ended after {step+1} steps", flush=True)
                break

        if failed:
            print(f"Episode {episode+1} failed", flush=True)
        else:
            print(f"Episode {episode+1} passed", flush=True)
        time.sleep(1)  # Pause between episodes

    env.close()

In [None]:
# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)
# Load the trained model
model = PendulumController()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if os.path.exists("pendulum_controller.pth"):
    model.load_state_dict(torch.load('pendulum_controller.pth', map_location=device))

# Test in simulation
test_controller_in_sim(model)