# Convert from GPS and Euler angles in EXIF to camera intrinsics and extrinsics

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
from glob import glob
import numpy as np
import PIL.Image
import PIL.ExifTags
import xml.etree.ElementTree as ET
import pymap3d
from tqdm.auto import tqdm

In [None]:
image_dir = os.path.expanduser("~/data/image_set/dbl/AerialPhotography")
output_path = os.path.expanduser("~/data/image_set/dbl/parsed_from_exif")

In [None]:
image_name_list = [i[len(image_dir):].lstrip("/\\") for i in
                   glob(os.path.join(image_dir, "**/*.[jJ][pP][gG]"), recursive=True)]
image_name_list.sort()
image_name_list

# 1. Extract EXIF data

In [None]:
def extract_exif_values(im):
    raw_exif_data = im._getexif()
    friendly_exif_data = {
        PIL.ExifTags.TAGS[k]: v
        for k, v in im._getexif().items()
        if k in PIL.ExifTags.TAGS
    }

    return friendly_exif_data, raw_exif_data

In [None]:
# https://exiftool.org/TagNames/DJI.html
def extract_xmp_values(im):
    xmp = {}

    for segment, content in im.applist:
        # fine xmp content
        marker, body = content.split(b'\x00', 1)
        if segment == 'APP1' and marker == b'http://ns.adobe.com/xap/1.0/':
            # convert to string
            str_body = body.decode()
            tree = ET.fromstring(str_body)
            for child in tree.iter():
                for attr in child.attrib:
                    key = attr.rsplit("}", maxsplit=1)
                    if len(key) == 2:
                        key = key[1]
                    else:
                        key = attr
                    xmp[key] = child.attrib[attr]
    return xmp

In [None]:
def extract(image_path: str):
    with PIL.Image.open(image_path) as im:
        exif, _ = extract_exif_values(im)
        xmp = extract_xmp_values(im)
        size = im.size
    return exif, xmp, size

In [None]:
exif_list = []
xmp_list = []
image_width_list = []
image_height_list = []

for i in tqdm(image_name_list):
    exif, xmp, size = extract(os.path.join(image_dir, i))
    exif_list.append(exif)
    xmp_list.append(xmp)
    image_width_list.append(size[0])
    image_height_list.append(size[1])

# 2. Parse EXIF

In [None]:
def gps_tuple_to_angle(i) -> float:
    return (((float(i[2]) / 60.) + float(i[1])) / 60.) + float(i[0])


def parse_GPSInfo(info: dict):
    decoded = {}
    for key in info.keys():
        decode = PIL.ExifTags.GPSTAGS.get(key, key)
        decoded[decode] = info[key]

    simplified = {
        "LatitudeRef": decoded["GPSLatitudeRef"],
        "LongitudeRef": decoded["GPSLongitudeRef"],
        "AltitudeRef": int.from_bytes(decoded["GPSAltitudeRef"], "little"),
        "Latitude": gps_tuple_to_angle(decoded["GPSLatitude"]),
        "Longitude": gps_tuple_to_angle(decoded["GPSLongitude"]),
        "Altitude": float(decoded["GPSAltitude"]),
    }
    return simplified


def get_gps_data(exif, xmp):
    if "Lat" in xmp:
        return float(xmp["Lat"]), float(xmp["Lon"]), float(xmp["AbsAlt"]), float(xmp["RltAlt"])
    gps_data = parse_GPSInfo(exif["GPSInfo"])
    assert gps_data["LatitudeRef"] == "N", gps_data["LatitudeRef"]
    assert gps_data["LongitudeRef"] == "E", gps_data["LongitudeRef"]
    return gps_data["Latitude"], gps_data["Longitude"], xmp["AbsoluteAltitude"], xmp["RelativeAltitude"]


def get_euler_angle_data(xmp):
    if "Yaw" in xmp:
        return float(xmp["Yaw"]), float(xmp["Pitch"]), float(xmp["Roll"])

    return float(xmp["GimbalYawDegree"]), float(xmp["GimbalPitchDegree"]), float(xmp["GimbalRollDegree"])

In [None]:
camera_model_to_camera_id = {}
camera_id_to_camera_model = {}
valid_image_list = []
camera_width_list = []
camera_height_list = []
camera_focal_length_list = []
camera_has_prior_focal_length = []
gps_list = []
euler_angle_list = []
image_camera_id_list = []

camera_model_to_sensor_size = {
    "PSDK102S_V2137H": 23.5,
    "PSDK102S_V2137Q": 23.5,
    "PSDK102S_V2137X": 23.5,
    "PSDK102S_V2137Y": 23.5,
    "PSDK102S_V2137Z": 23.5,
    "ZENMUSE Z30": 5.14,  # not known accurately
}  # REMEMBER TO UPDATE IT

