# LeLamp with Gymnasium and Mujoco

## Dependencies

In [10]:
%env MUJOCO_GL=egl

env: MUJOCO_GL=egl


In [11]:
import mujoco
import numpy as np

from gymnasium import utils
from gymnasium.envs.mujoco import MujocoEnv
from gymnasium.spaces import Box

## Configurations

In [12]:
DEFAULT_CAMERA_CONFIG = {
    "trackbodyid": -1,
    "distance": 4.0,
}

## Environment

In [25]:
MODEL_PATH = "../models/lelamp/scene.xml"

class LeLampEnv(MujocoEnv, utils.EzPickle):
    metadata = {
        "render_modes": [
            "human",
            "rgb_array",
            "depth_array",
            "rgbd_tuple"
        ],
        "render_fps": 20,
    }

    def __init__(
            self,
            forward_reward_weight=1.0,
            healthy_reward=5.0,
            terminate_when_unhealthy=True,
            healthy_z_range=(0.2, 1.0),
            reset_noise_scale=1e-2,
            **kwargs
    ):
        # Store reward parameters
        self._forward_reward_weight = forward_reward_weight
        self._healthy_reward = healthy_reward
        self._terminate_when_unhealthy = terminate_when_unhealthy
        self._healthy_z_range = healthy_z_range
        self._reset_noise_scale = reset_noise_scale

        # Initialize EzPickle
        utils.EzPickle.__init__(
            self,
            forward_reward_weight=forward_reward_weight,
            healthy_reward=healthy_reward,
            terminate_when_unhealthy=terminate_when_unhealthy,
            healthy_z_range=healthy_z_range,
            reset_noise_scale=reset_noise_scale,
            **kwargs
        )

        # Init the observation space
        obs_size = 5 + 5 + 6
        self.observation_space = Box(
            low=-np.inf, high=np.inf, shape=(obs_size,), dtype=np.float32
        )

        # Initialize the Mujoco environment
        MujocoEnv.__init__(
            self,
            MODEL_PATH,
            5,
            observation_space=self.observation_space,
            **kwargs
        )

        # Store previous COM position for velocity calculation
        self._prev_com = None

    def _calculate_observation_size(self):
        n_joints = self.model.nq - 7
        n_vel = self.model.nv - 6
        n_sensors = self.model.n_sensor

        return n_joints + n_vel + n_sensors

    def _get_obs(self):
        """Get current observation."""
        # Joint positions (excluding free joint if present)
        # You may need to adjust the slicing based on your model
        position = self.data.qpos[7:].copy()

        # Joint velocities (excluding free joint if present)  
        velocities = self.data.qvel[6:].copy()

        # Sensor data
        sensor_data = self.data.sensordata[:6].copy()

        return np.concatenate([position, velocities, sensor_data]).astype(np.float32)
    
    def reset_model(self):
        """Reset the model to a random state."""
        # Add noise to initial positions and velocities
        noise_low = -self._reset_noise_scale
        noise_high = self._reset_noise_scale
        
        # Add noise to initial joint positions
        qpos = self.init_qpos + self.np_random.uniform(
            low=noise_low,
            high=noise_high,
            size=self.model.nq,
        )

        # Add noise to initial joint velocities  
        qvel = self.np_random.uniform(
            low=noise_low,
            high=noise_high,
            size=self.model.nv,
        )

        # Set the state
        self.set_state(qpos, qvel)

        # Reset previous COM tracking
        self._prev_com = None

        return self._get_obs()
    
    def step(self, action):
        """Perform a step in the environment."""

        # Get body ID for the lamp
        body_id = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_BODY, "dc15_a01_dummy_assy_idle_asm")

        # Store COM
        com_before = self.data.subtree_com[body_id].copy()

        # Perform the action
        self.do_simulation(action, self.frame_skip)

        # Get the new COM position
        com_after = self.data.subtree_com[body_id].copy()

        # Calculate velocity (COM change over time)
        velocity = (com_after - com_before) / self.dt
        
        # Forward reward
        forward_reward = self._forward_reward_weight * velocity[0]

        # Healthy reward
        lamp_head_id = mujoco.mj_name2id(
            self.model,
            mujoco.mjtObj.mjOBJ_BODY,
            "lamp_head"
        )
        lamp_head_pos = self.data.xpos[lamp_head_id]

        min_z, max_z = self._healthy_z_range
        is_healthy = (min_z <= lamp_head_pos[2] <= max_z)
        healthy_reward = self._healthy_reward if is_healthy else 0.0

        # Calculate total reward
        reward = forward_reward + healthy_reward

        # Check if episode should terminate
        terminated = False
        if self._terminate_when_unhealthy and not is_healthy:
            terminated = True

        # Get observation
        observation = self._get_obs()

        # Render if in human mode
        if self.render_mode == "human":
            self.render()

                # Create info dict with metrics
        info = {
            'forward_reward': forward_reward,
            'reward_linvel': forward_reward,
            'reward_alive': healthy_reward,
            'x_position': com_after[0],
            'y_position': com_after[1],
            'distance_from_origin': np.linalg.norm(com_after),
            'x_velocity': velocity[0],
            'y_velocity': velocity[1],
        }

        # Return (observation, reward, terminated, truncated, info)
        return observation, reward, terminated, False, info

