RL policy based on the [SoloParkour: Constrained Reinforcement Learning for Visual Locomotion from Privileged Experience](https://arxiv.org/abs/2409.13678). 

# Flat Ground

## Test In Simulation

In [1]:
from Go2Py.robot.fsm import FSM
from Go2Py.robot.remote import KeyboardRemote, XBoxRemote
from Go2Py.robot.safety import SafetyHypervisor
from Go2Py.sim.mujoco import Go2Sim
from Go2Py.control.neuro_diff_sim import *
import torch

ModuleNotFoundError: No module named 'Go2Py'

In [2]:
from Go2Py.robot.model import FrictionModel
friction_model = None
Fs = np.zeros(12)
mu_v = np.zeros(12)
#mu_v[[2,5,8,11]] = np.array([0.2167, -0.0647, -0.0420, -0.0834])
#Fs[[2,5,8,11]] = np.array([1.5259, 1.2380, 0.8917, 2.2461])

#mu_v[[0,3,6,9]] = np.array([0., 0., 0., 0.])
#Fs[[0,3,6,9]] = np.array([1.5, 1.5, 1.5, 1.5])
#mu_v[[2,5,8,11]] = np.array([0., 0., 0., 0.])
#Fs[[2,5,8,11]] = np.array([1.5, 1.5, 1.5, 1.5])

friction_model = FrictionModel(Fs=1.5, mu_v=0.3)
#friction_model = FrictionModel(Fs=0., mu_v=0.)
#friction_model = FrictionModel(Fs=Fs, mu_v=mu_v)
robot = Go2Sim(dt = 0.001, friction_model=friction_model)

In [3]:
remote = KeyboardRemote()# XBoxRemote() # KeyboardRemote()
robot.sitDownReset()
safety_hypervisor = SafetyHypervisor(robot)

In [4]:
def getRemote(remote):
    commands = remote.getCommands()
    commands[0] *= 0.6
    commands[1] *= 0.6
    zero_commands_xy = np.linalg.norm(commands[:2]) <= 0.2
    zero_commands_yaw = np.abs(commands[2]) <= 0.2
    if zero_commands_xy:
        commands[:2] = np.zeros_like(commands[:2])
    if zero_commands_yaw:
        commands[2] = 0
    return commands

In [5]:
class NeuroDiffSimController:
    def __init__(self, robot, remote, checkpoint):
        self.remote = remote
        self.robot = robot
        self.policy = Policy(checkpoint)
        self.command_profile = CommandInterface()
        self.agent = NeuroDiffSimAgent(self.command_profile, self.robot)
        self.hist_data = {}

    def init(self):
        self.obs = self.agent.reset()
        self.policy_info = {}
        self.command_profile.yaw_vel_cmd = 0.0
        self.command_profile.x_vel_cmd = 0.0
        self.command_profile.y_vel_cmd = 0.0

    def update(self, robot, remote):
        if not hasattr(self, "obs"):
            self.init()
        commands = getRemote(remote)
        self.command_profile.yaw_vel_cmd = -commands[2]
        self.command_profile.x_vel_cmd = commands[1]
        self.command_profile.y_vel_cmd = -commands[0]

        self.obs = self.agent.get_obs()
        action = self.policy(self.obs, self.policy_info)
        _, self.ret, self.done, self.info = self.agent.step(action)
        for key, value in self.info.items():
            if key in self.hist_data:
                self.hist_data[key].append(value)
            else:
                self.hist_data[key] = [value]

In [None]:
robot.getJointStates()

In [None]:
from Go2Py import ASSETS_PATH
import os
checkpoint_path = os.path.join(ASSETS_PATH, 'checkpoints/neuro_diff_sim/neuro_diff_sim.pt')

controller = NeuroDiffSimController(robot, remote, checkpoint_path)
decimation = 20
fsm = FSM(robot, remote, safety_hypervisor, control_dT=decimation * robot.dt, user_controller_callback=controller.update)

# Slippage Analysis

In [8]:
contacts = []
feet_vels = []

while True:
    if remote.xbox_controller.digital_cmd[1]:
        break
    contact_state = robot.getFootContact()>15
    sites = ['FR_foot', 'FL_foot', 'RR_foot', 'RL_foot']
    feet_vel = [np.linalg.norm(robot.getFootVelInWorld(s)) for s in sites]
    contacts.append(contact_state)
    feet_vels.append(feet_vel)
    time.sleep(0.01)

feet_vels = np.stack(feet_vels)
contacts = np.stack(contacts)

In [None]:
import matplotlib.pyplot as plt
start = 300
end = 1200
plt.plot(contacts[start:end,0])
plt.plot(feet_vels[start:end,0])
plt.legend(['contact state', 'foot velocity'])
plt.grid(True)
plt.tight_layout()
plt.savefig('foot_slipping_fric0.2.png')

**To Do**
- Train a policy without any actuator friction and check the plots for friction 0.2 and 0.6 
- Do the same experiment for the walk-these-ways policy
- While testing the walk these ways, check the output of the adaptation module for various friction numbers, any correlation?

In [8]:
fsm.close()

## Foot Contanct Analysis

In [None]:
np.array(controller.hist_data["body_pos"])[0, 0, -1]

In [None]:
plt.plot(np.array(controller.hist_data["body_pos"])[:, 0, -1])

In [None]:
import matplotlib.pyplot as plt
# Assuming 'controller.hist_data["torques"]' is a dictionary with torque profiles
torques = np.array(controller.hist_data["body_linear_vel"])[:, 0, :, 0]

# Number of torque profiles
torque_nb = torques.shape[1]

# Number of rows needed for the grid, with 3 columns per row
n_cols = 3
n_rows = int(np.ceil(torque_nb / n_cols))

# Create the figure and axes for subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))

# Flatten the axes array for easy indexing (in case of multiple rows)
axes = axes.flatten()

# Plot each torque profile
for i in range(torque_nb):
    axes[i].plot(np.arange(torques.shape[0]) * robot.dt * decimation, torques[:, i])
    axes[i].set_title(f'Torque {i+1}')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Torque Value')
    axes[i].grid(True)

# Remove any empty subplots if torque_nb is not a multiple of 3
for j in range(torque_nb, len(axes)):
    fig.delaxes(axes[j])

# Adjust layout
plt.tight_layout()
plt.savefig("torque_profile.png")
plt.show()

In [None]:
import matplotlib.pyplot as plt
# Assuming 'controller.hist_data["torques"]' is a dictionary with torque profiles
torques = np.array(controller.hist_data["torques"])

# Number of torque profiles
torque_nb = torques.shape[1]

# Number of rows needed for the grid, with 3 columns per row
n_cols = 3
n_rows = int(np.ceil(torque_nb / n_cols))

# Create the figure and axes for subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))

# Flatten the axes array for easy indexing (in case of multiple rows)
axes = axes.flatten()

# Plot each torque profile
for i in range(torque_nb):
    axes[i].plot(np.arange(torques.shape[0]) * robot.dt * decimation, torques[:, i])
    axes[i].set_title(f'Torque {i+1}')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Torque Value')
    axes[i].grid(True)

# Remove any empty subplots if torque_nb is not a multiple of 3
for j in range(torque_nb, len(axes)):
    fig.delaxes(axes[j])

# Adjust layout
plt.tight_layout()
plt.savefig("torque_profile.png")
plt.show()

In [None]:
# Extract the joint position data for the first joint over time
joint_pos = np.array(controller.hist_data["joint_vel"])[:, 0]

# Number of data points in joint_pos
n_data_points = len(joint_pos)

# Since you're plotting only one joint, no need for multiple subplots in this case.
# But to follow the grid requirement, we'll replicate the data across multiple subplots.
# For example, let's assume you want to visualize this data 9 times in a 3x3 grid.

n_cols = 3
n_rows = int(np.ceil(torque_nb / n_cols))

# Create the figure and axes for subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))

# Flatten the axes array for easy indexing (in case of multiple rows)
axes = axes.flatten()

# Plot the same joint position data in every subplot (as per grid requirement)
for i in range(n_rows * n_cols):
    axes[i].plot(joint_pos[:, i])
    axes[i].set_title(f'Joint Position {i+1}')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Position Value')

# Adjust layout
plt.tight_layout()
plt.savefig("joint_position_profile.png")
plt.show()

In [None]:
import matplotlib.pyplot as plt
# Assuming 'controller.hist_data["foot_contact_forces_mag"]' is a dictionary with foot contact force magnitudes
foot_contact_forces_mag = np.array(controller.hist_data["foot_contact_forces_mag"])

# Number of feet (foot_nb)
foot_nb = foot_contact_forces_mag.shape[1]

# Number of rows needed for the grid, with 3 columns per row
n_cols = 3
n_rows = int(np.ceil(foot_nb / n_cols))

# Create the figure and axes for subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))

# Flatten the axes array for easy indexing (in case of multiple rows)
axes = axes.flatten()

# Plot each foot's contact force magnitude
for i in range(foot_nb):
    axes[i].plot(foot_contact_forces_mag[:, i])
    axes[i].set_title(f'Foot {i+1} Contact Force Magnitude')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Force Magnitude')

# Remove any empty subplots if foot_nb is not a multiple of 3
for j in range(foot_nb, len(axes)):
    fig.delaxes(axes[j])

# Adjust layout
plt.tight_layout()
plt.savefig("foot_contact_profile.png")
plt.show()

In [None]:
# Extract the joint acceleration data for the first joint over time
joint_acc = np.array(controller.hist_data["joint_acc"])[:, 0]

# Number of data points in joint_acc
n_data_points = len(joint_acc)

# Number of feet (foot_nb)
foot_nb = joint_acc.shape[1]

# Number of rows needed for the grid, with 3 columns per row
n_cols = 3
n_rows = int(np.ceil(foot_nb / n_cols))

# Create the figure and axes for subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))

# Flatten the axes array for easy indexing
axes = axes.flatten()

# Plot the same joint acceleration data in every subplot (as per grid requirement)
for i in range(n_rows * n_cols):
    axes[i].plot(joint_acc[:, i])
    axes[i].set_title(f'Joint Acceleration {i+1}')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Acceleration Value')

# Adjust layout to prevent overlap
plt.tight_layout()
plt.show()

In [None]:
# Extract the joint jerk data over time
joint_jerk = np.array(controller.hist_data["joint_jerk"])[:, 0]

# Number of data points in joint_jerk
n_data_points = len(joint_jerk)

# Number of joints (assuming the second dimension corresponds to joints)
num_joints = joint_jerk.shape[1]

# Number of columns per row in the subplot grid
n_cols = 3
# Number of rows needed for the grid
n_rows = int(np.ceil(num_joints / n_cols))

# Create the figure and axes for subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))

# Flatten the axes array for easy indexing
axes = axes.flatten()

# Plot the joint jerk data for each joint
for i in range(num_joints):
    axes[i].plot(joint_jerk[:, i])
    axes[i].set_title(f'Joint Jerk {i+1}')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Jerk Value')

# Hide any unused subplots
for i in range(num_joints, len(axes)):
    fig.delaxes(axes[i])

# Adjust layout to prevent overlap
plt.tight_layout()
plt.show()


In [None]:
# Extract the foot contact rate data over time
foot_contact_rate = np.array(controller.hist_data["foot_contact_rate"])[:, 0]

# Number of data points in foot_contact_rate
n_data_points = foot_contact_rate.shape[0]

# Number of feet (assuming the second dimension corresponds to feet)
num_feet = foot_contact_rate.shape[1]

# Number of columns per row in the subplot grid
n_cols = 3
# Number of rows needed for the grid
n_rows = int(np.ceil(num_feet / n_cols))

# Create the figure and axes for subplots
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 5 * n_rows))

