In [1]:
!pip install glfw
!pip install mujoco

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [2]:
import numpy as np
if not hasattr(np, 'bool8'):
    np.bool8 = np.bool_

  if not hasattr(np, 'bool8'):


In [4]:
import gym
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
import matplotlib.pyplot as plt

# ------------------------------
# 1. GPU Configuration
# ------------------------------

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ------------------------------
# 2. Define the PPO components for continuous action spaces
# ------------------------------

class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(ActorCritic, self).__init__()
        # Shared network layers
        self.shared = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU()
        )
        
        # Actor (policy) network
        self.mean = nn.Linear(256, action_dim)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
        
        # # Critic (value) network
        # self.critic = nn.Linear(256, 1)

        # Critic (value) network - rename to value_layer
        self.value_layer = nn.Linear(256, 1)  # Renamed from "critic"
        
        # Action scale
        self.max_action = max_action
        
    def forward(self):
        # Not used directly
        raise NotImplementedError
    
    def actor(self, state):
        x = self.shared(state)
        mean = self.mean(x)
        log_std = self.log_std.expand_as(mean)
        # Clamp log_std to prevent very small or large values
        log_std = torch.clamp(log_std, -20, 2)
        std = torch.exp(log_std)
        
        return mean, std
        
    def critic(self, state):
        x = self.shared(state)
        value = self.value_layer(x)  # Use the renamed layer
        #value = self.critic(x)
        return value
    
    def get_action(self, state, deterministic=False):
        mean, std = self.actor(state)
        
        if deterministic:
            return self.max_action * torch.tanh(mean)
        
        dist = Normal(mean, std)
        action = dist.sample()
        # Tanh squashing for bounded actions
        action = self.max_action * torch.tanh(action)
        
        # Calculate log probability for the sampled action
        # We need to account for the tanh transformation when computing log probs
        log_prob = dist.log_prob(action).sum(dim=-1, keepdim=True)
        
        # Apply tanh to keep actions within bounds
        return action, log_prob
    
    def evaluate_actions(self, state, action):
        mean, std = self.actor(state)
        dist = Normal(mean, std)
        
        # Get log probability
        log_probs = dist.log_prob(action).sum(dim=-1, keepdim=True)
        
        # Get entropy for exploration
        entropy = dist.entropy().mean()
        
        # Get state value
        value = self.critic(state)
        
        return log_probs, entropy, value

# Simple buffer for PPO
class PPOBuffer:
    def __init__(self, capacity, state_dim, action_dim):
        self.states = np.zeros((capacity, state_dim), dtype=np.float32)
        self.actions = np.zeros((capacity, action_dim), dtype=np.float32)
        self.rewards = np.zeros((capacity, 1), dtype=np.float32)
        self.next_states = np.zeros((capacity, state_dim), dtype=np.float32)
        self.dones = np.zeros((capacity, 1), dtype=np.float32)
        self.log_probs = np.zeros((capacity, 1), dtype=np.float32)
        self.values = np.zeros((capacity, 1), dtype=np.float32)
        
        self.idx = 0
        self.size = 0
        self.capacity = capacity
    
    def store(self, state, action, reward, next_state, done, log_prob, value):
        self.states[self.idx] = state
        self.actions[self.idx] = action
        self.rewards[self.idx] = reward
        self.next_states[self.idx] = next_state
        self.dones[self.idx] = done
        self.log_probs[self.idx] = log_prob
        self.values[self.idx] = value
        
        self.idx = (self.idx + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)
    
    def get_all_data(self):
        return (
            self.states[:self.size],
            self.actions[:self.size],
            self.rewards[:self.size],
            self.next_states[:self.size],
            self.dones[:self.size],
            self.log_probs[:self.size],
            self.values[:self.size]
        )
    
    def clear(self):
        self.idx = 0
        self.size = 0

# -----------------------------------
# 3. Define wrappers for uncertainties (unchanged)
# -----------------------------------

# 3.1 Sensor/Observation Noise: add Gaussian noise to observations
class SensorNoiseWrapper(gym.ObservationWrapper):
    def __init__(self, env, noise_std=0.01):
        super(SensorNoiseWrapper, self).__init__(env)
        self.noise_std = noise_std

    def observation(self, observation):
        noise = np.random.normal(0, self.noise_std, size=observation.shape)
        return observation + noise

