In [4]:
import json
import random
import signal
from collections import OrderedDict
from dataclasses import dataclass
import copy
import matplotlib.pyplot as plt
from scipy.spatial.transform import Rotation as R
import numpy as np
import subprocess
from operator import itemgetter, attrgetter
import re
import subprocess
import cv2
import traceback
import math
import os
import sys


@dataclass
class vector3:
    x: float = 0.0
    y: float = 0.0
    z: float = 0.0

    def __array__(self) -> np.ndarray:
        return np.array([self.x, self.y, self.z])

    def clamp(self, mmin, mmax):
        self.x = max(mmin, self.x)
        self.x = min(mmax, self.x)
        self.y = max(mmin, self.y)
        self.y = min(mmax, self.y)
        self.z = max(mmin, self.z)
        self.z = min(mmax, self.z)

    def mult_me(self, d):
        self.x *= d
        self.y *= d
        self.z *= d

    def normalize_me(self):
        if self.x == 0 and self.y == 0 and self.z == 0:
            return
        len = self.get_length()
        # print('1: ', self.x, ' ', self.y, ' ', self.z, ' ', len)
        self.x /= len
        self.y /= len
        self.z /= len
        # print('2: ', self.x, ' ', self.y, ' ', self.z, ' ', len)
        return vector3(self.x, self.y, self.z)

    def get_length(self):
        return np.sqrt(np.sum(np.power(np.array(self), 2)))

    def copy(self):
        return vector3(self.x, self.y, self.z)

    def round(self, d):
        self.x = round(self.x, d)
        self.y = round(self.y, d)
        self.z = round(self.z, d)

    def round_(self, d):
        return vector3(round(self.x, d), round(self.y, d), round(self.z, d))

    def get_rotated(self, tq):
        q = quat(self.x * tq.w + self.z * tq.y - self.y * tq.z,
                 self.y * tq.w + self.x * tq.z - self.z * tq.x,
                 self.z * tq.w + self.y * tq.x - self.x * tq.y,
                 self.x * tq.x + self.y * tq.y + self.z * tq.z)
        return vector3(tq.w * q.x + tq.x * q.w + tq.y * q.z - tq.z * q.y,
                       tq.w * q.y + tq.y * q.w + tq.z * q.x - tq.x * q.z,
                       tq.w * q.z + tq.z * q.w + tq.x * q.y - tq.y * q.x)

    def add_vector3(self, t):
        return vector3(round(self.x + t.x, 8), round(self.y + t.y, 8), round(self.z + t.z, 8))

    def sub_vector3(self, t):
        return vector3(round(self.x - t.x, 8), round(self.y - t.y, 8), round(self.z - t.z, 8))

    def get_dot(self, t):
        return self.x * t.x + self.y * t.y + self.z * t.z


@dataclass
class quat:
    x: float = 0.0
    y: float = 0.0
    z: float = 0.0
    w: float = 1.0

    def __array__(self) -> np.ndarray:
        return np.array([self.x, self.y, self.z, self.w])

    def mult_me_quat(self, q):
        tmp = self.copy()
        self.x = tmp.w * q.x + tmp.x * q.w + tmp.y * q.z - tmp.z * q.y
        self.y = tmp.w * q.y - tmp.x * q.z + tmp.y * q.w + tmp.z * q.x
        self.z = tmp.w * q.z + tmp.x * q.y - tmp.y * q.x + tmp.z * q.w
        self.w = tmp.w * q.w - tmp.x * q.x - tmp.y * q.y - tmp.z * q.z

    def mult_quat(self, q):
        tmp = self.copy()
        return quat(tmp.w * q.x + tmp.x * q.w + tmp.y * q.z - tmp.z * q.y,
                    tmp.w * q.y - tmp.x * q.z + tmp.y * q.w + tmp.z * q.x,
                    tmp.w * q.z + tmp.x * q.y - tmp.y * q.x + tmp.z * q.w,
                    tmp.w * q.w - tmp.x * q.x - tmp.y * q.y - tmp.z * q.z)

    def copy(self):
        return quat(self.x, self.y, self.z, self.w)

    '''
    def inverse(self):
        dot = quat_get_dot(self, self)
        self.x = -self.x
        self.y = -self.y
        self.z = -self.z
        self.mult_me(1 / dot)
    '''

    def round(self, d):
        self.x = round(self.x, d)
        self.y = round(self.y, d)
        self.z = round(self.z, d)
        self.w = round(self.w, d)

    def round_(self, d):
        return quat(round(self.x, d), round(self.y, d), round(self.z, d), round(self.w, d))

    def inverse(self):
        dot = quat_get_dot(self, self)
        self.x = -self.x
        self.y = -self.y
        self.z = -self.z
        self.x = self.x / dot
        self.y = self.y / dot
        self.z = self.z / dot
        self.w = self.w / dot


def quat_get_dot(self, t):
    return self.x * t.x + self.y * t.y + self.z * t.z + self.w * t.w


def get_dot_point(pt, t):
    return pt.get_dot(t)


def nomalize_point(pt):
    return pt.normalize_me()


def rotate_point(pt, pose):
    return pt.get_rotated(pose['orient'])


def transfer_point(pt, pose):
    r_pt = pt.get_rotated(pose['orient'])
    return r_pt.add_vector3(pose['position'])


