In [1]:
import os, time, sys
from pathlib import Path

SESSION_NAME = "superpoint_test"

ROOT = Path().absolute().parent
RESULTS_PATH = Path(f"{ROOT}/data/3_results")

SESSION_ID = SESSION_NAME if SESSION_NAME else str(int(time.time()))
SESSION_PATH = Path(f"{RESULTS_PATH}/{SESSION_ID}")
CLONE_COLMAP_PROJECT_PATH = Path(f"{RESULTS_PATH}/3_koepenick_rathaus_SfM_werramat")
CLONE_COLMAP_IMAGE_PATH = f"{CLONE_COLMAP_PROJECT_PATH}/image_path"
CLONE_DB_PATH = f"{CLONE_COLMAP_PROJECT_PATH}/database.db"

if not os.path.exists(SESSION_PATH):
    !mkdir -p {SESSION_PATH}
    # copy images from clone path
    !cp -r "{CLONE_COLMAP_IMAGE_PATH}" "{SESSION_PATH}"
    # copy database from clone path
    !cp "{CLONE_DB_PATH}" "{SESSION_PATH}"
    print(f"Created new session with ID {SESSION_ID} under {RESULTS_PATH}")

if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))

SCRIPTS_PATH = f"{ROOT}/third_party/SuperGluePretrainedNetwork"
if SCRIPTS_PATH not in sys.path:
    sys.path.append(SCRIPTS_PATH)

COLMAP_PY_SCRIPTS_PATH = f"{ROOT}/third_party/colmap/scripts/python"
if COLMAP_PY_SCRIPTS_PATH not in sys.path:
    sys.path.append(COLMAP_PY_SCRIPTS_PATH)

In [None]:
##### Data Preparation ( raw input -> input folder)
%cd {ROOT}

raw_input_files = [x for x in RAW_INPUT_PATH.glob('**/*') if x.is_file()]
print(f"found {len(raw_input_files)} raw input files")
_f = "/mnt/c/Users/tworkool/Documents/dev/python/historical-photo-sfm-pipeline/data/1_raw_input/lego.mp4" #str(raw_input_files[0])
print(_f)
DOWNSAMPLING_RATE = 2

#!bash scripts/third_party/neuralangelo/run_ffmpeg.sh {{SESSION_NAME}} {{_f}} 2
! ffmpeg -i {_f} -vf "select=not(mod(n\,{DOWNSAMPLING_RATE}))" -vsync vfr -q:v 2 {INPUT_PATH}/%06d.jpg

# Reconstruction (please read)
You have two options to continue here:
1. Reconstruction with Colmap wrapper in Python (with pycolmap) *
2. Reconstruction with Colmap (**PREFERRED**) **

1* pycolmap only supports sparse reconstruction, unless you build colmap from source with CUDA support

2** Colmap needs to be build and installed on your environment. If it is, you should use it. Dense reconstruction requires CUDA support on the machine.

## Reconstruction with COLMAP here

In [2]:
# reconstruction with SuperGlue
from database import COLMAPDatabase, pair_id_to_image_ids, image_ids_to_pair_id
import sqlite3

# create db here
DB_PATH = f"{SESSION_PATH}/database.db"
colmap_db = COLMAPDatabase.connect(DB_PATH)
cursor = colmap_db.cursor()

#if os.path.exists(DB_PATH):
#    print("ERROR: database path already exists -- will not modify it.")
#    return

#colmap_db = COLMAPDatabase.connect(DB_PATH)
#colmap_db.create_tables()

cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
print(cursor.fetchall())

[('cameras',), ('sqlite_sequence',), ('images',), ('keypoints',), ('descriptors',), ('matches',), ('two_view_geometries',)]


In [3]:
# prepare data to be read and replaced
def name_part(full_name):
    split_name = full_name.split(".")
    if len(split_name) == 0:
        return full_name
    return split_name[0]

def get_image_id_from_name(cursor, img_name):
    name_only = name_part(img_name)
    cursor.execute(f"SELECT image_id FROM images WHERE name LIKE '{name_only}%'")
    image_data = cursor.fetchone()
    if not image_data: return None
    return image_data[0]

def get_image_name_from_id(cursor, img_id):
    cursor.execute(f"SELECT name FROM images WHERE image_id={img_id};")
    image_data = cursor.fetchone()
    if not image_data: return None
    img_name, = image_data
    return img_name
    
