In [20]:
# [Cell 1] - Set up paths
import os
from google.colab import drive
drive.mount('/content/drive')

# !unzip "/content/drive/MyDrive/cv-22928-2025-a-project.zip" -d "/content/"

BASE_PATH = "/content/drive/MyDrive"  # This will use the current directory
train_folder = "cv-22928-2025-a-project/train"
test_folder = os.path.join(BASE_PATH,  "cv-22928-2025-a-project/test_images")
project_folder = os.path.join(BASE_PATH, train_folder)

if os.path.exists(project_folder):
    print(f"Successfully found project directory at: {project_folder}")
else:
    print(f"ERROR: Could not find project directory at: {project_folder}")
    print(f"Current working directory: {os.getcwd()}")
    print(f"Please check if the path '{project_folder}' is correct")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Successfully found project directory at: /content/drive/MyDrive/cv-22928-2025-a-project/train


In [10]:

def test_model(test_csv, submission_path, estimate_fundamental_matrix):

    # Verify test.csv exists
    if not os.path.exists(test_csv):
        print(f"Error: test.csv not found at {test_csv}")
        return

    # Read test pairs
    try:
        with open(test_csv) as f:
            reader = csv.reader(f, delimiter=',')
            next(reader)  # Skip header
            test_samples = list(reader)
    except Exception as e:
        print(f"Error reading test.csv: {e}")
        return

    # Initialize dictionary to store results
    F_dict = {}

    # Process all test samples
    for row in tqdm(test_samples, desc='Processing test samples'):
        sample_id, batch_id, image_1_id, image_2_id = row

        # Update image paths
        img1_path = os.path.join(src, 'test_images', batch_id, f'{image_1_id}.jpg')
        img2_path = os.path.join(src, 'test_images', batch_id, f'{image_2_id}.jpg')

        # Process image pair
        F = estimate_fundamental_matrix(img1_path, img2_path)
        F_dict[sample_id] = F if F is not None else np.zeros((3, 3))


    # Create submission file
    with open(submission_path, 'w') as f:
        f.write('sample_id,fundamental_matrix\n')
        for sample_id, F in F_dict.items():
            f.write(f'{sample_id},{flatten_matrix(F)}\n')

    print(f"Submission saved to: {submission_path}")

In [2]:

!pip install tqdm
!pip install torch torchvision

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [21]:
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
import torchvision.transforms as transforms
import cv2
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


class FundamentalMatrixNet(nn.Module):
    def __init__(self):
        super(FundamentalMatrixNet, self).__init__()

        # Define the convolutional layers (shared between both images)
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU())
        self.fc1 = nn.Linear(256 * 32 * 32 * 32, 32)  # Assuming input image size of 128x128
        self.fc_confidence = nn.Linear(32, 1)  # Output is a scalar
        self.fc2 = nn.Linear(32, 3 * 3)  # Output is a 3x3 matrix

    def forward(self, x1, x2):
        # Pass through the shared layers for both images
        x1 = self.model(x1)
        x2 = self.model(x2)

        # Flatten the outputs from conv layers
        x1 = x1.view(x1.size(0), -1)
        x2 = x2.view(x2.size(0), -1)

        # Combine features from both images
        x = torch.cat((x1, x2), dim=1)

        # Fully connected layers
        x = self.fc1(x)
        x = torch.relu(x)
        F = self.fc2(x)
        confidence = self.fc_confidence(x)
        confidence = torch.heaviside(confidence, torch.tensor([0.5]))
        # Reshape the output to be a 3x3 matrix
        F = F.view(-1, 3, 3) * confidence
        return F

In [12]:

class ImagePairDataset(Dataset):
    def __init__(self, image_pairs, fundamental_matrices, transform=None):
        self.image_pairs = image_pairs
        self.fundamental_matrices = fundamental_matrices
        self.transform = transform

    def __len__(self):
        return len(self.image_pairs)

    def __getitem__(self, idx):
        img1, img2 = self.image_pairs[idx]
        F = self.fundamental_matrices[idx]

        img1 = cv2.imread(img1)
        img2 = cv2.imread(img2)

        img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2RGB)
        img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2RGB)

        img1 = cv2.resize(img1, (128, 128))
        img2 = cv2.resize(img2, (128, 128))

        img1 = np.transpose(img1, (2, 0, 1))  # Change to CxHxW
        img2 = np.transpose(img2, (2, 0, 1))

        img1 = torch.tensor(img1, dtype=torch.float32) / 255.0
        img2 = torch.tensor(img2, dtype=torch.float32) / 255.0

        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        F = torch.tensor(F, dtype=torch.float32)
        return img1, img2, F