def move_point(pt, pose):
    return pt.add_vector3(pose)


def transfer_point_inverse(pt, pose):
    t = copy.deepcopy(pose)
    t['orient'].inverse()
    r_pt = pt.sub_vector3(t['position'])
    return r_pt.get_rotated(t['orient'])


def get_quat_from_euler(order, value):
    rt = R.from_euler(order, value, degrees=True)
    return quat(rt.as_quat()[0], rt.as_quat()[1], rt.as_quat()[2], rt.as_quat()[3])


def pose_apply(a, b):
    return {'position': transfer_point(a['position'], b), 'orient': b['orient'].mult_quat(a['orient'])}


def pose_apply_inverse(a, b):
    t = copy.deepcopy(b)
    t['orient'].inverse()
    tmp = a['position'].sub_vector3(t['position'])
    return {'position': tmp.get_rotated(t['orient']), 'orient': t['orient'].mult_quat(a['orient'])}


def get_euler_from_quat(order, q):
    rt = R.from_quat(np.array(q))
    return rt.as_euler(order, degrees=True)


def unit_vector(vector):
    """Returnstheunitvectorofthevector."""
    return vector / np.linalg.norm(vector)


def angle_between(v1, v2):
    """Returnstheangleinradiansbetweenvectors'v1'and'v2'::
    >>>angle_between((1,0,0),(0,1,0))
    1.5707963267948966
    >>>angle_between((1,0,0),(1,0,0))
    0.0
    >>>angle_between((1,0,0),(-1,0,0))
    3.141592653589793
    """
    v1_u = unit_vector(v1)
    v2_u = unit_vector(v2)
    return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))

def rw_json_data(rw_mode, path, data):
    try:
        if rw_mode == READ:
            with open(path, 'r', encoding="utf-8") as rdata:
                json_data = json.load(rdata)
            return json_data
        elif rw_mode == WRITE:
            with open(path, 'w', encoding="utf-8") as wdata:
                json.dump(data, wdata, ensure_ascii=False, indent="\t")
        else:
            print('not support mode')
    except:
        print('exception')
        return ERROR


def rw_file_storage(rw_cmd, name, left_map, right_map):
    if rw_cmd == WRITE:
        print("WRITE parameters ......")
        cv_file = cv2.FileStorage(name, cv2.FILE_STORAGE_WRITE)
        cv_file.write("Left_Stereo_Map_x", left_map[0])
        cv_file.write("Left_Stereo_Map_y", left_map[1])
        cv_file.write("Right_Stereo_Map_x", right_map[0])
        cv_file.write("Right_Stereo_Map_y", right_map[1])
        cv_file.release()
    else:
        print("READ parameters ......")
        try:
            # FILE_STORAGE_READ
            cv_file = cv2.FileStorage(name, cv2.FILE_STORAGE_READ)
            # note we also have to specify the type to retrieve other wise we only get a
            # FileNode object back instead of a matrix
            left_map = (cv_file.getNode("Left_Stereo_Map_x").mat(), cv_file.getNode("Left_Stereo_Map_y").mat())
            right_map = (cv_file.getNode("Right_Stereo_Map_x").mat(), cv_file.getNode("Right_Stereo_Map_y").mat())

            cv_file.release()

            return DONE, left_map, right_map
        except:
            traceback.print_exc()
            return ERROR, NOT_SET, NOT_SET


def Rotate(src, degrees):
    if degrees == 90:
        dst = cv2.transpose(src)
        dst = cv2.flip(dst, 1)

    elif degrees == 180:
        dst = cv2.flip(src, -1)

    elif degrees == 270:
        dst = cv2.transpose(src)
        dst = cv2.flip(dst, 0)
    else:
        dst = NOT_SET
    return dst


def terminal_cmd(cmd_m, cmd_s):
    print('start ', terminal_cmd.__name__)
    try:
        result = subprocess.run([cmd_m, cmd_s], stdout=subprocess.PIPE).stdout.decode('utf-8')

        device_re = re.compile(b"Bus\s+(?P<bus>\d+)\s+Device\s+(?P<device>\d+).+ID\s(?P<id>\w+:\w+)\s(?P<tag>.+)$",
                               re.I)
        df = subprocess.check_output("lsusb")
        devices = []
        for i in df.split(b'\n'):
            if i:
                info = device_re.match(i)
                if info:
                    dinfo = info.groupdict()
                    dinfo['device'] = '/dev/bus/usb/%s/%s' % (dinfo.pop('bus'), dinfo.pop('device'))
                    devices.append(dinfo)
    except:
        print('exception')
        traceback.print_exc()
    else:
        print('done')
    finally:
        if DEBUG == ENABLE:
            print(devices)
    temp = result.split('\n\n')
    print("==================================================")
    ret_val = []
    for i in range(len(temp)):
        if SENSOR_NAME in temp[i]:
            ret_val.append(temp[i])
            print("add camera dev", temp[i])
        else:
            print("skipping camera", temp[i])
    print("==================================================")
    return ret_val