# 3.2 Motor Noise: add random noise to actions (torque outputs)
class MotorNoiseWrapper(gym.ActionWrapper):
    def __init__(self, env, noise_std=0.05):
        super(MotorNoiseWrapper, self).__init__(env)
        self.noise_std = noise_std

    def action(self, action):
        noise = np.random.normal(0, self.noise_std, size=action.shape)
        return np.clip(action + noise, self.action_space.low, self.action_space.high)

# 3.3 Leg Mass or Joint Stiffness Variation
class MassVariabilityWrapper(gym.Wrapper):
    def __init__(self, env, mass_variation_range=(0.8, 1.2)):
        super(MassVariabilityWrapper, self).__init__(env)
        self.mass_variation_range = mass_variation_range
        self.original_body_mass = None

    def reset(self, **kwargs):
        observation = self.env.reset(**kwargs)
        if isinstance(observation, tuple):
            observation = observation[0]  # Handle new env reset API
        
        # Store original masses if not stored already
        if self.original_body_mass is None and hasattr(self.env.unwrapped, 'model'):
            self.original_body_mass = self.env.unwrapped.model.body_mass.copy()
        
        # Apply random mass variations if model exists
        if hasattr(self.env.unwrapped, 'model') and self.original_body_mass is not None:
            # Focus on leg masses (depending on the specific model structure)
            # Indices 4-8 typically correspond to the leg parts in HalfCheetah
            leg_indices = range(4, 9)  # Adjust based on actual model
            
            for idx in leg_indices:
                if idx < len(self.original_body_mass):
                    variation = np.random.uniform(*self.mass_variation_range)
                    self.env.unwrapped.model.body_mass[idx] = self.original_body_mass[idx] * variation
        
        return observation

# 3.4 Random Drag or Terrain Resistance
class TerrainResistanceWrapper(gym.Wrapper):
    def __init__(self, env, drag_range=(0.0, 0.3)):
        super(TerrainResistanceWrapper, self).__init__(env)
        self.drag_range = drag_range
        self.current_drag = 0.0

    def reset(self, **kwargs):
        observation = self.env.reset(**kwargs)
        if isinstance(observation, tuple):
            observation = observation[0]  # Handle new env reset API
        
        # Set a new random drag coefficient for this episode
        self.current_drag = np.random.uniform(*self.drag_range)
        return observation

    def step(self, action):
        result = self.env.step(action)
        
        # Handle both the new_step_api (5 values) and old (4 values) cases
        if len(result) == 5:
            obs, reward, done, truncated, info = result
            done = done or truncated
        else:
            obs, reward, done, info = result
        
        # Apply drag by modifying velocity components if we can access them
        if hasattr(self.env.unwrapped, 'sim'):
            # Get current velocities
            qvel = self.env.unwrapped.sim.data.qvel.copy()
            
            # Apply drag to horizontal velocity (typically the first velocity component)
            if len(qvel) > 0:
                qvel[0] *= (1.0 - self.current_drag)
                
                # Update velocities in the simulation
                self.env.unwrapped.sim.data.qvel[:] = qvel
        
        if len(result) == 5:
            return obs, reward, done, truncated, info
        else:
            return obs, reward, done, info

# 3.5 External Force Pulses
class ExternalForceWrapper(gym.Wrapper):
    def __init__(self, env, force_magnitude_range=(-100.0, 100.0), pulse_probability=0.05):
        super(ExternalForceWrapper, self).__init__(env)
        self.force_magnitude_range = force_magnitude_range
        self.pulse_probability = pulse_probability

    def step(self, action):
        result = self.env.step(action)
        
        # Handle both return formats
        if len(result) == 5:
            obs, reward, done, truncated, info = result
            done = done or truncated
        else:
            obs, reward, done, info = result
        
        # Randomly apply force pulses
        if np.random.rand() < self.pulse_probability and hasattr(self.env.unwrapped, 'sim'):
            # Choose a random force magnitude
            force_magnitude = np.random.uniform(*self.force_magnitude_range)
            
            # Apply the force to the torso (typically body index 1)
            torso_idx = 1  # Adjust based on actual model
            if hasattr(self.env.unwrapped.sim, 'data'):
                # Create force vector [x, y, z] - mainly in x direction (forward/backward)
                force = np.array([force_magnitude, 0.0, 0.0])
                
                # Apply the force if the method exists
                if hasattr(self.env.unwrapped.sim, 'apply_force'):
                    self.env.unwrapped.sim.apply_force(force, torso_idx)
        
        if len(result) == 5:
            return obs, reward, done, truncated, info
        else:
            return obs, reward, done, info