# Flatten the axes array for easy indexing
axes = axes.flatten()

# Plot the foot contact rate data for each foot
for i in range(num_feet):
    axes[i].plot(foot_contact_rate[:, i])
    axes[i].set_title(f'Foot Contact Rate {i+1}')
    axes[i].set_xlabel('Time')
    axes[i].set_ylabel('Contact Rate')

# Hide any unused subplots
for i in range(num_feet, len(axes)):
    fig.delaxes(axes[i])

# Adjust layout to prevent overlap
plt.tight_layout()
plt.show()


## Test on Real Robot (ToDo)

In [1]:
from Go2Py.robot.fsm import FSM
from Go2Py.robot.remote import XBoxRemote
from Go2Py.robot.safety import SafetyHypervisor
from Go2Py.control.neuro_diff_sim import *
from Go2Py.robot.remote import UnitreeRemote

pygame 2.6.1 (SDL 2.28.4, Python 3.8.19)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
from Go2Py.robot.interface import GO2Real
import numpy as np
robot = GO2Real(mode='lowlevel')

In [3]:
robot.getForwardVecInBody()

array([[ 0.24270522],
       [ 0.96971647],
       [-0.02727886]])

In [4]:
robot.getIMU()

{'accel': array([0.8762778 , 0.05147533, 9.44871712]),
 'gyro': array([-0.00532632, -0.00852211, -0.00745685]),
 'quat': array([ 0.01176891, -0.05038279,  0.0318548 ,  0.99815261]),
 'temp': 79.0}

