# 📸 Data Preprocessing

This notebook details the steps taken to preprocess the data for the project. The raw image pairs are stored in the folder `data/raw` and the cleaned data will be stored in the folder `data/processed`. Each sample is a pair of images that capture the same scence.

## Setup

---

Let's install some necessary dependencies and set global variables.

In [None]:
%reload_ext autoreload
%autoreload 2

In [None]:
# Autoroot
import autorootcwd

In [None]:
from typing import List, Optional, Tuple, Union

import cv2 as cv
import numpy as np
from matplotlib import pyplot as plt

# Imports
from tqdm import tqdm

## Alignment (Dummy Images)

---

Due to the different setup in the cameras (different lenses with different focal lengths), the images for an image pair are not exactly aligned. For this reason, we have to come up with an automated way of aligning the images. A classical computer vision pipeline to achieve this is the following:

* Extract meaningful features from both images (called **keypoints**)
* Match the features from both images
* Transform, warp and crop the larger image for alignment.

In [None]:
# Load image example
train = cv.imread("imgs/cinestill-800t.jpg")
train = cv.cvtColor(train, cv.COLOR_BGR2RGB)  # from BGR to RGB

# Apply some basic transformations (crop + rotation)
height, width = train.shape[:2]
M = cv.getRotationMatrix2D(((width // 2, height // 2)), 10, 1.0)
query = rotated = cv.warpAffine(train, M, (width, height))  # Rotate
query = query[500:2000, 500:3000]  # Crop

# Save grayscale version
train_gray = cv.cvtColor(train, cv.COLOR_RGB2GRAY)
query_gray = cv.cvtColor(query, cv.COLOR_RGB2GRAY)

fig, axs = plt.subplots(ncols=2, figsize=(10, 5))
axs[0].imshow(train)
axs[1].imshow(query);

We will follow the naming convention of a `train` and `query` image for this notebook. The `train` image is a complex scene which includes the `query` image. Hence, for our pipeline the digital image would be the `train` image while the film equivalent would be the `query` image.

### Feature Extraction

Nice, we have our dummy images. First, we have to find relevant keypoints in the images that we will later match on. We will start by exploring different methods to find the keypoints and descriptors of the images.

* **SIFT** (Scale-Invariant Feature Transform) [[Paper](), [Wikipedia](https://en.wikipedia.org/wiki/Scale-invariant_feature_transform), [Tutorial](https://docs.opencv.org/4.x/da/df5/tutorial_py_sift_intro.html)]
* **ORB** (Oriented FAST and Rotated BRIEF) [[Paper](), [Wikipedia](), [Tutorial](https://docs.opencv.org/4.x/d1/d89/tutorial_py_orb.html)]

**SIFT Algorithm**

In [None]:
# Create SIFT object and detect keypoints
sift = cv.SIFT_create()
kp = sift.detect(train_gray, None)

# Draw image w/ keypoints
plt.imshow(cv.drawKeypoints(train_gray, kp, None, color=(0, 255, 0)));

**ORB Algorithm**

In [None]:
# Initiate ORB detector
orb = cv.ORB_create()
kp = orb.detect(train_gray, None)

# Draw image w/ keypoints
plt.imshow(cv.drawKeypoints(train_gray, kp, None, color=(0, 255, 0)));

Let's define a function that will return the keypoints and descriptors of an image for different methods and with the option of only searching over an area in the image.

In [None]:
def extract_features(
    img: np.ndarray, method: str = "orb", mask: Union[np.ndarray, None] = None, **kwargs
) -> Tuple[List[cv.KeyPoint], np.ndarray]:
    """
    Extract features from an image using a given method.
    Currently supports SIFT and ORB.

    Args:
        img (np.ndarray): Image to get features

    Returns:
        Tuple[List[cv.KeyPoint], np.ndarray]: Tuple containing the keypoints and descriptors
    """
    # Convert image to grayscale
    gray = cv.cvtColor(img, cv.COLOR_RGB2GRAY)

    # Create the extractor
    if method == "sift":
        extractor = cv.SIFT_create(**kwargs)
    elif method == "orb":
        extractor = cv.ORB_create(**kwargs)
    else:
        raise ValueError("Invalid method")

    # Extract features
    kp, des = extractor.detectAndCompute(gray, mask)

    return kp, des

Nice, let's try to align two match the features between the original image and its crop. We will start 

In [None]:
# Extract keypoints and descriptors from images
train_kp, train_ds = extract_features(train, method="orb")

# Plot
_, axs = plt.subplots(ncols=2, figsize=(15, 5))
axs[0].imshow(train)
axs[1].imshow(
    cv.drawKeypoints(cv.cvtColor(train, cv.COLOR_RGB2GRAY), train_kp, None, color=(0, 255, 0))
)

print(
    f"Found {len(train_kp)} keypoints in original image (each descriptor has {train_ds.shape[1]} features)"
)

In [None]:
# Extract keypoints and descriptors from images
query_kp, query_ds = extract_features(query, method="orb")

# Plot
_, axs = plt.subplots(ncols=2, figsize=(15, 5))
axs[0].imshow(query)
axs[1].imshow(
    cv.drawKeypoints(cv.cvtColor(query, cv.COLOR_RGB2GRAY), query_kp, None, color=(0, 255, 0))
)

print(
    f"Found {len(query_kp)} keypoints in original image (each descriptor has {query_ds.shape[1]} features)"
)

Nice, this worked - however we can already see that it is not likely that we have an overlap in keypoints between the train and query image because ORB detects the keypoints in different regions of the image. For now, we will ignore this and continue with the matching.

### Matching

or each image we have $n$ **keypoints**, each descriped by  in $d$ dimensions and we wish to find "high-quality" matches. OpenCV defines two matching algorithms:

* **Brute-Force Matcher** [[Tutorial]()]
* **FLANN Matcher** [[Tutorial]()]

**Brute-Force Matcher**

In [None]:
# Initialise brute-force matcher
bf = cv.BFMatcher()

# Match the descriptors
matches = bf.knnMatch(query_ds, train_ds, k=2)

# Apply ratio test
good = []
for m, n in matches:
    if m.distance < 0.7 * n.distance:
        good.append([m])
matches = good

# Display the best matching points
_, ax = plt.subplots(figsize=(15, 10))
ax.imshow(cv.drawMatchesKnn(query, query_kp, train, train_kp, good, None, flags=2))

# Print total number of matching points between the training and query images
print("\nNumber of Matching Keypoints Between The Training and Query Images: ", len(matches))

**FLANN**

In [None]:
# Initialise FLANN matcher
FLANN_INDEX_KDTREE = 1
FLANN_INDEX_LSH = 6
index_params = dict(algorithm=FLANN_INDEX_LSH, table_number=6, key_size=12, multi_probe_level=1)
search_params = dict(checks=50)

flann = cv.FlannBasedMatcher(index_params, search_params)

# Match the descriptors
matches = flann.knnMatch(query_ds, train_ds, k=2)

# Apply ratio test
good = []
for m, n in matches:
    if m.distance < 0.5 * n.distance:
        good.append([m])
matches = good

# Need to draw only good matches, so create a mask
matchesMask = [[0, 0] for i in range(len(matches))]

draw_params = dict(
    matchColor=(0, 255, 0), singlePointColor=(255, 0, 0), matchesMask=matchesMask, flags=0
)

# Display the best matching points
_, ax = plt.subplots(figsize=(15, 10))
ax.imshow(cv.drawMatchesKnn(query, query_kp, train, train_kp, good, None, flags=2))

# Print total number of matching points between the training and query images
print("\nNumber of Matching Keypoints Between The Training and Query Images: ", len(matches))

The default matching with default parameters is not very good. We will define another helper function to match features and then one than combines the feature extraction and matching, so that we can experiment with both.

In [None]:
def match_features(query_ds, train_ds, method="flann", **kwargs):
    """
    Match features between two sets of keypoints and descriptors.
    Currently supports brute-force and FLANN.

    Args:
        query_ds (np.ndarray): Descriptors of query image
        train_ds (np.ndarray): Descriptors of train image
        method (str, optional): Matching method. Defaults to "bf".
        **kwargs: Additional arguments for the matcher

    Returns:
        Tuple[List[cv.KeyPoint], np.ndarray, List[cv.KeyPoint], np.ndarray, List[cv.DMatch]]: Tuple containing the keypoints and descriptors
    """
    # Create the matcher
    if method == "bf":
        matcher = cv.BFMatcher(**kwargs)
    elif method == "flann":
        matcher = cv.FlannBasedMatcher(**kwargs)
    else:
        raise ValueError("Invalid method")

    # Match the descriptors
    matches = matcher.knnMatch(query_ds, train_ds, k=2)

    # Apply ratio test
    good_matches = []
    for m, n in matches:
        if m.distance < 0.7 * n.distance:
            good_matches.append([m])

    return good_matches

Let's experiment with various configurations of the ORB feature detector and brute-force matcher.

In [None]:
# Baseline
query_kp, query_ds = extract_features(query)
train_kp, train_ds = extract_features(train)
orb_flann_kwargs = dict(
    indexParams=dict(algorithm=6, table_number=6, key_size=12, multi_probe_level=1),
    searchParams=dict(checks=50),
)
matches = match_features(query_ds, train_ds, method="flann", **orb_flann_kwargs)

fig, ax = plt.subplots(figsize=(15, 10))
ax.imshow(cv.drawMatchesKnn(query, query_kp, train, train_kp, matches, None, flags=2));

Not too bad. Let's try to align the images.

### Alignment

Finally, we want to align the two images based on the matched keypoints. To do so, we estimate a **homography** matrix which we use to transform that train (large) image in a way that we can crop into the detected zone.

In [None]:
# Find keypoints
query_pts = np.float32([query_kp[m[0].queryIdx].pt for m in matches]).reshape(-1, 1, 2)  # query
train_pts = np.float32([train_kp[m[0].trainIdx].pt for m in matches]).reshape(-1, 1, 2)  # train

# Estimate homography matrix
M, mask = cv.findHomography(train_pts, query_pts, cv.RANSAC, 5.0)
matchesMask = mask.ravel().tolist()

# Find the perspective transformation
h, w = query_gray.shape  # query
pts = np.float32([[0, 0], [0, h - 1], [w - 1, h - 1], [w - 1, 0]]).reshape(-1, 1, 2)
dst = cv.perspectiveTransform(pts, M)

# Align train image
aligned_train = cv.warpPerspective(train, M, (w, h))[0:h, 0:w]

_, axs = plt.subplots(ncols=3, figsize=(15, 10))
axs[0].imshow(train)
axs[1].imshow(query)
axs[2].imshow(aligned_train)

We can combine all of the above steps (feature extraction, matching and alignment) into a single function:

In [None]:
def transform_image(
    query: np.ndarray, train: np.ndarray, query_kp: List, train_kp: List, matches: List
) -> np.ndarray:
    """
    Aligns two images using a homography matrix estimated from keypoint matches.

    Args:
        query (np.ndarray): The query image
        train (np.ndarray): The train image
        query_kp (List): Keypoints of the query image
        train_kp (List): Keypoints of the train image
        matches (List): List of matches

    Returns:
        Transformed train image
    """
    # Find keypoints
    query_pts = np.float32([query_kp[m[0].queryIdx].pt for m in matches]).reshape(
        -1, 1, 2
    )  # query
    train_pts = np.float32([train_kp[m[0].trainIdx].pt for m in matches]).reshape(
        -1, 1, 2
    )  # train

    # Estimate homography matrix
    M, _ = cv.findHomography(train_pts, query_pts, cv.RANSAC, 5.0)

    # Find the perspective transformation
    h, w = query.shape[:2]  # query
    transformed_train = cv.warpPerspective(train, M, (w, h))[0:h, 0:w]

    return transformed_train

In [None]:
def align_images(
    query: np.ndarray,
    train: np.ndarray,
    mask: Union[np.ndarray, None] = None,
    extract_method: str = "orb",
    match_method: str = "bf",
    extract_kwargs: dict = {},
    match_kwargs: dict = {},
    transform_kwargs: dict = {},
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Aligns the train image (complex scence, expected to include the query)
    image using a pipeline of feature extraction, matching and homography.

    For our dataset, the train image is the digital image and the query image
    is the film image.

    Args:
        query (np.ndarray): Query image
        train (np.ndarray): Train image
        mask (np.ndarray, optional): Mask for feature extraction
        extract_method (str, optional): Feature extraction method. Defaults to "orb".
        match_method (str, optional): Feature matching method. Defaults to "bf".
        extract_kwargs (dict, optional): Additional arguments for feature extraction. Defaults to {}.
        match_kwargs (dict, optional): Additional arguments for feature matching. Defaults to {}.

    Returns:
        Tuple of aligned images (query, aligned_train)
    """
    # Extract features
    query_kp, query_ds = extract_features(query, method=extract_method, **extract_kwargs)
    train_kp, train_ds = extract_features(
        train, method=extract_method, mask=mask, **extract_kwargs
    )

    # Match features
    matches = match_features(query_ds, train_ds, method=match_method, **match_kwargs)

    # Align images
    aligned_train = transform_image(query, train, query_kp, train_kp, matches, **transform_kwargs)

    return query, aligned_train

In [None]:
# Align images
fig, axs = plt.subplots(ncols=2, figsize=(15, 10))
aligned_train, query = align_images(
    query=query,
    train=train,
    extract_method="orb",
    match_method="bf",
)

axs[0].imshow(aligned_train)
axs[1].imshow(query);

In [None]:
# Align images
fig, axs = plt.subplots(ncols=2, figsize=(15, 10))
aligned_train, query = align_images(
    query=query,
    train=train,
    extract_method="orb",
    match_method="flann",
    match_kwargs=orb_flann_kwargs,
)

axs[0].imshow(aligned_train)
axs[1].imshow(query);

It seems that all combinations of feature extraction and matching methods are able to align the images. However, the example is also relatively simple, as the query image is a rotated crop of the train image. We will have to use our dataset of image pairs to adjust the hyper-parameter values and find the best combination of methods.

## Alignment (Actual Data)

---

Nice, next let's test out the alignment pipeline on our dataset. We will use the `load_image_pair` and `align_images` utility functions.

### Example Image

In [None]:
from src.utils.load import load_image_pair

# Load example image
film, digital, meta = load_image_pair(13, processing_state="raw", as_array=True)

print(f"Digital: {digital.shape}, Film: {film.shape}")
_, axs = plt.subplots(ncols=2, figsize=(15, 10))
axs[0].imshow(digital)
axs[0].set_title("Digital")
axs[1].imshow(film)
axs[1].set_title("Film");

In [None]:
# Extract keypoints and descriptors from images
digital_kp, digital_ds = extract_features(digital, method="orb")
film_kp, film_ds = extract_features(film, method="orb")

# Plot
_, axs = plt.subplots(nrows=2, ncols=2, figsize=(30, 20))
axs[0, 0].imshow(digital)
axs[0, 1].imshow(
    cv.drawKeypoints(cv.cvtColor(digital, cv.COLOR_RGB2GRAY), digital_kp, None, color=(0, 255, 0))
)
axs[1, 0].imshow(film)
axs[1, 1].imshow(
    cv.drawKeypoints(cv.cvtColor(film, cv.COLOR_RGB2GRAY), film_kp, None, color=(0, 255, 0))
)

print(
    f"Found {len(digital_kp)} keypoints in digital image (each descriptor has {digital_ds.shape[1]} features)"
)
print(
    f"Found {len(film_kp)} keypoints in film image (each descriptor has {film_ds.shape[1]} features)"
)

In [None]:
# Match the descriptors
orb_flann_kwargs = dict(
    indexParams=dict(algorithm=6, table_number=6, key_size=12, multi_probe_level=1),
    searchParams=dict(checks=50),
)
matches = match_features(film_ds, digital_ds, method="flann", **orb_flann_kwargs)

fig, ax = plt.subplots(figsize=(15, 10))
ax.imshow(cv.drawMatchesKnn(film, film_kp, digital, digital_kp, matches, None, flags=2));

In [None]:
# Transform train image (digital)
aligned_digital = transform_image(film, digital, film_kp, digital_kp, matches)

_, ax = plt.subplots(figsize=(15, 10))
ax.imshow(aligned_digital);

Looks nice!

In [None]:
from src.utils.preprocess import keypoint_align

# Align images
aligned_film, aligned_digital = keypoint_align(
    query=film,
    train=digital,
    extract_method="orb",
    match_method="bf",
)

fig, axs = plt.subplots(ncols=2, figsize=(15, 10))
axs[0].imshow(aligned_film)
axs[1].imshow(aligned_digital);

### All Data

In [None]:
from src.utils.load import load_metadata

# Load metadata
meta = load_metadata()

# Get all image indices
image_indices = list(meta.keys())

print(f"There are {len(meta)} images in the dataset")

In [None]:
# Align all images
for i, idx in enumerate(image_indices):
    # Load image pair (digital and film)
    film, digital, _ = load_image_pair(idx, processing_state="raw", as_array=True)

    # Initialise figure
    fig, axs = plt.subplots(ncols=2, figsize=(15, 5))
    fig.suptitle(f"Image Pair {idx}", fontsize=16)

    try:
        # Align images
        _, aligned_digital = keypoint_align(
            query=film,
            train=digital,
            extract_method="sift",
            match_method="flann",
        )
        axs[0].imshow(aligned_digital)
        axs[0].set_title("Aligned Digital Image")
    except Exception as e:
        axs[0].imshow(np.zeros_like(digital))
        axs[0].set_title(f"Failed to Align Digital Image ({e})")

    axs[1].imshow(film)
    axs[1].set_title("Film Image")

    plt.show()

In [None]:
# Test different configuration for failed alignment
failed_indices = [9, 11, 40]  # Fails with SIFT + FLANN

for i, idx in enumerate(failed_indices):
    # Load image pair (digital and film)
    film, digital, _ = load_image_pair(idx, processing_state="raw", as_array=True)

    # Initialise figure
    fig, axs = plt.subplots(ncols=2, figsize=(15, 5))
    fig.suptitle(f"Image Pair {idx}", fontsize=16)

    try:
        # Align images
        _, aligned_digital = align_images(
            query=film,
            train=digital,
            extract_method="sift",
            match_method="flann",
            extract_kwargs=dict(nfeatures=1000),
            match_kwargs=dict(
                indexParams=dict(algorithm=1, trees=10), searchParams=dict(checks=100)
            ),
        )
        axs[0].imshow(aligned_digital)
        axs[0].set_title("Aligned Digital Image")
    except Exception as e:
        axs[0].imshow(np.zeros_like(digital))
        axs[0].set_title(f"Failed to Align Digital Image ({e})")

    axs[1].imshow(film)
    axs[1].set_title("Film Image")

    plt.show()