## 받아야 하는 Observation 들!
### 로봇한테서 뜯어와야하는 값들
```
- base_ang_vel (torch.Tensor, optional): Base angular velocity (3,). Default: zeros
- base_rpy (torch.Tensor, optional): Base roll-pitch-yaw (3,). Default: zeros

- joint_pos (torch.Tensor, optional): Joint positions (12,). Default: zeros
- joint_vel (torch.Tensor, optional): Joint velocities (12,). Default: zeros
```

### 연산해서 만들어야되는 값들
```
- projected_gravity (torch.Tensor, optional): Projected gravity vector (3,). Default: [0, 0, -1]
- base_lin_vel (torch.Tensor, optional): Base linear velocity (3,). Default: zeros
```

### 기록해뒀다가 써야되는 값들
```
- actions (torch.Tensor, optional): Previous actions (12,). Default: zeros
```

### 제어타겟 
```
- velocity_commands (torch.Tensor, optional): Velocity commands (3,). Default: zeros
```

In [309]:
from curses.ascii import ctrl
from ucl.common import byte_print, decode_version, decode_sn, getVoltage, pretty_print_obj, lib_version
from ucl.lowState import lowState
from ucl.lowCmd import lowCmd
from ucl.unitreeConnection import unitreeConnection, LOW_WIFI_DEFAULTS, LOW_WIRED_DEFAULTS
from ucl.enums import GaitType, SpeedLevel, MotorModeLow
from ucl.complex import motorCmd, motorCmdArray
import time
import sys
import math
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
from datetime import datetime
import glob

import threading


# You can use one of the 3 Presets WIFI_DEFAULTS, LOW_CMD_DEFAULTS or HIGH_CMD_DEFAULTS.
# IF NONE OF THEM ARE WORKING YOU CAN DEFINE A CUSTOM ONE LIKE THIS:

