In [None]:
import gymnasium as gym
import numpy as np
import os
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback

class EnhancedTrainingLogger(BaseCallback):
    def __init__(self, log_filepath="efficient_cheetah_ppo_training2.txt", save_dir="model_checkpoints", 
                 save_freq=50000, model_prefix="efficient_cheetah_ppo2"):
        super().__init__(verbose=0)
        self.log_filepath = log_filepath
        self.save_dir = save_dir
        self.save_freq = save_freq
        self.model_prefix = model_prefix
        self.episode_total_reward = 0
        self.control_costs = 0
        self.last_save = 0
        
        # Create save directory if it doesn't exist
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        
    def _on_step(self):
        # Save model at regular intervals
        if self.num_timesteps >= self.last_save + self.save_freq:
            try:
                # Save the model
                model_path = os.path.join(self.save_dir, 
                                        f"{self.model_prefix}_{self.num_timesteps}")
                self.model.save(model_path)
                print(f"Model saved at {model_path}")
                self.last_save = self.num_timesteps
            except Exception as e:
                print(f"Error saving model at timestep {self.num_timesteps}: {e}")
        
        # Log rewards and metrics
        current_reward = self.locals.get("rewards")[0] if self.locals.get("rewards") is not None else 0
        self.episode_total_reward += current_reward
        
        environment_info = self.locals.get("infos")[0] if self.locals.get("infos") is not None else {}
        episode_done = self.locals.get("dones")[0] if self.locals.get("dones") is not None else False
        
        # Track control costs
        if "reward_ctrl" in environment_info:
            self.control_costs += abs(environment_info["reward_ctrl"])
        
        if episode_done:
            distance_traveled = environment_info.get("x_position", 0)
            with open(self.log_filepath, "a") as log_file:
                log_file.write(f"{self.num_timesteps},{self.episode_total_reward:.4f},{distance_traveled:.4f},{self.control_costs:.4f}\n")
            self.episode_total_reward = 0
            self.control_costs = 0
        return True
    
    def _on_training_start(self):
        with open(self.log_filepath, "w") as log_file:
            log_file.write("timestep,reward,distance,control_cost\n")

class EfficientCheetahEnv(gym.Wrapper):
    def __init__(self):
        super().__init__(gym.make("HalfCheetah-v4"))
        
    def step(self, action):
        observation, original_reward, terminated, truncated, info = self.env.step(action)
        modified_reward = self.ideal_reward_function(observation, action, original_reward, info)
        return observation, modified_reward, terminated, truncated, info
    
    def ideal_reward_function(self, observation, action, original_reward, info):
        # 1. Maintain core elements of original reward (forward progress minus control costs)
        original_reward_weight = 1.0
        
        # 2. Efficient torso orientation - true torso angle from observation[1]
        torso_angle = observation[1]  # rooty - actual torso orientation
        # Slightly forward-leaning posture (about 0.2 rad) is efficient for running
        posture_reward = 0.3 * np.exp(-5 * (torso_angle - 0.2)**2)
        
        # 3. Coordinated leg movement - emulate galloping gait
        # Synchronize front and back leg patterns
        back_thigh_vel = observation[11]  # Angular velocity of back thigh
        front_thigh_vel = observation[14]  # Angular velocity of front thigh
        # Reward opposite movement (galloping pattern)
        gait_reward = 0.2 * np.exp(-2 * (back_thigh_vel + front_thigh_vel)**2)
        
        # 4. Energy efficiency - discourage wasteful actions
        # Penalize large action magnitudes while still allowing powerful movements when needed
        energy_efficiency = 0.1 * (1.0 - min(1.0, np.mean(np.abs(action))))
        
        # 5. Anti-inversion component - prevent flipping
        # Penalize when torso starts to flip upside down
        anti_inversion = -0.5 * min(0, np.cos(torso_angle))
        
        # 6. Foot ground contact reward - efficient push-off
        back_foot_angle = observation[4]   # bfoot angle
        front_foot_angle = observation[7]  # ffoot angle
        # Reward proper foot positioning for push-off
        stance_reward = 0.1 * (np.cos(back_foot_angle) + np.cos(front_foot_angle))
        
        # Combined reward
        reward = (posture_reward + gait_reward + energy_efficiency + 
                  anti_inversion + stance_reward + 
                  original_reward_weight * original_reward)
        
        return reward

def train_efficient_cheetah_with_ppo():
    try:
        # Setup environment
        print("Creating efficient cheetah environment...")
        cheetah_environment = EfficientCheetahEnv()

        # Initialize PPO model with default hyperparameters
        print("Initializing PPO model...")
        training_model = PPO(
            policy="MlpPolicy",
            env=cheetah_environment,
            verbose=1
        )

        # Setup logger with checkpoint saving capability
        print("Setting up training logger...")
        progress_logger = EnhancedTrainingLogger(
            log_filepath="efficient_cheetah_ppo_training2.txt",
            save_dir="model_checkpoints",
            save_freq=50000,
            model_prefix="efficient_cheetah_ppo2"
        )

        # Train the model
        print("Starting training for 1,000,000 timesteps...")
        training_model.learn(total_timesteps=1000000, callback=progress_logger)

        # Save the final model
        print("Saving final trained model...")
        training_model.save("efficient_cheetah_ppo_final2")

        print("Training completed successfully!")
        print(f"Training logs saved to {progress_logger.log_filepath}")
        print(f"Model checkpoints saved in {progress_logger.save_dir}")
        
    except Exception as e:
        print(f"Training error occurred: {str(e)}")
        print("Training was interrupted, but intermediate models should be saved.")

if __name__ == "__main__":
    train_efficient_cheetah_with_ppo()

Creating efficient cheetah environment...


  logger.deprecation(


Initializing PPO model...
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




Setting up training logger...
Starting training for 1,000,000 timesteps...
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | 341      |
| time/              |          |
|    fps             | 567      |
|    iterations      | 1        |
|    time_elapsed    | 3        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 1e+03       |
|    ep_rew_mean          | 356         |
| time/                   |             |
|    fps                  | 429         |
|    iterations           | 2           |
|    time_elapsed         | 9           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.011172911 |
|    clip_fraction        | 0.126       |
|    clip_range           | 0.2         |
|    entropy_loss         | -8.49      