In [1]:
import os
import cv2 
import time
import json
import math
import copy
import torch
import numpy as np
import kornia as K
import kornia.feature as KF
from vidstab import VidStab


from lightglue import LightGlue, SuperPoint
from lightglue.utils import load_image, rbd
import CSRansac

In [2]:
os.environ['KMP_DUPLICATE_LIB_OK']='True'

In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"

extractor = SuperPoint(max_num_keypoints=2048).eval().to(device)  # load the extractor
#matcher = LightGlue(features='superpoint', depth_confidence=0.9, width_confidence=0.95).eval().to(device)
matcher = LightGlue(features='superpoint', depth_confidence=-1, width_confidence=-1).eval().to(device)
#matcher.compile(mode='reduce-overhead')

In [4]:
print(torch.__version__)
print(device)

2.1.2
cuda


In [5]:
def match_lightglue(img0, img1):
    img0 = load_image(img0)
    img1 = load_image(img1)

    # extract local features
    feats0 = extractor.extract(img0.to(device))  # auto-resize the image, disable with resize=None
    feats1 = extractor.extract(img1.to(device))
    
    # match the features
    matches01 = matcher({'image0': feats0, 'image1': feats1})
    feats0, feats1, matches01 = [rbd(x) for x in [feats0, feats1, matches01]]  # remove batch dimension
    
    # get results
    kpts0 = feats0["keypoints"]
    kpts1 = feats1["keypoints"]
    matches = matches01['matches']  # indices with shape (K,2)
    points0 = kpts0[matches[..., 0]]  # coordinates in img0, shape (K,2)
    points1 = kpts1[matches[..., 1]]  # coordinates in img1, shape (K,2)
        
    return {
        "points0": points0,
        "points1": points1,
    }

In [6]:
def load_and_preprocess_image(image_path):
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    image = cv2.resize(image, (640, 480))  # 필요한 경우 이미지 크기 조정
    image = K.image_to_tensor(image, False).float() / 255.0
    image = image.to(device)
    return image

In [7]:
stabilizer = VidStab()

def matching_keypoints(target_img, video_img, stabilizing=False):
    # 이미지를 불러옴
    img0 = load_image(target_img, grayscale=True)
    if stabilizing == True:
        img1 = cv2.imread(video_img)
        img1 = stabilizer.stabilize_frame(img1)
        img1 = load_image(img1, grayscale=True)
    else:
        img1 = load_image(video_img , grayscale=True)

    # extract local features
    feats0 = extractor.extract(img0.to(device))  # auto-resize the image, disable with resize=None
    feats1 = extractor.extract(img1.to(device))

    # match the features
    matches01 = matcher({'image0': feats0, 'image1': feats1})
    feats0, feats1, matches01 = [rbd(x) for x in [feats0, feats1, matches01]]  # remove batch dimension

    # get results
    kpts0 = feats0["keypoints"]
    kpts1 = feats1["keypoints"]
    matches = matches01['matches']  # indices with shape (K,2)
    points0 = kpts0[matches[..., 0]]  # coordinates in img0, shape (K,2)
    points1 = kpts1[matches[..., 1]]  # coordinates in img1, shape (K,2)

    return {
        "points0": points0,
        "points1": points1,
    }

In [17]:
def get_errors(coord_list, float_origin_coordinate, len_coord, len_videos=1000):
    misannotate_error = 0
    pixel_error = 0
    
    for index in range(len_videos):
        for i in range(len_coord):
            try:
                origin_x = float_origin_coordinate[index][i][0]
                origin_y = float_origin_coordinate[index][i][1]
                
                _coord = coord_list[index][i]
                
                x = _coord[0][0]
                y = _coord[0][1]
                
                x = x / 640
                y = y / 480
                
                x = round(x, 4)
                y = round(y, 4)
                
                distance = math.sqrt((origin_x - x)**2 + (origin_y - y)**2)
                
                if distance > 0.1:
                    misannotate_error += 1
                
                if distance > pixel_error:
                    pixel_error = distance
            except:
                pass
                
    return misannotate_error, pixel_error

## Dataset 전처리

In [9]:
aircraft_datasets = "D:/AMARA/"

labels = os.path.join(aircraft_datasets + "label/")
video_dir = os.path.join(aircraft_datasets, "video/")
output_dir = os.path.join(aircraft_datasets, "frames_from_video/")
# stabilized_video_dir = os.path.join(aircraft_datasets, "stabilized_video")
# stabilized_frame_dir = os.path.join(aircraft_datasets, "stabilized_frame")
target_image_dir = os.path.join(aircraft_datasets, "image/")

In [10]:
# 배열 초기화
origin_coordinate_1 = []
origin_coordinate_2 = []
origin_coordinate_3 = []
origin_coordinate_4 = []
origin_coordinate_5 = []
float_origin_coordinate_1 = []
float_origin_coordinate_2 = []
float_origin_coordinate_3 = []
float_origin_coordinate_4 = []
float_origin_coordinate_5 = []

# 배열 리스트 생성
origin_coordinates = [
    origin_coordinate_1,
    origin_coordinate_2,
    origin_coordinate_3,
    origin_coordinate_4,
    origin_coordinate_5
]

float_origin_coordinates = [
    float_origin_coordinate_1,
    float_origin_coordinate_2,
    float_origin_coordinate_3,
    float_origin_coordinate_4,
    float_origin_coordinate_5
]

# 원점 좌표값 불러오기
label_index = 0
for label in os.listdir(labels):
    label_path = os.path.join(labels, label)
    for label_file in os.listdir(label_path):
        with open(os.path.join(label_path, label_file), "r") as f:
            json_file = json.load(f)
            coord = json_file["annotationList"]
            _coord = copy.deepcopy(coord)
            _coord = _coord[1:-1]
            
            # 현재 반복에 해당하는 배열에 요소 추가
            float_origin_coordinates[label_index].append(_coord)
            
            for i in range(len(coord)):
                coord[i][0] = coord[i][0] * 640
                coord[i][1] = coord[i][1] * 480
            coord = coord[1:-1]
            origin_coordinates[label_index].append(coord)
    
    label_index += 1
    if label_index >= 5:
        break

