In [None]:
# If in Colab, run this cell to install flybody. Don't forget to select GPU!!
# Otherwise, if running notebook locally, skip this cell.

import os
import subprocess
if subprocess.run('nvidia-smi').returncode:
  raise RuntimeError(
      'Cannot communicate with GPU. '
      'Make sure you are using a GPU Colab runtime. '
      'Go to the Runtime menu and select Choose runtime type.')

# Add an ICD config so that glvnd can pick up the Nvidia EGL driver.
# This is usually installed as part of an Nvidia driver package, but the Colab
# kernel doesn't install its driver via APT, and as a result the ICD is missing.
# (https://github.com/NVIDIA/libglvnd/blob/master/src/EGL/icd_enumeration.md)
NVIDIA_ICD_CONFIG_PATH = '/usr/share/glvnd/egl_vendor.d/10_nvidia.json'
if not os.path.exists(NVIDIA_ICD_CONFIG_PATH):
  with open(NVIDIA_ICD_CONFIG_PATH, 'w') as f:
    f.write("""{
    "file_format_version" : "1.0.0",
    "ICD" : {
        "library_path" : "libEGL_nvidia.so.0"
    }
}
""")

!pip install --upgrade pip
!python -m pip install git+https://github.com/TuragaLab/flybody.git

# Configure MuJoCo to use the EGL rendering backend (requires GPU).
%env MUJOCO_GL=egl

In [None]:
import os

import numpy as np
import matplotlib.pyplot as plt
import PIL.ImageDraw

from dm_control import mujoco
from dm_control import mjcf
from dm_control.mujoco.wrapper.mjbindings import enums
from dm_control.mujoco.wrapper.mjbindings import mjlib

import flybody
from flybody.fly_envs import walk_on_ball
from flybody.utils import display_video, any_substr_in_str

In [None]:
# Frame size and camera name-to-idx mapping for rendering.
frame_size = {'width': 640, 'height': 480}
cameras = {'track1': 0, 'track2': 1, 'track3': 2,
           'back': 3, 'side': 4, 'bottom': 5, 'hero': 6,
           'eye_right': 7, 'eye_left': 8}

In [None]:
flybody_path = os.path.dirname(flybody.__file__)
xml_path = os.path.join(flybody_path, 'fruitfly/assets/fruitfly.xml')

physics = mjcf.Physics.from_xml_path(xml_path)  # Load and compile.

print('# of bodies:', physics.model.nbody)
print('# of degrees of freedom:', physics.model.nv)
print('# of joints:', physics.model.njnt)
print('# of actuators:', physics.model.nu)
print("fly's mass (gr):", physics.model.body_subtreemass[1])

In [None]:
physics.data.qpos

In [None]:
pixels = physics.render(camera_id=cameras['hero'], **frame_size)
PIL.Image.fromarray(pixels)

In [None]:
pixels = physics.render(camera_id=cameras['bottom'], **frame_size)
PIL.Image.fromarray(pixels)

In [None]:
scene_option = mujoco.wrapper.core.MjvOption()
scene_option.geomgroup[1] = 0  # Hide external meshes.
scene_option.geomgroup[3] = 1  # Make fluid-interactions wing ellipsoids visible (orange).
scene_option.geomgroup[4] = 1  # Make collision geoms visible (blue).
pixels = physics.render(camera_id=cameras['side'], **frame_size, scene_option=scene_option)
PIL.Image.fromarray(pixels)

In [None]:

xml_path_floor = os.path.join(flybody_path, 'fruitfly/assets/floor.xml')
physics = mjcf.Physics.from_xml_path(xml_path_floor)

pixels = physics.render(camera_id=cameras['side'], **frame_size)
PIL.Image.fromarray(pixels)

In [None]:
# Construct new root joint quaternion: 90 degree rotation around z-axis.
angle = np.pi / 2
quat = np.array([np.cos(angle/2), 0, 0, np.sin(angle/2)])
# Write into qpos and update other related quantities in mjData.
with physics.reset_context():
    physics.named.data.qpos[3:7] = quat
pixels = physics.render(camera_id=cameras['side'], **frame_size)
PIL.Image.fromarray(pixels)

In [None]:
wing_joints = []
folded_wing_angles = []
# Loop over all model joints.
for i in range(physics.model.njnt):
    joint_name = physics.model.id2name(i, 'joint')
    # If wing joint, store joint name and reference angle.
    if 'wing' in joint_name:
        wing_joints.append(joint_name)
        folded_wing_angles.append(
            physics.named.model.qpos_spring[joint_name].item())

wing_joints, folded_wing_angles

In [None]:
with physics.reset_context():
    physics.named.data.qpos[wing_joints] = folded_wing_angles

pixels = physics.render(camera_id=cameras['side'], **frame_size)
PIL.Image.fromarray(pixels)