def init_data_array(cam_dev):
    print(cam_dev_list)
    camera_info_array = []
    for i in range(len(cam_dev_list)):
        cam_info = cam_dev_list[i].split('\n\t')
        for cam_id, dev_name in enumerate(cam_info):
            if 'dev' in dev_name:
                # print(dev_name)
                camera_info_array.append({'idx': cam_id,
                            'port': dev_name,
                            'display': {'width': CAP_PROP_FRAME_WIDTH, 'height': CAP_PROP_FRAME_HEIGHT},

                            'blobs': [],
                            'med_blobs': [],

                            'distorted_2d': [],
                            'undistorted_2d': [],

                            'cam_cal': {'cameraK': cameraK, 'dist_coeff': distCoeff},

                            'detect_status': [NOT_SET, 0, 0],

                            'track_cal': {'data': [], 'recording': {'name': NOT_SET}},

                            'D_R_T': {'rvecs': NOT_SET, 'tvecs': NOT_SET},
                            'D_R_T_A': [],
                            'RER': {'C_R_T': {'rvecs': NOT_SET, 'tvecs': NOT_SET}}})

    
    print('camera info', camera_info_array)

    return camera_info_array


def init_coord_json(file):
    print('start ', init_coord_json.__name__)
    try:
        json_file = open(''.join(['jsons/specs/', f'{file}']))
        jsonObject = json.load(json_file)
        model_points = jsonObject.get('TrackedObject').get('ModelPoints')
        pts = [0 for i in range(len(model_points))]
        for data in model_points:
            idx = data.split('Point')[1]
            x = model_points.get(data)[0]
            y = model_points.get(data)[1]
            z = model_points.get(data)[2]
            u = model_points.get(data)[3]
            v = model_points.get(data)[4]
            w = model_points.get(data)[5]
            r1 = model_points.get(data)[6]
            r2 = model_points.get(data)[7]
            r3 = model_points.get(data)[8]
            pts[int(idx)] = {'idx': idx,
                             'pos': [x, y, z],
                             'dir': [u, v, w],
                             'res': [r1, r2, r3],
                             'pair_xy': [],
                             'remake_3d': []}

            print(''.join(['{ .pos = {{', f'{x}', ',', f'{y}', ',', f'{z}',
                           ' }}, .dir={{', f'{u}', ',', f'{v}', ',', f'{w}', ' }}, .pattern=', f'{idx}', '},']))
    except:
        print('exception')
        traceback.print_exc()
    finally:
        print('done')
    return pts

def view_camera_infos(frame, text, x, y):
    cv2.putText(frame, text,
                (x, y),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), lineType=cv2.LINE_AA)

def find_center(frame, led_num, X, Y, W, H, blobs):
    x_sum = 0
    t_sum = 0
    y_sum = 0
    m_count = 0
    g_c_x = 0
    g_c_y = 0

    ret_blobs = copy.deepcopy(blobs)

    for y in range(Y, Y + H):
        for x in range(X, X + W):
            if frame[y][x] >= CV_MID_THRESHOLD:
                x_sum += x * frame[y][x]
                t_sum += frame[y][x]
                m_count += 1

    for x in range(X, X + W):
        for y in range(Y, Y + H):
            if frame[y][x] >= CV_MID_THRESHOLD:
                y_sum += y * frame[y][x]

    if t_sum != 0:
        g_c_x = x_sum / t_sum
        g_c_y = y_sum / t_sum

    # print('led ', led_num, ' x ', g_c_x, ' y ', g_c_y)

    if g_c_x == 0 or g_c_y == 0:
        return ERROR

    if len(ret_blobs) > 0:
        detect = 0
        for i, datas in enumerate(ret_blobs):
            led = datas['idx']
            if led == led_num:
                ret_blobs[i] = {'idx': led_num, 'cx': g_c_x, 'cy': g_c_y}
                detect = 1
                break
        if detect == 0:
            ret_blobs.append({'idx': led_num, 'cx': g_c_x, 'cy': g_c_y})
    else:
        ret_blobs.append({'idx': led_num, 'cx': g_c_x, 'cy': g_c_y})

    return DONE, ret_blobs