In [13]:
from collections import namedtuple
import csv

bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)

sift_detector = cv2.SIFT_create(nfeatures=5000, contrastThreshold=-10000, edgeThreshold=-10000)

def ComputeErrorForOneExample(q_gt, T_gt, q, T, scale, eps=1e-15):
    '''Compute the error metric for a single example.

    The function returns two errors, over rotation and translation. These are combined at different thresholds by ComputeMaa in order to compute the mean Average Accuracy.'''

    q_gt_norm = q_gt / (np.linalg.norm(q_gt) + eps)
    q_norm = q / (np.linalg.norm(q) + eps)

    loss_q = np.maximum(eps, (1.0 - np.sum(q_norm * q_gt_norm)**2))
    err_q = np.arccos(1 - 2 * loss_q)

    # Apply the scaling factor for this scene.
    T_gt_scaled = T_gt * scale
    T_scaled = T * np.linalg.norm(T_gt) * scale / (np.linalg.norm(T) + eps)

    err_t = min(np.linalg.norm(T_gt_scaled - T_scaled), np.linalg.norm(T_gt_scaled + T_scaled))

    return err_q * 180 / np.pi, err_t

def ExtractSiftFeatures(image, detector, num_features):
    '''Compute SIFT features for a given image.'''

    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    kp, desc = detector.detectAndCompute(gray, None)
    return kp[:num_features], desc[:num_features]

def ArrayFromCvKps(kps):
    '''Convenience function to convert OpenCV keypoints into a simple numpy array.'''

    return np.array([kp.pt for kp in kps])

def get_inlier(path_img_1, path_img_2, inlier_mask, sift_detector=sift_detector):
    images_1 = cv2.cvtColor(cv2.imread(path_img_1), cv2.COLOR_BGR2RGB)

    kp_dict_1, desc_dict_1 = ExtractSiftFeatures(images_1, sift_detector, 2000)

    images_2 = cv2.cvtColor(cv2.imread(path_img_2), cv2.COLOR_BGR2RGB)
    kp_dict_2, desc_dict_2 = ExtractSiftFeatures(images_2, sift_detector, 2000)

    cv_matches = bf.match(desc_dict_1, desc_dict_2)
    matches = np.array([[m.queryIdx, m.trainIdx] for m in cv_matches])

    matches_after_ransac = np.array([match for match, is_inlier in zip(matches, inlier_mask) if is_inlier])
    inlier_kp_1 = ArrayFromCvKps([kp_dict_1[m[0]] for m in matches_after_ransac])
    inlier_kp_2 = ArrayFromCvKps([kp_dict_2[m[1]] for m in matches_after_ransac])
    return inlier_kp_1, inlier_kp_2

def QuaternionFromMatrix(matrix):
    '''Transform a rotation matrix into a quaternion.'''

    M = np.array(matrix, dtype=np.float64, copy=False)[:4, :4]
    m00 = M[0, 0]
    m01 = M[0, 1]
    m02 = M[0, 2]
    m10 = M[1, 0]
    m11 = M[1, 1]
    m12 = M[1, 2]
    m20 = M[2, 0]
    m21 = M[2, 1]
    m22 = M[2, 2]

    K = np.array([[m00 - m11 - m22, 0.0, 0.0, 0.0],
              [m01 + m10, m11 - m00 - m22, 0.0, 0.0],
              [m02 + m20, m12 + m21, m22 - m00 - m11, 0.0],
              [m21 - m12, m02 - m20, m10 - m01, m00 + m11 + m22]])
    K /= 3.0

    # The quaternion is the eigenvector of K that corresponds to the largest eigenvalue.
    w, V = np.linalg.eigh(K)
    q = V[[3, 0, 1, 2], np.argmax(w)]

    if q[0] < 0:
        np.negative(q, q)

    return q