for image_idx in tqdm(range(len(image_name_list))):
    exif = exif_list[image_idx]
    xmp = xmp_list[image_idx]

    try:
        gps_data = get_gps_data(exif, xmp)
        skip = True
        for i in gps_data:
            if i != 0:
                skip = False
                break
        if skip:
            print("Skip {}".format(image_name_list[image_idx]))
            continue
    except AssertionError:
        print("Skip {} for invalid GPSInfo".format(image_name_list[image_idx]))
        continue

    camera_model = exif["Model"]

    camera_key = "{}-{}-{}-{}".format(
        camera_model,
        exif["FocalLength"],
        image_width_list[image_idx],
        image_height_list[image_idx],
    )
    camera_id = camera_model_to_camera_id.get(camera_key, None)
    if camera_id is None:
        # if `FocalLengthIn35mmFilm` presents in EXIF, use it to calculate the focal length,
        # or calculate from sensor size
        focal_length_in_35mm = float(exif.get("FocalLengthIn35mmFilm", -1))
        if focal_length_in_35mm > 0:
            focal_length = focal_length_in_35mm * image_width_list[image_idx] / 36  # 36x24mm
            camera_has_prior_focal_length.append(1)
        else:
            sensor_size = camera_model_to_sensor_size[camera_model]
            if sensor_size > 0:
                # known sensor size
                focal_length = float(exif["FocalLength"]) * image_width_list[image_idx] / sensor_size
                camera_has_prior_focal_length.append(1)
            else:
                # unknown
                focal_length = 1.25 * max(image_width_list[image_idx], image_height_list[image_idx])
                camera_has_prior_focal_length.append(0)

        camera_id = len(camera_width_list)
        camera_width_list.append(image_width_list[image_idx])
        camera_height_list.append(image_height_list[image_idx])
        camera_focal_length_list.append(focal_length)
        camera_model_to_camera_id[camera_key] = camera_id
        camera_id_to_camera_model[camera_id] = camera_model

    image_camera_id_list.append(camera_id)

    gps_list.append(gps_data)
    euler_angle_list.append(get_euler_angle_data(xmp))  # Z-Y-X

    valid_image_list.append(image_name_list[image_idx])

len(valid_image_list)

# 3. Convert to poses

In [None]:
gps_array = np.array(gps_list, dtype=np.float64)
euler_angle_array = np.radians(np.array(euler_angle_list, dtype=np.float64))  # [N, Z-Y-X]

In [None]:
mid_center = (np.min(gps_array, axis=0) + np.max(gps_array, axis=0)) * 0.5
mid_center

In [None]:
xyz_in_ned = np.stack(pymap3d.geodetic2ned(
    gps_array[:, 0], gps_array[:, 1], gps_array[:, 2],
    mid_center[0], mid_center[1], mid_center[2],
), axis=1)
xyz_in_ned

In [None]:
import viser.transforms as vt

In [None]:
z_R = vt.SO3.from_z_radians(euler_angle_array[:, 0]).as_matrix()
y_R = vt.SO3.from_y_radians(euler_angle_array[:, 1]).as_matrix()
x_R = vt.SO3.from_x_radians(euler_angle_array[:, 2]).as_matrix()
R = z_R @ y_R @ x_R  # in Z-Y-X order

# An extra rotation for the view direction of the camera (look at Z+(Down) to X+ (North) of the world).
# Different cameras has different extra rotations.
y_extra_R = vt.SO3.from_y_radians(np.pi / 2).as_matrix()
counter_clock_wise_extra_R = y_extra_R @ vt.SO3.from_z_radians(np.pi / 2).as_matrix()
clockwise_extra_R = y_extra_R @ vt.SO3.from_z_radians(-np.pi / 2).as_matrix()

extra_R_list = []
for image_camera_id in image_camera_id_list:
    image_camera_model = camera_id_to_camera_model[image_camera_id]
    if image_camera_model.startswith("PSDK102S_V2137") and not image_camera_model.endswith("X"):
        extra_R_list.append(clockwise_extra_R)
    else:
        extra_R_list.append(counter_clock_wise_extra_R)
        
R = R @ np.stack(extra_R_list)

R.shape

In [None]:
c2w = np.concatenate(
    [
        np.concatenate([R, xyz_in_ned[..., None]], axis=-1),
        np.repeat(np.asarray([0., 0., 0., 1.], dtype=R.dtype)[None, None, :], R.shape[0], axis=0),
    ],
    axis=1,
)
# make sure the concatenating is correct
assert (c2w[0, :3, :3] == R[0]).all()
assert (c2w[0, :3, 3] == xyz_in_ned[0]).all()
assert (c2w[:, -1, -1] == 1.).all()