def simple_solvePNP(cam_id, frame, blob_array):
    model_points = []
    image_points = []
    led_ids = []
    interationsCount = 100
    confidence = 0.99

    for blobs in blob_array:
        led_num = int(blobs['idx'])
        # if DEBUG == ENABLE:
        #     print('idx:', led_num, ' added 3d', leds_dic['pts'][led_num]['pos'], ' remake: ',
        #           leds_dic['target_pts'][led_num]['remake_3d'],
        #           ' 2d', [blobs['cx'], blobs['cy']])

        model_points.append(leds_dic['pts'][led_num]['pos'])
        led_ids.append(led_num)
        image_points.append([blobs['cx'], blobs['cy']])

    model_points_len = len(model_points)
    image_points_len = len(image_points)

    # check assertion
    if model_points_len != image_points_len:
        print("assertion len is not equal")
        return ERROR

    if model_points_len < 4 or image_points_len < 4:
        print("assertion < 4: ")
        return ERROR

    camera_k = leds_dic['cam_info'][cam_id]['cam_cal']['cameraK']
    dist_coeff = leds_dic['cam_info'][cam_id]['cam_cal']['dist_coeff']

    list_2d_distorted = np.zeros((image_points_len, 1, 2), dtype=np.float64)
    for i in range(image_points_len):
        list_2d_distorted[i] = image_points[i]

    points3D = np.array(model_points)

    list_2d_undistorted = cv2.fisheye.undistortPoints(list_2d_distorted, camera_k, dist_coeff)
    leds_dic['cam_info'][cam_id]['distorted_2d'] = copy.deepcopy(list_2d_distorted)
    leds_dic['cam_info'][cam_id]['undistorted_2d'] = copy.deepcopy(list_2d_undistorted)

    if DO_UNDISTORT == ENABLE:
        temp_points2D = []
        for u_data in list_2d_undistorted:
            temp_points2D.append([u_data[0][0], u_data[0][1]])
        points2D = np.array(temp_points2D)
        temp_camera_k = cameraK
        temp_dist_coeff = distCoeff
    else:
        points2D = np.array(image_points)
        temp_camera_k = leds_dic['cam_info'][cam_id]['cam_cal']['cameraK']
        temp_dist_coeff = leds_dic['cam_info'][cam_id]['cam_cal']['dist_coeff']

    if DO_SOLVEPNP_RANSAC == ENABLE:
        success, rvecs, tvecs, inliers = cv2.solvePnPRansac(points3D, points2D,
                                                            temp_camera_k,
                                                            temp_dist_coeff,
                                                            useExtrinsicGuess=True,
                                                            iterationsCount=interationsCount,
                                                            confidence=confidence,
                                                            reprojectionError=1.0,
                                                            flags=cv2.SOLVEPNP_ITERATIVE)

    else:
        success, rvecs, tvecs = cv2.solvePnP(points3D, points2D,
                                             temp_camera_k,
                                             temp_dist_coeff,
                                             flags=cv2.SOLVEPNP_AP3P)
        # length = len(points2D)
        # for i in range(length):
        #     inliers = np.array(
        #         [i for i in range(length)]).reshape(
        #         length, 1)

    # ret, RER, TRER, candidate_array, except_inlier = cal_RER_px(led_ids, frame,
    #                                                             points3D, points2D,
    #                                                             inliers,
    #                                                             rvecs,
    #                                                             tvecs,
    #                                                             temp_camera_k,
    #                                                             temp_dist_coeff, DONE)

    # if ret == SUCCESS:
    #     #####
    #     leds_dic['cam_info'][cam_id]['D_R_T_A'].append({'rvecs': rvecs, 'tvecs': tvecs})
    #     #####
    #     leds_dic['cam_info'][cam_id]['RER']['C_R_T'] = {'rvecs': rvecs, 'tvecs': tvecs}
    #     return SUCCESS, candidate_array
    # else:
    #     return ERROR, candidate_array

    leds_dic['cam_info'][cam_id]['D_R_T_A'].append({'rvecs': rvecs, 'tvecs': tvecs})
    #####
    leds_dic['cam_info'][cam_id]['RER']['C_R_T'] = {'rvecs': rvecs, 'tvecs': tvecs}
    return SUCCESS, blob_array


def cal_RER_px(led_ids, frame, points3D, points2D, inliers, rvecs, tvecs, camera_k, dist_coeff, status):
    # Compute re-projection error.
    blob_array = []
    points2D_reproj = cv2.projectPoints(points3D, rvecs,
                                        tvecs, camera_k, dist_coeff)[0].squeeze(1)
    # print('points2D_reproj\n', points2D_reproj, '\npoints2D\n', points2D, '\n inliers: ', inliers)
    assert (points2D_reproj.shape == points2D.shape)
    error = (points2D_reproj - points2D)[inliers]  # Compute error only over inliers.
    # print('error', error)
    rmse = 0.0
    dis = 0.0
    led_except = -1
    except_inlier = -1
    led_dis = []

    for idx, error_data in enumerate(error[:, 0]):
        rmse += np.power(error_data[0], 2) + np.power(error_data[1], 2)
        temp_dis = np.power(error_data[0], 2) + np.power(error_data[1], 2)
        led_dis.append(temp_dis)

        if status == NOT_SET:
            if temp_dis > dis:
                dis = temp_dis
                led_except = led_ids[idx]
                except_inlier = idx

        # print('led_num: ', led_ids[idx], ' dis:', '%0.18f' % temp_dis, ' : ',
        #       points2D_reproj[idx][0], ' ', points2D_reproj[idx][1],
        #       ' vs ', points2D[idx][0], ' ', points2D[idx][1])

        if temp_dis > 100:
            return ERROR, 0, 0, blob_array, except_inlier

    trmse = float(rmse - dis)
    # print('trmse : ', trmse, ' rmse : ', rmse)
    if inliers is None:
        return ERROR, -1, -1, blob_array, except_inlier
    RER = round(np.sqrt(rmse) / len(inliers), 18)
    if status == NOT_SET:
        TRER = round(np.sqrt(trmse) / (len(inliers) - 1), 18)
        if led_except == -1:
            return ERROR, RER, TRER, blob_array, except_inlier
    else:
        TRER = round(np.sqrt(trmse) / (len(inliers)), 18)

    for i, idx in enumerate(led_ids):
        if idx != led_except:
            blob_array.append({'idx': led_ids[i],
                               'cx': points2D[i][0], 'cy': points2D[i][1], 'area': 0})

    return SUCCESS, RER, TRER, blob_array, except_inlier