# 3.6 Variable Contact Softness
class VariableContactWrapper(gym.Wrapper):
    def __init__(self, env, stiffness_range=(1.0, 10.0), damping_range=(0.1, 1.0)):
        super(VariableContactWrapper, self).__init__(env)
        self.stiffness_range = stiffness_range
        self.damping_range = damping_range
        self.original_stiffness = None
        self.original_damping = None

    def reset(self, **kwargs):
        observation = self.env.reset(**kwargs)
        if isinstance(observation, tuple):
            observation = observation[0]  # Handle new env reset API
        
        # Store original parameters if not stored already
        if self.original_stiffness is None and hasattr(self.env.unwrapped, 'model'):
            if hasattr(self.env.unwrapped.model, 'geom_kp'):
                self.original_stiffness = self.env.unwrapped.model.geom_kp.copy()
            if hasattr(self.env.unwrapped.model, 'geom_kd'):
                self.original_damping = self.env.unwrapped.model.geom_kd.copy()
        
        # Apply random contact parameters
        if hasattr(self.env.unwrapped, 'model'):
            # Adjust contact stiffness
            if hasattr(self.env.unwrapped.model, 'geom_kp') and self.original_stiffness is not None:
                stiffness_factor = np.random.uniform(*self.stiffness_range)
                # Apply to ground contact geoms (usually the first few indices)
                ground_indices = [0]  # Typically the first geom is the ground
                for idx in ground_indices:
                    if idx < len(self.original_stiffness):
                        self.env.unwrapped.model.geom_kp[idx] = self.original_stiffness[idx] * stiffness_factor
            
            # Adjust contact damping
            if hasattr(self.env.unwrapped.model, 'geom_kd') and self.original_damping is not None:
                damping_factor = np.random.uniform(*self.damping_range)
                for idx in ground_indices:
                    if idx < len(self.original_damping):
                        self.env.unwrapped.model.geom_kd[idx] = self.original_damping[idx] * damping_factor
        
        return observation

# 3.7 Unified Aleatoric Disruption
class DisruptionWrapper(gym.Wrapper):
    def __init__(self, env, lambda_rate=0.1, intensity_scale=0.05):
        super(DisruptionWrapper, self).__init__(env)
        self.lambda_rate = lambda_rate
        # Calculate intensity vector proportional to observation space shape
        state_dim = env.observation_space.shape[0]
        self.intensity_vector = np.ones(state_dim) * intensity_scale

    def reset(self, **kwargs):
        observation = self.env.reset(**kwargs)
        if isinstance(observation, tuple):
            observation = observation[0]  # Handle new env reset API
        return observation

    def step(self, action):
        result = self.env.step(action)
        if len(result) == 5:
            obs, reward, done, truncated, info = result
            done = done or truncated
        else:
            obs, reward, done, info = result

        # Apply disruption with probability λ
        if np.random.rand() < self.lambda_rate:
            noise = np.random.uniform(-self.intensity_vector, self.intensity_vector)
            obs = obs + noise

        if len(result) == 5:
            return obs, reward, done, truncated, info
        else:
            return obs, reward, done, info

# -----------------------------------------
# 4. Training function for the PPO agent with GPU support
# -----------------------------------------