# # 각 배열 확인 (출력 예시)
# print("origin_coordinate_1:", origin_coordinate_1)
# print("origin_coordinate_2:", origin_coordinate_2)
# print("origin_coordinate_3:", origin_coordinate_3)
# print("origin_coordinate_4:", origin_coordinate_4)
# print("origin_coordinate_5:", origin_coordinate_5)
# print("float_origin_coordinate_1:", float_origin_coordinate_1)
# print("float_origin_coordinate_2:", float_origin_coordinate_2)
# print("float_origin_coordinate_3:", float_origin_coordinate_3)
# print("float_origin_coordinate_4:", float_origin_coordinate_4)
# print("float_origin_coordinate_5:", float_origin_coordinate_5)

In [11]:
print(len(origin_coordinates[0][0]))
print(len(float_origin_coordinates[0][0]))

368
368


In [12]:
# 원본 이미지 경로를 저장할 리스트
len_1 = []
len_2 = []
len_3 = []
len_4 = []
len_5 = []

lengths = [
    len_1,
    len_2,
    len_3,
    len_4,
    len_5
]

i = 0

index = 0
for videos in os.listdir(output_dir):
    image_path = os.path.join(output_dir, videos)
    for image_file in os.listdir(image_path):
        _path = os.path.join(image_path, image_file)
        for image in os.listdir(_path):
            lengths[index].append(os.path.join(_path, image))
    
    index += 1
    if index >= 5:
        break

# lengths 리스트의 길이 반환
print(len(lengths[0]))
print(len(lengths[1]))
print(len(lengths[2]))
print(len(lengths[3]))
print(len(lengths[4]))

34965
23565
34965
75890
75890


In [13]:
# 원본 이미지 경로를 저장할 리스트
image_1 = []
image_2 = []
image_3 = []
image_4 = []
image_5 = []

images = [
    image_1,
    image_2,
    image_3,
    image_4,
    image_5
]

i = 0

index = 0
for videos in os.listdir(output_dir):
    image_path = os.path.join(output_dir, videos)
    for image_file in os.listdir(image_path):
        _path = os.path.join(image_path, image_file)
        list = []
        for image in os.listdir(_path):
            list.append(os.path.join(_path, image))
            # 현재 반복에 해당하는 배열에 요소 추가
        images[index].append(list)
    
    index += 1
    if index >= 5:
        break

# images 리스트의 길이 반환
# num_images = len(images)
# print(f"총 이미지 수: {num_images}")

In [14]:

print(len(images[0]))
print(len(images[1]))
print(len(images[2]))
print(len(images[3]))
print(len(images[4]))
print(images[0][0])
print(len(images[0][0]))

