## Create dataset

Do all imports.

In [None]:
# For input/output
from pathlib import Path

# For numerical methods
import numpy as np

# For image processing and visualization of results
import cv2
import matplotlib.pyplot as plt
from matplotlib.patches import ConnectionPatch

# For optimization with symforce
import symforce
symforce.set_epsilon_to_symbol()
import symforce.symbolic as sf
from symforce.values import Values
from symforce.opt.factor import Factor
from symforce.opt.optimizer import Optimizer
from symforce.opt.noise_models import BarronNoiseModel
import sym

Function to print things nicely.

In [None]:
def myprint(M):
    if M.shape:
        with np.printoptions(linewidth=150, formatter={'float': lambda x: f'{x:10.4f}'}):
            print(M)
    else:
        print(f'{M:10.4f}')

Function to compute projection error.

In [None]:
def apply_transform(R_inB_ofA, p_inB_ofA, p_inA):
    p_inB = np.row_stack([
        (R_inB_ofA @ p_inA_i + p_inB_ofA) for p_inA_i in p_inA
    ])
    return p_inB

def project(K, R_inB_ofA, p_inB_ofA, p_inA, warn=True):
    p_inB = apply_transform(R_inB_ofA, p_inB_ofA, p_inA)
    if not np.all(p_inB[:, 2] > 0):
        if warn:
            print('WARNING: non-positive depths')
    q = np.row_stack([K @ p_inB_i / p_inB_i[2] for p_inB_i in p_inB])
    return q[:, 0:2]

def projection_error(K, R_inB_ofA, p_inB_ofA, p_inA, b, warn=True):
    b_pred = project(K, R_inB_ofA, p_inB_ofA, p_inA, warn=warn)
    return np.linalg.norm(b_pred - b, axis=1)

Load two images from video.

In [None]:
# Specify filename
video_filename = Path('../../tutorials/20240305_realdata_whatbreaks/video.MOV')

# Create a video reader
video_src = cv2.VideoCapture(str(video_filename))

# Say what frames we want to read
# - index of first frame
i_frame_1 = 0
# - index of second frame
i_frame_2 = 30

# Read first frame
video_src.set(cv2.CAP_PROP_POS_FRAMES, i_frame_1)
success, frame = video_src.read()
assert(success)
img1 = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

# Read second frame
video_src.set(cv2.CAP_PROP_POS_FRAMES, i_frame_2)
success, frame = video_src.read()
assert(success)
img2 = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

Detection.

In [None]:
# Create a SIFT feature detector
sift = cv2.SIFT_create()

# Apply detector to find keypoints (pts) and descriptors (desc) in each image
pts1, desc1 = sift.detectAndCompute(image=img1, mask=None)
pts2, desc2 = sift.detectAndCompute(image=img2, mask=None)

Matching.

In [None]:
# Create a brute-force matcher
bf = cv2.BFMatcher(
    normType=cv2.NORM_L2,
    crossCheck=False,       # <-- IMPORTANT - must be False for kNN matching
)

# Function to get good matches with ratio test
def get_good_matches(descA, descB, threshold=0.5):
    # Find the two best matches between descriptors
    matches = bf.knnMatch(descA, descB, k=2)

    # Find the subset of good matches
    good_matches = []
    for m, n in matches:
        if m.distance / n.distance < threshold:
            good_matches.append(m)
    
    # Return good matches, sorted by distance (smallest first)
    return sorted(good_matches, key = lambda m: m.distance)

# Match the two images
matches = get_good_matches(desc1, desc2)
print(f'found {len(matches)} good matches')

Visualize all good matches.

In [None]:
# Create figure
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 10))

# Show images
ax1.imshow(img1, cmap='gray')
ax2.imshow(img2, cmap='gray')

# Show matches
for m in matches:
    # - Get location of keypoints
    p1 = pts1[m.queryIdx].pt
    p2 = pts2[m.trainIdx].pt
    # - Draw line connecting keypoint in first image with keypoint in second image
    fig.add_artist(
        ConnectionPatch(
            p1, p2, 
            'data', 'data',
            axesA=ax1, axesB=ax2,\
            color='red',
            connectionstyle='arc3, rad=0.',
            linewidth=0.1,
        )
    )
    # - Draw red dot at each keypoint
    ax1.plot(p1[0], p1[1], 'r.', markersize=2)
    ax2.plot(p2[0], p2[1], 'r.', markersize=2)

plt.show()

Define camera matrix.

In [None]:
K = np.array([[1565.7702703272157, 0.0, 964.2389356041999], [0.0, 1562.3561924508267, 537.4247202074102], [0.0, 0.0, 1.0]])

Get image coordinates of matches.

In [None]:
# Create a, b
a = []
b = []
for m in matches:
    a.append(pts1[m.queryIdx].pt)
    b.append(pts2[m.trainIdx].pt)
