<a href="https://colab.research.google.com/github/kuds/rl-drone/blob/main/RL%20Drone.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!pip install mujoco

# Set up GPU rendering.
from google.colab import files
import distutils.util
import os
import subprocess
if subprocess.run('nvidia-smi').returncode:
  raise RuntimeError(
      'Cannot communicate with GPU. '
      'Make sure you are using a GPU Colab runtime. '
      'Go to the Runtime menu and select Choose runtime type.')

# Add an ICD config so that glvnd can pick up the Nvidia EGL driver.
# This is usually installed as part of an Nvidia driver package, but the Colab
# kernel doesn't install its driver via APT, and as a result the ICD is missing.
# (https://github.com/NVIDIA/libglvnd/blob/master/src/EGL/icd_enumeration.md)
NVIDIA_ICD_CONFIG_PATH = '/usr/share/glvnd/egl_vendor.d/10_nvidia.json'
if not os.path.exists(NVIDIA_ICD_CONFIG_PATH):
  with open(NVIDIA_ICD_CONFIG_PATH, 'w') as f:
    f.write("""{
    "file_format_version" : "1.0.0",
    "ICD" : {
        "library_path" : "libEGL_nvidia.so.0"
    }
}
""")

# Configure MuJoCo to use the EGL rendering backend (requires GPU)
print('Setting environment variable to use GPU rendering:')
%env MUJOCO_GL=egl

# Check if installation was succesful.
try:
  print('Checking that the installation succeeded:')
  import mujoco
  mujoco.MjModel.from_xml_string('<mujoco/>')
except Exception as e:
  raise e from RuntimeError(
      'Something went wrong during installation. Check the shell output above '
      'for more information.\n'
      'If using a hosted Colab runtime, make sure you enable GPU acceleration '
      'by going to the Runtime menu and selecting "Choose runtime type".')

print('Installation successful.')

# Other imports and helper functions
import time
import itertools
import numpy as np

# Graphics and plotting.
print('Installing mediapy:')
!command -v ffmpeg >/dev/null || (apt update && apt install -y ffmpeg)
!pip install -q mediapy
import mediapy as media
import matplotlib.pyplot as plt

# More legible printing from numpy.
np.set_printoptions(precision=3, suppress=True, linewidth=100)

from IPython.display import clear_output
clear_output()

In [6]:
!pip install robot_descriptions
!pip install gymnasium
!pip install stable-baselines3

Collecting robot_descriptions
  Downloading robot_descriptions-1.20.0-py3-none-any.whl.metadata (28 kB)
