In [None]:
# Демонстрация функционала эстимейторов на видео

In [None]:
import os, sys

In [None]:
base_dir = os.path.split(os.getcwd())[0]
os.chdir(os.path.join(base_dir))
sys.path.append("core")

In [None]:
import pickle as pk

import numpy as np
import cv2
import sys
import argparse
import time
import copy
import datetime
import yaml

import matplotlib.pyplot as plt

from core.utils import ARUCO_DICT, display_pose, f_left_x_04, f_right_x_04
from core.estimators import PoseSingle

In [None]:
from scipy.spatial.transform import Rotation as R

In [None]:
k_matrix = "calib/calibration_matrix_render_calib.npy"
d_coeff = "calib/distortion_coefficients_render_calib.npy"
config_filename = "jupyters/marker_poses.yaml"

edge_len = 0.7
marker_step = 10
n_markers = 100

type_ = "DICT_7X7_100"

if ARUCO_DICT.get(type_, None) is None:
    print(f"ArUCo tag type '{type_}' is not supported")
    sys.exit(0)

aruco_dict_type = ARUCO_DICT[type_]
calibration_matrix_path = k_matrix
distortion_coefficients_path = d_coeff

k = np.load(k_matrix)
d = np.load(d_coeff)


with open(config_filename, encoding='utf8') as f:
    cfg = yaml.load(f, Loader=yaml.FullLoader)

marker_poses = dict(
    zip([id for id in range(len(cfg['poses']))], 
        cfg['poses']))

In [None]:
# Загрузка истинных данных
with open('data/render_distance_long/report2.txt', 'r') as f:
    lines = f.readlines()

x_poses_front_gt = np.array([float(line.strip().replace(',','.')) for line in lines[::2]])

In [None]:
with open('data/render_distance_long/report1.txt', 'r') as f:
    lines = f.readlines()

x_poses_rear_gt = np.array([float(line.strip().replace(',','.')) for line in lines[::2]])

In [None]:
cap = cv2.VideoCapture('data/render_distance_long/front.mp4')

width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)/10)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter('data/results/demo_front.mov', cv2.VideoWriter_fourcc('M','J','P','G'), 30, (width, height))

estimator_video_front = PoseSingle(
    aruco_dict_type,
    1,
    n_markers,
    marker_poses,
    edge_len,
    k,
    d,
    apply_kf = True,
    transition_coef = 50,
    observation_coef = 0.02,
    x_bias = 1.25,
    left_edge_weight_func  = f_left_x_04,
)

if (cap.isOpened()== False): 
  print("Error opening video stream or file")

x_poses_front = []

while(cap.isOpened()):
    ret, frame = cap.read()
    if ret == True:
        frame, pose, size = estimator_video_front(frame, True)
        output = frame
        if pose.shape!=():
            output = display_pose(output, pose[:-1,[3]])
            x_poses_front.append(pose[0][3])
        else:
            x_poses_front.append(-1)
            
        out.write(output)
    else: 
        break

cap.release()
out.release()

In [None]:
# График предсказаний

# На видео кран начинает двигаться не сразу. А в файле изменение координаты происходит с самого начала.
# Поэтому вручную подбираем момент начала движения на видео (от него прямо зависит сдвиг предсказаний)

begin_move = 80
x_poses_front = np.array(x_poses_front)
plt.figure()
plt.plot(np.arange(0,len(x_poses_front[begin_move:])), x_poses_front[begin_move:])
plt.plot(np.arange(0,len(x_poses_front_gt)), x_poses_front_gt)
# plt.xlim(xmin=164, xmax=225)
# plt.ylim(ymin=25, ymax=40)
plt.show()

In [None]:
begin_move = 80
end_move = 680
x_poses_front = np.array(x_poses_front)
plt.figure()
resid = x_poses_front_gt[:end_move] - x_poses_front[begin_move:end_move+begin_move]
plt.plot(np.arange(0,len(resid)), resid)
plt.grid(True)
plt.show()

In [None]:
cap = cv2.VideoCapture('data/render_distance_long/rear.mp4')

width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)/10)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter('data/results/demo_rear.mov', cv2.VideoWriter_fourcc('M','J','P','G'), 30, (width, height))

estimator_video_rear = PoseSingle(
    aruco_dict_type,
    -1,
    n_markers,
    marker_poses,
    edge_len,
    k,
    d,
    apply_kf = True,
    transition_coef = 50,
    observation_coef = 0.02,
    x_bias = -1.5,
    right_edge_weight_func  = lambda x: np.tanh(10*(1-x-0.35)).clip(min=0),
)

if (cap.isOpened()== False): 
  print("Error opening video stream or file")

x_poses_rear = []

while(cap.isOpened()):
    ret, frame = cap.read()
    if ret == True:
        frame, pose, size = estimator_video_rear(frame, True)
        output = frame
        if pose.shape!=():
            output = display_pose(output, pose[:-1,[3]])
            x_poses_rear.append(pose[0][3])
        else:
            x_poses_rear.append(-1)
            
        out.write(output)
    else: 
        break

cap.release()
out.release()

In [None]:
begin_move = 90
x_poses_rear = np.array(x_poses_rear)
plt.plot(np.arange(0,len(x_poses_rear[begin_move:])), x_poses_rear[begin_move:])
plt.plot(np.arange(0,len(x_poses_rear_gt)), x_poses_rear_gt)