a = np.array(a)
b = np.array(b)

## Two-view reconstruction with OpenCV

Estimate $R^B_A$ and $p^B_A$.

In [None]:
# Get solution
num_inliers_cv, E_cv, R_inB_ofA_cv, p_inB_ofA_cv, mask_cv = cv2.recoverPose(
    a.copy(),
    b.copy(),
    K, np.zeros(4),
    K, np.zeros(4),
)

# Flatten the position (returned as a 2d array by default)
p_inB_ofA_cv = p_inB_ofA_cv.flatten()
mask_cv = mask_cv.flatten()

Estimate $p^A_1, \dotsc, p^A_n$.

In [None]:
points = cv2.triangulatePoints(
    K @ np.column_stack([np.eye(3), np.zeros(3)]),
    K @ np.column_stack([R_inB_ofA_cv, p_inB_ofA_cv]),
    a.copy().T,
    b.copy().T,
)

# Normalize points
points /= points[-1, :]

# Extract non-homogeneous coordinates
p_inA_cv = points[0:3, :].T

How many inliers.

In [None]:
print(f'OpenCV found {num_inliers_cv} inliers out of {len(mask_cv)}')

Get subsets of points that were declared inliers and outliers by OpenCV.

In [None]:
a_inliers_cv = np.array([a_i for a_i, mask_i in zip(a, mask_cv) if mask_i])
b_inliers_cv = np.array([b_i for b_i, mask_i in zip(b, mask_cv) if mask_i])
assert(len(a_inliers_cv) == np.count_nonzero(mask_cv))
assert(len(b_inliers_cv) == np.count_nonzero(mask_cv))

p_inA_inliers_cv = np.array([p_inA_i for p_inA_i, mask_i in zip(p_inA_cv, mask_cv) if mask_i])
assert(len(p_inA_inliers_cv) == np.count_nonzero(mask_cv))

a_outliers_cv = np.array([a_i for a_i, mask_i in zip(a, mask_cv) if not mask_i])
b_outliers_cv = np.array([b_i for b_i, mask_i in zip(b, mask_cv) if not mask_i])
p_inA_outliers_cv = np.array([p_inA_i for p_inA_i, mask_i in zip(p_inA_cv, mask_cv) if not mask_i])

Get reprojection errors.

In [None]:
# For inliers
e_a_inliers_cv = projection_error(K, np.eye(3), np.zeros(3), p_inA_inliers_cv, a_inliers_cv)
e_b_inliers_cv = projection_error(K, R_inB_ofA_cv, p_inB_ofA_cv, p_inA_inliers_cv, b_inliers_cv)

# For outliers
e_a_outliers_cv = projection_error(K, np.eye(3), np.zeros(3), p_inA_outliers_cv, a_outliers_cv)
e_b_outliers_cv = projection_error(K, R_inB_ofA_cv, p_inB_ofA_cv, p_inA_outliers_cv, b_outliers_cv)

## Two-view reconstruction with your code

### Initial guess

Your implementation of two-view reconstruction.

In [None]:
def skew(v):
    assert(type(v) == np.ndarray)
    assert(v.shape == (3,))
    return np.array([[0., -v[2], v[1]],
                     [v[2], 0., -v[0]],
                     [-v[1], v[0], 0.]])

def twoview_triangulate(alpha, beta, R_inB_ofA, p_inB_ofA):
    # INPUTS (alpha, beta, R_inB_ofA, p_inB_ofA)
    #  alpha        normalized coordinates of points in image A
    #  beta         normalized coordinates of points in image B
    #  R_inB_ofA    orientation of frame A in frame B
    #  p_inB_ofA    position of frame A in frame B
    #
    # OUTPUTS (p_inA, p_inB, mask)
    #  p_inA        triangulated points in frame A
    #  p_inB        triangulated points in frame B
    #  mask         1d array of length equal to number of triangulated points,
    #               with a "1" for each point that has positive depth in both
    #               frames and with a "0" otherwise

    # FIXME
    return None, None, None

def get_transformation(alpha):
    # INPUTS (alpha)
    #  alpha        normalized image coordinates (n x 3)
    #
    # OUTPUTS (T)
    #  T            transformation matrix (3 x 3)
    #
    # The transformation...
    #
    #  alpha_n = np.array([T @ alpha_i for alpha_i in alpha])
    #  
    # ...should produce normalized image coordinates whose centroid
    # is zero and whose mean distance from the centroid is sqrt(2).
    #
    # Similarly, the transformation...
    #
    #   E = T_beta.T @ E_n @ T_alpha
    #
    # ...should recover the estimate E of the essential matrix, given
    # the estimate E_n of the essential matrix that was computed using
    # the transformed points alpha_n and beta_n (corresponding to the
    # transformations T_alpha and T_beta).

    # FIXME
    return None