def generate_matching_map(cursor):
    cursor.execute("SELECT pair_id FROM matches")
    lines = []
    for pair_id, in cursor.fetchall():
        img1, img2 = pair_id_to_image_ids(pair_id)
        img1 = int(img1)
        img2 = int(img2)
        img1_name = get_image_name_from_id(cursor, img1)
        img2_name = get_image_name_from_id(cursor, img2)
        line = f"{img1_name} {img2_name} {img1} {img2} {pair_id}"
        lines.append(line)
        #print(f"Matching {img1} / {img2}")
        #print(f"{img1_name} {img2_name} OTHER PARAMS HERE")
    return lines

def write_superglue_image_pairs_file(image_pairs, file_name):
    with open(file_name, "w+") as f:
        f.writelines(f"{line}\n" for line in image_pairs)
    return

IMAGE_PAIR_FILE_NAME = "image_pairs.txt"
IMAGE_PAIR_FILE_PATH = Path(f"{SESSION_PATH}/{IMAGE_PAIR_FILE_NAME}")
image_pairs = generate_matching_map(cursor)
write_superglue_image_pairs_file(image_pairs, IMAGE_PAIR_FILE_PATH)

# STEPS
# 1. SERIALIZE: generate matching_images text file for SuperGlue to use and know which images to match
# 2. PROCESS: match images with SuperGlue
# 3. DESERIALIZE: collect information on matches for each image pair and deserialize image pairs from the generated files generically
# 4. POST-PROCESS/ESTIMATE: estimate which matches should be used based on "match_confidence"
# 5. DB OVERWRITE
# 5.1 Loop through all DB entries in images, matches, two_view_geometries, keypoints, ?descriptors? and overwrite the entries 
# (DB DATA SHOULD JUST BE OVERWRITTEN AND DB IS MASTER)
# 6. Run COLMAP steps after matching!

In [4]:
OUT_DIR = f"{SESSION_PATH}/SuperGlue_dump"
INPUT_IMAGES = f"{SESSION_PATH}/image_path"
MATCH_SCRIPT_DIR = f"{SCRIPTS_PATH}/match_pairs.py"
!mkdir -p {OUT_DIR}

In [None]:
# execute SuperGlue
! python "{MATCH_SCRIPT_DIR}" --input_pairs "{IMAGE_PAIR_FILE_PATH}" --input_dir "{INPUT_IMAGES}" --output_dir "{OUT_DIR}" --resize "-1" --superglue outdoor --max_keypoints "-1" --viz

In [5]:
# Read numpy file
import numpy as np

def get_npz_info(path):
    npz = np.load(path)
    #print(npz.files)
    image_pairs = os.path.basename(path).split("_")
    img1_name = image_pairs[0] + "_" + image_pairs[1]
    img2_name = image_pairs[2] + "_" + image_pairs[3]
    return np.load(path), (img1_name, img2_name)
    
def read_superglue_dumps(dumps_directory):
    deserialized_dumps = []
    npz_files = []
    for file in os.listdir(dumps_directory):
        if file.endswith(".npz"):
            abs_file_name = os.path.join(dumps_directory, file)
            npz_files.append(abs_file_name)

    print(f"Found {len(npz_files)} .npz SuperGlue dump files")
    for npz_file in npz_files:
        data, (img1_name, img2_name) = get_npz_info(npz_file)
        print(img1_name, img2_name)
        deserialized_dumps.append({
            "img1_name": img1_name,
            "img2_name": img2_name,
            "img1_id": get_image_id_from_name(cursor, img1_name),
            "img2_id": get_image_id_from_name(cursor, img2_name),
            "npz_data": data
        })
    return deserialized_dumps

superglue_dumps = read_superglue_dumps(OUT_DIR)

def get_kp_info(kp_id, npz):
    if kp_id >= npz['keypoints0'].shape[0]: return
    kp = npz['keypoints0'][kp_id]
    print(f"KP coordinates for kp ID {kp_id} = (x {int(kp[0])}, y {int(kp[1])})")
    if npz['matches'][kp_id] == -1:
        print("No Match!")
    else:
        print(f"Match {npz['matches'][kp_id]}! Confidence = {npz['match_confidence'][kp_id]}")

npz, _ = get_npz_info(f"{OUT_DIR}/IMG_1140_IMG_1141_matches.npz")

