In [None]:
from FingerPy.robot.interface import TeensyUDPBridge

In [None]:
bridge = TeensyUDPBridge()

In [22]:
from FingerMsgs import FingerCmd, FingerState
from cyclonedds.domain import DomainParticipant
from cyclonedds.topic import Topic
from cyclonedds.pub import DataWriter
from cyclonedds.sub import DataReader
import numpy as np

In [1]:
from dataclasses import dataclass

@dataclass
class RobotConfig:
    DoF: int
    robot_name: str
    state_fields: list
    command_fields: list
    current2Torque: float
    gear_ratio: float
    max_torque: float

config = RobotConfig
config.DoF = 3
config.robot_name = 'finger1'

config.state_fields = [
                       'q_hip',   'q_thigh',   'q_knee',
                       'dq_hip',  'dq_thigh',  'dq_knee', 
                       'tau_hip', 'tau_thigh', 'tau_knee'
                      ]

config.command_fields = [
                         'tau_ff_hip', 'tau_ff_thigh',  'tau_ff_knee',
                         'q_des_hip',  'q_des_thigh',   'q_des_knee', 
                         'dq_des_hip', 'dq_des_thigh',  'dq_des_knee', 
                         'kp_hip',     'kp_thigh',      'kp_knee',
                         'kd_hip',     'kd_thigh',      'kd_knee'
                         ]

config.current2Torque = 1.0
config.gear_ratio = 9
config.max_torque = 20.0

In [None]:
def get_last_msg(reader, topic_type):
    """ """
    last_msg = reader.take()

    if last_msg:
        while True:
            a = reader.take()
            if not a:
                break
            else:
                last_msg = a
    if last_msg:
        msg = last_msg[0]
        if type(msg) == topic_type:
            return msg
        else:
            return None

    else:
        return None

import numpy as np

class UDPCycloneDDSBridge:
    def __init__(self, udp_bridge: TeensyUDPBridge, cfg: RobotConfig):

        self.cfg = cfg
        self.state_topic_name = f'{cfg.robot_name}_robot_state'
        self.cmd_topic_name = f'{cfg.robot_name}_robot_cmd'
        self.q_idx = []
        self.dq_idx = []
        self.tau_idx = []
        self.kp_idx = []
        self.kd_idx = []
        self.q_des_idx = []
        self.dq_des_idx = []
        self.tau_ff_idx = []
        
        for i, state_name in enumerate(cfg.state_fields):
            if state_name.split('_')[0] == 'q':
                self.q_idx.append(i)
            elif state_name.split('_')[0] == 'dq':
                self.dq_idx.append(i)
            elif state_name.split('_')[0] == 'tau':
                self.tau_idx.append(i)

        for i, command_name in enumerate(cfg.command_fields):
            if command_name.split('_')[0] == 'kp':
                self.kp_idx.append(i)
            elif command_name.split('_')[0] == 'kd':
                self.kd_idx.append(i)
            elif command_name.split('_')[0] == 'q':
                self.q_des_idx.append(i)
            elif command_name.split('_')[0] == 'dq':
                self.dq_des_idx.append(i)
            elif command_name.split('_')[0] == 'tau':
                self.tau_ff_idx.append(i)

        self.udp_bridge = udp_bridge
        self.participant = DomainParticipant()
        self.cmd_topic = Topic(self.participant,self.cmd_topic_name, FingerCmd)
        self.state_topic = Topic(self.participant,self.state_topic_name, FingerState)
        self.cmd_reader = DataReader(self.participant, self.cmd_topic)
        self.state_writer = DataWriter(self.participant, self.state_topic)
        self.udp_cmd = np.zeros(len(cfg.command_fields))

    def forwardUDPStates2DDS(self):
        stamp, teensy_state = self.udp_bridge.getLatestState()
        if teensy_state is not None:
            state = np.array(teensy_state)
            q = (state[self.q_idx]/self.cfg.gear_ratio).tolist()
            dq = (state[self.dq_idx]/self.cfg.gear_ratio).tolist()
            tau = (state[self.tau_idx]*self.cfg.current2Torque * self.cfg.gear_ratio).tolist()
            state_msg = FingerState(q=q, dq=dq, tau=tau)
            self.state_writer.write(state_msg)

    def forwardDDSCmds2UDP(self):
        cmd_msg = get_last_msg(self.cmd_reader, FingerCmd)
        if cmd_msg is not None:
            self.udp_cmd[self.q_des_idx] = cmd_msg.q_des
            self.udp_cmd[self.dq_des_idx] = cmd_msg.dq_des
            self.udp_cmd[self.kp_idx] = cmd_msg.kp
            self.udp_cmd[self.kd_idx] = cmd_msg.kv
            tau_ff = np.clip(cmd_msg.tau_ff, -self.cfg.max_torque, self.cfg.max_torque)
            current = (np.array(tau_ff)/self.cfg.gear_ratio)/self.cfg.current2Torque
            self.udp_cmd[self.tau_ff_idx] = current
            self.udp_bridge.sendCommand(self.teensy_cmd)

In [None]:
dds_bridge = UDPCycloneDDSBridge('finger', 3, bridge)

In [None]:
import time
dt = 0.001
while True: 
    start=time.time()
    dds_bridge.forwardUDPStates2DDS()
    dds_bridge.forwardDDSCmds2UDP()
    while time.time()-start<dt:
        time.sleep(0.0002)