def twoview(a, b, K):
    # INPUTS (a, b, K)
    #  a            image coordinates of points in image A (n x 2)
    #  b            image coordinates of points in image B (n x 2)
    #  K            camera matrix

    # OUTPUTS (E, R_inB_ofA, p_inB_ofA, p_inA)
    #  E            estimate of essential matrix
    #  R_inB_ofA    estimate of orientation of frame A in frame B
    #  p_inB_ofA    estimate of position of frame A in frame B
    #  p_inA        estimate of triangulated position of points in frame A

    return None, None, None, None

Apply your code for two-view reconstruction.

In [None]:
# Get estimate
E, R_inB_ofA, p_inB_ofA, p_inA = twoview(a, b, K)

# Get reprojection errors
e_a = projection_error(K, np.eye(3), np.zeros(3), p_inA, a)
e_b = projection_error(K, R_inB_ofA, p_inB_ofA, p_inA, b)

### Optimization

Write a symbolic function that projects a point into the image.

In [None]:
def sf_projection(
    T_inC_ofW: sf.Pose3,
    p_inW: sf.V3,
    fx: sf.Scalar,
    fy: sf.Scalar,
    cx: sf.Scalar,
    cy: sf.Scalar,
    epsilon: sf.Scalar,
) -> sf.V2:
    p_inC = T_inC_ofW * p_inW
    z = sf.Max(p_inC[2], epsilon)  # <-- points with non-positive depth will have large but finite error
    return sf.V2(
        fx * (p_inC[0] / z) + cx,
        fy * (p_inC[1] / z) + cy,
    )

Write a symbolic function that computes the difference between a projected point and an image point.

In [None]:
def sf_projection_residual(
    T_inC_ofW: sf.Pose3,
    p_inW: sf.V3,
    q: sf.V2,
    fx: sf.Scalar,
    fy: sf.Scalar,
    cx: sf.Scalar,
    cy: sf.Scalar,
    epsilon: sf.Scalar,  
) -> sf.V2:
    q_proj = sf_projection(T_inC_ofW, p_inW, fx, fy, cx, cy, epsilon)

    # Find the reprojection error
    unwhitened_residual = sf.V2(q_proj - q)
    
    # Create a "noise model" that allows us to implement the Geman-McClure robust loss function
    noise_model = BarronNoiseModel(
        alpha=-2,
        scalar_information=1,
        x_epsilon=epsilon,
        alpha_epsilon=epsilon,
    )
    
    # Return the "whitened residual" that corresponds to the robust loss function
    return noise_model.whiten_norm(unwhitened_residual)

Create one more residual to fix the scale so that the distance between frame $A$ and frame $B$ is close to one.

In [None]:
def sf_scale_residual(
    T_inC_ofW: sf.Pose3,
    epsilon: sf.Scalar,
) -> sf.V1:
    return sf.V1(T_inC_ofW.t.norm() - 1)

Create initial values for optimization.

In [None]:
initial_values = Values(
    T_inA_ofA=sym.Pose3(
        R=sym.Rot3.from_rotation_matrix(np.eye(3)),
        t=np.zeros(3),
    ),
    T_inB_ofA=sym.Pose3(
        R=sym.Rot3.from_rotation_matrix(R_inB_ofA),
        t=p_inB_ofA,
    ),
    matches=[],
    fx=K[0, 0],
    fy=K[1, 1],
    cx=K[0, 2],
    cy=K[1, 2],
    epsilon=sym.epsilon,
)
for a_i, b_i, p_inA_i in zip(a, b, p_inA):
    match = Values(
        a=a_i,
        b=b_i,
        p_inA=p_inA_i,
    )
    initial_values['matches'].append(match)

Create factors for optimization.

In [None]:
factors = [
    Factor(
        residual=sf_scale_residual,
        keys=[
            'T_inB_ofA',
            'epsilon',
        ],
    )
]
for i_match, match in enumerate(initial_values['matches']):
    factors.append(Factor(
        residual=sf_projection_residual,
        keys=[
            'T_inA_ofA',
            f'matches[{i_match}].p_inA',
            f'matches[{i_match}].a',
            'fx',
            'fy',
            'cx',
            'cy',
            'epsilon',
        ],
    ))
    factors.append(Factor(
        residual=sf_projection_residual,
        keys=[
            'T_inB_ofA',
            f'matches[{i_match}].p_inA',
            f'matches[{i_match}].b',
            'fx',
            'fy',
            'cx',
            'cy',
            'epsilon',
        ],
    ))

Create optimizer.

In [None]:
optimized_keys = ['T_inB_ofA']
for i_match, match in enumerate(initial_values['matches']):
    optimized_keys.append(f'matches[{i_match}].p_inA')