plt.show()

In [None]:
begin_move = 90
end_move = 610
x_poses_rear = np.array(x_poses_rear)
plt.figure()
resid = x_poses_rear_gt[:end_move] - x_poses_rear[begin_move:end_move+begin_move]
plt.plot(np.arange(0,len(resid)), resid)
plt.grid(True)
plt.ylim(ymin=-1, ymax=1)
plt.show()

In [None]:
# Среднее
begin_move_front = 80
begin_move_rear = 90
mean_x = np.mean([np.array(x_poses_rear[begin_move_rear:600+begin_move_rear]), np.array(x_poses_front[begin_move_front:600+begin_move_front])], axis=0)

In [None]:
plt.plot(np.arange(0,len(mean_x)), mean_x)
plt.plot(np.arange(0,len(x_poses_front_gt)), x_poses_front_gt)
plt.xlim(xmin=164, xmax=225)
plt.ylim(ymin=25, ymax=40)
plt.show()

In [None]:
plt.figure()
resid = x_poses_front_gt[:600] - mean_x[:600]
plt.plot(np.arange(0,len(resid)), resid)
plt.grid(True)
plt.show()

In [None]:
# Тест с обновленными краевыми функциями БЕЗ фильтра калмана

In [None]:
cap = cv2.VideoCapture('data/render_distance_long/front.mp4')

width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)/10)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter('data/results/demo_front_nokf.mov', cv2.VideoWriter_fourcc('M','J','P','G'), 30, (width,height))

estimator_video_front = PoseSingle(
    aruco_dict_type,
    1,
    n_markers,
    marker_poses,
    edge_len,
    k,
    d,
    apply_kf = False,
    transition_coef = 50,
    observation_coef = 0.02,
    x_bias = 1.25,
    left_edge_weight_func  = f_left_x_04,
)

if (cap.isOpened()== False): 
  print("Error opening video stream or file")

x_poses_front = []

while(cap.isOpened()):
    ret, frame = cap.read()
    if ret == True:
        frame, pose, size = estimator_video_front(frame, True)
        output = frame
        if pose.shape!=():
            output = display_pose(output, pose[:-1,[3]])
            x_poses_front.append(pose[0][3])
        else:
            x_poses_front.append(-1)
            
        out.write(output)
    else: 
        break

cap.release()
out.release()

In [None]:
begin_move = 80
x_poses_front = np.array(x_poses_front)
plt.figure()
plt.plot(np.arange(0,len(x_poses_front[begin_move:])), x_poses_front[begin_move:])
plt.plot(np.arange(0,len(x_poses_front_gt)), x_poses_front_gt)
plt.show()

In [None]:
begin_move = 80
end_move = 680
x_poses_front = np.array(x_poses_front)
plt.figure()
resid = x_poses_front_gt[:end_move] - x_poses_front[begin_move:end_move+begin_move]
plt.plot(np.arange(0,len(resid)), resid)
plt.grid(True)
plt.ylim(ymin=-1, ymax=1)
plt.show()

In [None]:
cap = cv2.VideoCapture('data/render_distance_long/rear.mp4')

width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)/10)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter('data/results/demo_rear_nokf.mov', cv2.VideoWriter_fourcc('M','J','P','G'), 30, (width,height))

estimator_video_rear = PoseSingle(
    aruco_dict_type,
    -1,
    n_markers,
    marker_poses,
    edge_len,
    k,
    d,
    apply_kf = False,
    transition_coef = 50,
    observation_coef = 0.02,
    x_bias = -1.5,
    right_edge_weight_func  = lambda x: np.tanh(10*(1-x-0.35)).clip(min=0),
)

if (cap.isOpened()== False): 
  print("Error opening video stream or file")

x_poses_rear = []

while(cap.isOpened()):
    ret, frame = cap.read()
    if ret == True:
        frame, pose, size = estimator_video_rear(frame, True)
        output = frame
        if pose.shape!=():
            output = display_pose(output, pose[:-1,[3]])
            x_poses_rear.append(pose[0][3])
        else:
            x_poses_rear.append(-1)
            
        out.write(output)
    else: 
        break

cap.release()
out.release()

In [None]:
begin_move = 90
x_poses_rear = np.array(x_poses_rear)
plt.plot(np.arange(0,len(x_poses_rear[begin_move:])), x_poses_rear[begin_move:])
plt.plot(np.arange(0,len(x_poses_rear_gt)), x_poses_rear_gt)

plt.show()

In [None]:
begin_move = 90
end_move = 610
x_poses_rear = np.array(x_poses_rear)
plt.figure()
resid = x_poses_rear_gt[:end_move] - x_poses_rear[begin_move:end_move+begin_move]
plt.plot(np.arange(0,len(resid)), resid)
plt.grid(True)
plt.ylim(ymin=-1, ymax=1)
plt.show()

In [None]:
# Среднее
begin_move_front = 80
begin_move_rear = 90
mean_x = np.mean([np.array(x_poses_rear[begin_move_rear:600+begin_move_rear]), np.array(x_poses_front[begin_move_front:600+begin_move_front])], axis=0)

In [None]:
plt.figure()
resid = x_poses_front_gt[:600] - mean_x[:600]
plt.plot(np.arange(0,len(resid)), resid)
plt.grid(True)
plt.ylim(ymin=-1, ymax=1)
plt.show()