class Robot :
    def __init__(self):
        ## Initialize connection ##
        print(f'Running lib version: {lib_version()}')
        self.conn = unitreeConnection(LOW_WIFI_DEFAULTS)
        self.conn.startRecv()
        ## == Initialize connection == ##

        ## instantiate lowlevel command and state objects ##
        self.lcmd = lowCmd()
        self.lstate = lowState()
        self.mCmdArr = motorCmdArray()


        # 로봇 제어에 사용되는 딕셔너리, lowcmd에서 이미 지정된 순서서
        self.d = {'FR_0':0, 'FR_1':1, 'FR_2':2,
                'FL_0':3, 'FL_1':4, 'FL_2':5,
                'RR_0':6, 'RR_1':7, 'RR_2':8,
                'RL_0':9, 'RL_1':10, 'RL_2':11 }

        # IsaacLab 에서 사용되는 Joint 순서, 제어시 사용함
        self.joint = [
            'FL_0', 'FL_1', 'FL_2', # FL_hip, FL_thigh, FL_calf
            'FR_0', 'FR_1', 'FR_2', # FR_hip, FR_thigh, FR_calf
            'RL_0', 'RL_1', 'RL_2', # RL_hip, RL_thigh, RL_calf
            'RR_0', 'RR_1', 'RR_2'  # RR_hip, RR_thigh, RR_calf
            ]
        
        # IsaacLab 에서 사용되는 기본 관절 위치
        self.default_joint_pos = np.array([
            0.1,   # FL_hip
            0.8,   # FL_thigh
            -1.5,  # FL_calf
            -0.1,  # FR_hip
            0.8,   # FR_thigh
            -1.5,  # FR_calf
            0.1,   # RL_hip
            1.0,   # RL_thigh
            -1.5,  # RL_calf
            -0.1,  # RR_hip
            1.0,   # RR_thigh
            -1.5   # RR_calf
        ])

        ## get initial state & print Log ##
        # Send empty command to tell the dog the receive port and initialize the connection
        cmd_bytes = self.lcmd.buildCmd(debug=False)
        self.conn.send(cmd_bytes)


        ## Robot Control Parameters
        # control HZ : 50Hz
        self.ctrldt = 1/50 # Ctrl loop HZ : 50Hz
        # PD Parameters
        self.kp = 8
        self.kd = 1
        # Motor Torque Limit
        self.tau = [20,10,10,20,10,10,20,10,10,20,10,10]

        ## Define Default Robot Status values
        # robot status
        self.qlist = np.zeros(12) #joint pos
        self.dqlist = np.zeros(12) #joint vel
        self.rpy = np.zeros(3) #base rpy
        self.gyroscope = np.zeros(3) #base ang vel
        self.accelerometer = np.zeros(3) #base lin acc
        self.quaternion = np.zeros(4) #base quat

        # Calculat'd
        self.base_lin_vel = np.zeros(3) # base lin vel
        self.projected_gravity = np.zeros(3) # projected gravity
        self.filtered_accel = np.zeros(3)

        # Policy rel.
        self.actions = np.zeros(12) # actions(Current Target)
        self.last_actions = np.zeros(12) # last actions(Previous Target)

        # Control Target
        self.velocity_commands = np.zeros(3) # velocity commands

        # Timestamp
        self.timestamp = time.time() # Initialize timestamp
        


    ## define a function to print log data
    # 나중에 안쓰는 메서드일듯
    def printLog(self):
        print(f'Cycles:\t\t\t{self.lstate.bms.cycle}')
        data = self.conn.getData()
        packet = data[-1] #Get Last Data
        print('+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=')
        self.lstate.parseData(packet)

        print(f'SN [{byte_print(self.lstate.SN)}]:\t{decode_sn(self.lstate.SN)}')
        print(f'Ver [{byte_print(self.lstate.version)}]:\t{decode_version(self.lstate.version)}')
        print(f'SOC:\t\t\t{self.lstate.bms.SOC} %')
        print(f'Overall Voltage:\t{getVoltage(self.lstate.bms.cell_vol)} mv') #something is still wrong here ?!
        print(f'Current:\t\t{self.lstate.bms.current} mA')
        print(f'Temps BQ:\t\t{self.lstate.bms.BQ_NTC[0]} °C, {self.lstate.bms.BQ_NTC[1]}°C')
        print(f'Temps MCU:\t\t{self.lstate.bms.MCU_NTC[0]} °C, {self.lstate.bms.MCU_NTC[1]}°C')
        print(f'FootForce:\t\t{self.lstate.footForce}')
        print(f'FootForceEst:\t\t{self.lstate.footForceEst}')
        print(f'IMU Temp:\t\t{self.lstate.imu.temperature}')
        print(f'MotorState FR_0 MODE:\t\t{self.lstate.motorState[self.d["FR_0"]].mode}')
        print('+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=')

    def retrive_data(self):
        """
        conn에서 최신 패킷(가장 마지막)을 받아 lstate에 저장
        이후 값 파싱해서 self.* 에 저장
        """
        data = self.conn.getData()
        self.lstate.parseData(data[-1])
        self.q_list = np.array([self.lstate.motorState[self.d[joint]].q for joint in self.joint]) # 현재 관절위치 (절대위치!!!!)
        self.q_list_rel = np.array(self.default_joint_pos) - self.q_list # 현재 관절위치 상대위치

        self.dq_list = np.array([self.lstate.motorState[self.d[joint]].dq for joint in self.joint])

        # process IMU data
        self.rpy = np.array(self.lstate.imu.rpy)
        self.quaternion = np.array(self.lstate.imu.quaternion)
        self.gyroscope = np.array(self.lstate.imu.gyroscope)
        self.accelerometer = np.array(self.lstate.imu.accelerometer)

        # Calculate Extra Values 
        self.projected_gravity = self.estim_projected_gravity()
        self.estimate_lin_vel()

    def estim_projected_gravity(self):
        """
        projected_gravity 계산 - Quaternion 우선 사용!
        """
        # Quaternion 사용
        w, x, y, z = self.quaternion
        return np.array([
            2 * (x * z - w * y),
            2 * (w * x + y * z),
            w**2 - x**2 - y**2 + z**2
        ])
    
        
    def estimate_lin_vel(self):
        """
        가속도 적분으로 base_lin_vel 추정 (비추천)
        
        Args:
            accelerometer: (3,) 현재 가속도
            quaternion: (w, x, y, z)
            dt: 시간 간격 (예: 0.02초)
            prev_velocity: 이전 속도
            prev_filtered_accel: 이전 필터링된 가속도
            alpha: 필터 계수
            decay: 드리프트 감쇠
        
        Returns:
            velocity: (3,) 추정 속도
            filtered_accel: (3,) 필터링된 가속도
        """

        ## Set Default Values
        self.alpha = 0.9
        self.decay = 0.999
        
        # Set Previous Values
        prev_velocity = np.array(self.base_lin_vel)
        prev_filtered_accel = np.array(self.filtered_accel)


        # 1. projected_gravity
        pg = self.estim_projected_gravity()
        


        # 2. 중력 제거
        linear_accel = np.array(self.accelerometer) - pg * 9.81
        
        # 3. 필터링
        filtered_accel = self.alpha * np.array(prev_filtered_accel) + (1 - self.alpha) * linear_accel
        
        # 4. 적분
        velocity = prev_velocity + filtered_accel * self.ctrldt 
        
        # 5. 드리프트 보정
        velocity *= self.decay
        velocity[2] = 0 # Z축속도 0 으로 고정
        
        # Write output to class variables
        self.base_lin_vel = velocity
        self.filtered_accel = filtered_accel


    """
    아래 메서드들 이미 retrive_data 에서 처리됨, 없애도될듯?
    """
    def get_joint_pos(self):
        """
        self.lstate에서 최신 패킷을 받아 12개 관절의 각도(q, radian) 리스트를 반환
        --> **joint_pos** @ IsaacLab

        Returns:
            q_list (np.ndarray): 12개 관절의 각도 리스트 (radian, joint 순서는 global 'joint' 리스트와 동일)
                        오류시 None 반환
        """
        self.q_list = np.array([self.lstate.motorState[self.d[joint]].q for joint in self.joint])
        return self.q_list
    
    def get_joint_vel(self):
        """
        self.lstate에서 최신 패킷을 받아 12개 관절의 속도(dq, radian/s) 리스트를 반환
        --> **joint_vel** @ IsaacLab
        """
        self.dq_list = np.array([self.lstate.motorState[self.d[joint]].dq for joint in self.joint])
        return self.dq_list

    def get_rpy(self):
        """
        self.lstate에서 최신 패킷을 받아 로봇의 imu RPY 각도(roll, pitch, yaw, radian) 리스트를 반환
        --> **base_rpy** @ IsaacLab
        """
        self.rpy = np.array(self.lstate.imu.rpy)
        return self.rpy
    
    def get_gyroscope(self):
        """
        self.lstate에서 최신 패킷을 받아 로봇의 imu 각속도(roll, pitch, yaw, radian/s) 리스트를 반환
        --> **base_gyroscope** @ IsaacLab
        """
        self.gyroscope = np.array(self.lstate.imu.gyroscope)
        return self.gyroscope

    def move_motor(self,target_pos):
        """
        관절들들을 `target_pos` 로 이동
        """
        joint = self.joint
        for i in range(len(joint)):
            self.mCmdArr.setMotorCmd(joint[i],  motorCmd(mode=MotorModeLow.Servo, q=target_pos[i], dq = 0, Kp = self.kp, Kd = self.kd, tau = self.tau[i]))
        
        self.lcmd.motorCmd = self.mCmdArr
        cmd_bytes = self.lcmd.buildCmd(debug=False)
        self.conn.send(cmd_bytes)
    
    def do_motion(self,init_pos,target_pos,duration,debug=False):
        """
        특정 관절을 `init_pos` 에서 `target_pos` 로 선형 보간하여 이동

        Args:
            init_pos (list): 초기 관절 위치
            target_pos (list): 타겟 관절 위치
            duration (float): 이동 시간 (초)
        """
        hz = 1/self.ctrldt
        motion_step = int(duration*hz)
        print(f"do motion while : {motion_step/hz} seconds")
        for i in range(motion_step):
            self.retrive_data()
            qDes = self.jointLinearInterpolation(init_pos,target_pos,i/motion_step)
            qDes = list(qDes)
            if debug:
                # print(f"target : {self.rad2deg(qDes,2)}")
                print(f"target : {qDes}")
                print(f"Step : {i}/{motion_step}")
            else:
                self.move_motor(qDes)
            time.sleep(self.ctrldt)

    ## 별도의 쓰레드에서 loop_func(루프문) 함수 실행
    def follow_target_pos(self):
        """
        별도의 쓰레드에서 loop_func(루프문) 함수 실행
        loop_func: 루프를 돌릴 함수. self를 인자로 받거나, 필요한 경우 *args, **kwargs로 인자 전달
        """

        def robot_control_loop():
            while True : 
                self.timestamp = time.time()
                self.retrive_data() # Refresh Data
                ## 
                # Do something HERE!
                ## 
                loop_delay = self.ctrldt - (time.time() - self.timestamp) # 루프 지연시간이 보상된 sleeptime
                time.sleep(loop_delay)


        self.control_thread = threading.Thread(target=robot_control_loop, daemon=True)
        self.control_thread.start()



    ## Utils
    def deg2rad(self,deg,digit=0):
        """
        numpy array 또는 list 형식의 각도를 radian으로 변환
        """
        deg = np.array(deg)
        if digit != 0:
            result = np.array([round(math.radians(d),digit) for d in deg])
        else:
            result = np.array([math.radians(d) for d in deg])
        return result
    
    def rad2deg(self,rad,digit=0):
        """
        numpy array 또는 list 형식의 각도를 도로 변환
        """
        rad = np.array(rad)
        if digit != 0:
            result = np.array([round(math.degrees(r),digit) for r in rad])
        else:
            result = np.array([math.degrees(r) for r in rad])
        return result


    ## Linear interpolation between two joint positions
    # Input : initPos(rate=0) ~~~~~~ targetPos(rate=1)
    # Output : interpolated_position =(p)
    def jointLinearInterpolation(self,initPos, targetPos, rate):
        rate = np.fmin(np.fmax(rate, 0.0), 1.0)
        p = initPos*(1-rate) + targetPos*rate
        return p