In [None]:
# flip the whole world upside down (NED to ENU: switch the X and Y, flip the Z)
ned2enu = np.asarray([
    [0., 1., 0., 0., ],
    [1., 0., 0., 0., ],
    [0., 0., -1., 0.],
    [0., 0., 0., 1.],
], dtype=c2w.dtype)
c2w_in_enu = ned2enu @ c2w
assert (c2w_in_enu[0, 0] == c2w[0, 1]).all()
assert (c2w_in_enu[0, 1] == c2w[0, 0]).all()
assert (c2w_in_enu[0, 2] == -c2w[0, 2]).all()

In [None]:
xyz_in_enu = c2w_in_enu[:, :3, 3]

In [None]:
# store as npy
os.makedirs(output_path, exist_ok=True)
np.save(
    os.path.join(output_path, "parsed.npy"),
    {
        "image_name_list": valid_image_list,
        "gps": gps_array,
        "gps_origin": mid_center,
        "euler_angle": euler_angle_array,
        "camera": [
            np.asarray(camera_width_list, dtype=np.int32),
            np.asarray(camera_height_list, dtype=np.int32),
            np.asarray(camera_focal_length_list, dtype=np.float64),
            np.asarray(camera_has_prior_focal_length, dtype=np.uint8),
        ],
        "c2w": c2w_in_enu,
        "image_camera_id_list": image_camera_id_list,
    },
)

# 4. Store as a colmap model

In [None]:
from internal.utils import colmap

In [None]:
colmap_output_path = output_path
colmap_sparse_model_output_path = os.path.join(colmap_output_path, "sparse_from_exif")
os.makedirs(colmap_sparse_model_output_path, exist_ok=True)

In [None]:
colmap_db_path = os.path.join(colmap_output_path, "colmap.db")
assert os.path.exists(colmap_db_path) is False
import subprocess

subprocess.call([
    "colmap",
    "database_creator",
    "--database_path={}".format(colmap_db_path),
])
colmap_db_path

In [None]:
import sqlite3

colmap_db = sqlite3.connect(colmap_db_path)

In [None]:
try:
    colmap_cameras = {}
    for camera_idx in range(len(camera_width_list)):
        colmap_camera_idx = camera_idx + 1
        colmap_db.execute(
            "INSERT INTO `cameras` "
            "(camera_id, model, width, height, params, prior_focal_length) "
            "VALUES(?, ?, ?, ?, ?, ?)",
            [
                colmap_camera_idx,
                4,  # OPENCV
                camera_width_list[camera_idx],
                camera_height_list[camera_idx],
                np.asarray([
                    camera_focal_length_list[camera_idx],
                    camera_focal_length_list[camera_idx],
                    camera_width_list[camera_idx] / 2,
                    camera_height_list[camera_idx] / 2,
                    0.,
                    0.,
                    0.,
                    0.,
                ], dtype=np.float64).tostring(),
                camera_has_prior_focal_length[camera_idx],
            ],
        )

        colmap_cameras[colmap_camera_idx] = colmap.Camera(
            id=colmap_camera_idx,
            model="PINHOLE",
            width=camera_width_list[camera_idx],
            height=camera_height_list[camera_idx],
            params=np.asarray([
                camera_focal_length_list[camera_idx],
                camera_focal_length_list[camera_idx],
                camera_width_list[camera_idx] / 2,
                camera_height_list[camera_idx] / 2,
            ], dtype=np.float64),  # [fx, fy, cx, cy]
        )

    colmap_images = {}
    w2c = np.linalg.inv(c2w_in_enu)
    for image_idx in range(len(valid_image_list)):
        colmap_image_idx = image_idx + 1
        qvec = colmap.rotmat2qvec(w2c[image_idx, :3, :3])
        tvec = w2c[image_idx, :3, 3]

        colmap_db.execute(
            "INSERT INTO `images` "
            "VALUES (?, ?, ?)",
            [
                colmap_image_idx,
                valid_image_list[image_idx],
                image_camera_id_list[image_idx] + 1,
            ],
        )

        colmap_images[colmap_image_idx] = colmap.Image(
            id=colmap_image_idx,
            qvec=qvec,
            tvec=tvec,
            camera_id=image_camera_id_list[image_idx] + 1,
            name=valid_image_list[image_idx],
            xys=np.asarray([]),
            point3D_ids=np.asarray([]),
        )
except Exception as e:
    colmap_db.rollback()
    raise e

colmap_db.commit()
colmap_db.close()

In [None]:
colmap.write_cameras_binary(colmap_cameras, os.path.join(colmap_sparse_model_output_path, "cameras.bin"))
colmap.write_images_binary(colmap_images, os.path.join(colmap_sparse_model_output_path, "images.bin"))
colmap.write_points3D_binary({}, os.path.join(colmap_sparse_model_output_path, "points3D.bin"))

In [None]:
os.makedirs("{}_text".format(colmap_sparse_model_output_path), exist_ok=True)
subprocess.run([
    "colmap",
    "model_converter",
    "--input_path={}".format(colmap_sparse_model_output_path),
    "--output_path={}_text".format(colmap_sparse_model_output_path),
    "--output_type=TXT",
])