In [None]:
import cv2
import numpy as np
import os
import re
from mmeval import EndPointError

import pickle

import matplotlib.pyplot as plt
from matplotlib.colors import hsv_to_rgb
from scipy.spatial import ConvexHull
from scipy.interpolate import griddata

from collections import defaultdict

from common.warp import forward_warp_bilinear
from common.kitti import load_kitti_flow, save_kitti_flow


In [None]:
def collect_sources(kitti_path: str):
    gt_map: dict[int, tuple[np.ndarray, np.ndarray]] = {}
    frame_map: dict[int, tuple[np.ndarray, np.ndarray]] = {}

    files = os.listdir(os.path.join(kitti_path, "flow_occ"))
    pattern=re.compile(r'^(\d{6})_10\.png$')
    for filename in files:
        match = pattern.match(filename)
        if match:
            index = int(match.group(1))

            gt_flow, gt_valid = load_kitti_flow(os.path.join(kitti_path, "flow_occ", filename))
            gt_map[index] = (gt_flow, gt_valid)

    frames: dict[int, dict[int, np.ndarray]] = {}

    files = os.listdir(os.path.join(kitti_path, "image_2"))
    pattern=re.compile(r'^(\d{6})_(\d{2})\.png$')
    for filename in files:
        match = pattern.match(filename)
        if match:
            index = int(match.group(1))
            frame_number = int(match.group(2))
            
            frame = cv2.imread(os.path.join(kitti_path, "image_2", filename))
            
            framedict = frames.get(index)
            if not framedict:
                framedict = {}
            framedict[frame_number] = frame
            frames[index] = framedict
            
    for index, framedict in frames.items():
        frame_10 = framedict.get(10)
        frame_11 = framedict.get(11)
        
        if frame_10 is None or frame_11 is None:
            print('Error', index)
            continue
            
        frame_map[index] = (frame_10, frame_11)

    return gt_map, frame_map
            
def vector_field_to_rgb(vector_field, mask=None):
    U = vector_field[..., 0]
    V = vector_field[..., 1]
    magnitude = np.sqrt(U**2 + V**2)
    angle = np.arctan2(V, U)

    if mask is not None:
        valid_mag = magnitude[mask]
    else:
        valid_mag = magnitude

    # Normalize magnitude
    max_magnitude = valid_mag.max() if valid_mag.size > 0 else 1.0
    mag_norm = magnitude / max_magnitude

    # Angle maps to hue
    H = (angle + np.pi) / (2 * np.pi)
    # Magnitue maps to saturation
    S = mag_norm
    V_img = np.ones_like(S)

    if mask is not None:
        S[~mask] = 0.0
        V_img[~mask] = 0.0

    HSV = np.stack((H, S, V_img), axis=-1)
    RGB = hsv_to_rgb(HSV)
    return RGB

def convex_hull_from_mask(mask):
    coords = np.argwhere(mask)  # shape (N, 2), rows: (y, x)

    hull = ConvexHull(coords)

    hull_vertices = coords[hull.vertices]
    return hull_vertices


In [None]:
kitti_path = r"./data_kitti"
gt_map, frame_map = collect_sources(kitti_path)

In [None]:
for index, (gt_flow, gt_valid) in gt_map.items():
    if np.count_nonzero(gt_valid) > 100:
        
        flow_h, flow_w = gt_flow.shape[:2]
        
        points = convex_hull_from_mask(gt_valid)[:,::-1]
        flows = gt_flow[points[:,1], points[:,0]]
        
        hull = ConvexHull(points)
        hull_points = points[hull.vertices]
        
        mask = np.zeros((flow_h, flow_w), dtype=np.uint8)
        
        cv2.fillPoly(mask, [np.round(hull_points).astype(np.int32)], 1)

        y, x = np.nonzero(mask)
        inside_points = np.column_stack((x, y)).astype(np.float32)

        interp_flows = griddata(points, flows, inside_points, method='cubic')
        valid_mask = ~np.isnan(interp_flows).any(axis=1)
        valid_points = inside_points[valid_mask]
        valid_flows = interp_flows[valid_mask]

        new_flows = np.zeros((flow_h, flow_w,2), dtype=np.float32)
        new_flows[valid_points[:,1].astype(np.uint32), valid_points[:,0].astype(np.uint32)] = valid_flows
        
        new_mask = np.zeros((flow_h, flow_w), dtype=bool)
        new_mask[valid_points[:,1].astype(np.uint32), valid_points[:,0].astype(np.uint32)] = True
        

        
        
        
        # plt.imshow(gt_valid, cmap='gray', origin='lower')
        # plt.plot(hull_vertices[:, 1], hull_vertices[:, 0], 'r-', lw=2)  # x = cols, y = rows
        # plt.plot(hull_vertices[:, 1], hull_vertices[:, 0], 'ro')
        # plt.title("Convex Hull from Mask")
        # plt.axis('equal')
        # plt.show()
        plt.imshow(vector_field_to_rgb(gt_flow, mask=gt_valid))
        plt.axis('off')
        plt.title(f"2D Vector Field Visualization ({index})")
        plt.show()
        plt.imshow(vector_field_to_rgb(new_flows, mask=new_mask))
        plt.axis('off')
        plt.title(f"2D Vector Field Visualization ({index})")
        plt.show()
        # zero_mask = find_and_visualize_second_derivative(gt_flow, visualize=True)