Downloading robot_descriptions-1.20.0-py3-none-any.whl (99 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/99.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.8/99.8 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: robot_descriptions
Successfully installed robot_descriptions-1.20.0
Collecting stable-baselines3
  Downloading stable_baselines3-2.7.0-py3-none-any.whl.metadata (4.8 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-c

In [7]:
import gymnasium
import mujoco
from stable_baselines3 import SAC
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import VecVideoRecorder
from stable_baselines3.common.env_util import make_atari_env
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.callbacks import CallbackList
import numpy
import os
import csv
import torch
import pandas
import platform
from importlib.metadata import version
import matplotlib
import matplotlib.pyplot
from gymnasium import utils
from gymnasium.envs.mujoco import MujocoEnv
from gymnasium.spaces import Box

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


In [8]:
print(f"Python Version: {platform.python_version()}")
print(f"Torch Version: {version('torch')}")
print(f"Is Cuda Available: {torch.cuda.is_available()}")
print(f"Cuda Version: {torch.version.cuda}")
print(f"Gymnasium Version: {version('gymnasium')}")
print(f"Numpy Version: {version('numpy')}")
print(f"Mujoco Version: {version('mujoco')}")
print(f"Stable-Baselines3 Version: {version('stable-baselines3')}")
print(f"Matplotlib Version: {version('matplotlib')}")

Python Version: 3.11.13
Torch Version: 2.6.0+cu124
Is Cuda Available: True
Cuda Version: 12.4
Gymnasium Version: 1.2.0
Numpy Version: 2.0.2
Mujoco Version: 3.3.4
Stable-Baselines3 Version: 2.7.0
Matplotlib Version: 3.10.0


In [9]:
rl_type = "SAC"
env_str = "BitCrazy"
log_dir = "./logs/{}".format(env_str)
name_prefix = "bit_crazy"

In [10]:
hyperparams = {
    "env_str": env_str,
    "rl_type": rl_type,
    "eval_freq": 25_000,
    "n_envs": 4,
    "min_force": 100.0,
    "total_timesteps": 1_500_000,
    "log_dir": log_dir,
    "episode_length": 1_000
}

In [11]:
csv_header = ["ball_velocity_x",
              "ball_velocity_y",
              "ball_velocity_z",
              "ball_accelerometer_x",
              "ball_accelerometer_y",
              "ball_accelerometer_z",
              "from_to_x1",
              "from_to_y1",
              "from_to_z1",
              "from_to_x2",
              "from_to_y2",
              "from_to_z2",
              "touch_sensor",
              "reward",
              "total_reward",
              "done"]

In [None]:
class VideoRecordCallback(BaseCallback):
    def __init__(
        self,
        save_path: str,
        video_length: int,
        save_freq: int = 5_000,
        name_prefix: str ="rl_model",
        verbose: int = 0):

        super().__init__(verbose)
        self.save_freq = save_freq
        self.video_length = video_length
        self.save_path = save_path
        self.name_prefix = name_prefix
        # Those variables will be accessible in the callback
        # (they are defined in the base class)
        # The RL model
        # self.model = None  # type: BaseAlgorithm
        # An alias for self.model.get_env(), the environment used for training
        # self.training_env # type: VecEnv
        # Number of time the callback was called
        # self.n_calls = 0  # type: int
        # num_timesteps = n_envs * n times env.step() was called
        # self.num_timesteps = 0  # type: int
        # local and global variables
        # self.locals = {}  # type: Dict[str, Any]
        # self.globals = {}  # type: Dict[str, Any]
        # The logger object, used to report things in the terminal
        # self.logger # type: stable_baselines3.common.logger.Logger
        # Sometimes, for event callback, it is useful
        # to have access to the parent object
        # self.parent = None  # type: Optional[BaseCallback]

    def _on_step(self) -> bool:
        if self.n_calls % self.save_freq == 0:

          name_prefix = f"{self.name_prefix}_{self.num_timesteps}"

          # Record video of the best model playing
          rec_val = make_vec_env(make_env, n_envs=1)
          rec_val = VecVideoRecorder(rec_val,
                                    self.save_path,
                                    video_length=self.video_length,
                                    record_video_trigger=lambda x: x == 0,
                                    name_prefix=name_prefix)

          obs = rec_val.reset()
          session_length = 0
          total_reward = 0.0
          csv_file_name = os.path.join(self.save_path, f"{name_prefix}.csv")
          with open(csv_file_name, 'w') as csvfile:
            csv_writer = csv.writer(csvfile, delimiter=',')
            csv_writer.writerow(csv_header)
            for _ in range(self.video_length):
              session_length += 1
              action, _states = self.model.predict(obs)
              obs, rewards, dones, info = rec_val.step(action)
              total_reward += rewards
              #print(info)
              row_data = numpy.concatenate([info[0]["ball_velocity"],
                                            info[0]["ball_accelerometer"],
                                            info[0]["ball_to_paddle"],
                                            [info[0]["touch_sensor"],
                                             rewards[0],
                                             total_reward[0],
                                             dones[0]]])
              row_data = numpy.round(row_data, decimals=4)
              csv_writer.writerow(row_data)
              rec_val.render()

              if dones:
                break

          print(f"Step: {self.num_timesteps} | Session Length: {session_length} |Total Bounces: {int(total_reward[0])}")

          rec_val.close()
        return True


In [12]:
import mujoco

# Loading a specific model description as an imported module.
from robot_descriptions import cf2_mj_description
model = mujoco.MjModel.from_xml_path(cf2_mj_description.MJCF_PATH)
print(cf2_mj_description.MJCF_PATH)

Cloning https://github.com/deepmind/mujoco_menagerie.git...


100%|██████████| 1972.0/1972.0 [00:33<00:00, 58.85it/s]

/root/.cache/robot_descriptions/mujoco_menagerie/bitcraze_crazyflie_2/cf2.xml





In [10]:
import xml.etree.ElementTree as ET

# --- Configuration ---
# The script will modify this file directly
filename = cf2_mj_description.MJCF_PATH

# 1. Parse the existing XML file
try:
    tree = ET.parse(filename)
    root = tree.getroot()
except FileNotFoundError:
    print(f"Error: The file '{filename}' was not found.")
    exit()
except ET.ParseError:
    print(f"Error: The file '{filename}' is not a valid XML file.")
    exit()

# 2. Find both the <worldbody> and <sensor> parent nodes
worldbody_node = root.find('worldbody')
sensor_node = root.find('sensor')

# 3. Validate that both parent nodes were found
if worldbody_node is not None and sensor_node is not None:
    changes_made = False
    print("Found <worldbody> and <sensor> nodes. Checking for elements...")

    # --- Action 1: Check for and add the paddle_body ---
    # The XPath "./body[@name='paddle_body']" finds a body element with a specific name
    if worldbody_node.find("./body[@name='paddle_body']") is None:
        print(" -> Body 'paddle_body' not found. Adding it...")
        body_attributes = {'name': 'paddle_body', 'pos': '0 0 .5'}
        body_node = ET.SubElement(worldbody_node, 'body', attrib=body_attributes)

        site_attributes = {
            'name': 'bounce_sensor',
            'pos': '0 0 0',
            'size': '0.25 0.25 0.1',
            'type': 'box',
            'rgba': '1 0 0 .15'
        }
        ET.SubElement(body_node, 'site', attrib=site_attributes)
        changes_made = True
    else:
        print(" -> Body 'paddle_body' already exists. Skipping.")

    # --- Action 2: Check for and add the touch_sensor ---
    if sensor_node.find("./touch[@name='touch_sensor']") is None:
        print(" -> Sensor 'touch_sensor' not found. Adding it...")
        touch_attributes = {'name': 'touch_sensor', 'site': 'bounce_sensor'}
        ET.SubElement(sensor_node, 'touch', attrib=touch_attributes)
        changes_made = True
    else:
        print(" -> Sensor 'touch_sensor' already exists. Skipping.")

    # 4. Write to the file ONLY if changes were made
    if changes_made:
        ET.indent(tree, space="  ", level=0)
        tree.write(filename, encoding='utf-8', xml_declaration=True)
        print(f"\n✅ Successfully applied updates to '{filename}'.")
    else:
        print(f"\n✅ No updates needed. All elements already exist in '{filename}'.")

else:
    # This block runs if one or both parent nodes are missing
    print("❌ Error: Could not find both <worldbody> and <sensor> nodes. No changes were made.")
    if worldbody_node is None:
        print(" -> Missing <worldbody> node.")
    if sensor_node is None:
        print(" -> Missing <sensor> node.")

Found <worldbody> and <sensor> nodes. Checking for elements...
 -> Body 'paddle_body' already exists. Skipping.
 -> Sensor 'touch_sensor' already exists. Skipping.

✅ No updates needed. All elements already exist in '/root/.cache/robot_descriptions/mujoco_menagerie/bitcraze_crazyflie_2/cf2.xml'.


In [17]:
import gymnasium as gym
import numpy as np
import mujoco
import xml.etree.ElementTree as ET
from gymnasium.spaces import Box

# Import the path to the CF2 model from the library
from robot_descriptions.cf2_mj_description import MJCF_PATH as CF2_PATH

class DroneHoverEnv(gym.Env):
    """
    Custom Gymnasium environment for the drone hovering task.
    This version loads the drone model from `robot_descriptions`
    and programmatically builds the training scene.
    """
    def __init__(self, frame_skip=5):
        super().__init__()

        # --- 1. Load and Modify the XML Scene ---
        # Parse the base robot XML file from the library
        tree = ET.parse(CF2_PATH)
        root = tree.getroot()

        # Find the worldbody to add scene elements
        worldbody = root.find('worldbody')
        if worldbody is None:
            raise ValueError("'<worldbody>' tag not found in the XML.")

        # Add a floor and a light to the scene
        ET.SubElement(worldbody, 'geom', attrib={
            'name': 'floor', 'size': '0 0 0.05', 'type': 'plane',
            'rgba': '0.2 0.3 0.4 1', 'material': '' # Basic material
        })
        ET.SubElement(worldbody, 'light', attrib={
            'pos': '0 0 1.5', 'dir': '0 0 -1', 'directional': 'true'
        })

        # Add the target paddle body and sensor site
        body_node = ET.SubElement(worldbody, 'body', attrib={
            'name': 'paddle_body', 'pos': '0 0 0.5'
        })
        ET.SubElement(body_node, 'site', attrib={
            'name': 'bounce_sensor', 'pos': '0 0 0', 'size': '0.25 0.25 0.1',
            'type': 'box', 'rgba': '1 0 0 .15'
        })

        # Find the sensor section and add the touch sensor
        sensor_node = root.find('sensor')
        if sensor_node is None:
            sensor_node = ET.SubElement(root, 'sensor') # Create if it doesn't exist

        ET.SubElement(sensor_node, 'touch', attrib={
            'name': 'touch_sensor', 'site': 'bounce_sensor'
        })

        # --- 2. Load the MuJoCo Model from the modified XML string ---
        xml_string = ET.tostring(root, encoding='unicode')
        self.model = mujoco.MjModel.from_xml_string(xml_string)
        self.data = mujoco.MjData(self.model)
        self.frame_skip = frame_skip

        # --- 3. Define Spaces and Target (same as before) ---
        self.action_space = Box(
            low=np.array([0.0, -1.0, -1.0, -1.0]),
            high=np.array([0.35, 1.0, 1.0, 1.0]),
            dtype=np.float32
        )
        self.observation_space = Box(
            low=-np.inf, high=np.inf, shape=(16,), dtype=np.float64
        )
        self.target_pos = np.array([0.0, 0.0, 0.5])
        self.drone_body_id = self.model.body('cf2').id
        self.gyro_sensor_id = self.model.sensor('body_gyro').id


    def _get_obs(self):
        """Constructs the observation vector from the simulation data."""
        drone_pos = self.data.qpos[:3]
        drone_quat = self.data.qpos[3:7]
        drone_lin_vel = self.data.qvel[:3]
        drone_ang_vel = self.data.sensor(self.gyro_sensor_id).data
        vec_to_target = self.target_pos - drone_pos

        return np.concatenate([
            drone_pos, drone_quat, vec_to_target,
            drone_lin_vel, drone_ang_vel
        ]).astype(np.float64)


    def step(self, action):
        """Applies an action, steps the simulation, and calculates the reward."""
        self.data.ctrl[:] = action
        for _ in range(self.frame_skip):
            mujoco.mj_step(self.model, self.data)

        drone_pos = self.data.qpos[:3]
        distance_to_target = np.linalg.norm(drone_pos - self.target_pos)

        reward = -distance_to_target
        reward -= 0.01 * np.square(action).sum()

        terminated = False
        truncated = distance_to_target > 2.0 or drone_pos[2] < 0.05

        observation = self._get_obs()
        info = {'distance_to_target': distance_to_target}

        return observation, reward, terminated, truncated, info

    def reset(self, seed=None, options=None):
        """Resets the environment to an initial state."""
        super().reset(seed=seed)
        mujoco.mj_resetData(self.model, self.data)
        noise = self.np_random.uniform(low=-0.1, high=0.1, size=3)
        self.data.qpos[:3] += noise

        return self._get_obs(), {}

In [33]:
class DroneHoverEnv(MujocoEnv, utils.EzPickle):
    metadata = {
        "render_modes": [
            "human",
            "rgb_array",
            "depth_array",
        ],
        "render_fps": 100,
    }

    # set default episode_len for truncate episodes
    def __init__(self, episode_len=1_000, **kwargs):
        utils.EzPickle.__init__(self, **kwargs)

        self.total_reward = 0
        self.previous_touch_value = 0
        # change shape of observation to your observation space size
        observation_space = Box(low=-numpy.inf, high=np.inf, shape=(12,), dtype=numpy.float64)
        # load your MJCF model with env and choose frames count between actions
        MujocoEnv.__init__(
            self,
            cf2_mj_description.MJCF_PATH,
            frame_skip=5,
            observation_space=observation_space,
            **kwargs
        )
        self.step_number = 0
        self.previous_position = None
        self.previous_velocity = None
        self.previous_touch_value = None
        self.episode_len = episode_len

    # determine the reward depending on observation or other properties of the simulation
    def step(self, a):
        reward = 0.0
        self.do_simulation(a, self.frame_skip)
        self.step_number += 1

        # Bounce Detection (CRITICAL)
        current_touch_value = self.data.sensor("touch_sensor").data[0]

        # print(current_velocity_value)
        # print(current_force_value)
        #print(current_fromto_value)

        self.previous_position = np.copy(current_touch_value)
        if current_touch_value >= 0:  # Check for transition from no contact to contact
            self.total_reward += 1
            reward = 1.0


        obs = self._get_obs()
        # Check if the drone has fallen below the paddle (assuming paddle is at z=0.5)
        # You might need to adjust this condition based on your model and desired behavior
        drone_z_position = self.data.body("body").xpos[2] # Assuming 'body' is the main body of the drone
        done = bool(not np.isfinite(obs).all() or (drone_z_position < 0.4)) # Adjusted condition

        truncated = self.step_number > self.episode_len
        info = {"touch_sensor": current_touch_value,
                "ball_velocity": self.data.joint("ball_slide_x").qvel, # Assuming these are the correct joint names
                "ball_accelerometer": self.data.sensor("accelerometer").data, # Assuming this is the correct sensor name
                "ball_to_paddle": self.data.geom("ball").xpos - self.data.geom("bounce_sensor").xpos # Assuming 'ball' and 'bounce_sensor' are the correct geom names
                }


        return obs, reward, done, truncated, info

    # define what should happen when the model is reset (at the beginning of each episode)
    def reset_model(self, seed=None):
        self.step_number = 0
        self.total_reward = 0

        # for example, noise is added to positions and velocities
        qpos = self.init_qpos + self.np_random.uniform(
            size=self.model.nq, low=-0.01, high=0.01
        )
        qvel = self.init_qvel + self.np_random.uniform(
            size=self.model.nv, low=-0.01, high=0.01
        )
        self.set_state(qpos, qvel)
        return self._get_obs()

    # determine what should be added to the observation
    # for example, the velocities and positions of various joints can be obtained through their names, as stated here
    def _get_obs(self):
        # Access joints through self.model.joint
        #print("Available joint names in the model:")
        #for joint in self.model.joint:
        #    print(joint.name)

        # Assuming these are the correct joint names based on the model
        obs = numpy.concatenate((numpy.array(self.data.actuator("body_thrust").qvel),
                                 numpy.array(self.data.actuator("body_thrust").qpos),
                                 numpy.array(self.data.actuator("x_moment").qvel),
                                 numpy.array(self.data.actuator("x_moment").qpos),
                                 numpy.array(self.data.actuator("y_moment").qvel),
                                 numpy.array(self.data.actuator("y_moment").qpos),
                                 numpy.array(self.data.actuator("z_moment").qvel),
                                 numpy.array(self.data.actuator("z_moment").qpos)), axis=0)
        return obs

In [18]:
env = DroneHoverEnv()
print("Observation Space Size: ", env.observation_space.shape)
print('Actions Space: ', env.action_space)
env.close()

ValueError: Error: Error opening file 'assets/cf2_0.obj': No such file or directory

In [25]:
def make_env():
  env = DroneHoverEnv(render_mode="rgb_array",
                      episode_len=hyperparams["episode_length"])
  check_env(env)
  return env

In [26]:
# Create Training environment
env = make_vec_env(make_env,
                   n_envs=hyperparams["n_envs"],
                   monitor_dir=os.path.join(log_dir, "monitor"))

# Create Evaluation environment
env_val = make_vec_env(make_env, n_envs=1)

eval_callback = EvalCallback(env_val,
                             best_model_save_path=log_dir,
                             log_path=log_dir,
                             render=False,
                             deterministic=True,
                             n_eval_episodes=20,
                             eval_freq=hyperparams["eval_freq"])

video_record_callback = VideoRecordCallback(
    save_path=os.path.join(log_dir, "videos"),
    video_length=10_000,
    save_freq=hyperparams["eval_freq"],
    name_prefix=name_prefix)

# Create the callback list
callbackList = CallbackList([video_record_callback,
                             eval_callback])

# learning with tensorboard logging and saving model
model = SAC("MlpPolicy",
            env,
            verbose=0,
            tensorboard_log=os.path.join(log_dir, "tensorboard"))

model.learn(total_timesteps=hyperparams["total_timesteps"],
            callback=callbackList,
            progress_bar=False)

# Save the model
model.save(os.path.join(log_dir, "final_model"))

mean_reward, std_reward = evaluate_policy(model, env)
print(f"Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")

env.close()
env_val.close()

TypeError: The reset() method must accept a `seed` parameter

In [15]:
make_env()

NameError: name 'make_env' is not defined

In [None]:
|