## Test Environment

In [14]:
env = LeLampEnv()

obs, info = env.reset()

obs, info

(array([-0.00998175, -0.00189848,  0.00720532, -0.00654438, -0.0050232 ,
         0.00258104, -0.00840592, -0.0038323 , -0.00013813,  0.00987807,
        -0.01093672,  0.02229953,  0.09236985, -0.00599872,  0.00309877,
         0.00786889]),
 {})

In [15]:
# Test single step
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
print(f"✓ Step successful. Reward: {reward:.4f}")
print(f"  Terminated: {terminated}, Truncated: {truncated}")
print(f"  Info: {info}")

✓ Step successful. Reward: 4.9962
  Terminated: False, Truncated: False
  Info: {'forward_reward': np.float64(-0.003756489504710605), 'reward_linvel': np.float64(-0.003756489504710605), 'reward_alive': 5.0, 'x_position': np.float64(0.006174889324609028), 'y_position': np.float64(0.011154097514273383), 'distance_from_origin': np.float64(0.10917730677951851), 'x_velocity': np.float64(-0.003756489504710605), 'y_velocity': np.float64(-0.014283630241475574)}


In [21]:
import mediapy as media

def render_env_video(max_steps=100, fps=20):
    """Collect frames and show as video in Jupyter."""
    env = LeLampEnv(render_mode="rgb_array")
    obs, info = env.reset()
    
    frames = []
    rewards = []
    
    # Collect frames
    for i in range(max_steps):
        action = env.action_space.sample()
        obs, reward, terminated, truncated, info = env.step(action)
        
        # Get frame and store
        frame = env.render()
        if frame is not None:
            frames.append(frame)
            rewards.append(reward)
        
        if terminated or truncated:
            print(f"Episode ended at step {i+1}")
            break
    
    env.close()
    
    # Show video using media.show_video
    if frames:
        print(f"Collected {len(frames)} frames")
        print(f"Total reward: {sum(rewards):.3f}")
        media.show_video(frames, fps=fps)
    else:
        print("No frames collected!")

# Just run this in a Jupyter cell:
render_env_video()


Episode ended at step 11
Collected 11 frames
Total reward: 51.592


0
This browser does not support the video tag.


# Train with PPO

In [26]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor

In [27]:
# Create environment
env = LeLampEnv()

# Check environment is valid
check_env(env)
print("✓ Environment validation passed")

# Wrap environment for monitoring
env = Monitor(env, "./logs/")

✓ Environment validation passed


In [28]:
# Create model
model = PPO(
    "MlpPolicy",
    env,
    verbose=1,
    tensorboard_log="./tensorboard_logs/",
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=64,
    n_epochs=10,
    clip_range=0.2,
)

Using cuda device
Wrapping the env in a DummyVecEnv.




In [29]:
# Set up callbacks
eval_env = Monitor(LeLampEnv())
eval_callback = EvalCallback(
    eval_env,
    best_model_save_path="./models/",
    log_path="./logs/",
    eval_freq=10000,
    deterministic=True,
    render=False,
)

In [30]:
checkpoint_callback = CheckpointCallback(
    save_freq=50000,
    save_path="./checkpoints/",
    name_prefix="lelamp_ppo",
)

In [None]:
# Train the model
model.learn(
    total_timesteps=1000000,  # 1M steps
    callback=[eval_callback, checkpoint_callback],
    progress_bar=True,
)

Logging to ./tensorboard_logs/PPO_1


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 7.16     |
|    ep_rew_mean     | 30.7     |
| time/              |          |
|    fps             | 1057     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 8.61       |
|    ep_rew_mean          | 37.7       |
| time/                   |            |
|    fps                  | 853        |
|    iterations           | 2          |
|    time_elapsed         | 4          |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.02072424 |
|    clip_fraction        | 0.25       |
|    clip_range           | 0.2        |
|    entropy_loss         | -7.1       |
|    explained_variance   | 0.0081     |
|    learning_rate        | 0.0003     |
|   

In [None]:
model.save("lelamp_ppo_final")