def cal_iqr_func(arr):
    Q1 = np.percentile(arr, 25)
    Q3 = np.percentile(arr, 75)

    IQR = Q3 - Q1

    outlier_step = 1.5 * IQR

    lower_bound = Q1 - outlier_step
    upper_bound = Q3 + outlier_step

    mask = np.where((arr > upper_bound) | (arr < lower_bound))

    # print(f"cal_iqr_func!!!!!! lower_bound = {lower_bound} upper_bound ={upper_bound} mask = {mask}")

    return mask


def detect_outliers(idx, blob_array, remove_index_array):
    temp_x = np.array(cal_iqr_func(blob_array[0]))
    temp_y = np.array(cal_iqr_func(blob_array[1]))

    for x in temp_x:
        for xx in x:
            if xx in remove_index_array:
                continue
            else:
                remove_index_array.append(xx)
    for y in temp_y:
        for yy in y:
            if yy in remove_index_array:
                continue
            else:
                remove_index_array.append(yy)

    remove_index_array.sort()

    # print("detect_outliers!!!!!!!!!!!!!!!!!!!!!!!!!!!! remove_index_array", remove_index_array)


def median_blobs(cam_id, blob_array, rt_array):
    blob_cnt = len(blob_array)
    if blob_cnt == 0:
        print('blob_cnt is 0')
        return ERROR
    blob_length = len(blob_array[0])

    med_blobs_array = []
    remove_index_array = []
    med_rt_array = []
    print('cam_id:', cam_id, ' blob_cnt:', blob_cnt)

    for i in range(blob_length):
        med_xy = [[], [], []]
        for ii in range(blob_cnt):
            med_xy[0].append(blob_array[ii][i]['cx'])
            med_xy[1].append(blob_array[ii][i]['cy'])
            # med_xy[2].append(blob_array[ii][i]['area'])
        detect_outliers(blob_array[ii][i]['idx'], med_xy, remove_index_array)

    r_len = len(remove_index_array)
    print(f"median_blobs!!!!! remove_index_array length={r_len}")

    for i in range(blob_length):
        med_xy = [[], [], []]
        for ii in range(blob_cnt):
            med_xy[0].append(blob_array[ii][i]['cx'])
            med_xy[1].append(blob_array[ii][i]['cy'])
            # med_xy[2].append(blob_array[ii][i]['area'])
        # tempx=med_xy[0]
        # print(f"original med_xy[0] = {tempx}")
        count = 0
        for index in remove_index_array:
            med_xy[0].pop(index - count)
            med_xy[1].pop(index - count)
            # med_xy[2].pop(index - count)
            count += 1
        # tempx=med_xy[0]
        # print(f"after pop med_xy[0] = {tempx}")

        mean_med_x = np.mean(med_xy[0])
        mean_med_y = np.mean(med_xy[1])

        med_blobs_array.append({'idx': blob_array[ii][i]['idx'],
                                'cx': mean_med_x,
                                'cy': mean_med_y})

    if rt_array != NOT_SET:
        count = 0
        for index in remove_index_array:
            rt_array.pop(index - count)
            count += 1
        for i in range(len(rt_array)):
            rvt = [[], [], []]
            for x in rt_array[i]['rvecs'][0]:
                rvt[0].append(x)
            for y in rt_array[i]['rvecs'][1]:
                rvt[1].append(y)
            for z in rt_array[i]['rvecs'][2]:
                rvt[2].append(z)
            tvt = [[], [], []]
            for x in rt_array[i]['tvecs'][0]:
                tvt[0].append(x)
            for y in rt_array[i]['tvecs'][1]:
                tvt[1].append(y)
            for z in rt_array[i]['tvecs'][2]:
                tvt[2].append(z)

        mean_rvt = []
        mean_tvt = []
        for i in range(0, 3):
            mean_rvt.append(np.mean(rvt[i]))
            mean_tvt.append(np.mean(tvt[i]))

        med_rt_array = {'rvecs': np.array([[mean_rvt[0]], [mean_rvt[1]], [mean_rvt[2]]], dtype=np.float64),
                        'tvecs': np.array([[mean_tvt[0]], [mean_tvt[1]], [mean_tvt[2]]], dtype=np.float64)}

        len_rt_array = len(rt_array)
    #     print(f"rt_array_len = {len_rt_array}")
    #     print(f"med_rt_array = {med_rt_array}")
    # print(f"med_blobs_array = {med_blobs_array}")

    blob_array = med_blobs_array

    return blob_array, med_rt_array


leds_dic = {}

# Default
CAP_PROP_FRAME_WIDTH = 1920
CAP_PROP_FRAME_HEIGHT = 1080

# Defining the dimensions of checkerboard
CHECKERBOARD = (7, 4)
# Termination criteria for refining the detected corners
CRITERIA = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
cameraK = np.eye(3).astype(np.float64)
distCoeff = np.zeros((4, 1)).astype(np.float64)

ENABLE = 1
DISABLE = 0

DONE = 'DONE'
NOT_SET = 'NOT_SET'

READ = 0
WRITE = 1

ERROR = -1
SUCCESS = 1

DEBUG = ENABLE

SENSOR_NAME = "Droidcam"

EXTERNAL_TOOL_CALIBRATION = 'calibration_json'
RECTIFY_MAP = "improved_params2.xml"
CAM_DELAY = 1
USE_EXTERNAL_TOOL_CALIBRAION = DISABLE
RER_MAX = 100

