In [None]:
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import multiprocessing as mp
from functools import partial
import cv2
from typing import List
from utils.homography import HomographySIFT, HomographyResult
import torch
from utils.image_stitcher import make_gif_of_sequence



def load_files_from_dir(path):
    data = {}
    list_files = list(path.glob('*.npz'))
    for p in tqdm(list_files):
        try:
            d = np.load(p)
        except:
            continue
        for k, v in d.items():
            data.setdefault(k, []).extend( v)
    if not data:
        try:
            data = np.load(path.with_suffix('.npz'))
        except:
            raise FileNotFoundError(f'No files found in {path}')
    indices = np.argsort(data['time_ns'])
    data = {k:np.stack(v)[indices] for k, v in data.items()}
    return data


path_to_files = Path('/home/navhomelinux/Downloads')

left = load_files_from_dir(path_to_files / 'pan')
right = load_files_from_dir(path_to_files / 'mono')
list(right.keys())

Fix timing between stereo cameras

In [None]:
diff = np.abs(left['time_ns'][:, None] - right['time_ns'][None, :])
diff = np.argmin(diff, axis=0)

plt.figure()
plt.plot(diff, label='before')

left = {k: v.copy()[diff] for k, v in left.items()}
diff = np.abs(left['time_ns'][:, None] - right['time_ns'][None, :])
diff = np.argmin(diff, axis=0)
plt.plot(diff, label='after')
plt.legend()
plt.grid()
plt.show()

# diff = np.min(diff, axis=0)
# diff = diff / 1e9
# # diff = [(pan['time_ns'][p] - q) / 1e9 for p, q in zip(diff, mono['time_ns'])]
# plt.plot(diff[diff < 0.1])

# # [p for p in (diff / 1e9)]
# # 
# # for i in range(len(pan['time_ns'])):
# #     dict_of_connections.setdefault(pan['time_ns'][i], []).append(mono['time_ns'][i])

In [None]:
distance_between_centers_of_cameras = 0.1  # meters   ?????????????????????????????????????????
distance_from_ground = 50 # meters
focal_length = 9.8 * 1e-3 # meters
size_of_pixel = 17 * 1e-6 # meters
image_width = 336 # pixels
image_height = 256 # pixels

delta_distance_from_ground = 3 # meters

In [None]:
fov_width_rad = 2 * np.arctan(image_width * size_of_pixel / (2 * focal_length))
fov_width_degree = fov_width_rad * 180 / np.pi
fov_width_meters = 2 * distance_from_ground * np.tan(fov_width_rad / 2)
fov_height_rad = 2 * np.arctan(image_height * size_of_pixel / (2 * focal_length))
fov_height_degree = fov_height_rad * 180 / np.pi
fov_height_meters = 2 * distance_from_ground * np.tan(fov_height_rad / 2)
print(f"FOV width: {fov_width_degree:.1f}deg, {fov_width_meters:.1f}m")
print(f"FOV height: {fov_height_degree:.1f}deg, {fov_height_meters:.1f}m")


In [None]:
fov_single_pixel_width_meters = 2 * distance_from_ground * size_of_pixel / focal_length
fov_single_pixel_height_meters = 2 * distance_from_ground * size_of_pixel / focal_length
print(f"FOV single pixel width: {fov_single_pixel_width_meters:.2g}m")
print(f"FOV single pixel height: {fov_single_pixel_height_meters:.2g}m")

min_fov_change = 2 * (distance_from_ground-delta_distance_from_ground) * size_of_pixel / focal_length
max_fov_change = 2 * (distance_from_ground+delta_distance_from_ground) * size_of_pixel / focal_length
print(f"Min FOV change: {min_fov_change:.2g}m")
print(f"Max FOV change: {max_fov_change:.2g}m")

In [None]:
distance_between_cameras_in_pixels = distance_between_centers_of_cameras / fov_single_pixel_width_meters
print(f"Distance between cameras in pixels: {distance_between_cameras_in_pixels:.2g} pixels")

In [None]:
# plot the first frame side-by-side
fig, ax = plt.subplots(1, 2, figsize=(15, 5))
ax[0].imshow(left['frames'][0])
ax[1].imshow(right['frames'][0])
ax[0].set_title('Left camera')
ax[1].set_title('Right camera')
plt.show()

In [None]:
# Use cv2 to find the homography between the frames

