In [1]:
satellites_xml = """
<mujoco model="satellites">
  <compiler angle="degree"/>
  <option timestep="0.001" gravity="0 0 0"/>
  
  <default>
    <geom contype="1" conaffinity="1" density="1000" friction="0.5 0.005 0.0001"/>
  </default>
  
  <worldbody>
    <light diffuse=".5 .5 .5" pos="0 0 3" dir="0 0 -1"/>
    <camera pos="5 0 2" xyaxes="-1 0 0 0 0 1"/>
    
    <!-- Servicing Satellite -->
    <body name="servicer" pos="0 0 0">
      <joint name="servicer_free" type="free"/>
      <!-- Main body -->
      <geom name="servicer_body" type="cube" size="0.5" rgba="0.7 0.7 0.7 1"/>
      <!-- Docking port (protrusion) -->
      <geom name="servicer_port" type="cylinder" pos="0 0 0.5" size="0.1 0.2" rgba="1 0 0 1"/>
      <!-- Docking site (for reference) -->
      <site name="servicer_dock_site" pos="0 0 0.5" size="0.15" rgba="0 1 0 1"/>
    </body>
    
    <!-- Target Satellite -->
    <body name="target" pos="3 0 0">
      <joint name="target_free" type="free"/>
      <!-- Main body -->
      <geom name="target_body" type="cube" size="0.5" rgba="0.7 0.7 0.7 1"/>
      <!-- Docking cavity (receptacle) -->
      <geom name="target_cavity" type="cylinder" pos="0 0 -0.5" size="0.15 0.25" rgba="0 0 1 1"/>
      <!-- Docking site (for reference) -->
      <site name="target_dock_site" pos="0 0 -0.5" size="0.15" rgba="0 1 0 1"/>
    </body>
  </worldbody>
</mujoco>
"""


In [2]:
#Define dynamics
#Rigid Body dynamics mujoco shapes






#Define satellites shapes
#Define scenario and enviorment
#Shape Rewards
#Train


In [3]:
import os
import platform
# Set appropriate backend for macOS
if platform.system() == "Darwin":
    os.environ["MUJOCO_GL"] = "glfw"  # macOS native OpenGL context
else:
    os.environ["MUJOCO_GL"] = "osmesa"  # For Linux systems

import mujoco
import numpy as np
import time
import imageio.v2 as imageio
import matplotlib.pyplot as plt  # For debugging
import mediapy as media

# Create output directory
os.makedirs("output", exist_ok=True)

# XML model definition
sat_xml = """<mujoco model="satellites">
  <compiler angle="degree" />
  <option timestep="0.01" gravity="0 0 0" />
  <default>
    <!-- Default geom properties -->
    <geom contype="1" conaffinity="1" friction="0.1" density="1000"/>
  </default>

  <worldbody>
    <!-- Create a camera that follows the servicer -->
    <camera name="trackercam" mode="trackcom" target="servicer" pos="0 -15 0"/>
    <!-- Fixed camera -->
    <camera name="fixed" pos="0 -15 2" xyaxes="1 0 0 0 0 1"/>


    
    <!-- Lighting -->
    <light directional="true" diffuse=".8 .8 .8" specular=".2 .2 .2" pos="0 0 5" dir="0 0 -1"/>
    
    <!-- Servicer Satellite -->
    <body name="servicer" pos="0 2 0">
      <freejoint/>
      <!-- Main body (e.g., sphere) -->
      <geom type="box" size=".5 .5 .5" rgba="0.7 0.7 0.7 1"/>
      <!-- Docking port protrusion -->
      <geom name="servicer_dock_port" type="cylinder" pos="0 0 0.5" size="0.1 0.2" euler="0 0 0" rgba="1 0 0 1"/>
      <!-- Docking detection site -->
      <site name="servicer_dock_site" pos="0 0 0.5" size="0.15" rgba="0 1 0 0.5" />
    </body>

    <!-- Target Satellite -->
    <body name="target" pos="2 0 0">
      <freejoint/>
      <!-- Main body (e.g., sphere) -->
      <geom type="box" size=".5 .5 .5" rgba="0.7 0.7 0.7 1"/>
      <!-- Docking cavity (for receiving the docking port) -->
      <geom name="target_dock_cavity" type="cylinder" pos="0 0 -0.5" size="0.15 0.25" euler="0 0 0" rgba="0 0 1 1"/>
      <!-- Docking detection site -->
      <site name="target_dock_site" pos="0 0 -0.5" size="0.15" rgba="0 1 0 0.5" />
    </body>
  </worldbody>
  

  <visual>
    <global offwidth="1920" offheight="1080" />
  </visual>
</mujoco>
"""