def train_ppo(env_fn, episodes=200, steps_per_update=2048, epochs=10, batch_size=64, gamma=0.99, 
              lr=3e-4, eps_clip=0.2, value_coef=0.5, entropy_coef=0.01):
    env = env_fn()
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]
    max_action = float(env.action_space.high[0])
    
    # Initialize actor-critic network and move to GPU
    model = ActorCritic(state_dim, action_dim, max_action).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # Initialize buffer
    buffer = PPOBuffer(steps_per_update, state_dim, action_dim)
    
    rewards_history = []
    running_reward = 0
    
    # Start training
    total_steps = 0
    episodes_completed = 0
    
    while episodes_completed < episodes:
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]  # Handle new env reset API
        
        episode_reward = 0
        done = False
        
        # Episode loop
        while not done:
            # Convert state to tensor
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
            
            # Select action
            with torch.no_grad():
                action, log_prob = model.get_action(state_tensor)
                value = model.critic(state_tensor)
            
            # Convert to numpy and take step
            action_np = action.cpu().numpy().flatten()
            
            # Take action in environment
            step_result = env.step(action_np)
            if len(step_result) == 5:
                next_state, reward, done, truncated, _ = step_result
                done = done or truncated
            else:
                next_state, reward, done, _ = step_result
            
            # Store transition in buffer
            buffer.store(
                state, 
                action_np, 
                reward, 
                next_state, 
                float(done), 
                log_prob.cpu().numpy()[0], 
                value.cpu().numpy()[0]
            )
            
            state = next_state
            episode_reward += reward
            total_steps += 1
            
            # Update if buffer is full
            if buffer.size == steps_per_update:
                # Get all data from buffer
                states, actions, rewards, next_states, dones, old_log_probs, old_values = buffer.get_all_data()
                
                # Convert to tensors and move to device
                states = torch.FloatTensor(states).to(device)
                actions = torch.FloatTensor(actions).to(device)
                old_log_probs = torch.FloatTensor(old_log_probs).to(device)
                old_values = torch.FloatTensor(old_values).to(device)
                
                # Compute returns and advantages
                returns = compute_gae(
                    rewards, 
                    dones, 
                    old_values, 
                    next_states, 
                    model, 
                    gamma=gamma, 
                    lambda_gae=0.95
                )
                returns = torch.FloatTensor(returns).to(device)
                
                # Normalize returns (optional but helps with training)
                advantages = returns - old_values
                advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
                
                # Update policy for multiple epochs
                for _ in range(epochs):
                    # Generate random indices
                    indices = torch.randperm(steps_per_update)
                    
                    # Create mini-batches
                    for start_idx in range(0, steps_per_update, batch_size):
                        # Get mini-batch indices
                        idx = indices[start_idx:min(start_idx + batch_size, steps_per_update)]
                        
                        # Get mini-batch data
                        mb_states = states[idx]
                        mb_actions = actions[idx]
                        mb_old_log_probs = old_log_probs[idx]
                        mb_returns = returns[idx]
                        mb_advantages = advantages[idx]
                        
                        # Get current log probs and values
                        new_log_probs, entropy, values = model.evaluate_actions(mb_states, mb_actions)
                        
                        # Calculate ratio (π_θ / π_θ_old)
                        ratio = torch.exp(new_log_probs - mb_old_log_probs)
                        
                        # PPO update
                        surr1 = ratio * mb_advantages
                        surr2 = torch.clamp(ratio, 1.0 - eps_clip, 1.0 + eps_clip) * mb_advantages
                        
                        # Calculate actor loss
                        actor_loss = -torch.min(surr1, surr2).mean()
                        
                        # Calculate critic loss
                        critic_loss = nn.MSELoss()(values, mb_returns)
                        
                        # Calculate total loss
                        loss = actor_loss + value_coef * critic_loss - entropy_coef * entropy
                        
                        # Update network
                        optimizer.zero_grad()
                        loss.backward()
                        # Optional: clip gradients for stability
                        nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
                        optimizer.step()
                
                # Clear buffer after update
                buffer.clear()
        
        # End of episode processing
        episodes_completed += 1
        rewards_history.append(episode_reward)
        
        # Exponential moving average for smoother reporting
        running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward if episodes_completed > 1 else episode_reward
        
        if episodes_completed % 10 == 0:
            print(f"Episode {episodes_completed}, Reward: {episode_reward:.2f}, Avg Reward: {running_reward:.2f}")
    
    env.close()
    return rewards_history

# Helper function for PPO's Generalized Advantage Estimation
def compute_gae(rewards, dones, values, next_states, model, gamma=0.99, lambda_gae=0.95):
    returns = []
    gae = 0
    
    # Move the model to evaluation mode for inference
    with torch.no_grad():
        # Get next state values for the last state
        next_state = torch.FloatTensor(next_states[-1]).unsqueeze(0).to(device)
        next_value = model.critic(next_state).cpu().numpy()[0, 0]
    
    # Append next value to values for easier computation
    values = np.append(values, next_value)
    
    # Compute returns with GAE
    for step in reversed(range(len(rewards))):
        if step == len(rewards) - 1:
            next_non_terminal = 1.0 - dones[step]
            next_return = next_value
        else:
            next_non_terminal = 1.0 - dones[step]
            next_return = returns[0]
        
        # Calculate delta and GAE
        delta = rewards[step] + gamma * values[step + 1] * next_non_terminal - values[step]
        gae = delta + gamma * lambda_gae * next_non_terminal * gae
        
        # Insert at the beginning (as we're going backwards)
        returns.insert(0, gae + values[step])
    
    return np.array(returns)

