-
Notifications
You must be signed in to change notification settings - Fork 1
dgm_head
This tutorial outlines how to program a robot using dynamic graph manager and what is called dynamic graph head (DGH). With DGH it's possible to write controllers fully in python.
The idea behind DGH is to give the minimal tools for python to talk with the dynamic graph manager control process. Such a minimal example looks like this:
import time
from robot_properties_solo.solo8wrapper import Solo8Config
import dynamic_graph_manager_cpp_bindings
# Create the dgm communication to the control process.
head = dynamic_graph_manager_cpp_bindings.DGMHead(Solo8Config.dgm_yaml_path)
# Get a reference to the joint velocities.
joint_velocities = head.get_sensor('joint_velocities')
D = 0.05
dt = 0.001
next_time = time.time() + dt
# In your control loop:
while (True):
if time.time() >= next_time:
next_time += dt
###
# Get the latest measurements from the shared memory.
head.read()
###
# Compute joint torques using D controller.
tau = -D * joint_velocities
head.set_control('ctrl_joint_torques', tau)
###
# Write the results into shared memory again.
head.write()
time.sleep(0.0001)
At the point of writing, the blmc main DG application has to be started as root user. Therefore, also the python script needs to be started as root user to get access to the shared memory.
Steps to start a script might look like this then:
- In a first terminal, start the main dgm program for your robot, for example solo12 setup in new york
$ sudo -s # Stay root user
$ source ~/workspace/install/setup.bash
$ source /opt/openrobots/setup.bash
# For the setup in New York
$ ros2 run solo dg_main_solo12 dgm_parameters_solo12_nyu.yaml
- Run calibration or other setup procedures.
$ ros2 service call /dynamic_graph_manager/calibrate_joint_position mim_msgs/srv/JointCalibration
-
Do not start the DG process. This is as only one control process should be active at a single time and we intend to use DGH
-
In a new terminal, do the following
$ sudo -s # Stay root user
$ source ~/workspace/install/setup.bash
$ source /opt/openrobots/setup.bash
$ taskset 0x10 ipython # Start the ipython terminal and pin it to CPU #4
# From within ipython, run your script then using `run your_script.py`
If you want to have an interactive prompt from ipython
, it's necessary to run the controller off the main thread in a different thread.
For managing different controllers, running the controller on a separate thread and doing data logging, you can use the ThreadHead
implemented in the dynamic_graph_head
repo. One of the benefits of using the thread_head infrastructure is also that switching from simulation to the real robot is easy.
An example to run a PD controller, which holds the current position, is given below (taken from here](https://github.com/machines-in-motion/dynamic_graph_head/blob/main/examples/solo12_real_pd.py)):
import numpy as np
from dynamic_graph_head import ThreadHead, HoldPDController
import time
import dynamic_graph_manager_cpp_bindings
from robot_properties_solo.solo12wrapper import Solo12Config
print("Finished imports")
###
# Create the dgm communication and instantiate the controllers.
head = dynamic_graph_manager_cpp_bindings.DGMHead(Solo12Config.dgm_yaml_path)
head.read()
print(head.get_sensor('slider_positions'))
# Create the controllers.
hold_pd_controller = HoldPDController(head, 3., 0.05, with_sliders=True)
thread_head = ThreadHead(
0.001, # Run controller at 1000 Hz.
hold_pd_controller, # Safety controller
head,
[]
)
# Start the parallel processing.
thread_head.start()
print("Finished controller setup")
To change the setup to run in simulation, you need to change the way the head
and thread_head
is constructed.
from bullet_utils.env import BulletEnvWithGround
from robot_properties_solo.solo12wrapper import Solo12Robot, Solo12Config
from dynamic_graph_head import ThreadHead, SimHead, SimVicon, HoldPDController
# Simulated robot.
bullet_env = BulletEnvWithGround()
robot = Solo12Robot()
bullet_env.add_robot(robot)
# Head for the simulated robot.
head = SimHead(robot, vicon_name='solo12')
thread_head = ThreadHead(
0.001, # Run controller at 1000 Hz.
hold_pd_controller, # Safety controller
head,
[],
bullet_env # Environment to step.
)
The main change here is that the head
object is now constructed from a SimHead
. The SimHead
talks to the simulator to read and write values. In addition, we need to pass the bullet_env
to the ThreadHead
constructor. This causes the simulation to step whenever the control is stepping.
In the examples so far we used the HoldPDController
. This is a controller provided by the Dynamic Graph Head implementation. It is possible to write custom controllers. Such controllers must have two methods - a warmup
and run
method. The simplest controller sending only zero torques would be defined as this:
class ZeroTorquesController:
def __init__(self, head):
self.head = head
# Zero the commands.
self.tau = np.zeros_like(head.get_sensor("joint_positions"))
def warmup(self, thread_head):
pass
def run(self, thread_head):
self.head.set_control("ctrl_joint_torques", self.tau)
and we instantiate and use this controller as follows:
# [...]
zero_ctrl = ZeroTorquesController(head)
# [...]
thread_head.swich_controllers(zero_ctrl)
Where the last command showed how to switch from one controller to the next one using the thread_head.swich_controllers
method.
When are the methods warmup
and run
called? The warmup
method gets called whenever the thread_head is switching to the current controller. In the HoldPDController
we use the warmup
method for instance to make a copy of the current joint positions to hold while the controller is running. Another common example for the warmup
method is when using it with a motion capturing system like Vicon. There, when activating the controller, you often want to use the current position as zero position (bias the vicon).
The run
method is called during each control cycle. This is where the actual control of the controller is happening. The run
method should consume the sensor values from the head
object and then set the controls on the head
object as well.
When constructing the ThreadHead
object, we are passing a safety controller as second argument. This safety controller is the default controller when no other controller gets applied (it runs when the robot gets started). In addition, whenever there is a python exception detected during calling warmup
or run
of the controller, the ThreadHead
infrastructure automatically switches to the safety controller. This ensures that there is always a controller running.
The ThreadHead
object comes with a build in data logging functionality. This functionality allows to record data from the current running controller and store the data to disk. In particular, the ThreadHead
writes data in the Machines In Motion Data Storage Format (.mds). Using the python tools from the Machines in Motion Data Utils, the format can be either read from Python or visualized using the data log viewer here (at the point of writing only works on Chrome).
The way to start the data logging is by calling the thread_head.start_logging()
. This will log by default for 30 seconds to a filename based on the start time of the logging. To stop the logging before the 30 seconds are over, you can call the thread_head.stop_logging()
method. Alternatively, you can provide a shorter logging duration to the thread_head.start_logging(5)
method, where the example here logs the data for 5
seconds.
The data logging tool in ThreadHead
always logs the current active controller (the one you switched to via thread_head.switch_controllers
). The logging works as follows: When calling thread_head.start_logging()
, ThreadHead
loops over all the fields on the current controller and checks if the attribute contains an integer, floating point number of an np.array
vector. If that's the case, these attributes are stored after each call to run
(so at each control cycle).
Common mistake: A common mistake is to start the data logging and switching directly to another controller on the command line. The idea is to catch all the output from the controller. As it turns out, this is not a good idea. When switching the controller and if data logging is happening, ThreadHead
will terminate the data logging. Therefore, the correct way to switch to the new controller and start logging right away is to switch to the new controller first and then start the data logging.
In the examples so far, we specified the control frequency at 1000 Hz. However, it is possible to run at a lower control frequency like 500 Hz easily by switching the first argument dt
to the ThreadHead
constructor:
thread_head = ThreadHead(
0.002, # Run controller at 500 Hz.
hold_pd_controller, # Safety controller
head,
[],
bullet_env # Environment to step.
)
Note: At the point of writing, the data storage does not take the dt
into account / no timing information is saved. Especially, when plotting the data using the plotting tool, the data is displayed on the time/x-axis as if it was recorded with 1000 Hz.
The use of Vicon is supported in ThreadHead
by defining a vicon object in the utils section of the constructor. This will cause the vicon data to be read before the controller runs in the main control loop.
In simulation, the Vicon support is defined as follows:
from dynamic_graph_head import SimVicon
head = SimHead(robot, vicon_name='solo12')
thread_head = ThreadHead(
0.001, # dt.
hold_pd_controller,
head, # Heads to read / write from.
[ # Utils.
('vicon', SimVicon(['solo12/solo12']))
],
bullet_env # Environment to step.
)
Note how the vicon name matches in the SimHead
definition to the on in the SimVicon
construction. On the real robot, the vicion is constructed like this:
from dynamic_graph_head import Vicon
thread_head = ThreadHead(
0.001, # dt.
hold_pd_controller,
head, # Heads to read / write from.
[ # Utils.
('vicon', Vicon('172.24.117.119:801', ['solo12/solo12']))
]
)
The constructed vicon object is then available from thread_head.vicon
either from the terminal or when warmup/running the controller.
It is important to check if the controller and ThreadHead
infrastructure runs in the given dt
time budget. To check for the time it takes to run the utils, main controller and data logging, you can use the build in thread_head.plot_timing()
method.
Out of the box ThreadHead supports controlling multiple robots at once. The idea is that each robot maps to a head
object. It is then possible to run either a single controller that controls all head
s/robots at the same time or to write one controller per robot.
For the simulation case, instantiating the ThreadHead
would look like this then:
head0 = SimHead(robot0)
head1 = SimHead(robot1)
hold_ctrl0 = HoldPDController(head0, 3., 0.05, with_sliders=True)
hold_ctrl1 = HoldPDController(head1, 3., 0.05, with_sliders=True)
thread_head = ThreadHead(
0.001, # dt.
[hold_ctrl0, hold_ctrl1],
[head0, head1], # Heads to read / write from.
[],
bullet_env # Environment to step.
)
Note how we pass two controllers for the safety controller and two heads to update from to the ThreadHead
constructor.
All our open source software are licensed against the BSD 3-clause license.
Copyright 2016-2020, New York University and Max Planck Gesellschaft.
- Home Page
- Contribute to the wiki
- Logo
- Introduction
- Our Codebase
- Build Our Codebase
- Build tools introduction
- Build chain tutorials
- Dependencies
- Building our software stack
- Troubleshooting
- Robot Tutorials
- Programming
- ODRI Robots
- MicroDrivers
- Solo12
- Bolt
- NYUFinger
- Kuka
- Debugging