In [1]:
import sys
sys.path.append("/usr/local/lib/python3.8/site-packages")
from FR3Py.interfaces import FR3Real

robot = FR3Real()

Interface Running...


In [3]:
import numpy as np

from tvcbf_control.board_fr3_controller import BoardCBFQPController
from tvcbf_control.board_evasive_controller import BoardEvasiveController
from tvcbf_control.board_expand_controller import BoardExpandController
from tvcbf_control.state_estimation_quat import StateEstimatorQuaternion

In [4]:
control_interval = 10
controller = BoardExpandController()
time_interval = control_interval * 1e-3
observer = StateEstimatorQuaternion(dt=time_interval)

In [5]:
# initial guess for observers
pos_mean_tm1 = np.array([[1.0], [0.0], [1.0], [0.8], [0.0], [0.0]])
pos_covariance_tm1 = np.diag([1e-6, 1e-6, 1e-6, 2e-5, 2e-5, 2e-5])
ori_mean_tm1 = np.array([0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0])[:, np.newaxis]
ori_covariance_tm1 = np.diag([1e-6, 1e-6, 1e-6, 1e-6, 5e-5, 5e-5, 5e-5])

In [6]:
state = robot.get_state()
q, dq = state['q'], state['dq']

fake_pos = np.ones(3)
fake_vel = np.zeros(3)
fake_quat = np.array([0, 0, 0, 1])

dq_target, _info = controller.controller(
    q,
    dq,
    fake_pos,
    fake_quat,
    fake_vel,
    fake_pos,
    fake_quat,
    fake_vel,
    time_interval,
)

In [7]:
import rospy
import tf
import time
import threading
from scipy.spatial.transform import Rotation
import pickle
from copy import deepcopy

rospy.init_node("tf_listener_node")

In [8]:
def tf_listener_thread():

    def get_T(p, R):
        T = np.eye(4)
        T[:3, :3] = R
        T[:3, 3] = p

        return T

    global board_pos
    global board_ori

    camera_to_base_transform = np.load("/home/bolun/camera_to_grey_base_transform.npy")

    listener = tf.TransformListener()

    # the frame where the data originated
    target_frame = "/camera_color_optical_frame"

    # the frame where the data should be transformed
    source_frame = "/wall"

    listener.waitForTransform(target_frame, source_frame, rospy.Time(), rospy.Duration(4.0))

    # Main loop to continuously update the pose
    while not rospy.is_shutdown():
        try:
            (trans, quat) = listener.lookupTransform(target_frame, source_frame, rospy.Time(0))
            # print(trans, quat)
            
            pos = np.array(trans)
            Rot = Rotation.from_quat(quat).as_matrix()

            T = camera_to_base_transform @ get_T(pos, Rot)
            board_pos = T[:3, 3]
            board_rot = T[:3, :3]
            board_ori = Rotation.from_matrix(board_rot).as_quat()

            rospy.sleep(0.001)  # Adjust the sleep duration as needed
        except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
            rospy.logwarn("Error occurred while retrieving TF transform.") 

# Initialize the global pose variable
board_pos = None
board_ori = None

# Create and start the thread
tf_thread = threading.Thread(target=tf_listener_thread)
tf_thread.start()

In [9]:
board_ori, board_pos

(array([ 0.62212911, -0.13069444, -0.16045386,  0.7550688 ]),
 array([ 1.04914565,  1.50178269, -0.13161204]))

In [10]:
start_time = time.time()
history = []

for i in range(1000000):
    current_time = time.time() - start_time
    state = robot.get_state()
    q, dq = state['q'], state['dq']
    
    if i % control_interval == 0:
        if board_ori[3] < 0:
            _board_ori = -board_ori
        else:
            _board_ori = board_ori

        pos_mean_t, pos_covariance_t = observer.position_estimation(
                pos_mean_tm1, pos_covariance_tm1, board_pos[:, np.newaxis]
        )
        # ori_mean_t is in [qx, qy, qz, qw, ωx, ωy, ωz]
        ori_mean_t, ori_covariance_t = observer.orientation_estimation(
            ori_mean_tm1, ori_covariance_tm1, _board_ori[:, np.newaxis]
        )

        # updated estimation
        pos_mean_tm1 = deepcopy(pos_mean_t)
        pos_covariance_tm1 = deepcopy(pos_covariance_t)
        ori_mean_tm1 = deepcopy(ori_mean_t)
        ori_covariance_tm1 = deepcopy(ori_covariance_t)
        
        # get estimated configuration of the board
        est_board_pos = pos_mean_t[:3, 0]
        est_board_vel = pos_mean_t[3:, 0]
        est_board_ori = ori_mean_t[:4, 0]

        # get the estimated next configuration of the board
        next_board_pos = observer.get_next_position(pos_mean_t)[:, 0]
        next_board_vel = pos_mean_t[3:, 0]
        next_board_ori = observer.get_next_orientation(ori_mean_t)[:4, 0]
        
        dq_target, _info = controller.controller(
            q,
            dq,
            est_board_pos,
            est_board_ori,
            est_board_vel,
            next_board_pos,
            next_board_ori,
            next_board_vel,
            time_interval,
        )

        data = {
            'time': deepcopy(current_time),
            'board_pos': deepcopy(board_pos),
            'board_ori': deepcopy(_board_ori),
            'est_board_pos': deepcopy(est_board_pos),
            'est_board_ori': deepcopy(est_board_ori),
            'next_board_pos': deepcopy(next_board_pos),
            'next_board_ori': deepcopy(next_board_ori),
            'dq_target': deepcopy(dq_target),
            "pos_mean_tm1": deepcopy(pos_mean_tm1),
            "pos_covariance_tm1": deepcopy(pos_covariance_tm1),
            "ori_mean_tm1": deepcopy(ori_mean_tm1),
            "ori_covariance_tm1": deepcopy(ori_covariance_tm1),
        }
        history.append({**data, **deepcopy(_info)})
        
    robot.send_joint_command(dq_target[:7])
    time.sleep(1/1000)

KeyboardInterrupt: 

In [11]:
# save data to pickle file
with open("../data/data_board_0615_11_wb.pkl", "wb") as f:
    pickle.dump(history, f)