# ------------------------------------------------------
# 5. Create environment functions for each uncertainty (unchanged)
# ------------------------------------------------------

def make_base_env():
    # Base environment (no uncertainty)
    env = gym.make('HalfCheetah-v4')
    return env

def make_sensor_noise_env():
    env = gym.make('HalfCheetah-v4')
    env = SensorNoiseWrapper(env, noise_std=0.01)
    return env

def make_motor_noise_env():
    env = gym.make('HalfCheetah-v4')
    env = MotorNoiseWrapper(env, noise_std=0.05)
    return env

def make_mass_variability_env():
    env = gym.make('HalfCheetah-v4')
    env = MassVariabilityWrapper(env, mass_variation_range=(0.8, 1.2))
    return env

def make_terrain_resistance_env():
    env = gym.make('HalfCheetah-v4')
    env = TerrainResistanceWrapper(env, drag_range=(0.0, 0.3))
    return env

def make_external_force_env():
    env = gym.make('HalfCheetah-v4')
    env = ExternalForceWrapper(env, force_magnitude_range=(-100.0, 100.0), pulse_probability=0.05)
    return env

def make_variable_contact_env():
    env = gym.make('HalfCheetah-v4')
    env = VariableContactWrapper(env, stiffness_range=(1.0, 10.0), damping_range=(0.1, 1.0))
    return env

def make_disruption_env():
    env = gym.make('HalfCheetah-v4')
    env = DisruptionWrapper(env, lambda_rate=0.1, intensity_scale=0.05)
    return env

# -------------------------------------------------
# 6. Run experiments and collect performance metrics
# -------------------------------------------------

# Define the uncertainty experiments
experiments = {
    "Base": make_base_env,
    "Sensor Noise": make_sensor_noise_env,
    "Motor Noise": make_motor_noise_env,
    "Mass Variability": make_mass_variability_env,
    "Terrain Resistance": make_terrain_resistance_env,
    "External Force": make_external_force_env, 
    "Variable Contact": make_variable_contact_env,
    "Disruption Model": make_disruption_env
}

results = {}
episodes = 300  # Adjust as needed for HalfCheetah

for key, env_fn in experiments.items():
    print(f"\n{'='*50}")
    print(f"Training with {key} uncertainty:")
    print(f"{'='*50}")
    rewards = train_ppo(env_fn, episodes=episodes)
    results[key] = rewards
    
    # Save intermediate results after each experiment (in case of crash)
    np.save(f"rewards_ppo_{key.replace(' ', '_').lower()}.npy", np.array(rewards))

# --------------------------------------
# 7. Plot the training performance curves with improved visualization
# --------------------------------------

plt.figure(figsize=(12, 8))

# Apply smoothing for clearer visualization
window_size = 20
colors = plt.cm.viridis(np.linspace(0, 1, len(experiments)))

for i, (key, rewards) in enumerate(results.items()):
    # Compute rolling average for smoother curves
    smoothed_rewards = np.convolve(rewards, np.ones(window_size)/window_size, mode='valid')
    
    # Plot both raw (light) and smoothed (dark) curves
    plt.plot(rewards, alpha=0.3, color=colors[i])
    plt.plot(range(window_size-1, len(rewards)), smoothed_rewards, 
             label=key, linewidth=2, color=colors[i])

plt.xlabel("Episode", fontsize=14)
plt.ylabel("Reward", fontsize=14)
plt.title("PPO Performance with Different Aleatoric Uncertainties in HalfCheetah", fontsize=16)
plt.legend(fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()

# Save the final plot
plt.savefig("halfcheetah_ppo_uncertainty_performance.png", dpi=300)
plt.show()

# Also save all results together
np.save("ppo_all_results.npy", results)

Using device: cuda

Training with Base uncertainty:


  if not isinstance(terminated, (bool, np.bool8)):


TypeError: can't convert cuda:0 device type tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.