# Load the model
model = mujoco.MjModel.from_xml_string(sat_xml)

# Create simulation data
data = mujoco.MjData(model)

# Set initial conditions
# Servicer position and velocity
data.qpos[0:3] = np.array([0, 0, 0])      # x, y, z position
data.qpos[3:7] = np.array([1, 0, 0, 0])   # quaternion orientation
data.qvel[0:3] = np.array([1, 0, 1])      # velocity

# Target position and velocity (7 DOF offset for the second free joint)
target_offset = 7
data.qpos[target_offset:target_offset+3] = np.array([0, 2, 0])  # position
data.qpos[target_offset+3:target_offset+7] = np.array([1, 0, 0, 0])  # orientation
data.qvel[target_offset:target_offset+3] = np.array([0, 0, 1])  # velocity
# Servicer position and orientation (facing +Y)
data.qpos[0:3] = np.array([0, 0, 0])      # x, y, z position
data.qpos[3:7] = np.array([1, 0, 0, 0])    # identity quaternion (facing +Y)
data.qvel[0:6] = 0                         # zero linear and angular velocity

# Target position and orientation (facing -Y)
target_offset = 7
data.qpos[target_offset:target_offset+3] = np.array([0, 2, 0])         # position
data.qpos[target_offset+3:target_offset+7] = np.array([0, 0, 1, 0])    # 180° yaw (facing -Y)
data.qvel[target_offset:target_offset+6] = 0                           # zero velocity

# Simulation parameters
duration = 15.0  # seconds
dt = model.opt.timestep
fps = 30  # Frames per second for rendering
render_every = int(1.0 / (dt * fps))  # How many simulation steps between renders

# Setup for recording
record = True
video_path = os.path.join("output", "satellite_simulation.mp4")
width, height = 640, 480

# Option 1: Interactive viewer without recording
def run_interactive():
    # Initialize the viewer
    with mujoco.viewer.launch_passive(model, data) as viewer:
        # Initial sync
        viewer.sync()
        
        # Run simulation with rendering
        step_count = 0
        start_time = time.time()
        sim_time = 0
        
        while sim_time < duration:
            # Step the simulation
            mujoco.mj_step(model, data)
            sim_time += dt
            step_count += 1
            
            # Only render at the specified FPS
            if step_count % render_every == 0:
                # Update visualization
                viewer.sync()
                
                # Optional: add time delay to watch in real-time
                elapsed = time.time() - start_time
                if elapsed < sim_time:
                    time.sleep(sim_time - elapsed)
        
        # Keep the viewer open for a moment to see the final state
        time.sleep(1.0)

# Option 2: Render to offscreen buffer and save video
duration = 10  # (seconds)
framerate = 60  # (Hz)
frames = []
scene_option = mujoco.MjvOption()
scene_option.flags[mujoco.mjtVisFlag.mjVIS_JOINT] = True
#mujoco.mj_resetData(model, data)
with mujoco.Renderer(model) as renderer:

  while data.time < duration:
    mujoco.mj_step(model, data)
    if len(frames) < data.time * framerate:
      renderer.update_scene(data, scene_option=scene_option)
      pixels = renderer.render()
      frames.append(pixels)

media.show_video(frames, fps=framerate)

# Choose which method to use


print("Simulation complete!")



0
This browser does not support the video tag.


Simulation complete!


In [4]:
import gym
from gym import spaces
import numpy as np
import mujoco
import os
import platform

# Setup rendering backend
if platform.system() == "Darwin":
    os.environ["MUJOCO_GL"] = "glfw"
else:
    os.environ["MUJOCO_GL"] = "osmesa"