In [310]:
robot = Robot()

Running lib version: 0.2


In [311]:
robot.retrive_data() # 먼저 self.lstate 에 최신패킷 넣고

# 그다음 값 파싱해서 쓸것
print("default_joint_pos : ",robot.rad2deg(robot.default_joint_pos,2))
print("joint_pos : ",robot.rad2deg(robot.get_joint_pos(),2))
print("joint_vel : ",robot.rad2deg(robot.get_joint_vel(),2))
print("rpy : ",robot.rad2deg(robot.get_rpy(),2))
print("gyroscope : ",robot.rad2deg(robot.get_rpy(),2))

default_joint_pos :  [  5.73  45.84 -85.94  -5.73  45.84 -85.94   5.73  57.3  -85.94  -5.73
  57.3  -85.94]
joint_pos :  [  22.57   66.39 -159.17  -21.2    66.87 -157.21   19.84   68.04 -158.17
  -17.49   66.9  -164.14]
joint_vel :  [ 0.57  0.56  0.31 -0.02  0.07  0.11 -0.39 -0.19  0.08 -0.05 -0.01  0.02]
rpy :  [ -0.46  -0.02 -13.31]
gyroscope :  [ -0.46  -0.02 -13.31]


In [308]:
robot.retrive_data()
print("joint_pos : ",robot.rad2deg(robot.get_joint_pos(),2))