In [None]:
def interpolate_convex_hull(flow_h, flow_w, convex_hull_pairs, method):
    # Interpolation method that interpolates flow vectors between annotated points in 2D
    
    new_flows = np.zeros((flow_h, flow_w,2), dtype=np.float32)
    new_mask = np.zeros((flow_h, flow_w), dtype=bool)
    
    if len(convex_hull_pairs) < 3:
        return new_flows, new_mask
           
    points = np.array([p1 for p1, _ in convex_hull_pairs], dtype=np.float32)
    flows = np.array([(p2[0] - p1[0], p2[1] - p1[1]) for p1, p2 in convex_hull_pairs], dtype=np.float32)

    X,Y = np.meshgrid(np.arange(flow_w),np.arange(flow_h))

    interp_flows = griddata(points, flows, (X,Y), method=method)
    
    valid_mask = ~np.isnan(interp_flows).any(axis=2)
    valid_x, valid_y = X[valid_mask], Y[valid_mask]
    valid_flows = interp_flows[valid_mask]

    new_flows[valid_y, valid_x] = valid_flows
    new_mask[valid_y, valid_x] = True
    
    return new_flows, new_mask

def interpolate_convex_hull_with_homography(flow_h, flow_w, convex_hull_pairs):
    # Interpolation method that fits a homography transformation between the annotated points.  
    
    new_flows = np.zeros((flow_h, flow_w, 2), dtype=np.float32)
    new_mask = np.zeros((flow_h, flow_w), dtype=np.uint8)

    if len(convex_hull_pairs) < 4:
        return new_flows, new_mask

    src_pts = np.array([p1 for p1, _ in convex_hull_pairs], dtype=np.float32)
    dst_pts = np.array([p2 for _, p2 in convex_hull_pairs], dtype=np.float32)

    H, status = cv2.findHomography(src_pts, dst_pts, method=0)
    if H is None:
        return new_flows, new_mask

    # Create a mask of the convex hull.
    convex_hull = cv2.convexHull(src_pts.astype(np.int32))
    cv2.fillConvexPoly(new_mask, convex_hull, 1)
    new_mask = new_mask.astype(bool)
    
    # Create grid
    X, Y = np.meshgrid(np.arange(flow_w), np.arange(flow_h))
    all_coords = np.stack([X, Y], axis=-1).reshape(-1, 2).astype(np.float32)

    # Select only coords inside the convex hull
    inside_mask = new_mask[Y.ravel(), X.ravel()]
    inside_coords = all_coords[inside_mask]

    if inside_coords.size == 0:
        return new_flows, new_mask

    # Apply homography transformation to the convex-hull
    warped_coords = cv2.perspectiveTransform(inside_coords.reshape(-1, 1, 2), H).reshape(-1, 2)

    flow = warped_coords - inside_coords

    valid_yx = np.where(new_mask)
    new_flows[valid_yx[0], valid_yx[1]] = flow

    return new_flows, new_mask


In [None]:
# REDO INTERPOLATION

raw_annotation_dir = os.path.join(kitti_path, 'raw_annotations')
flow_occ_dir = os.path.join(kitti_path, 'flow_occ')
for filename in os.listdir(raw_annotation_dir):
    with open(os.path.join(raw_annotation_dir, filename), 'rb') as f:
        info = pickle.load(f)
    index = int(os.path.splitext(filename)[0])
    
    convex_hull_pairs = info['convex_hull_annotations']
    if len(info['annotations']) == 0 and len(convex_hull_pairs) >= 3:
        flow, valid = interpolate_convex_hull_with_homography(384, 1280, convex_hull_pairs)
        # flow, valid = interpolate_convex_hull(384, 1280, convex_hull_pairs, 'cubic')
        gt_filename = os.path.join(flow_occ_dir, f'{index:06d}_10.png')
        save_kitti_flow(gt_filename, flow, valid)
    

In [None]:
linear_flow, linear_mask = interpolate_convex_hull(384, 1280, convex_hull_pairs, 'linear')
cubic_flow, cubic_mask = interpolate_convex_hull(384, 1280, convex_hull_pairs, 'cubic')

plt.imshow(vector_field_to_rgb(linear_flow, mask=linear_mask))
plt.axis('off')
plt.title("2D Vector Field Visualization")
plt.show()
plt.imshow(vector_field_to_rgb(cubic_flow, mask=cubic_mask))
plt.axis('off')
plt.title("2D Vector Field Visualization")
plt.show()

plt.pcolor(np.linalg.norm(linear_flow - cubic_flow, axis=2))
plt.axis('off')
plt.title("2D Vector Field Visualization")
plt.show()