print(npz['keypoints0'].shape) # kps image 1
print(npz['keypoints1'].shape) # kps image 2
kp_id_image1 = 5000 # id of the keypoint
print(npz['keypoints0'][kp_id_image1])
print(npz['matches'].shape)
print(npz['matches'][kp_id_image1])
print(npz['match_confidence'].shape)
print(npz['match_confidence'][kp_id_image1])

#for i in range(1, 10):
#    get_kp_info(i, npz)

Found 11 .npz SuperGlue dump files
IMG_1140 IMG_1141
IMG_1140 IMG_1142
IMG_1140 IMG_1143
IMG_1140 IMG_1145
IMG_1140 IMG_1147
IMG_1140 IMG_1148
IMG_1140 IMG_1149
IMG_1140 IMG_1150
IMG_1140 IMG_1151
IMG_1140 IMG_1160
IMG_1140 IMG_1190
(5896, 2)
(4851, 2)
[ 704. 1492.]
(5896,)
-1
(5896,)
0.0


In [9]:
def get_matches(dump, min_confidence=0.3):
    # For each keypoint in keypoints0, the matches array indicates the index of the matching keypoint in keypoints1, or -1 if the keypoint is unmatched.
    kp_matching_pairs = []
    match_confidence = []
    kp_coordinates = []
    data = dump['npz_data']
    for i, match in enumerate(data['matches']):
        if match == -1:
            continue
        if data['match_confidence'][i] < min_confidence:
            continue
        # 0 = keypoints0 index, 1 = keypoints1 match pair if not -1
        kp_matching_pairs.append(np.array([i, match]))
        match_confidence.append(data['match_confidence'][i])
        kp_coordinates.append(np.array([data['keypoints0'][i], data['keypoints1'][match]]))
    return np.asarray(kp_matching_pairs), np.asarray(match_confidence), np.asarray(kp_coordinates)


def get_keypoints_and_matches(img_id, dumps):
    shortened_dump_entry = None
    for d in dumps:
        if d['img1_id'] == img_id:
            shortened_dump_entry = {
                "keypoints": d['npz_data']['keypoints0'],
                "matches": d['npz_data']['matches'],
                "match_confidence": d['npz_data']['match_confidence']
            }
            break
        elif d['img2_id'] == img_id:
            shortened_dump_entry = {
                "keypoints": d['npz_data']['keypoints1'],
                "matches": d['npz_data']['matches'],
                "match_confidence": d['npz_data']['match_confidence']
            }
            break

    return shortened_dump_entry

def synchronize_db_with_dumps(cursor, dumps, validate=True):
    num_db_matched_image_pairs = cursor.execute("SELECT COUNT(*) FROM matches").fetchone()[0]
    validation_errors = []
    
    # validate that number of image pairs matched in DB is the same as in dumps!
    if num_db_matched_image_pairs != len(dumps):
        validation_errors.append("cannot synchronize DB image matching pairs with dumps")

    # validate number of keypoints matches for all reoccuring images!
    keypoint_map = {}
    for d in dumps:
        npz_data = d['npz_data']
        img1_id = d['img1_id']
        img2_id = d['img2_id']
        img1_kp_num = npz_data['keypoints0'].shape[0]
        img2_kp_num = npz_data['keypoints1'].shape[0]
        if img1_id not in keypoint_map:
            keypoint_map[img1_id] = {
                "num_kps": img1_kp_num,
                "data": {
                    "keypoints": npz_data['keypoints0'],
                    "matches": npz_data['matches'],
                    "match_confidence": npz_data['match_confidence']
                }
            }
        elif keypoint_map[img1_id]['num_kps'] != img1_kp_num:
            validation_errors.append(f"ERROR: keypoint mismatch for image {d['img1_name']}: {keypoint_map[img1_id]['num_kps']} != {img1_kp_num}")

        if img2_id not in keypoint_map:
            keypoint_map[img2_id] = {
                "num_kps": img2_kp_num,
                "data": {
                    "keypoints": npz_data['keypoints1'],
                    "matches": npz_data['matches'],
                    "match_confidence": npz_data['match_confidence']
                }
            }
        elif keypoint_map[img2_id]['num_kps'] != img2_kp_num:
            validation_errors.append(f"ERROR: keypoint mismatch for image {d['img2_name']}: {keypoint_map[img2_id]['num_kps']} != {img2_kp_num}")
    
    if validate and len(validation_errors) > 0:
        print("ERROR! Validation errors:")
        for i, v_error in enumerate(validation_errors):
            print(f"{i+1}.\t {v_error}")
        print("Aborting synchronization due to errors")
        return
    
    
    cursor.execute("DELETE FROM keypoints")
    cursor.execute("DELETE FROM matches")
    cursor.execute("DELETE FROM two_view_geometries")
    colmap_db.commit()
    
    # write keypoints
    for image_id, img_info in keypoint_map.items():
        data = img_info['data']
        colmap_db.add_keypoints(image_id, data['keypoints'])
        #cursor.execute(
        #    "INSERT INTO keypoints VALUES (?, ?, ?, ?)",
        #    (image_id,) + data['keypoints'].shape + (array_to_blob(data['keypoints']),),
        #)

    for d in dumps:
        m_idx, m_c, kp_coords = get_matches(d)
        print(m_idx.shape)
        colmap_db.add_matches(d['img1_id'], d['img2_id'], m_idx)
        tv_m_idx, tv_m_c, tv_kp_coords = get_matches(d, 0.45)
        colmap_db.add_two_view_geometry(d['img1_id'], d['img2_id'], tv_m_idx)

    colmap_db.commit()
    return