optimizer = Optimizer(
    factors=factors,
    optimized_keys=optimized_keys,
    debug_stats=True,
    params=Optimizer.Params(
        iterations=100,
        use_diagonal_damping=True,      # <-- this makes a big difference (try turning it off!)
    ),
)

Run optimizer.

In [None]:
result = optimizer.optimize(initial_values)
assert(result.status == Optimizer.Status.SUCCESS)

Get results.

In [None]:
T_inB_ofA_sf = result.optimized_values['T_inB_ofA'].to_homogenous_matrix()
R_inB_ofA_sf = T_inB_ofA_sf[0:3, 0:3]
p_inB_ofA_sf = T_inB_ofA_sf[0:3, 3]
p_inA_sf = np.array([
    match['p_inA'] for match in result.optimized_values['matches']
])

Compute reprojection error from results.

In [None]:
e_a_sf = projection_error(K, np.eye(3), np.zeros(3), p_inA_sf, a)
e_b_sf = projection_error(K, R_inB_ofA_sf, p_inB_ofA_sf, p_inA_sf, b)

## Visualize results

Create function to show error statistics.

In [None]:
def show_stats(e_a, e_b):
    print(f'A ({len(e_a):5d}) : (mean, std, max, min) = ({np.mean(e_a):4.2f}, {np.std(e_a):4.2f}, {np.max(e_a):4.2f}, {np.min(e_a):4.2f})')
    print(f'B ({len(e_b):5d}) : (mean, std, max, min) = ({np.mean(e_b):4.2f}, {np.std(e_b):4.2f}, {np.max(e_b):4.2f}, {np.min(e_b):4.2f})')

Estimates.

In [None]:
print('ESTIMATES')
print('\nCV')
myprint(R_inB_ofA_cv)
myprint(p_inB_ofA_cv)
print('\nYOURS (initial guess)')
myprint(R_inB_ofA)
myprint(p_inB_ofA)
print('\nYOURS (optimized)')
myprint(R_inB_ofA_sf)
myprint(p_inB_ofA_sf)

Reprojection errors.

In [None]:
print('REPROJECTION ERRORS')
print('\nCV (inliers)')
show_stats(e_a_inliers_cv, e_b_inliers_cv)
print('\nCV (outliers)')
show_stats(e_a_outliers_cv, e_b_outliers_cv)
print('\nYOURS (initial guess)')
show_stats(e_a, e_b)
print('\nYOURS (optimized)')
show_stats(e_a_sf, e_b_sf)
threshold = 0.5
print('\nYOURS (optimized, with statistics only over erros less than {threshold} px)')
show_stats(
    np.array([e_a_sf_i for e_a_sf_i in e_a_sf if e_a_sf_i < threshold]),
    np.array([e_b_sf_i for e_b_sf_i in e_b_sf if e_b_sf_i < threshold]),
)

Histogram of reprojection errors.

In [None]:
fig, ax = plt.subplots(3, 2, figsize=(10, 5), sharex=True)
bins = np.linspace(0, 5, 50)

ax[0, 0].hist(e_a_inliers_cv, bins, alpha=0.5, label=f'inliers (cv)')
ax[0, 0].hist(e_a_outliers_cv, bins, alpha=0.5, label=f'outliers (cv)')
ax[0, 0].legend()
ax[0, 1].hist(e_b_inliers_cv, bins, alpha=0.5, label=f'inliers (cv)')
ax[0, 1].hist(e_b_outliers_cv, bins, alpha=0.5, label=f'outliers (cv)')
ax[0, 1].legend()
ax[0, 0].set_title(f'A')
ax[0, 1].set_title(f'B')

ax[1, 0].hist(e_a, bins, alpha=0.5, label=f'yours (initial)')
ax[1, 0].legend()
ax[1, 1].hist(e_b, bins, alpha=0.5, label=f'yours (initial)')
ax[1, 1].legend()

ax[2, 0].hist(e_a_sf, bins, alpha=0.5, label=f'yours (optimized)')
ax[2, 0].legend()
ax[2, 1].hist(e_b_sf, bins, alpha=0.5, label=f'yours (optimized)')
ax[2, 1].legend()

ax[2, 0].set_xlabel('error (pixels)')
ax[2, 1].set_xlabel('error (pixels)')
plt.show()

For more information on robust cost functions, see (for example) this paper:

[K. MacTavish and T. D. Barfoot, "At all Costs: A Comparison of Robust Cost Functions for Camera Correspondence Outliers," 2015 12th Conference on Computer and Robot Vision, Halifax, NS, Canada, 2015, pp. 62-69, doi: 10.1109/CRV.2015.52](https://doi.org/10.1109/CRV.2015.52)