In [4]:
remote = UnitreeRemote(robot) # XBoxRemote() # KeyboardRemote()
safety_hypervisor = SafetyHypervisor(robot)

In [5]:
robot.getJointStates()

{'q': [0.4028351306915283,
  -0.10426139831542969,
  -1.5858192443847656,
  -0.7083656787872314,
  -0.026054739952087402,
  -1.6221204996109009,
  0.5959703922271729,
  -0.08419299125671387,
  -1.6552941799163818,
  -0.5171579122543335,
  0.06316781044006348,
  -1.6382335424423218],
 'dq': [0.023253142833709717,
  0.06588390469551086,
  0.02022012509405613,
  -0.015502095222473145,
  0.01937761902809143,
  0.0,
  -0.07751047611236572,
  -0.023253142833709717,
  -0.006066037341952324,
  -0.03875523805618286,
  0.03875523805618286,
  -0.04246226325631142],
 'tau_est': [-0.14842969179153442,
  0.12369140982627869,
  -0.1422451138496399,
  -0.22264453768730164,
  -0.17316797375679016,
  0.23707520961761475,
  0.0,
  0.049476563930511475,
  -0.09483008086681366,
  0.024738281965255737,
  -0.049476563930511475,
  0.0],
 'temperature': [28.0,
  27.0,
  28.0,
  27.0,
  27.0,
  28.0,
  29.0,
  28.0,
  28.0,
  29.0,
  28.0,
  28.0]}