robot.get_joint_pos()


IndexError: list index out of range

In [None]:
robot.retrive_data()
robot.default_joint_pos
init = robot.get_joint_pos()
init


# target : [0.10259079174200696, 0.8020568057696025, -1.5082515637079876, -0.1020708256562551, 0.802230801264445, -1.5080478302637736, 0.10216408030192059, 1.0011433211962382, -1.508274170557658, -0.10127957280476889, 1.001268468697866, -1.5092029507954916]


array([ 0.48861876,  1.10852087, -2.73773456, -0.41062385,  1.13462019,
       -2.70717454,  0.42461205,  1.17149818, -2.74112558, -0.29193592,
        1.1902703 , -2.88044262])

In [289]:
robot.retrive_data()
init = robot.get_joint_pos()
robot.tau = [8,4,4,8,4,4,8,4,4,8,4,4]
robot.do_motion(init,robot.default_joint_pos,1,debug=False)
robot.do_motion(robot.default_joint_pos,robot.default_joint_pos,5,debug=False)

do motion while : 1.0 seconds
do motion while : 5.0 seconds


In [260]:
while True : 
    robot.retrive_data()
    robot.move_motor(init)
    time.sleep(0.1)

KeyboardInterrupt: 

In [203]:
a = robot.get_joint_pos()
robot.tau = float(10)