synchronize_db_with_dumps(cursor, superglue_dumps, validate=False)

(1499, 2)
(3004, 2)
(3267, 2)
(1385, 2)
(3626, 2)
(3523, 2)
(422, 2)
(164, 2)
(39, 2)
(12, 2)
(1456, 2)


In [None]:
# Sparse Reconstruction (Step 1)
! bash run_colmap_sparse.sh "{SESSION_PATH}" "{INPUT_PATH}"
#MASK_PATH = f"{INPUT_PATH}/masks"
#! bash run_colmap_sparse_with_masks.sh "{SESSION_PATH}" "{INPUT_PATH}" "{MASK_PATH}"

In [None]:
# Dense Reconstruction IF NEEDED (Step 2)
! bash run_colmap_dense.sh "{SESSION_PATH}"

## Generate Transforms File
Generate this file if you want to have camera, points etc. in a self contained file which you can then use to display everything in a 3D engine like Blender (see my scripts `colmap_pc_importer_ui.py` and `colmap_pc_importer.py` which u can load into Blender)

In [None]:
import sys

if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))

print(sys.path)

generate_transforms_script = f"{ROOT}/scripts/third_party/neuralangelo/convert_data_to_json_advanced.py"
! python {generate_transforms_script} --data_dir "{SESSION_PATH}" --scene_type "outdoor" --image_dir "{INPUT_PATH}"

#generate_transforms_script = f"{ROOT}/scripts/third_party/neuralangelo/convert_data_to_json.py"
#! python {generate_transforms_script} --data_dir "{SESSION_PATH}" --scene_type "outdoor"

In [None]:
from PIL import Image
from PIL.ExifTags import TAGS, GPSTAGS

def get_exif_data(image_path):
    try:
        image = Image.open(image_path)
        exif_data = image._getexif()

        if exif_data is not None:
            # Decode the EXIF data
            decoded_exif = {TAGS[key]: exif_data[key] for key in exif_data.keys() if key in TAGS and isinstance(exif_data[key], (int, str, bytes))}
            return decoded_exif
        else:
            print("No EXIF data found.")
            return None

    except Exception as e:
        print(f"Error reading EXIF data: {e}")
        return None

# Example usage
image_path = '/mnt/d/dev/python/historical-photo-sfm-pipeline/data/2_input/IMG_0555.jpeg'
exif_data = get_exif_data(image_path)

if exif_data:
    print("EXIF Metadata:")
    for key, value in exif_data.items():
        print(f"{key}: {value}")

In [None]:
'''
camera = pycolmap.Camera(
    model=camera_model_name_or_id,
    width=width,
    height=height,
    params=params,
)

import pycolmap
reconstruction = pycolmap.Reconstruction("path/to/reconstruction/dir")
print(reconstruction.summary())

for image_id, image in reconstruction.images.items():
    print(image_id, image)

for point3D_id, point3D in reconstruction.points3D.items():
    print(point3D_id, point3D)

for camera_id, camera in reconstruction.cameras.items():
    print(camera_id, camera)

reconstruction.write("path/to/reconstruction/dir/")
'''