Make sure the robot can take commands from python. The next cell should make the joints free to move (no damping).

In [21]:
import numpy as np
import time
start_time = time.time()

while time.time()-start_time < 30:
    q = np.zeros(12) 
    dq = np.zeros(12)
    kp = np.ones(12)*0.0
    kd = np.ones(12)*0.0
    tau = np.zeros(12)
    tau[0] = 0.0
    robot.setCommands(q, dq, kp, kd, tau)
    time.sleep(0.02)

1747061548.557423 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7400 failed with retcode -1
1747061548.557467 [0]        tev: ddsi_udp_conn_write to udp/192.168.123.18:7410 failed with retcode -1
1747061548.657277 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7400 failed with retcode -1
1747061548.657328 [0]        tev: ddsi_udp_conn_write to udp/192.168.123.18:7410 failed with retcode -1
1747061548.657392 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7400 failed with retcode -1
1747061548.657421 [0]        tev: ddsi_udp_conn_write to udp/192.168.123.18:7410 failed with retcode -1
1747061548.757510 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7400 failed with retcode -1
1747061548.757546 [0]        tev: ddsi_udp_conn_write to udp/192.168.123.18:7410 failed with retcode -1
1747061548.857665 [0]        tev: ddsi_udp_conn_write to udp/239.255.0.1:7400 failed with retcode -1
1747061548.857703 [0]        tev: ddsi_udp_conn_write to udp/192.168.123.18:741

