In [None]:
# 50Hz 0~20sec
# length = 1001

# data to collect
    # 1. Simulation
    #    - x, y, z, yaw (+roll, pitch?)
    # 2. Arduino
    #    - IMU Data 
    # 3. Realsense Camera
    #    - TPS coefficients
    #    - x, y, yaw
    #    - video data
    # 4. Egocentric Camera
    #    - video data

## Check camera indices

In [None]:
import cv2
camera_idx = []
for idx in range(30):
    cap = cv2.VideoCapture(idx)
    if cap.isOpened():
        camera_idx.append(idx)
        cap.release()
    else : 
        pass

print(camera_idx)

In [None]:
import cv2, time

#check camera using index
cap = cv2.VideoCapture(4)

fps = cap.get(cv2.CAP_PROP_FPS)
delay = int(1000/fps)

now = time.time()
while time.time() < now+5:
    ret,frame = cap.read()
    if not ret: break
    cv2.imshow('Camera', frame)
    if cv2.waitKey(delay) == 27:
        break
cap.release()
cv2.destroyAllWindows()


## Select Variables

In [None]:
TRAJECTORY_NUMBER = 31
REALSENSE_CAMERA_NUMBER = 12
EGOCENTRIC_CAMERA_NUMBER = 4
IMU_USB_NUMBER = 1
SNAPBOT_USB_NUMBER = 0

## Instantiate Snapbot

In [None]:
from realworld_func.class_xm430 import *

snapbot = xm430('SNAPBOT', USB_NUM=SNAPBOT_USB_NUMBER)
snapbot.connect()
leg_idx = [15, 16, 17, 18, 19, 20, 21, 22]
snapbot.IDX_LIST = leg_idx
snapbot.set_delaytime([30])
snapbot.set_pidgains(20, 0, 0)
snapbot.set_maxtorque([2000])
snapbot.set_goalspeed([1500])
snapbot.set_torque([1])

## Initialize Snapbot

In [None]:
snapbot.set_goalpos([2048, 2048, 2048, 2048, 2048, 2048, 2048, 2048])
print(snapbot.get_currpos())
print("INITIALIZE POSITION")

## Test Snapbot

In [None]:
import os
from utils.utils_real import run_snapbot

poses = np.load(os.path.join(os.getcwd(), "trajectory", "qpos_dlpg_%d.npy" % TRAJECTORY_NUMBER))
run_snapbot(poses[:300],snapbot,50,100)

## Timer

In [None]:
import cv2, os, serial
import numpy as np
from time import time, localtime
from classes.timer import Timer
from utils.utils_tps import get_tps_mat
from utils.utils_real import run_snapbot_single
from imutils.video import WebcamVideoStream

cwd = os.getcwd()
tm = localtime(time())
Hz = 25


# input data path
TRAJECTORY_PATH = os.path.join(cwd, "trajectory", "qpos_dlpg_%d.npy" % TRAJECTORY_NUMBER)
traj = np.load(TRAJECTORY_PATH)
traj_time = (traj.shape[0]-1) / Hz


# output data path
DATA_FOLDER = os.path.join(cwd, "data")
DATA_FOLDER_TIME = os.path.join(DATA_FOLDER, "%d%02d%02d-%02d:%02d:%02d" % (tm.tm_year, tm.tm_mon, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec))
RS_CAMERA_FOLDER = os.path.join(DATA_FOLDER_TIME, "realsense_camera")
EGO_CAMERA_FOLDER = os.path.join(DATA_FOLDER_TIME, "egocentric_camera")

if not os.path.isdir(DATA_FOLDER): os.mkdir(DATA_FOLDER)
os.mkdir(DATA_FOLDER_TIME)
os.mkdir(RS_CAMERA_FOLDER)
os.mkdir(EGO_CAMERA_FOLDER)
with open(os.path.join(DATA_FOLDER_TIME, "traj_number.txt"), "w") as file:
    file.writelines("Trajectory number: %d" % TRAJECTORY_NUMBER)


time_data = []
idx = 0
idx_snapbot = 0


ser = serial.Serial('/dev/ttyUSB{}'.format(IMU_USB_NUMBER), 115200, timeout=1)
imu_data = []


tps_coef = get_tps_mat()
np.save(os.path.join(DATA_FOLDER_TIME, "tps_coef.npy"), tps_coef)


rs_video = WebcamVideoStream(src=REALSENSE_CAMERA_NUMBER).start()
rs_frame_width = rs_video.frame.shape[1]
rs_frame_height = rs_video.frame.shape[0]
rs_size = (rs_frame_width, rs_frame_height)
rs_result = cv2.VideoWriter(os.path.join(RS_CAMERA_FOLDER, "rs.mp4"),
            cv2.VideoWriter_fourcc('m','p','4','v'), Hz, rs_size)


ego_video = WebcamVideoStream(src=EGOCENTRIC_CAMERA_NUMBER).start()
ego_frame_width = ego_video.frame.shape[1]
ego_frame_height = ego_video.frame.shape[0]
ego_size = (ego_frame_width, ego_frame_height)
ego_result = cv2.VideoWriter(os.path.join(EGO_CAMERA_FOLDER, "ego.mp4"),
            cv2.VideoWriter_fourcc('m','p','4','v'), Hz, ego_size)


tmr_plot = Timer(_name='Plot',_HZ=Hz,_MAX_SEC=traj_time,_VERBOSE=True)
tmr_plot.start()
while tmr_plot.is_notfinished():
    if tmr_plot.do_run():
        # Time data
        time_data.append(tmr_plot.sec_elps)

        # Run Snapbot
        pos = traj[idx_snapbot]
        run_snapbot_single(pos,snapbot)
        idx_snapbot += 1

        # Read IMU data
        line = ser.readline()
        data = line.decode('unicode_escape')
        imu_data.append(data)

        # Realsense camera video
        rs_frame = rs_video.frame
        rs_result.write(rs_frame) # save .mp4 video
        # RS_IMAGE_PATH = os.path.join(RS_CAMERA_FOLDER, str(idx) + '.jpg')
        # cv2.imwrite(RS_IMAGE_PATH, rs_frame) # save .jpg images

        # Egocentric camera video
        ego_frame = ego_video.frame
        ego_frame_flip = cv2.flip(ego_frame, -1)
        ego_result.write(ego_frame_flip) # save .mp4 video
        # EGO_IMAGE_PATH = os.path.join(EGO_CAMERA_FOLDER, str(idx) + '.jpg')
        # cv2.imwrite(EGO_IMAGE_PATH, ego_frame_flip) # save .jpg images

        idx += 1


np.save(os.path.join(DATA_FOLDER_TIME, "time.npy"), time_data)
np.save(os.path.join(DATA_FOLDER_TIME, "imu.npy"), imu_data)
rs_video.stop()
rs_result.release()
ego_video.stop()
ego_result.release()
cv2.destroyAllWindows()


In [None]:
tps_check = np.load(os.path.join(DATA_FOLDER_TIME, "tps_coef.npy"))
time_check = np.load(os.path.join(DATA_FOLDER_TIME, "time.npy"))
imu_check = np.load(os.path.join(DATA_FOLDER_TIME, "imu.npy"))

print(tps_check, "\n")
print(time_check.shape, "\n")
print(imu_check[-5:])

## Close Snapbot

In [None]:
snapbot.set_torque([0])
snapbot.close()