95
95
95
95
95
['D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0000.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0001.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0002.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0003.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0004.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0005.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0006.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0007.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0008.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0009.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInstallation_00001_01\\frame_0010.jpg', 'D:/AMARA/frames_from_video/video_1\\FuelPumpInst

In [15]:
# stabilized_images = [[] for i in range(len(origin_coordinate))]
# i = 0

# # output_dir 내의 모든 폴더에 대한 반복
# for folder_name in os.listdir(stabilized_frame_path):
#     folder_path = os.path.join(stabilized_frame_path, folder_name)
    
#     for name in os.listdir(folder_path):
#         filename = os.path.join(folder_path, name)
#         stabilized_images[i].append(filename)
        
#     i = i + 1
    
# # images 리스트의 길이 반환
# print(len(stabilized_images[0]))

## 에러 평가 코드(인접 프레임)

In [19]:
epochs = 8

for video_type in range(0, 5):
    # 각종 평가 수치들
    misannotate_errors = []
    pixel_errors = []
    failed_inliers = 0
    zero_inliers = 0
    inlier_rates = 0
    
    for epoch in range(epochs):
        len_videos = len(origin_coordinates[video_type]) # 동영상의 개수
        coord_list = [[] for _ in range(len_videos)]
        
        for video_index in range(len_videos):
            len_coord = len(origin_coordinates[video_type][video_index])
            len_images = len(images[video_type][video_index])
            
            # 이미지의 길이만큼
            _images = images[video_type][video_index]
            
            coord_list[video_index] = [[] for _ in range(len_images)]
            
            img0 = _images[0]
            
            for i in range(len_images):
                x = origin_coordinates[video_type][video_index][i][0]
                y = origin_coordinates[video_type][video_index][i][1]
                
                if i != len_images - 1:
                    img1 = _images[i + 1]
                    
                    # LightGlue
                    results_lightglue = matching_keypoints(img0, img1, stabilizing=False)
                    target_keypoint = results_lightglue["points0"].cpu().numpy()
                    frame_keypoint = results_lightglue["points1"].cpu().numpy()
                    if len(target_keypoint) < 6:
                        failed_inliers += 1
                        continue

                    homography, mask = CSRansac.csransac(target_keypoint, frame_keypoint)
                    if mask <= 0.3:
                        if mask == 0:
                            zero_inliers += 1
                        failed_inliers += 1
                    
                    inlier_rates += mask
                    projected_pts = CSRansac.perspective_transform(np.array([x, y]), homography)

                    coord_list[video_index][i].append(projected_pts)
                    
                    img0 = img1
                    
                    
        #에러 측정            
        # disappear_error = 0
        misannotate_error, pixel_error = get_errors(coord_list, float_origin_coordinates[video_type], len_coord)
                
        #print("disappear_error:", disappear_error)
        misannotate_errors.append(misannotate_error)
        pixel_errors.append(pixel_error)
        
    
    
    error1 = sum(misannotate_errors) / len(misannotate_errors)
    error2 = sum(pixel_errors) / len(pixel_errors)
    error3 = inlier_rates / (len(lengths[video_type]) * epochs)

    print("failed_inliers:", failed_inliers)
    print("zero_inliers:", zero_inliers)
    print("misannotate_error:", error1)
    print("pixel_error:", error2)
    print("inlier_rate:", error3)
    print()

failed_inliers: 17706
zero_inliers: 0
misannotate_error: 0.5
pixel_error: 0.10467461282488003
inlier_rate: 0.8322376545899601

failed_inliers: 26297
zero_inliers: 0
misannotate_error: 1.625
pixel_error: 0.17317687807706017
inlier_rate: 0.6788313461599008

failed_inliers: 16492
zero_inliers: 0
misannotate_error: 0.25
pixel_error: 0.08616144873087536
inlier_rate: 0.8354989200464383

failed_inliers: 68391
zero_inliers: 0
misannotate_error: 19.375
pixel_error: 0.2882594250587717
inlier_rate: 0.7371604779475519

failed_inliers: 68290
zero_inliers: 0
misannotate_error: 24.875
pixel_error: 0.33891311796159945
inlier_rate: 0.7378704326116212



In [16]:
#원본 이미지를 기준으로 호모그래피 행렬을 구하고, 호모그래피 행렬을 이용하여 특징점의 좌표를 변환하는 코드
epochs = 2

for index in range(0, 5):
    # 각종 평가 수치들
    misannotate_errors = []
    pixel_errors = []
    failed_inliers = 0
    zero_inliers = 0
    inlier_rates = 0
    
    len_coord = len(origin_coordinates[index])
    coord_list = [[] for _i in range(len(origin_coordinates[index]))]
    
    # 10번 반복하여 측정한 에러를 구함
    for k in range(epochs):
        # 좌표의 개수(동영상의 개수)만큼 반복
        for i in range(len_coord):
            _images = images[index][i]
            _len_images = len(_images)
            
            x = origin_coordinates[index][i][0]
            y = origin_coordinates[index][i][1]
            
            # 두 번째 차원의 리스트 초기화
            coord_list[i] = [[] for _ in range(_len_images)]

            img0 = _images[0] # 첫 번째 이미지를 target 이미지로 설정
            
            for j in range(_len_images):
                if j != _len_images - 1:
                    img1 = _images[j+1]

                    # LightGlue
                    results_lightglue = matching_keypoints(img0, img1, stabilizing=False)
                    target_keypoint = results_lightglue["points0"].cpu().numpy()
                    frame_keypoint = results_lightglue["points1"].cpu().numpy()
                    if len(target_keypoint) < 6:
                        failed_inliers += 1
                        continue

                    homography, mask = CSRansac.csransac(target_keypoint, frame_keypoint)
                    if mask <= 0.3:
                        if mask == 0:
                            zero_inliers += 1
                        failed_inliers += 1
                    
                    inlier_rates += mask
                    projected_pts = CSRansac.perspective_transform(np.array([x, y]), homography)

                    coord_list[i][j].append(projected_pts)
                    
                    img0 = img1
                    x = projected_pts[0]
                    y = projected_pts[1]
                    
        #에러 측정            
        # disappear_error = 0
        misannotate_error, pixel_error = get_errors(coord_list, float_origin_coordinates[index], len_coord)
                
        #print("disappear_error:", disappear_error)
        misannotate_errors.append(misannotate_error)
        pixel_errors.append(pixel_error)
        
    
    
    error1 = sum(misannotate_errors) / len(misannotate_errors)
    error2 = sum(pixel_errors) / len(pixel_errors)
    error3 = inlier_rates / (len(lengths[index]) * epochs)

    print("failed_inliers:", failed_inliers)
    print("zero_inliers:", zero_inliers)
    print("misannotate_error:", error1)
    print("pixel_error:", error2)
    print("inlier_rate:", error3)
    print()

failed_inliers: 22178
zero_inliers: 0
misannotate_error: 0.5
pixel_error: 0.0961069490830834
inlier_rate: 0.8316822044444913

failed_inliers: 32372
zero_inliers: 0
misannotate_error: 0.4
pixel_error: 0.10667310724099914
inlier_rate: 0.6831353971849062

failed_inliers: 20529
zero_inliers: 0
misannotate_error: 0.1
pixel_error: 0.08448434681565134
inlier_rate: 0.8357186765942877

failed_inliers: 85901
zero_inliers: 0
misannotate_error: 3.9
pixel_error: 0.14904979391949064
inlier_rate: 0.7363167489959468

failed_inliers: 85145
zero_inliers: 0
misannotate_error: 8.0
pixel_error: 0.21190722618754756
inlier_rate: 0.738144709490343



## 에러 평가 코드(인접 + 칼만 필터)

[320.0, 186.41601599999998]


In [26]:

#원본 이미지를 기준으로 호모그래피 행렬을 구하고, 호모그래피 행렬을 이용하여 특징점의 좌표를 변환하는 코드
epochs = 2

# 칼만 필터 초기화
kalman = cv2.KalmanFilter(8, 4)
kalman.measurementMatrix = np.eye(4, 8, dtype=np.float32)
kalman.transitionMatrix = np.eye(8, dtype=np.float32)
kalman.processNoiseCov = np.eye(8, dtype=np.float32) * 0.01
kalman.measurementNoiseCov = np.eye(4, dtype=np.float32) * 0.1
kalman.errorCovPost = np.eye(8, dtype=np.float32)

for index in range(0, 5):
    # 각종 평가 수치들
    misannotate_errors = []
    pixel_errors = []
    failed_inliers = 0
    zero_inliers = 0
    inlier_rates = 0
    
    len_coord = len(origin_coordinates[index])
    coord_list = [[] for _i in range(len(origin_coordinates[index]))]
    
    # 10번 반복하여 측정한 에러를 구함
    for k in range(epochs):
        len_videos = len(origin_coordinates[index]) # 동영상의 개수
        
        # 좌표의 개수(동영상의 개수)만큼 반복
        for i in range(len_coord):
            _images = images[index][i]
            _len_images = len(_images)
            
            coord = origin_coordinates[index][i][0]
            
            x = coord[0]
            y = coord[1]
            
            # 두 번째 차원의 리스트 초기화
            coord_list[i] = [[] for _ in range(_len_images)]

            img0 = _images[0] # 첫 번째 이미지를 target 이미지로 설정
            
            for j in range(_len_images):
                if j != _len_images - 1:
                    img1 = _images[j+1]

                    # LightGlue
                    results_lightglue = matching_keypoints(img0, img1, stabilizing=False)
                    target_keypoint = results_lightglue["points0"].cpu().numpy()
                    frame_keypoint = results_lightglue["points1"].cpu().numpy()
                    if len(target_keypoint) < 6:
                        failed_inliers += 1
                        continue

                    homography, mask = CSRansac.csransac(target_keypoint, frame_keypoint)
                    if mask <= 0.3:
                        if mask == 0:
                            zero_inliers += 1
                        failed_inliers += 1
                    
                    inlier_rates += mask
                    measured = np.array([[x, y, 0, 0]], dtype=np.float32).T
                    kalman.correct(measured)
                    prediction = kalman.predict()
                    projected_pts = (prediction[0][0], prediction[1][0])

                    projected_pts = CSRansac.perspective_transform(np.array([projected_pts[0], projected_pts[1]]), homography)
                    
                    coord_list[i][j].append(projected_pts)
                    
                    img0 = img1
                    x = projected_pts[0]
                    y = projected_pts[1]
                    
        #에러 측정            
        # disappear_error = 0
        misannotate_error, pixel_error = get_errors(coord_list, float_origin_coordinates[index], len_coord, len_videos)
                
        #print("disappear_error:", disappear_error)
        # print("misannotate_error:", misannotate_error)
        # print("pixel_error:", pixel_error)

        #disappear_errors.append(disappear_error)
        misannotate_errors.append(misannotate_error)
        pixel_errors.append(pixel_error)
        
    
    
    error1 = sum(misannotate_errors) / len(misannotate_errors)
    error2 = sum(pixel_errors) / len(pixel_errors)
    error3 = inlier_rates / (len(lengths[index]) * epochs)

    print("failed_inliers:", failed_inliers)
    print("zero_inliers:", zero_inliers)
    print("misannotate_error:", error1)
    print("pixel_error:", error2)
    print("inlier_rate:", error3)
    print()

failed_inliers: 4372
zero_inliers: 0
misannotate_error: 3455.5
pixel_error: 0.5444986288444487
inlier_rate: 0.8321170809648124

failed_inliers: 6649
zero_inliers: 0
misannotate_error: 4047.5
pixel_error: 0.9575493710338165
inlier_rate: 0.6768006848552878

failed_inliers: 4155
zero_inliers: 0
misannotate_error: 6327.5
pixel_error: 0.5289352947736141
inlier_rate: 0.8344927590082998

failed_inliers: 17186
zero_inliers: 0
misannotate_error: 5560.0
pixel_error: 0.6141193664858373
inlier_rate: 0.7379516195819851

failed_inliers: 17165
zero_inliers: 0
misannotate_error: 6844.0
pixel_error: 0.5372420134952917
inlier_rate: 0.7366455974471953



## 에러 평가 코드 (LOFTR + 칼만 필터)

In [28]:
import cv2
import numpy as np
import torch

# 칼만 필터 초기화
kalman = cv2.KalmanFilter(8, 4)
kalman.measurementMatrix = np.eye(4, 8, dtype=np.float32)
kalman.transitionMatrix = np.eye(8, dtype=np.float32)
kalman.processNoiseCov = np.eye(8, dtype=np.float32) * 0.01
kalman.measurementNoiseCov = np.eye(4, dtype=np.float32) * 0.1
kalman.errorCovPost = np.eye(8, dtype=np.float32)

# LoFTR 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loftr = KF.LoFTR(pretrained='outdoor').to(device)

# Lucas-Kanade optical flow 파라미터
lk_params = dict(winSize=(15, 15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

epochs = 1

for video_type in range(3, 5):
    misannotate_errors = []
    pixel_errors = []
    failed_inliers = 0
    zero_inliers = 0
    inlier_rates = 0
    
    for epoch in range(epochs):
        len_videos = len(origin_coordinates[video_type])
        coord_list = [[] for _ in range(len_videos)]
        
        for video_index in range(len_videos):
            len_coord = len(origin_coordinates[video_type][video_index])
            len_images = len(images[video_type][video_index])
            _images = images[video_type][video_index]
                
            coord_list[video_index] = [[] for _ in range(len_images)]
            
            img0 = _images[0]
            prev_gray = cv2.imread(img0, cv2.IMREAD_GRAYSCALE)
            
            image = load_and_preprocess_image(img0)
            with torch.no_grad():
                input_dict = {"image0": image, "image1": image}
                correspondences = loftr(input_dict)
            
            prev_points = correspondences['keypoints0'].cpu().numpy()
            prev_points = prev_points.reshape(-1, 1, 2)
            
            for i in range(len_images):
                x = origin_coordinates[video_type][video_index][i][0]
                y = origin_coordinates[video_type][video_index][i][1]
            
                if i != len_images - 1:
                    img1 = _images[i + 1]
                    gray = cv2.imread(img1, cv2.IMREAD_GRAYSCALE)
                    next_points, status, error = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None, **lk_params)
                    good_old = prev_points[status == 1]
                    good_new = next_points[status == 1]

                    if len(good_new) >= 4:
                        H, mask = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
                            
                        if H is not None:
                            inliers = mask.ravel().tolist().count(1)
                            total_points = len(mask)
                            inlier_rate = inliers / total_points
                            if inlier_rate <= 0.3:
                                failed_inliers += 1
                            inlier_rates += inlier_rate
                                
                            points = np.array([[x, y]], dtype='float32')
                            projected_points = cv2.perspectiveTransform(np.array([points]), H)
                            measured = np.array([[projected_points[0][0][0], projected_points[0][0][1], 0, 0]], dtype=np.float32).T
                            kalman.correct(measured)
                            prediction = kalman.predict()
                            predicted_points = (prediction[0][0], prediction[1][0])
                    else:
                        failed_inliers += 1       
                
                projected_pts = tuple(projected_points[0][0])
                coord_list[video_index][i].append(projected_pts)
                
                prev_gray = gray
                prev_points = good_new.reshape(-1, 1, 2)
        
        misannotate_error, pixel_error = get_errors(coord_list, float_origin_coordinates[video_type], len_coord, len_videos)
                
        misannotate_errors.append(misannotate_error)
        pixel_errors.append(pixel_error)
        
    error1 = sum(misannotate_errors) / len(misannotate_errors)
    error2 = sum(pixel_errors) / len(pixel_errors)
    error3 = inlier_rates / (len(lengths[video_type]) * epochs)

    print("failed_inliers:", failed_inliers)
    print("zero_inliers:", zero_inliers)
    print("misannotate_error:", error1)
    print("pixel_error:", error2)
    print("inlier_rate:", error3)
    print()


failed_inliers: 1
zero_inliers: 0
misannotate_error: 48760.0
pixel_error: 0.7287079745073197
inlier_rate: 0.9966143044570095



KeyboardInterrupt: 

0.4117537

In [25]:
len(float_origin_coordinates[3])

95

In [15]:
# 칼만 필터 초기화
kalman = cv2.KalmanFilter(8, 4)
kalman.measurementMatrix = np.eye(4, 8, dtype=np.float32)
kalman.transitionMatrix = np.eye(8, dtype=np.float32)
kalman.processNoiseCov = np.eye(8, dtype=np.float32) * 0.01
kalman.measurementNoiseCov = np.eye(4, dtype=np.float32) * 0.1
kalman.errorCovPost = np.eye(8, dtype=np.float32)


# LoFTR 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loftr = KF.LoFTR(pretrained='outdoor').to(device)

# Lucas-Kanade optical flow 파라미터
lk_params = dict(winSize=(15, 15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

epochs = 1

for index in range(5):
    # 각종 평가 수치들
    misannotate_errors = []
    pixel_errors = []
    failed_inliers = 0
    zero_inliers = 0
    inlier_rates = 0
    
    
    len_videos = len(origin_coordinates[index])
    coord_list = [[] for _i in range(len(origin_coordinates[index]))]
    
    # 10번 반복하여 측정한 에러를 구함
    for k in range(epochs):
        for vid_index in range(len_videos):
            len_coord = len(origin_coordinates[index][vid_index])
            # 좌표의 개수(동영상의 개수)만큼 반복
            for i in range(len_coord):
                try:
                    _images = images[index][i]
                except:
                    print(i)
                    print(len(images[index]))
                    print(len(origin_coordinates[index]))
                    print(len_coord)
                _len_images = len(_images)
                
                x = origin_coordinates[index][vid_index][i][0]
                y = origin_coordinates[index][vid_index][i][1]
                
                # 두 번째 차원의 리스트 초기화
                coord_list[i] = [[] for _ in range(_len_images)]

                img0 = _images[0] # 첫 번째 이미지를 target 이미지로 설정
                prev_gray = cv2.imread(img0, cv2.IMREAD_GRAYSCALE)
                
                #LOFTR
                image = load_and_preprocess_image(img0)

                # 특징점 추출
                with torch.no_grad():
                    input_dict = {"image0": image, "image1": image}
                    correspondences = loftr(input_dict)

                # 특징점 시각화
                prev_points = correspondences['keypoints0'].cpu().numpy()
                prev_points = prev_points.reshape(-1, 1, 2)
                
                for j in range(_len_images):
                    if j != _len_images - 1:
                        img1 = _images[j+1]
                        
                        
                        # 첫 프레임에서 특징점 찾기
                        gray = cv2.imread(img1, cv2.IMREAD_GRAYSCALE)
                        next_points, status, error = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None, **lk_params)
                        good_old = prev_points[status == 1]
                        good_new = next_points[status == 1]

                        if len(good_new) >= 4:
                            H, mask = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
                                
                            if H is not None:
                                inliers = mask.ravel().tolist().count(1)
                                total_points = len(mask)
                                inlier_rate = inliers / total_points
                                if inlier_rate <= 0.3:
                                    failed_inliers += 1
                                inlier_rates += inlier_rate
                                    
                                # 원본 이미지의 좌표를 변환
                                points = np.array([[x, y]], dtype='float32')
                                projected_points = cv2.perspectiveTransform(np.array([points]), H)
                                # 칼만 필터 업데이트
                                measured = np.array([[projected_points[0][0][0], projected_points[0][0][1], 0, 0]], dtype=np.float32).T
                                kalman.correct(measured)
                                prediction = kalman.predict()
                                predicted_points = (prediction[0][0], prediction[1][0])
                        else:
                            failed_inliers += 1       
                        

                        projected_pts = tuple(projected_points[0][0])
                        coord_list[i][j].append(projected_pts)
                        
                        prev_gray = gray
                        prev_points = good_new.reshape(-1, 1, 2)
                    
                    
                    
        #에러 측정            
        # disappear_error = 0
        misannotate_error, pixel_error = get_errors(coord_list, float_origin_coordinates[index], len_coord, len_videos)
                
        #print("disappear_error:", disappear_error)
        # print("misannotate_error:", misannotate_error)
        # print("pixel_error:", pixel_error)

        #disappear_errors.append(disappear_error)
        misannotate_errors.append(misannotate_error)
        pixel_errors.append(pixel_error)
        
    
    
    error1 = sum(misannotate_errors) / len(misannotate_errors)
    error2 = sum(pixel_errors) / len(pixel_errors)
    error3 = inlier_rates / (len(lengths[index]) * epochs)

    print("failed_inliers:", failed_inliers)
    print("zero_inliers:", zero_inliers)
    print("misannotate_error:", error1)
    print("pixel_error:", error2)
    print("inlier_rate:", error3)
    print()

95
95
95
368


IndexError: list assignment index out of range

## 저널용 비디오 생성

In [15]:
# import os

target_images = []
target_image_dir = "jernal_targer_image"

videos = []
videos_dir = "jernal_videos"

for image in os.listdir(target_image_dir):
    target_images.append(os.path.join(target_image_dir, image))
    
for video in os.listdir(videos_dir):
    video_path = os.path.join(videos_dir, video)
    video = []
    for frame in os.listdir(video_path):
        video.append(os.path.join(video_path, frame))
        
    videos.append(video)


print(target_images)
print(videos)

['jernal_targer_image\\WheelAndTireAssemblyRemoval_00007_01.png', 'jernal_targer_image\\WheelAndTireAssemblyRemoval_00007_02.png', 'jernal_targer_image\\WheelAndTireAssemblyRemoval_00007_03.png', 'jernal_targer_image\\WheelAndTireAssemblyRemoval_00007_04.png', 'jernal_targer_image\\WheelAndTireAssemblyRemoval_00007_05.png']
[['jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0000.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0001.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0002.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0003.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0004.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0005.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0006.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0007.jpg', 'jernal_videos\\WheelAndTireAssemblyRemoval_00007_01\\frame_0008.jpg', 'jernal_videos\\WheelAndTireAssem

In [16]:
coordinates = [[0.5000092, 0.5369453], [0.5000096, 0.5369451000000001], [0.4271423, 0.4999952], [0.4205159, 0.5372209], [0.4270934, 0.5285428]]

In [34]:
# 칼만 필터 초기화
kalman = cv2.KalmanFilter(8, 4)
kalman.measurementMatrix = np.eye(4, 8, dtype=np.float32)
kalman.transitionMatrix = np.eye(8, dtype=np.float32)
kalman.processNoiseCov = np.eye(8, dtype=np.float32) * 0.01
kalman.measurementNoiseCov = np.eye(4, dtype=np.float32) * 0.1
kalman.errorCovPost = np.eye(8, dtype=np.float32)

# LoFTR 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loftr = KF.LoFTR(pretrained='outdoor').to(device)

# Lucas-Kanade optical flow 파라미터
lk_params = dict(winSize=(15, 15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

for i in range(3, 5):
    # fourcc = cv2.VideoWriter_fourcc(*'DIVX')
    # out = cv2.VideoWriter('annotated_kalman_' + str(i) + '.mp4', fourcc, 30, (640, 480))
    
    img0 = target_images[i]  # 첫 번째 이미지를 target 이미지로 설정
    prev_gray = cv2.imread(img0, cv2.IMREAD_GRAYSCALE)
    prev_gray = cv2.resize(prev_gray, (640, 480))
    
    # LOFTR
    image = load_and_preprocess_image(img0)
    
    # 특징점 추출
    with torch.no_grad():
        input_dict = {"image0": image, "image1": image}
        correspondences = loftr(input_dict)

    prev_points = correspondences['keypoints0'].cpu().numpy()
    prev_points = prev_points.reshape(-1, 1, 2)
    
    _len_images = len(videos[i])
    
    
    
    for j in range(_len_images):
        if j != _len_images - 1:
            img1 = videos[i][j]
            
            # 첫 프레임에서 특징점 찾기
            _frame = cv2.imread(img1)
            gray = cv2.cvtColor(_frame, cv2.COLOR_BGR2GRAY)
            gray = cv2.resize(gray, (640, 480))
            next_points, status, error = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None, **lk_params)
            good_old = prev_points[status == 1]
            good_new = next_points[status == 1]

            if len(good_new) >= 4:
                H, mask = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
                    
                if H is not None:
                    inliers = mask.ravel().tolist().count(1)
                    total_points = len(mask)
                    inlier_rate = inliers / total_points
                    if inlier_rate <= 0.3:
                        cv2.imshow("frame", _frame)
                        continue
                    
                    # 원본 이미지의 좌표를 변환
                    x, y = float_origin_coordinates[i][92][j]
                    x = x * 640
                    y = y * 480
                    
                    points = np.array([[x, y]], dtype='float32')
                    projected_points = cv2.perspectiveTransform(np.array([points]), H)
                    # 칼만 필터 업데이트
                    measured = np.array([[projected_points[0][0][0], projected_points[0][0][1], 0, 0]], dtype=np.float32).T
                    kalman.correct(measured)
                    prediction = kalman.predict()
                    predicted_points = (prediction[0][0], prediction[1][0])
                    points = np.array([[predicted_points[0], predicted_points[1]]], dtype='float32')
                    
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 15, (0, 0, 255), -1)
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 3, (0, 0, 0), -1)
                    
                    cv2.imshow("frame", _frame)

                    # out.write(_frame)
                    
                    key = cv2.waitKey(5)
                    if key == 27:
                        break
                    
                prev_gray = gray
                prev_points = good_new.reshape(-1, 1, 2)
        
    # out.release()

cv2.destroyAllWindows()


In [37]:
# Lightglue
for i in range (3, 5):    
    fourcc = cv2.VideoWriter_fourcc(*'DIVX')
    out = cv2.VideoWriter('annotated_lightglue_' + str(i) + '.mp4', fourcc, 30, (640, 480))
    
    x, y = coordinates[i]
    x = x * 1280
    y = y * 960
    
    img0 = target_images[i]
    
    # 각 프레임 처리
    for frame in range(len(videos[i])):
        _frame = cv2.imread(videos[i][frame])
        img1 = videos[i][frame]
        
        
        # LightGlue
        results_lightglue = matching_keypoints(img0, img1, stabilizing=False)
        target_keypoint = results_lightglue["points0"].cpu().numpy()
        frame_keypoint = results_lightglue["points1"].cpu().numpy()

        homography, mask = CSRansac.csransac(target_keypoint, frame_keypoint)
        
        x, y = CSRansac.perspective_transform(np.array([x, y]), homography)

        cv2.circle(_frame, (int(x), int(y)), 15, (0, 0, 255), -1)
        cv2.circle(_frame, (int(x), int(y)), 3, (0, 0, 0), -1)
        #cv2.imshow('frame', _frame)
        
        img0 = img1
        
        out.write(_frame)
        
        # key = cv2.waitKey(5)
        # if key == 27:
        #     break
        
    out.release()
    # cap.release()

cv2.destroyAllWindows()


## 저널용 속도 측정

In [62]:
# Lightglue
for i in range (len(target_images)):    
    fourcc = cv2.VideoWriter_fourcc(*'DIVX')
    out = cv2.VideoWriter('annotated_lightglue_' + str(i) + '.mp4', fourcc, 30, (640, 480))
    
    x, y = coordinates[i]
    x = x * 640
    y = y * 480
    
    frame_count = 0
    total_time = 0
    
    img0 = target_images[i]
    
    start_time = time.time()
    
    # 각 프레임 처리
    for frame in range(len(videos[i])):
        # _frame = cv2.imread(videos[i][frame])
        img1 = videos[i][frame]
        
        # LightGlue
        results_lightglue = matching_keypoints(img0, img1, stabilizing=False)
        target_keypoint = results_lightglue["points0"].cpu().numpy()
        frame_keypoint = results_lightglue["points1"].cpu().numpy()

        homography, mask = CSRansac.csransac(target_keypoint, frame_keypoint)
        projected_pts = CSRansac.perspective_transform(np.array([x, y]), homography)
        #cv2.imshow('frame', _frame)
        
        img0 = img1
        
        # key = cv2.waitKey(5)
        # if key == 27:
        #     break
        
        frame_count += 1
        
    end_time = time.time()
    
    total_time += end_time - start_time   
    average_time = frame_count / total_time
    print("FPS : ", average_time)
    

FPS :  10.208562032768482
FPS :  10.457685857382842
FPS :  10.715528074720735
FPS :  10.490097274481045
FPS :  10.46868641100674


In [None]:
# 칼만 필터 초기화
kalman = cv2.KalmanFilter(8, 4)
kalman.measurementMatrix = np.eye(4, 8, dtype=np.float32)
kalman.transitionMatrix = np.eye(8, dtype=np.float32)
kalman.processNoiseCov = np.eye(8, dtype=np.float32) * 0.01
kalman.measurementNoiseCov = np.eye(4, dtype=np.float32) * 0.1
kalman.errorCovPost = np.eye(8, dtype=np.float32)


# LoFTR 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loftr = KF.LoFTR(pretrained='outdoor').to(device)

# Lucas-Kanade optical flow 파라미터
lk_params = dict(winSize=(15, 15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

    
for frame in range(len(target_images)):
    frame_count = 0
    total_time = 0

    start_time = time.time()

    img0 = target_images[frame] # 첫 번째 이미지를 target 이미지로 설정
    prev_gray = cv2.imread(img0, cv2.IMREAD_GRAYSCALE)
    
    #LOFTR
    image = load_and_preprocess_image(img0)
    

    # 특징점 추출
    with torch.no_grad():
        input_dict = {"image0": image, "image1": image}
        correspondences = loftr(input_dict)

    prev_points = correspondences['keypoints0'].cpu().numpy()
    prev_points = prev_points.reshape(-1, 1, 2)
    
    _len_images = len(videos[i][frame])
    
    for j in range(len(videos[i][frame])):
        coord = coordinates[frame][j]
        x = coord[0] * 640
        y = coord[1] * 480
        
        if j != _len_images - 1:
            img1 = videos[j][frame]
            
            # 첫 프레임에서 특징점 찾기
            gray = cv2.imread(img1, cv2.IMREAD_GRAYSCALE)
            next_points, status, error = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None, **lk_params)
            good_old = prev_points[status == 1]
            good_new = next_points[status == 1]

            if len(good_new) >= 4:
                H, mask = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
                    
                if H is not None:
                    # 원본 이미지의 좌표를 변환
                    points = np.array([[x, y]], dtype='float32')
                    projected_points = cv2.perspectiveTransform(np.array([points]), H)
                    # 칼만 필터 업데이트
                    measured = np.array([[projected_points[0][0][0], projected_points[0][0][1], 0, 0]], dtype=np.float32).T
                    kalman.correct(measured)
                    prediction = kalman.predict()
                    predicted_points = (prediction[0][0], prediction[1][0])  
            
            prev_gray = gray
            prev_points = good_new.reshape(-1, 1, 2)
        
        end_time = time.time()
        frame_processing_time = end_time - start_time
        
        total_time += frame_processing_time
        frame_count += 1
        
        
    average_time = frame_count / total_time
    print("FPS : ", average_time)
                    
                    
                    

In [43]:
# 칼만 필터 초기화
kalman = cv2.KalmanFilter(8, 4)
kalman.measurementMatrix = np.eye(4, 8, dtype=np.float32)
kalman.transitionMatrix = np.eye(8, dtype=np.float32)
kalman.processNoiseCov = np.eye(8, dtype=np.float32) * 0.01
kalman.measurementNoiseCov = np.eye(4, dtype=np.float32) * 0.1
kalman.errorCovPost = np.eye(8, dtype=np.float32)

# LoFTR 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loftr = KF.LoFTR(pretrained='outdoor').to(device)

# Lucas-Kanade optical flow 파라미터
lk_params = dict(winSize=(15, 15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

for i in range(3, 5):
    # fourcc = cv2.VideoWriter_fourcc(*'DIVX')
    # out = cv2.VideoWriter('annotated_kalman_' + str(i) + '.mp4', fourcc, 30, (640, 480))
    
    img0 = target_images[i]  # 첫 번째 이미지를 target 이미지로 설정
    prev_gray = cv2.imread(img0, cv2.IMREAD_GRAYSCALE)
    prev_gray = cv2.resize(prev_gray, (640, 480))
    
    # LOFTR
    image = load_and_preprocess_image(img0)
    
    # 특징점 추출
    with torch.no_grad():
        input_dict = {"image0": image, "image1": image}
        correspondences = loftr(input_dict)

    prev_points = correspondences['keypoints0'].cpu().numpy()
    prev_points = prev_points.reshape(-1, 1, 2)
    
    _len_images = len(videos[i])
    
    # 원본 이미지의 좌표를 변환
    x, y = float_origin_coordinates[i][92][0]
    x = x * 640
    y = y * 480
    points = np.array([[x, y]], dtype='float32')
    
    for j in range(_len_images):
        if j != _len_images - 1:
            img1 = videos[i][j]
            
            # 첫 프레임에서 특징점 찾기
            _frame = cv2.imread(img1)
            gray = cv2.cvtColor(_frame, cv2.COLOR_BGR2GRAY)
            gray = cv2.resize(gray, (640, 480))
            next_points, status, error = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None, **lk_params)
            good_old = prev_points[status == 1]
            good_new = next_points[status == 1]

            if len(good_new) >= 4:
                H, mask = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
                    
                if H is not None:
                    inliers = mask.ravel().tolist().count(1)
                    total_points = len(mask)
                    inlier_rate = inliers / total_points
                    if inlier_rate <= 0.3:
                        cv2.imshow("frame", _frame)
                        continue
                    
                    
                    projected_points = cv2.perspectiveTransform(np.array([points]), H)
                    # 칼만 필터 업데이트
                    measured = np.array([[projected_points[0][0][0], projected_points[0][0][1], 0, 0]], dtype=np.float32).T
                    kalman.correct(measured)
                    prediction = kalman.predict()
                    predicted_points = (prediction[0][0], prediction[1][0])
                    
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 15, (0, 0, 255), -1)
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 3, (0, 0, 0), -1)
                    
                    cv2.imshow("frame", _frame)
                    
                    points = np.array([[predicted_points[0], predicted_points[1]]], dtype='float32')

                    # out.write(_frame)
                    
                    key = cv2.waitKey(5)
                    if key == 27:
                        break
                    
                prev_gray = gray
                prev_points = good_new.reshape(-1, 1, 2)
        
    # out.release()

cv2.destroyAllWindows()


In [36]:
# 칼만 필터 초기화
kalman = cv2.KalmanFilter(8, 4)
kalman.measurementMatrix = np.eye(4, 8, dtype=np.float32)
kalman.transitionMatrix = np.eye(8, dtype=np.float32)
kalman.processNoiseCov = np.eye(8, dtype=np.float32) * 0.01
kalman.measurementNoiseCov = np.eye(4, dtype=np.float32) * 0.1
kalman.errorCovPost = np.eye(8, dtype=np.float32)

# LoFTR 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loftr = KF.LoFTR(pretrained='outdoor').to(device)

# Lucas-Kanade optical flow 파라미터
lk_params = dict(winSize=(15, 15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

for i in range(0, 5):
    # fourcc = cv2.VideoWriter_fourcc(*'DIVX')
    # out = cv2.VideoWriter('annotated_kalman_' + str(i) + '.mp4', fourcc, 30, (640, 480))
    
    img0 = target_images[i]  # 첫 번째 이미지를 target 이미지로 설정
    prev_gray = cv2.imread(img0, cv2.IMREAD_GRAYSCALE)
    prev_gray = cv2.resize(prev_gray, (640, 480))
    
    # LOFTR
    image = load_and_preprocess_image(img0)
    
    # 특징점 추출
    with torch.no_grad():
        input_dict = {"image0": image, "image1": image}
        correspondences = loftr(input_dict)

    prev_points = correspondences['keypoints0'].cpu().numpy()
    prev_points = prev_points.reshape(-1, 1, 2)
    
    _len_images = len(videos[i])
    
    # 원본 이미지의 좌표를 변환
    x, y = float_origin_coordinates[i][92][0]
    x = x * 640
    y = y * 480
    points = np.array([[x, y]], dtype='float32')
    
    for j in range(_len_images):
        if j != _len_images - 1:
            img1 = videos[i][j]
            
            # 첫 프레임에서 특징점 찾기
            _frame = cv2.imread(img1)
            gray = cv2.cvtColor(_frame, cv2.COLOR_BGR2GRAY)
            gray = cv2.resize(gray, (640, 480))
            next_points, status, error = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None, **lk_params)
            good_old = prev_points[status == 1]
            good_new = next_points[status == 1]

            if len(good_new) >= 4:
                H, mask = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
                    
                if H is not None:
                    inliers = mask.ravel().tolist().count(1)
                    total_points = len(mask)
                    inlier_rate = inliers / total_points
                    if inlier_rate <= 0.3:
                        cv2.imshow("frame", _frame)
                        continue
                    
                    
                    projected_points = cv2.perspectiveTransform(np.array([points]), H)
                    # 칼만 필터 업데이트
                    measured = np.array([[projected_points[0][0][0], projected_points[0][0][1], 0, 0]], dtype=np.float32).T
                    kalman.correct(measured)
                    prediction = kalman.predict()
                    predicted_points = (prediction[0][0], prediction[1][0])
                    
                    predicted_points = (projected_points[0][0][0], projected_points[0][0][1])
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 15, (0, 0, 255), -1)
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 3, (0, 0, 0), -1)
                    
                    cv2.imshow("frame", _frame)
                    
                    if i == 3 or i == 4:
                        points = np.array([[predicted_points[0], predicted_points[1]]], dtype='float32')

                    # out.write(_frame)
                    
                    key = cv2.waitKey(5)
                    if key == 27:
                        break
                    
                prev_gray = gray
                prev_points = good_new.reshape(-1, 1, 2)
        
    # out.release()

cv2.destroyAllWindows()


In [29]:
import cv2
import numpy as np
import torch
from numpy.linalg import inv

# EKF 관련 파라미터 초기화
dt = 1.0
A = np.array([[1, dt, 0, 0],
              [0, 1, 0, 0],
              [0, 0, 1, dt],
              [0, 0, 0, 1]])
Q = 0.1 * np.eye(4)
R = np.array([[0, 0],
              [0, 0]])
x_0 = np.array([320, 0, 240, 0])
P_0 = 10 * np.eye(4)

# Jacobian 계산 함수
def jacobian_H(x):
    return np.array([[1, 0, 0, 0],
                     [0, 0, 1, 0]])

# 비선형 시스템 모델의 예측 함수
def f(x):
    return A @ x

# 비선형 측정 모델의 예측 함수
def h(x):
    return np.array([x[0], x[2]])

# EKF 알고리즘 함수
def ekf_predict(x, P):
    x_pred = f(x)
    P_pred = A @ P @ A.T + Q
    return x_pred, P_pred

def ekf_update(x_pred, P_pred, z):
    H = jacobian_H(x_pred)
    K = P_pred @ H.T @ inv(H @ P_pred @ H.T + R)
    y = z - h(x_pred)
    x_upd = x_pred + K @ y
    P_upd = P_pred - K @ H @ P_pred
    return x_upd, P_upd

# LoFTR 모델 초기화
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loftr = KF.LoFTR(pretrained='outdoor').to(device)

# Lucas-Kanade optical flow 파라미터
lk_params = dict(winSize=(15, 15), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

for i in range(0, 5):
    img0 = target_images[i]  # 첫 번째 이미지를 target 이미지로 설정
    prev_gray = cv2.imread(img0, cv2.IMREAD_GRAYSCALE)
    prev_gray = cv2.resize(prev_gray, (640, 480))
    
    # LOFTR
    image = load_and_preprocess_image(img0)
    
    # 특징점 추출
    with torch.no_grad():
        input_dict = {"image0": image, "image1": image}
        correspondences = loftr(input_dict)

    prev_points = correspondences['keypoints0'].cpu().numpy()
    prev_points = prev_points.reshape(-1, 1, 2)
    
    _len_images = len(videos[i])
    
    # 원본 이미지의 좌표를 변환
    x, y = float_origin_coordinates[i][92][0]
    x = x * 640
    y = y * 480
    points = np.array([[x, y]], dtype='float32')

    x_esti, P = x_0, P_0
    
    for j in range(_len_images):
        coord = float_origin_coordinates[i][92][j]
        x = coord[0] * 640
        y = coord[1] * 480
        points = np.array([[x, y]], dtype='float32')
        if j != _len_images - 1:
            img1 = videos[i][j]
            
            # 첫 프레임에서 특징점 찾기
            _frame = cv2.imread(img1)
            gray = cv2.cvtColor(_frame, cv2.COLOR_BGR2GRAY)
            gray = cv2.resize(gray, (640, 480))
            next_points, status, error = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None, **lk_params)
            good_old = prev_points[status == 1]
            good_new = next_points[status == 1]

            if len(good_new) >= 4:
                H, mask = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
                    
                if H is not None:
                    inliers = mask.ravel().tolist().count(1)
                    total_points = len(mask)
                    inlier_rate = inliers / total_points
                    if inlier_rate <= 0.3:
                        cv2.imshow("frame", _frame)
                        continue
                    
                    projected_points = cv2.perspectiveTransform(np.array([points]), H)
                    z_meas = np.array([projected_points[0][0][0], projected_points[0][0][1]])

                    # EKF 예측
                    x_pred, P_pred = ekf_predict(x_esti, P)

                    # EKF 업데이트
                    x_esti, P = ekf_update(x_pred, P_pred, z_meas)
                    
                    predicted_points = (x_esti[0], x_esti[2])
                    
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 15, (0, 0, 255), -1)
                    cv2.circle(_frame, (int(predicted_points[0]), int(predicted_points[1])), 3, (0, 0, 0), -1)
                    
                    cv2.imshow("frame", _frame)
                    
                    points = np.array([[predicted_points[0], predicted_points[1]]], dtype='float32')

                    key = cv2.waitKey(5)
                    if key == 27:
                        break
                    
                prev_gray = gray
                prev_points = good_new.reshape(-1, 1, 2)

cv2.destroyAllWindows()