In [20]:
"""def getRemote(remote):
    commands = remote.getCommands()
    commands[0] *= 0.6
    commands[1] *= 0.6
    zero_commands = np.logical_and(
        np.linalg.norm(commands[:2]) <= 0.2,
        np.abs(commands[2]) <= 0.2
    )
    if zero_commands:
        commands = np.zeros_like(commands)
    return commands"""
def getRemote(remote):
    vy = -robot.getRemoteState().lx
    vx = robot.getRemoteState().ly
    omega = -robot.getRemoteState().rx
    commands = [vy, vx, omega]
    commands[0] *= 0.6
    commands[1] *= 0.6
    zero_commands = np.logical_and(
        np.linalg.norm(commands[:2]) <= 0.2,
        np.abs(commands[2]) <= 0.2
    )
    if zero_commands:
        commands = np.zeros_like(commands)
    return commands

In [8]:
class NeuroDiffSimController:
    def __init__(self, robot, remote, checkpoint):
        self.remote = remote
        self.robot = robot
        self.policy = Policy(checkpoint)
        self.command_profile = CommandInterface()
        self.agent = NeuroDiffSimAgent(self.command_profile, self.robot)
        self.hist_data = {}

    def init(self):
        self.obs = self.agent.reset()
        self.policy_info = {}
        self.command_profile.yaw_vel_cmd = 0.0
        self.command_profile.x_vel_cmd = 0.0
        self.command_profile.y_vel_cmd = 0.0

    def update(self, robot, remote):
        if not hasattr(self, "obs"):
            self.init()
        commands = getRemote(remote)
        self.command_profile.yaw_vel_cmd = -commands[2]
        self.command_profile.x_vel_cmd = commands[1]
        self.command_profile.y_vel_cmd = -commands[0]

        self.obs = self.agent.get_obs()
        action = self.policy(self.obs, self.policy_info)
        _, self.ret, self.done, self.info = self.agent.step(action)
        for key, value in self.info.items():
            if key in self.hist_data:
                self.hist_data[key].append(value)
            else:
                self.hist_data[key] = [value]

In [9]:
from Go2Py import ASSETS_PATH 
import os
checkpoint_path = os.path.join(ASSETS_PATH, 'checkpoints/neuro_diff_sim/neuro_diff_sim.pt')

controller = NeuroDiffSimController(robot, remote, checkpoint_path)

Sequential(
  (0): Linear(in_features=305, out_features=256, bias=True)
  (1): ELU(alpha=1.0)
  (2): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
  (3): Linear(in_features=256, out_features=128, bias=True)
  (4): ELU(alpha=1.0)
  (5): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
  (6): Linear(in_features=128, out_features=64, bias=True)
  (7): ELU(alpha=1.0)
  (8): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  (9): Linear(in_features=64, out_features=12, bias=True)
  (10): Identity()
)
Parameter containing:
tensor([-1., -1., -1., -1., -1., -1., -1., -1., -1., -1., -1., -1.],
       requires_grad=True)


  agent_init, _, _, self.obs_rms, _, _ = torch.load(checkpoint_path, map_location="cpu")


Exported model has been tested with ONNXRuntime, and the result looks good!
p_gains: [20. 20. 20. 20. 20. 20. 20. 20. 20. 20. 20. 20.]




In [12]:
fsm = FSM(robot, remote, safety_hypervisor, control_dT=1./50., user_controller_callback=controller.update)

In [19]:
fsm.close()

In [15]:
N=0

In [16]:
N+=1
from copy import deepcopy
import time
actuatornet_dataset = []

start_time = time.time()
while time.time() - start_time < 120:
    tic = time.time()
    state = robot.getJointStates()
    imu_data = robot.getIMU()
    
    actuatornet_dataset.append([deepcopy(robot.latest_command), deepcopy(state), deepcopy(imu_data)])
    while time.time()-tic<0.02:
        time.sleep(0.0001)


KeyboardInterrupt: 

In [17]:
import pickle
with open(f'/home/Go2py/examples/data/actuatorNet/an-joints-data.pkl', 'wb') as f:
    pickle.dump(actuatornet_dataset, f)

In [18]:
N

1