class SatelliteEnv(gym.Env):
    def __init__(self):
        super().__init__()

        self.model = mujoco.MjModel.from_xml_string(sat_xml)
        self.data = mujoco.MjData(self.model)

        self.obs_dim = 6  
        self.action_dim = 3  #force on servicer

        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.obs_dim,), dtype=np.float32)
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(self.action_dim,), dtype=np.float32)
        self.max_episode_steps = 500  # or any desired number
        self.current_step = 0


    def reset(self, seed=None, options=None):
        if seed is not None:
            self.np_random, _ = gym.utils.seeding.np_random(seed)

        mujoco.mj_resetData(self.model, self.data)

        '''# Initialize state
        self.data.qpos[:3] = [0, 0, 0]
        self.data.qpos[3:7] = [1, 0, 0, 0]
        self.data.qvel[:3] = [0, 0, 0]

        offset = 7
        self.data.qpos[offset:offset+3] = [0, 0, 3]
        self.data.qpos[offset+3:offset+7] = [1, 0, 0, 0]
        self.data.qvel[offset:offset+3] = [0, 0, 0]'''
         # Randomize servicer state
        self.data.qpos[:3] = self.np_random.uniform(low=-0.1, high=0.1, size=3)  # small initial position noise
        self.data.qpos[3:7] = [1, 0, 0, 0]  # identity quaternion
        self.data.qvel[:3] = self.np_random.uniform(low=-0.01, high=0.01, size=3)  # small initial velocity noise

    # Randomize target state
        offset = 7
        self.data.qpos[offset:offset+3] = self.np_random.uniform(low=[-1.5, -1.5, 0.0], high=[1.5, 1.5, 5.5])  # random position near (0,0,3)
        self.data.qpos[offset+3:offset+7] = [1, 0, 0, 0]  # identity quaternion
        self.data.qvel[offset:offset+3] = self.np_random.uniform(low=-0.01, high=0.01, size=3)  # small initial velocity noise
        self.current_step = 0

        return self._get_obs(), {}
    def _dock_error(self):
        sid1 = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SITE, "servicer_dock_site")
        sid2 = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SITE, "target_dock_site")
        return self.data.site_xpos[sid2] - self.data.site_xpos[sid1]

    def step(self, action):
        action = np.clip(action, -1.0, 1.0)
        self.data.qvel[:3] = action

        mujoco.mj_step(self.model, self.data)

        obs = self._get_obs()
        dock_dist = np.linalg.norm(self._dock_error())

        '''
        relative_velocity = np.linalg.norm()
        reward = -dock_dist
        if np.linalg.norm(obs[:3]) < 0.01:
            reward = 1
        terminated = np.linalg.norm(obs[:3]) < 0.01  #Success
        '''
        rel_pos = dock_dist
        rel_pos = self.data.qpos[7:10] - self.data.qpos[0:3]
        rel_vel = self.data.qvel[0:3] - self.data.qvel[6:9]
        rel_ang_vel = np.linalg.norm(self.data.qvel[3:6] - self.data.qvel[9:12])

    # Unit vector toward target
        norm_rel_pos = np.linalg.norm(rel_pos)
        if norm_rel_pos > 1e-6:
            direction = rel_pos / norm_rel_pos
            velocity_toward_target = np.dot(rel_vel, direction)
        else:
            velocity_toward_target = 0.0

    # Reward: encourage motion toward target, discourage distance
        reward = velocity_toward_target - 0.1 * dock_dist - rel_ang_vel

        terminated = dock_dist < 0.01 and np.linalg.norm(rel_vel) < 1
        self.current_step += 1
        truncated = self.current_step >= self.max_episode_steps


        return obs, reward, terminated, truncated, {}
    def _dock_error(self):
        sid1 = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SITE, "servicer_dock_site")
        sid2 = mujoco.mj_name2id(self.model, mujoco.mjtObj.mjOBJ_SITE, "target_dock_site")
        return self.data.site_xpos[sid2] - self.data.site_xpos[sid1]

    def _get_obs(self):
        pos1 = self.data.qpos[:3]
        pos2 = self.data.qpos[7:10]
        vel1 = self.data.qvel[:3]
        vel2 = self.data.qvel[7:10]
        return np.concatenate([pos2 - pos1, vel2 - vel1], dtype=np.float32)

    def render_video(self, duration=10, framerate=60):
        """
        Renders a video of the environment for the specified duration
        
        Args:
            duration (float): Duration in seconds
            framerate (int): Frames per second
            
        Returns:
            mediapy video object
        """
        # Reset environment to start a fresh simulation
        self.reset()
        
        frames = []
        scene_option = mujoco.MjvOption()
        scene_option.flags[mujoco.mjtVisFlag.mjVIS_JOINT] = True
        
        # Create renderer
        with mujoco.Renderer(model=self.model, height=480, width=640) as renderer:
            # Run simulation and collect frames
            while self.data.time < duration:
                mujoco.mj_step(self.model, self.data)
                
                if len(frames) < self.data.time * framerate:
                    renderer.update_scene(self.data, scene_option=scene_option)
                    pixels = renderer.render()
                    frames.append(pixels)
        
        return media.show_video(frames, fps=framerate)
        

    def close(self):
        if hasattr(self, "viewer"):
            self.viewer.close()