def calc_accuracy(calib_dict_img_1, calib_dict_img_2, path_img_1, path_img_2, inlier_mask,
                   scale, F, sift_detector=sift_detector, eps=1e-15):
    inlier_kp_1, inlier_kp_2 = get_inlier(path_img_1, path_img_2, inlier_mask, sift_detector)

    E, R, T = ComputeEssentialMatrix(F, calib_dict_img_1.K, calib_dict_img_2.K, inlier_kp_1, inlier_kp_2)
    q = QuaternionFromMatrix(R)
    T = T.flatten()

    # Get the ground truth relative pose difference for this pair of images.
    R1_gt, T1_gt = calib_dict_img_1.R, calib_dict_img_1.T.reshape((3, 1))
    R2_gt, T2_gt = calib_dict_img_2.R, calib_dict_img_2.T.reshape((3, 1))
    dR_gt = np.dot(R2_gt, R1_gt.T)
    dT_gt = (T2_gt - np.dot(dR_gt, T1_gt)).flatten()
    q_gt = QuaternionFromMatrix(dR_gt)
    q_gt = q_gt / (np.linalg.norm(q_gt) + eps)

    # Given ground truth and prediction, compute the error for the example above.
    err_q_curr, err_t_curr = ComputeErrorForOneExample(q_gt, dT_gt, q, T, scale=scale)
    return err_q_curr, err_t_curr

def NormalizeKeypoints(keypoints, K):
    C_x = K[0, 2]
    C_y = K[1, 2]
    f_x = K[0, 0]
    f_y = K[1, 1]
    keypoints = (keypoints - np.array([[C_x, C_y]])) / np.array([[f_x, f_y]])
    return keypoints


def ComputeEssentialMatrix(F, K1, K2, kp1, kp2):
    '''Compute the Essential matrix from the Fundamental matrix, given the calibration matrices. Note that we ask participants to estimate F, i.e., without relying on known intrinsics.'''

    # Warning! Old versions of OpenCV's RANSAC could return multiple F matrices, encoded as a single matrix size 6x3 or 9x3, rather than 3x3.
    # We do not account for this here, as the modern RANSACs do not do this:
    # https://opencv.org/evaluating-opencvs-new-ransacs
    assert F.shape[0] == 3, 'Malformed F?'

    # Use OpenCV's recoverPose to solve the cheirality check:
    # https://docs.opencv.org/4.5.4/d9/d0c/group__calib3d.html#gadb7d2dfcc184c1d2f496d8639f4371c0
    E = np.matmul(np.matmul(K2.T, F), K1).astype(np.float64)

    kp1n = NormalizeKeypoints(kp1, K1)
    kp2n = NormalizeKeypoints(kp2, K2)
    num_inliers, R, T, mask = cv2.recoverPose(E, kp1n, kp2n)

    return E, R, T

def LoadCalibration(filename):
    Gt = namedtuple('Gt', ['K', 'R', 'T'])

    '''Load calibration data (ground truth) from the csv file.'''

    calib_dict = {}
    with open(filename, 'r') as f:
        reader = csv.reader(f, delimiter=',')
        for i, row in enumerate(reader):
            # Skip header.
            if i == 0:
                continue

            camera_id = row[1]
            K = np.array([float(v) for v in row[2].split(' ')]).reshape([3, 3])
            R = np.array([float(v) for v in row[3].split(' ')]).reshape([3, 3])
            T = np.array([float(v) for v in row[4].split(' ')])
            calib_dict[camera_id] = Gt(K=K, R=R, T=T)

    return calib_dict

def ComputeMaa(err_q, err_t,thresholds_q = np.linspace(1, 10, 10), thresholds_t = np.geomspace(0.2, 5, 10)):
    '''Compute the mean Average Accuracy at different tresholds, for one scene.'''

    assert len(err_q) == len(err_t)

    acc, acc_q, acc_t = [], [], []
    for th_q, th_t in zip(thresholds_q, thresholds_t):
        acc += [(np.bitwise_and(np.array(err_q) < th_q, np.array(err_t) < th_t)).sum() / len(err_q)]
        acc_q += [(np.array(err_q) < th_q).sum() / len(err_q)]
        acc_t += [(np.array(err_t) < th_t).sum() / len(err_t)]
    return np.mean(acc), np.array(acc), np.array(acc_q), np.array(acc_t)