def find_homography(left, right):
    left = cv2.normalize(left, left, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
    right = cv2.normalize(right, right, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
    orb = cv2.ORB_create()
    kp1, des1 = orb.detectAndCompute(left, None)
    kp2, des2 = orb.detectAndCompute(right, None)
    bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
    matches = bf.match(des1, des2)
    src_pts = np.float32([kp1[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
    dst_pts = np.float32([kp2[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
    M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, 5.0)
    return M, mask
find_homography(left['frames'][0], right['frames'][0])[0]

In [None]:
def warp_image(img, M):
    return cv2.warpPerspective(img, M, (img.shape[1], img.shape[0]))

def _calc_loss(M, left, right):
    loss = 0
    for idx in range(len(left['frames'])):
        estimated_right = warp_image(left['frames'][idx], M)
        loss += np.mean(np.abs((estimated_right - right['frames'][idx])))
    loss /= len(left['frames'])
    return loss



NW = 100
NH = 100
array_distances_w = np.linspace(0.9, 2, NW)
array_distances_h = np.linspace(5.5, 7, NH)
M_permutations = np.eye(3)[None, None]
M_permutations = M_permutations.repeat(NH, axis=0).repeat(NW, axis=1)
M_permutations[:, :, 0, 2] = array_distances_h[:, None]
M_permutations[:, :, 1, 2] = array_distances_w[None, :]
M_permutations = M_permutations.reshape(-1, 3, 3)

func = partial(_calc_loss, left=left.copy(), right=right.copy())


with mp.Pool(mp.cpu_count()) as pool:
    ret_val = list(tqdm(pool.imap(func, M_permutations, 
                                  chunksize=2**6), 
                                #   chunksize=len(M_permutations) // mp.cpu_count()), 
                                  total=len(M_permutations)))

print(min(ret_val))
print(M_permutations[np.argmin(ret_val)])


In [None]:
plt.plot(ret_val)

In [None]:
DEVICE = 'cuda'

with torch.inference_mode():
    homography = HomographySIFT().to(DEVICE, dtype=torch.float32)
    x = np.concatenate([left['frames'][None], right['frames'][None]], axis=0).transpose(1, 0, 2, 3)
    x -= x.min()
    x = x/ x.max()
    x = torch.from_numpy(x)
    ret_val: List[HomographyResult] = [homography(x=x_.to(DEVICE, dtype=torch.float32), m=None, mask=None, verbose=False).cpu() for x_ in tqdm(torch.split(x, 1, dim=0))]
h = np.concatenate([p.homography for p in ret_val], 0)

In [None]:
# make_gif_of_sequence([p.warped[0, 1] for p in ret_val], 'homography.gif')

In [None]:
for i, j in np.ndindex(3, 3):
    d = h[:, i, j]
    std = np.std(d)
    d = d[np.abs(d - np.mean(d)) < 1 * std]
    plt.figure()
    plt.scatter(range(len(d)), d, label='y', s=2)
    plt.plot(np.mean(d) * np.ones_like(d), label='mean', c='r', linewidth=1)
    # plt.plot(np.mean(d) * np.ones_like(d)-1,  '--', c='black', linewidth=1)
    # plt.plot(np.mean(d) * np.ones_like(d])+1,  '--', c='black', linewidth=1)
    # plt.yticks(np.arange(np.floor(d.min()), np.ceil(d.max()), 1))
    plt.title(f'Homography {i}, {j}')
    plt.grid()
    plt.show()
    plt.close()

In [None]:
def warp_image(img, M):
    return cv2.warpPerspective(img, M, (img.shape[1], img.shape[0]))


def make_mask(img, M):
    mask = np.ones_like(img)
    mask = cv2.warpPerspective(mask, M, (img.shape[1], img.shape[0]))
    mask[mask > 0.5] = 1
    return mask


M = np.array([[ 1.00253339e+00,  1.38941996e-02, -4.26872392e+00],
       [-1.38602545e-02,  9.98299525e-01, -4.89828861e-01],
       [ 1.72839530e-05, -1.02908181e-05,  1.00000000e+00]], dtype=float)
ret = right['frames'][0].astype(float) - warp_image(left['frames'][0], M).astype(float)
ret[make_mask(left['frames'][0], M) == 0] = np.nan
plt.matshow(ret)
plt.colorbar()