from stable_baselines3 import PPO

env = SatelliteEnv()
#model = PPO("MlpPolicy", env, verbose=1)
#model.learn(total_timesteps=100_000)

#model.save("ppo_satellite_docking")

In [5]:
def train_and_visualize():
    from stable_baselines3 import PPO
    from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize

    
    # Create and train the model
    env = SatelliteEnv()
    env = DummyVecEnv([lambda: SatelliteEnv()])  # wraps in a VecEnv

# Then apply VecNormalize
    env = VecNormalize(env, norm_obs=True, norm_reward=True)
    model = PPO("MlpPolicy", env, verbose=1)
    model.learn(total_timesteps=10_000_000)
    model.save("ppo_satellite_docking")
    
    # Create a new environment for visualization
    vis_env = SatelliteEnv()
    vis_env = DummyVecEnv([lambda: SatelliteEnv()])  # wraps in a VecEnv

# Then apply VecNormalize
    vis_env = VecNormalize(env, norm_obs=True, norm_reward=True)

    
    #record a simulation with the trained policy
    obs= vis_env.reset()
    
    # Reset data for visualization
    vis_env.reset()
    
    # Render video of trained agent
    print("Rendering video...")
    
    # Create frames with policy actions
    frames = []
    scene_option = mujoco.MjvOption()
    scene_option.flags[mujoco.mjtVisFlag.mjVIS_JOINT] = True
    duration = 10
    framerate = 60
    '''
    with mujoco.Renderer(model=vis_env.model, height=480, width=640) as renderer:
        while vis_env.data.time < duration:
            # Get action from the trained policy
            action, _ = model.predict(obs, deterministic=True)
            
            # Take step in environment
            obs, reward, terminated, truncated, _ = vis_env.step(action)
            print("Reward",reward)
            
            # Render frame if needed
            if len(frames) < vis_env.data.time * framerate:
                renderer.update_scene(vis_env.data, scene_option=scene_option)
                pixels = renderer.render()
                frames.append(pixels)
                
            if terminated or truncated:
                break
    '''
    # Unwrap to get the raw env
    raw_env = vis_env.venv.envs[0]  # unwrap DummyVecEnv -> SatelliteEnv

    with mujoco.Renderer(model=raw_env.model, height=480, width=640) as renderer:
        while raw_env.data.time < duration:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, _ = vis_env.step(action)
            print("Reward", reward)

            if len(frames) < raw_env.data.time * framerate:
                renderer.update_scene(raw_env.data, scene_option=scene_option)
                pixels = renderer.render()
                frames.append(pixels)

            if terminated:
                break

    # Display the video
    return media.show_video(frames, fps=framerate)

if __name__ == "__main__":
    train_and_visualize()