In [None]:
n_steps = 150
frames = []
for i in range(n_steps):
    with physics.reset_context():
        wing_angles = np.array(folded_wing_angles) * np.sin(np.pi/2 * i/n_steps)
        physics.named.data.qpos[wing_joints] = wing_angles
    pixels = physics.render(camera_id=cameras['back'], **frame_size)
    frames.append(pixels)

display_video(frames)

In [None]:
def get_leg_actuator_names(leg):
    return [f'{joint}_{leg}' 
            for joint in ['coxa', 'femur', 'tibia', 'tarsus', 'tarsus2']]

# Names of leg actuators.
leg_actuators_L1 = get_leg_actuator_names('T1_left')
leg_actuators_R1 = get_leg_actuator_names('T1_right')
leg_actuators_L2 = get_leg_actuator_names('T2_left')

# Amplitudes of joint motion for front and middle legs.
amplitude_front = 0.5 * np.array([1, -1, 2, 1, 1])
amplitude_mid = 0.5 * np.array([0.5, -0.5, -2, 1, 2])

In [None]:
n_steps = 100
physics_to_ctrl_ratio = 10
frames = []

# Reset physics to initial default state.
physics.reset()

# Let wings fold.
for i in range(n_steps):
    pixels = physics.render(camera_id=cameras['hero'], **frame_size)
    frames.append(pixels)
    for _ in range(physics_to_ctrl_ratio):
        physics.step()

# Twist head.
for i in range(n_steps):
    pixels = physics.render(camera_id=cameras['hero'], **frame_size)
    frames.append(pixels)
    physics.named.data.ctrl['head_twist'] = 1.5 * np.sin(2*np.pi * i/n_steps)
    for _ in range(physics_to_ctrl_ratio):
        physics.step()

# Move middle right leg.
for i in range(n_steps+50):
    if i <= n_steps:
        physics.named.data.ctrl[leg_actuators_L2] = amplitude_mid * np.sin(np.pi * i/n_steps)
    for _ in range(physics_to_ctrl_ratio):
        physics.step()
    pixels = physics.render(camera_id=cameras['hero'], **frame_size)
    frames.append(pixels)

# Activate middle leg adhision to prevent slipping when front legs are lifted later.
physics.named.data.ctrl[['adhere_claw_T2_right', 'adhere_claw_T2_left']] = 1.

# Lift fronts legs with lag.
for i in range(n_steps+100):
    left_angle = np.pi * i/n_steps
    right_angle = np.pi * i/n_steps - np.pi/5
    if left_angle <= np.pi:
        physics.named.data.ctrl[leg_actuators_L1] = amplitude_front * np.sin(left_angle)
    if 0 < right_angle <= np.pi:
        physics.named.data.ctrl[leg_actuators_R1] = amplitude_front * np.sin(right_angle)
    for _ in range(physics_to_ctrl_ratio):
        physics.step()
    pixels = physics.render(camera_id=cameras['hero'], **frame_size)
    frames.append(pixels)

display_video(frames)

In [None]:

wing_act_indices = []  # Force actuators.
body_act_indices = []  # Position actuators.
# Loop over all actuators.
for i in range(physics.model.nu):
    name = physics.model.id2name(i, 'actuator')
    # Store wing actuator indices and rest of indices separately.
    if 'wing' in name:
        wing_act_indices.append(i)
    else:
        body_act_indices.append(i)

print(wing_act_indices)
print(body_act_indices)

In [None]:
n_body_actions = len(body_act_indices)
n_wing_actions = len(wing_act_indices)

n_steps = 300
physics_to_ctrl_ratio = 10
frames = []

# Reset simulatiomn to initial default state.
physics.reset()

for i in range(n_steps):
    pixels = physics.render(camera_id=cameras['side'], **frame_size)
    frames.append(pixels)
    physics.named.data.ctrl[body_act_indices] = np.random.uniform(-.3, .3, n_body_actions)
    physics.named.data.ctrl[wing_act_indices] = np.random.uniform(-.005, .005, n_wing_actions)
    for _ in range(physics_to_ctrl_ratio):
        physics.step()

display_video(frames)

In [None]:
env = walk_on_ball()

In [None]:
env.observation_spec()

In [None]:
env.action_spec()

In [None]:
timestep = env.reset()

pixels = env.physics.render(camera_id=cameras['track1'], **frame_size)
PIL.Image.fromarray(pixels)

In [None]:
n_actions = env.action_spec().shape[0]

def random_action_policy(observation):
    del observation  # Not used by dummy policy.
    random_action = np.random.uniform(-.5, .5, n_actions)
    return random_action

frames = []
rewards = []

# Environment loop.
timestep = env.reset()
for _ in range(200):
    
    action = random_action_policy(timestep.observation)
    timestep = env.step(action)
    rewards.append(timestep.reward)

    pixels = env.physics.render(camera_id=cameras['track1'], **frame_size)
    frames.append(pixels)

display_video(frames)

In [None]:
plt.plot(rewards)
plt.xlabel('timestep')
plt.ylabel('reward')