robot.do_motion(a,init,1,debug=False)

In [135]:
while True : 
    robot.retrive_data() # 먼저 self.lstate 에 최신패킷 넣고
    print(robot.base_lin_vel)
    time.sleep(robot.ctrldt)




[ 5.45478473e-05 -1.18308958e-04  0.00000000e+00]
[ 8.84588790e-05 -2.42537908e-04  0.00000000e+00]
[ 0.00013328 -0.00031131  0.        ]
[ 0.0002191  -0.00035352  0.        ]
[ 0.00031269 -0.00031947  0.        ]
[ 0.00040826 -0.00027965  0.        ]
[ 0.00050574 -0.00026095  0.        ]
[ 0.00061261 -0.00015834  0.        ]
[ 0.0007161  -0.00010238  0.        ]
[ 8.20987774e-04 -4.25420707e-05  0.00000000e+00]
[ 9.70361076e-04 -2.48418546e-05  0.00000000e+00]
[1.13124552e-03 3.90344055e-05 0.00000000e+00]
[1.26902059e-03 7.22212930e-05 0.00000000e+00]
[1.42659088e-03 7.58638492e-05 0.00000000e+00]
[1.56627567e-03 6.99332313e-06 0.00000000e+00]
[ 1.71549536e-03 -3.66980115e-05  0.00000000e+00]
[ 1.83391572e-03 -6.04470342e-05  0.00000000e+00]
[ 1.96977953e-03 -5.12484429e-05  0.00000000e+00]
[ 2.02854868e-03 -3.32564959e-05  0.00000000e+00]
[ 2.09406747e-03 -2.42729018e-05  0.00000000e+00]
[ 2.15375481e-03 -6.70238607e-06  0.00000000e+00]
[2.21536038e-03 4.72518245e-05 0.00000000e+00]

KeyboardInterrupt: 

In [None]:
robot.gyroscope = robot.deg2rad([0.0462,  0.0558,  2.6461])
robot.gyroscope = [0.0462,  0.0558,  2.6461]

robot.calc_projected_gravity('rpy')

# result : array([-8.08011361e-04,  4.27703094e-03, -9.99990527e-01])
# GT : 0.0558, -0.0461, -0.9974

rpy :  [-0.004277045372873545, -0.0008080114494077861, 0.16981452703475952]


array([-8.08011361e-04,  4.27703094e-03, -9.99990527e-01])

array([-0.00166241,  0.00469371, -0.9999876 ])