Using cpu device
-----------------------------
| time/              |      |
|    fps             | 6282 |
|    iterations      | 1    |
|    time_elapsed    | 0    |
|    total_timesteps | 2048 |
-----------------------------
-----------------------------------------
| time/                   |             |
|    fps                  | 4420        |
|    iterations           | 2           |
|    time_elapsed         | 0           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.012196979 |
|    clip_fraction        | 0.145       |
|    clip_range           | 0.2         |
|    entropy_loss         | -4.27       |
|    explained_variance   | -0.011      |
|    learning_rate        | 0.0003      |
|    loss                 | 0.17        |
|    n_updates            | 10          |
|    policy_gradient_loss | -0.0135     |
|    std                  | 1           |
|    value_loss           | 0.297       |
-----------------

KeyboardInterrupt: 

In [4]:
from stable_baselines3 import PPO
import time

# Load trained model
#model = PPO.load(".zip")

# Create env (same as during training)
env = SatelliteEnv()

obs = env.reset()[0]  # for gym>=0.26
done = False

# Optional: real-time viewer
env.render()

while not done:
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    env.render()
    time.sleep(env.model.opt.timestep)  # slow down for visibility

env.close()
import imageio

frames = []
obs = env.reset()[0]
done = False

while not done:
    frame = env.render(mode="rgb_array")  # requires proper render implementation
    frames.append(frame)
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)

# Save as video
imageio.mimsave("docking_test.mp4", frames, fps=30)


AttributeError: module 'mujoco' has no attribute 'viewer'

In [12]:
from stable_baselines3 import PPO
import time
import imageio
import numpy as np

# Assuming you have the SatelliteEnv class defined elsewhere
# and it's imported properly at the top of your script

# Create env
env = SatelliteEnv()

# For demonstration, we'll use random actions
# But you can uncomment the model loading part if you have a trained model
# model = PPO.load("ppo_satellite_docking")

# Option 1: Real-time visualization (using random actions if no model available)
def visualize_realtime():
    obs = env.reset()  # No need for [0] with regular gym
    done = False
    
    # If you have a trained model:
    # while not done:
    #     action, _ = model.predict(obs, deterministic=True)
    #     obs, reward, done, info = env.step(action)
    #     env.render()
    #     time.sleep(env.model.opt.timestep)  # slow down for visibility
    
    # For testing without a model (random actions):
    for _ in range(200):  # Run for 200 steps or until done
        action = env.action_space.sample()  # Random action
        obs, reward, done, info = env.step(action)
        env.render()
        time.sleep(env.model.opt.timestep)
        if done:
            break
    
    env.close()

# Option 2: Save as video
def create_video():
    frames = []
    obs = env.reset()  # No need for [0] with regular gym
    done = False
    
    # If you have a trained model:
    # while not done:
    #     action, _ = model.predict(obs, deterministic=True)
    #     obs, reward, done, info = env.step(action)
    #     frame = env.render(mode="rgb_array")
    #     frames.append(frame)
    
    # For testing without a model (random actions):
    for _ in range(200):  # Run for 200 steps or until done
        action = env.action_space.sample()  # Random action
        obs, reward, terminated,truncated, info = env.step(action)
        frame = env.render(mode="rgb_array")
        frames.append(frame)
        if done:
            break
    
    # Convert frames to uint8 if they aren't already
    frames = [frame.astype(np.uint8) if frame.dtype != np.uint8 else frame for frame in frames]
    
    # Save as video
    imageio.mimsave("docking_test.mp4", frames, fps=30)
    print("Video saved as docking_test.mp4")
def create_video(env, steps=200, filename="docking_test.mp4"):
    frames = []
    obs = env.reset()
    done = False
    
    for _ in range(steps):  # Run for specified steps or until done
        action = env.action_space.sample()  # Random action
        obs, reward, done,truncated, info = env.step(action)
        frame = env.render(mode="rgb_array")
        frames.append(frame)
        if done:
            break
    
    # Convert frames to uint8 if they aren't already
    frames = [frame.astype(np.uint8) if frame.dtype != np.uint8 else frame for frame in frames]
    
    # Save as video
    imageio.mimsave(filename, frames, fps=30)
    print(f"Video saved as {filename}")
    
    env.close()


create_video(env)

AttributeError: 'SatelliteEnv' object has no attribute 'viewer'