DO_ESTIMATE_POSE = ENABLE
DO_SOLVEPNP_REFINE = DISABLE
DO_UNDISTORT = DISABLE
DO_SOLVEPNP_RANSAC = ENABLE
USE_PRINT_FRAME = DISABLE
PRINT_FRAME_INFOS = DISABLE
RT_TEST = ENABLE

ORIGIN = 'rifts2_left.json'
JSON_FILE = 'stereo_json'


In [5]:
# init camera data array
import sys
for p in sys.path:
    print(p)
cam_dev_list = terminal_cmd('v4l2-ctl', '--list-devices')
leds_dic['cam_info'] = init_data_array(cam_dev_list)
leds_dic['pts'] = init_coord_json(ORIGIN)


/home/rangkast.jeong/Project/OpenCV_APP/API_TEST
/usr/lib/python38.zip
/usr/lib/python3.8
/usr/lib/python3.8/lib-dynload

/home/rangkast.jeong/.local/lib/python3.8/site-packages
/usr/local/lib/python3.8/dist-packages
/usr/local/lib/python3.8/dist-packages/PyBluez-0.23-py3.8-linux-x86_64.egg
/usr/lib/python3/dist-packages
start  terminal_cmd
done
[{'id': b'8087:8000', 'tag': b'Intel Corp. ', 'device': "/dev/bus/usb/b'002'/b'002'"}, {'id': b'1d6b:0002', 'tag': b'Linux Foundation 2.0 root hub', 'device': "/dev/bus/usb/b'002'/b'001'"}, {'id': b'8087:8008', 'tag': b'Intel Corp. ', 'device': "/dev/bus/usb/b'001'/b'002'"}, {'id': b'1d6b:0002', 'tag': b'Linux Foundation 2.0 root hub', 'device': "/dev/bus/usb/b'001'/b'001'"}, {'id': b'2833:0211', 'tag': b' ', 'device': "/dev/bus/usb/b'004'/b'004'"}, {'id': b'2833:0211', 'tag': b' ', 'device': "/dev/bus/usb/b'004'/b'003'"}, {'id': b'2109:0820', 'tag': b'VIA Labs, Inc. USB3.1 Hub             ', 'device': "/dev/bus/usb/b'004'/b'002'"}, {'id': b'1d

In [6]:
# camera setting test
cap1_name = leds_dic['cam_info'][0]['port']
cap1 = cv2.VideoCapture(cap1_name)
cap2_name = leds_dic['cam_info'][1]['port']
cap2 = cv2.VideoCapture(cap2_name)

width1 = cap1.get(cv2.CAP_PROP_FRAME_WIDTH)
height1 = cap1.get(cv2.CAP_PROP_FRAME_HEIGHT)
print('cap1 size: %d, %d' % (width1, height1))
leds_dic['cam_info'][0]['display']['width'] = width1
leds_dic['cam_info'][0]['display']['height'] = height1


width2 = cap2.get(cv2.CAP_PROP_FRAME_WIDTH)
height2 = cap2.get(cv2.CAP_PROP_FRAME_HEIGHT)
print('cap2 size: %d, %d' % (width2, height2))
leds_dic['cam_info'][1]['display']['width'] = width2
leds_dic['cam_info'][1]['display']['height'] = height2

if not cap1.isOpened() or not cap2.isOpened():
    sys.exit()

while True:
    ret1, frame1 = cap1.read()
    ret2, frame2 = cap2.read()
    if not ret1 or not ret2:
        break
    if cv2.waitKey(1) & 0xFF == 27:  # Esc pressed
        break
    imgL = Rotate(frame1, 270)
    imgR = Rotate(frame2, 270)
    view_camera_infos(imgL, f'{cap1_name}', 30, 35)
    cv2.circle(imgL, (int(height1 / 2), int(width1 / 2)), 3, color=(0, 0, 255),
                thickness=-1)
    cv2.imshow('left camera', imgL)
    view_camera_infos(imgR, f'{cap2_name}', 30, 35)
    cv2.circle(imgR, (int(height2 / 2), int(width2 / 2)), 3, color=(0, 0, 255),
                thickness=-1)
    cv2.imshow("right camera", imgR)

    cv2.waitKey(CAM_DELAY)

cap1.release()
cap2.release()
cv2.destroyAllWindows()

cap1 size: 1736, 2312
cap2 size: 1736, 2312


In [7]:
# multi tracker test (PHONE)
import time
from datetime import datetime, date, time, timedelta

trackerTypes = ['BOOSTING', 'MIL', 'KCF', 'TLD', 'MEDIANFLOW', 'GOTURN', 'MOSSE', 'CSRT']
def createTrackerByName(trackerType):
    # Create a tracker based on tracker name
    if trackerType == trackerTypes[0]:
        tracker = cv2.legacy.TrackerBoosting_create()
    elif trackerType == trackerTypes[1]:
        tracker = cv2.legacy.TrackerMIL_create()
    elif trackerType == trackerTypes[2]:
        tracker = cv2.legacy.TrackerKCF_create()
    elif trackerType == trackerTypes[3]:
        tracker = cv2.legacy.TrackerTLD_create()
    elif trackerType == trackerTypes[4]:
        tracker = cv2.legacy.TrackerMedianFlow_create()
    elif trackerType == trackerTypes[5]:
        tracker = cv2.legacy.TrackerGOTURN_create()
    elif trackerType == trackerTypes[6]:
        tracker = cv2.TrackerMOSSE_create()
    elif trackerType == trackerTypes[7]:
        tracker = cv2.legacy.TrackerCSRT_create()
    else:
        tracker = None
        print('Incorrect tracker name')
        print('Available trackers are:')
        for t in trackerTypes:
            print(t)

    return tracker


## Select boxes
CV_FINDCONTOUR_LVL = 140
CV_THRESHOLD = 170
CV_MIN_THRESHOLD = 170
CV_MID_THRESHOLD = 190
CV_MAX_THRESHOLD = 255
bboxes = []
blobs = []

for cam_id in range(len(leds_dic['cam_info'])):
    print('try to open:', leds_dic['cam_info'][cam_id]['port'])
    video_src = leds_dic['cam_info'][cam_id]['port']

    width = int(leds_dic['cam_info'][cam_id]['display']['width'])
    height = int(leds_dic['cam_info'][cam_id]['display']['height'])

    cap = cv2.VideoCapture(video_src)    
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)


    print('width ' , leds_dic['cam_info'][cam_id]['display']['width'])
    print('height ' , leds_dic['cam_info'][cam_id]['display']['height'])

    # cap.set(cv2.CAP_PROP_FORMAT, cv2.CV_64FC1)

    while True:
        ret, frame = cap.read()
        if not ret:
            print('Cannot read video file')
            break
        # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame = Rotate(frame, 270)
        img_draw = frame.copy()

        if cv2.waitKey(1) == ord('a'):
            view_camera_infos(frame, 'drag led area and press space bar',
                                30, 35)
            cv2.imshow('MultiTracker', frame)
            bbox = cv2.selectROI('MultiTracker', frame)
            print("Press q to quit selecting boxes and start tracking")
            print("Press any other key to select next object")
            view_camera_infos(frame, 'press led numer',
                                30, 70)
            cv2.imshow('MultiTracker', frame)
            while True:
                # ToDo 수정해야 함
                key = cv2.waitKey(1) & 0xff
                if key in range(48, 58):  # 0~9 숫자 입력   ---⑥
                    IDX = key - 48  # 선택한 숫자로 트랙커 인덱스 수정
                    print('led num ', IDX)
                    bboxes.append({'idx': IDX, 'bbox': bbox})
                    break
                elif cv2.waitKey(1) == ord('q'):
                    bboxes.clear()
                    break

        elif cv2.waitKey(1) == ord('n'):
            break

        elif cv2.waitKey(1) & 0xFF == 27:  # Esc pressed
            cap.release()
            cv2.destroyAllWindows()
            break

        if len(bboxes) > 0:
            for i, data in enumerate(bboxes):
                (x, y, w, h) = data['bbox']
                IDX = data['idx']
                cv2.rectangle(img_draw, (int(x), int(y)), (int(x + w), int(y + h)), (0, 255, 0), 2, 1)
                view_camera_infos(img_draw, ''.join([f'{IDX}']), int(x), int(y) - 10)
                view_camera_infos(img_draw, ''.join(['[', f'{IDX}', '] '
                                                        , f' {x}'
                                                        , f' {y}']), 30, 35 + i * 30)

        view_camera_infos(img_draw, ''.join(['Cam[', f'{cam_id}', '] ', f'{video_src}']),
                            height - 250, 35)

        cv2.circle(img_draw, (int(height / 2), int(width / 2)), 3, color=(0, 0, 255),
                    thickness=-1)

        cv2.imshow('MultiTracker', img_draw)

    # end while
    print('Selected bounding boxes {}'.format(bboxes))

    # Specify the tracker type
    trackerType = "CSRT"

    # Create MultiTracker object
    multiTracker = cv2.legacy.MultiTracker_create()

    tracker_start = 0
    recording_start = 0

    # Process video and track objects
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        frame = Rotate(frame, 270)
        img_gray = frame.copy()

        # Initialize MultiTracker
        if tracker_start == 0:
            for i, data in enumerate(bboxes):
                multiTracker.add(createTrackerByName(trackerType), img_gray, data['bbox'])

        tracker_start = 1

        # get updated location of objects in subsequent frames
        qq, boxes = multiTracker.update(img_gray)

        # draw tracked objects
        for i, newbox in enumerate(boxes):
            p1 = (int(newbox[0]), int(newbox[1]))
            p2 = (int(newbox[0] + newbox[2]), int(newbox[1] + newbox[3]))
            cv2.rectangle(img_gray, p1, p2, 255, 2, 1)
            IDX = bboxes[i]['idx']
            view_camera_infos(img_gray, ''.join([f'{IDX}']), int(newbox[0]), int(newbox[1]) - 10)
        
        # add graysum find center
        for i, newbox in enumerate(boxes):
            IDX = bboxes[i]['idx']
            ret, new_blobs = find_center(img_gray, IDX, int(newbox[0]), int(newbox[1]),
                                            int(newbox[2]), int(newbox[3]), blobs)
            if ret == DONE:
                blobs = new_blobs

        KEY = cv2.waitKey(CAM_DELAY)

        # ToDo
        if ret == DONE:
            ret_status, min_blob = simple_solvePNP(cam_id, frame, blobs)
            if ret_status == SUCCESS:
                leds_dic['cam_info'][cam_id]['blobs'] = min_blob
                leds_dic['cam_info'][cam_id]['med_blobs'].append(min_blob)

                cv2.rectangle(img_gray, (10, 10), (height - 10, width - 10), 255, 2)
                cam_ori = R.from_rotvec(leds_dic['cam_info'][cam_id]['RER']['C_R_T']['rvecs'].reshape(3)).as_quat()
                cam_ori_euler = np.round_(get_euler_from_quat('xyz', cam_ori), 3)
                cam_ori_quat = np.round_(get_quat_from_euler('xyz', cam_ori_euler), 8)
                cam_pos = leds_dic['cam_info'][cam_id]['RER']['C_R_T']['tvecs'].reshape(3)
                cv2.putText(img_gray, ''.join(['rot 'f'{cam_ori_euler}']),
                            (20, 35),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, 255, lineType=cv2.LINE_AA)
                cv2.putText(img_gray, ''.join(['quat 'f'{cam_ori_quat}']),
                            (20, 65),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, 255, lineType=cv2.LINE_AA)
                cv2.putText(img_gray, ''.join(['pos 'f'{cam_pos}']),
                            (20, 95),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, 255, lineType=cv2.LINE_AA)
            stacked = len(leds_dic['cam_info'][cam_id]['med_blobs'])
            cv2.putText(img_gray, ''.join([f'{stacked}', ' data stacked']),
                        (20, 125),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, 255, lineType=cv2.LINE_AA)


             # quit on ESC button
            if KEY & 0xFF == 27:  # Esc pressed
                cap.release()
                cv2.destroyAllWindows()
                break
            elif KEY == ord('e'):
                break
            elif KEY == ord('s'):
                if recording_start == 0:
                    # w = round(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                    # h = round(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                    fps = cap.get(cv2.CAP_PROP_FPS)
                    # print('w', w, 'h', h, 'fps', fps)
                    # fourcc  val 받아오기, *는 문자를 풀어쓰는 방식, *'DIVX' == 'D', 'I', 'V', 'X'
                    delay = round(1000 / fps)
                    now = datetime.datetime.now()
                    recording_file_name = ''.join([f'{now}', '.avi'])
                    print('recording start', ' ', recording_file_name)
                    fourcc = cv2.VideoWriter_fourcc(*'XVID')
                    recording_out = cv2.VideoWriter(recording_file_name, fourcc, fps,
                                                    (CAP_PROP_FRAME_WIDTH, CAP_PROP_FRAME_HEIGHT))
                    recording_start = 1

                    leds_dic['cam_info'][cam_id]['track_cal']['recording'] = {'name': recording_file_name}

            elif KEY == ord('c'):
                blob_array, rt_array = median_blobs(cam_id,
                                                    leds_dic['cam_info'][cam_id]['med_blobs'],
                                                    leds_dic['cam_info'][cam_id]['D_R_T_A'])

                leds_dic['cam_info'][cam_id]['track_cal']['data'] = {'idx': cam_id, 'blobs': blob_array,
                                                                     'R_T': rt_array}

                leds_dic['cam_info'][cam_id]['med_blobs'].clear()
                leds_dic['cam_info'][cam_id]['D_R_T_A'].clear()

                print(leds_dic['cam_info'][cam_id]['track_cal'])

                if recording_start == 1:
                    recording_out.release()
                    recording_start = 0
                break

            # print current stacked data
            for i, track_data in enumerate(leds_dic['cam_info'][cam_id]['track_cal']['data']):
                track_r = R.from_rotvec(track_data['R_T']['rvecs'].reshape(3)).as_quat()
                track_r_euler = np.round_(get_euler_from_quat('xyz', track_r), 3)
                track_t = track_data['R_T']['tvecs'].reshape(3)
                cv2.putText(img_gray, ''.join(['R 'f'{track_r_euler}',
                                               ' T 'f'{track_t}']),
                            (CAP_PROP_FRAME_WIDTH - 400, 35 + i * 20),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.3, 255, lineType=cv2.LINE_AA)

            if recording_start == 1:
                recording_out.write(frame)

            # show frame
            cv2.imshow('MultiTracker', img_gray)

            # release camera frame
        if recording_start == 1:
            recording_out.release()

        cap.release()
        cv2.destroyAllWindows()

        bboxes.clear()
        blobs.clear()


try to open: /dev/video0
width  1736.0
height  2312.0
Select a ROI and then press SPACE or ENTER button!
Cancel the selection process by pressing c button!
Press q to quit selecting boxes and start tracking
Press any other key to select next object
led num  0
Select a ROI and then press SPACE or ENTER button!
Cancel the selection process by pressing c button!
Press q to quit selecting boxes and start tracking
Press any other key to select next object
led num  1
Select a ROI and then press SPACE or ENTER button!
Cancel the selection process by pressing c button!
Press q to quit selecting boxes and start tracking
Press any other key to select next object
led num  2
Select a ROI and then press SPACE or ENTER button!
Cancel the selection process by pressing c button!
Press q to quit selecting boxes and start tracking
Press any other key to select next object
led num  3
Selected bounding boxes [{'idx': 0, 'bbox': (1467, 834, 30, 33)}, {'idx': 1, 'bbox': (1408, 844, 38, 43)}, {'idx': 2, 'bbo