In [14]:
import glob
import pandas as pd
import csv

class_folders = glob.glob(project_folder + "/*/")
print(class_folders)


scaling_dict = {}
with open(f'{project_folder}/scaling_factors.csv') as f:
    reader = csv.reader(f, delimiter=',')
    for i, row in enumerate(reader):
        # Skip header.
        if i == 0:
            continue
        scaling_dict[row[1]] = float(row[2])

dataframes = []
calib_dict = {}
# Loop through the CSV files and read them into DataFrames
for folder in class_folders:
    calib_dict[folder] = LoadCalibration(f'{folder}/calibration.csv')

    df = pd.read_csv(folder + "/pair_covisibility.csv", index_col=False)
    df["img1_path"] = folder + "images/" + df["im1"] + ".jpg"
    df["img2_path"] = folder + "images/" + df["im1"] + ".jpg"
    df["folder"] = folder
    df["scale"] = scaling_dict[folder.split('/')[-2]]
    dataframes.append(df)

combined_df = pd.concat(dataframes, ignore_index=True)
combined_df.shape

['/content/drive/MyDrive/cv-22928-2025-a-project/train/trevi_fountain/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/sacre_coeur/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/notre_dame_front_facade/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/temple_nara_japan/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/taj_mahal/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/sagrada_familia/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/lincoln_memorial_statue/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/colosseum_exterior/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/pantheon_exterior/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/brandenburg_gate/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/british_museum/', '/content/drive/MyDrive/cv-22928-2025-a-project/train/buckingham_palace/']


(84578, 10)

In [15]:
df = combined_df.loc[combined_df.covisibility>0.1, :]
df = df.copy()
df.loc[:, "F_romatch"] = df.loc[:, "fundamental_matrix"]
df.shape
df = df.iloc[[1]]

In [16]:
# [Cell 11] - Helper function for matrix flattening
def flatten_matrix(M, num_digits=8):
    """Convert matrix to string format for submission."""
    return ' '.join([f'{v:.{num_digits}e}' for v in M.flatten()])

def unflatten_matrix(flattened_str):
    """Convert a flattened string back into a 3x3 matrix."""
    # Split the flattened string into a list of values
    values = list(map(float, flattened_str.split()))

    # Ensure the number of values is 9 (for a 3x3 matrix)
    if len(values) != 9:
        raise ValueError("The flattened string must contain exactly 9 values.")

    # Reshape the list of values into a 3x3 matrix
    return np.array(values).reshape(3, 3)

In [17]:
# Example data (replace with real dataset)

image_pairs = [x for x in zip(df["img1_path"], df["img2_path"])] # Image pairs (replace with your dataset)
fundamental_matrices = [unflatten_matrix(x) for x in df["fundamental_matrix"]]# Random fundamental matrices (replace with ground truth)

# Create dataset and dataloader
dataset = ImagePairDataset(image_pairs, fundamental_matrices)
dataloader = DataLoader(dataset, batch_size=10, shuffle=True)

In [23]:


# Initialize model, loss function, and optimizer
model = FundamentalMatrixNet()
criterion = nn.L1Loss()  # Mean Squared Error loss
optimizer = optim.AdamW(model.parameters(), lr=1e-4)

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

trainable_params

268806634

In [None]:



# Training loop
for epoch in range(2):
    epoch_loss = 0
    for img1, img2, F_gt in tqdm(dataloader):
        optimizer.zero_grad()

        # Forward pass
        F_pred = model.forward(img1, img2)

        # Compute loss
        loss = criterion(F_pred, F_gt)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {epoch_loss / len(dataloader)}")
torch.save(model, f"{project_folder}/model.pth")

  0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
with torch.no_grad():
        val_loss = 0.0
        for left_image, right_image, F in val_loader:
            F_pred = model(left_image, right_image)
            loss = criterion(F_pred, F)
            val_loss += loss.item()
        print(f"Validation Loss: {val_loss / len(val_loader)}")