# PPO Training for Piper Robot in Google Colab

This notebook sets up the environment and trains a PPO agent for the Piper robot arm grasping task using MuJoCo simulation.

In [None]:
# Install MuJoCo and other dependencies
!apt-get update
!apt-get install -y \
    libgl1-mesa-dev \
    libgl1-mesa-glx \
    libglew-dev \
    libosmesa6-dev \
    libegl1-mesa-dev \
    software-properties-common \
    patchelf \
    libglfw3-dev

# Install MuJoCo
!pip install mujoco

# Install other Python dependencies
!pip install torch>=2.0.0
!pip install numpy==1.26.4
!pip install scipy>=1.7.0
!pip install gymnasium==0.28.1
!pip install imageio
!pip install imageio[ffmpeg]
!pip install imageio[pyav]
!pip install tyro>=0.5.0
!pip install tqdm>=4.60.0

In [None]:
# Clone the repository
# Replace with your actual GitHub repository URL
!git clone https://github.com/wzzzzq/mujoco_sim2real.git
%cd mujoco_sim2real

# Verify the clone was successful
!ls -la

In [None]:
# Mount Google Drive to save models and logs
from google.colab import drive
drive.mount('/content/drive')

# Create directories for saving models
!mkdir -p /content/drive/MyDrive/ppo_training_runs

In [None]:
# Import required libraries
import os
import sys
import torch
import numpy as np
import gymnasium as gym

# Set MuJoCo to use EGL for headless rendering (required for Colab)
os.environ['MUJOCO_GL'] = 'egl'

# Add the project directory to Python path
# Adjust this path based on where you uploaded/extracted your files
project_path = '/content/mujoco_sim2real'  # or your actual path
if os.path.exists(project_path):
    sys.path.append(project_path)
    os.chdir(project_path)
else:
    print(f"Project path {project_path} not found. Please adjust the path.")

from single_piper_on_desk_env import PiperEnv
from ppo_rgb import PPOArgs, train
import tyro

# Check if GPU is available
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

# Test the environment
print("Testing PiperEnv...")
try:
    env = PiperEnv(render_mode=None)  # Disable rendering to avoid GLFW issues in headless environments
    obs, info = env.reset()
    print(f"Observation space: {env.observation_space}")
    print(f"Action space: {env.action_space}")
    print(f"Initial observation keys: {obs.keys()}")
    env.close()
    print("Environment test successful!")
except Exception as e:
    print(f"Environment test failed: {e}")
    print("Please check that all required files are uploaded and paths are correct.")

In [None]:
# Configure PPO training arguments
ppo_args = PPOArgs()

# Colab-friendly settings (adjust based on your needs)
ppo_args.total_timesteps = 1000000  # Shorter for Colab demo, increase for real training
ppo_args.num_envs = 4  # Fewer environments for Colab
ppo_args.num_eval_envs = 2
ppo_args.num_steps = 80
ppo_args.num_minibatches = 4
ppo_args.learning_rate = 3e-4
ppo_args.track = False  # Disable wandb tracking
ppo_args.save_model = True
ppo_args.cuda = torch.cuda.is_available()  # Use GPU if available

# Optional: Set random seed for reproducibility
ppo_args.seed = 42
ppo_args.torch_deterministic = True

print("PPO Configuration:")
print(f"Total timesteps: {ppo_args.total_timesteps}")
print(f"Number of environments: {ppo_args.num_envs}")
print(f"Learning rate: {ppo_args.learning_rate}")
print(f"Using CUDA: {ppo_args.cuda}")

# Start training
print("Starting PPO training...")
try:
    train(args=ppo_args)
    print("Training completed successfully!")
except Exception as e:
    print(f"Training failed with error: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Test the trained policy
print("Testing the trained policy...")

# Find the latest checkpoint
import glob
checkpoint_files = glob.glob("runs/*/final_ckpt.pt") + glob.glob("runs/*/best_ckpt.pt")
if checkpoint_files:
    latest_checkpoint = max(checkpoint_files, key=os.path.getctime)
    print(f"Found checkpoint: {latest_checkpoint}")

    # Set up evaluation arguments
    eval_args = PPOArgs()
    eval_args.evaluate = True
    eval_args.checkpoint = latest_checkpoint
    eval_args.num_envs = 1  # Single environment for evaluation
    eval_args.num_eval_envs = 1
    eval_args.render_mode = "human"  # Enable rendering for visualization
    eval_args.cuda = torch.cuda.is_available()

    print("Starting evaluation...")
    try:
        train(args=eval_args)
        print("Evaluation completed!")
    except Exception as e:
        print(f"Evaluation failed: {e}")
        import traceback
        traceback.print_exc()
else:
    print("No checkpoint files found. Please check the runs/ directory.")

# Alternative: Manual testing with a few episodes
print("\nManual testing with a few episodes...")
try:
    from ppo_rgb import Agent, NatureCNN
    import torch

    # Load the trained model
    if checkpoint_files:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Create environment and agent
        env = PiperEnv(render_mode=None)  # No rendering for faster testing
        sample_obs, _ = env.reset()

        # Convert sample observation for agent initialization
        converted_obs = {}
        for key, value in sample_obs.items():
            if key in ["rgb", "wrist_cam"]:
                converted_obs[key] = torch.tensor(value, dtype=torch.uint8, device=device)
            elif key == "state":
                converted_obs[key] = torch.tensor(value, dtype=torch.float32, device=device)

        agent = Agent(env, sample_obs=converted_obs)
        agent.load_state_dict(torch.load(latest_checkpoint, map_location=device))
        agent.eval()

        # Test for a few episodes
        num_test_episodes = 3
        for episode in range(num_test_episodes):
            obs, _ = env.reset()
            done = False
            episode_reward = 0
            step_count = 0

            while not done and step_count < 200:  # Limit steps per episode
                # Convert observation
                converted_obs = {}
                for key, value in obs.items():
                    if key in ["rgb", "wrist_cam"]:
                        converted_obs[key] = torch.tensor(value, dtype=torch.uint8, device=device).unsqueeze(0)
                    elif key == "state":
                        converted_obs[key] = torch.tensor(value, dtype=torch.float32, device=device).unsqueeze(0)

                with torch.no_grad():
                    action = agent.get_action(converted_obs, deterministic=True)
                    action = action.squeeze(0).cpu().numpy()

                obs, reward, terminated, truncated, info = env.step(action)
                episode_reward += reward
                step_count += 1
                done = terminated or truncated

            print(f"Episode {episode + 1}: Reward = {episode_reward:.3f}, Steps = {step_count}")

        env.close()
        print("Manual testing completed!")
    else:
        print("No checkpoint available for manual testing.")

except Exception as e:
    print(f"Manual testing failed: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# After training, you can evaluate the trained model
print("Training completed! Here are some next steps:")

# 1. Check the saved models
!ls -la runs/

# 2. Copy models to Google Drive
!cp -r runs/ /content/drive/MyDrive/ppo_training_runs/

print("\nDon't forget to:")
print("- Download your trained models from Google Drive")
print("- Adjust hyperparameters for better performance")
print("- Consider using more timesteps for real training")