# Surface Tracking - Images

In [None]:
import os
import typing as T

### API Overview

```python3
from surface_tracker import SurfaceTracker


# Instace implementing the `surface_tracker.CameraModel` interface.
#
# This version of the library does not provide a concrete implementation of this interface,
# so library clients must implement a concrete subclass of `surface_tracker.CameraModel`.
camera_model = ...


# List of the detected instances of `surface_tracker.Marker` class.
detected_markers_in_frame_n = ...


# Main entry point of the library API;
# used to define and locate surfaces based on
# a list of markers and a camera model
surface_tracker = SurfaceTracker()


my_surface = surface_tracker.define_surface(
    name="My Surface",
    markers=detected_markers_in_frame_n,
    camera_model=camera_model,
)


detected_markers_in_frame_k = ...


surface_location_within_frame_k = surface_tracker.locate_surface(
    surface=my_surface,
    markers=detected_markers_in_frame_k,
    camera_model=camera_model,
)
```

In [None]:
from surface_tracker import (
    CornerId,
    Marker, MarkerId,
    Surface, SurfaceId,
    SurfaceImageCrop,
    SurfaceHeatmap,
    SurfaceLocation,
    SurfaceTracker,
    SurfaceVisualAnchors,
)

## Implementing the interfaces

Bellow are basic implementations of the parts that are not provided by the library.

### Camera model

The current version of the `surface_tracker` library does **not** provide a concrete implementation of the `CameraModel` interface.

The client of the library **must** implement the interface to be able to use the library.

In [None]:
import numpy as np
import cv2


# Source: pupil/pupil_src/shared_modules/camera_model.py
class RadialDistorsionCamera():
    """ Camera model assuming a lense with radial distortion (this is the defaut model in opencv).
        Provides functionality to make use of a pinhole camera calibration that is also compensating for lense distortion
    """

    def __init__(self, name, resolution, K, D):
        self.name = name
        self.__resolution = resolution
        self.K = np.array(K)
        self.D = np.array(D)

    # CameraModel Interface

    @property
    def resolution(self) -> T.Tuple[int, int]:
        return self.__resolution
    
    def undistort_points_on_image_plane(self, points):
        points = self.__unprojectPoints(points, use_distortion=True)
        points = self.__projectPoints(points, use_distortion=False)
        return points

    def distort_points_on_image_plane(self, points):
        points = self.__unprojectPoints(points, use_distortion=False)
        points = self.__projectPoints(points, use_distortion=True)
        return points

    # Private

    def __projectPoints(self, object_points, rvec=None, tvec=None, use_distortion=True):
        """
        Projects a set of points onto the camera plane as defined by the camera model.
        :param object_points: Set of 3D world points
        :param rvec: Set of vectors describing the rotation of the camera when recording the corresponding object point
        :param tvec: Set of vectors describing the translation of the camera when recording the corresponding object point
        :return: Projected 2D points
        """
        input_dim = object_points.ndim

        object_points = object_points.reshape((1, -1, 3))

        if rvec is None:
            rvec = np.zeros(3).reshape(1, 1, 3)
        else:
            rvec = np.array(rvec).reshape(1, 1, 3)

        if tvec is None:
            tvec = np.zeros(3).reshape(1, 1, 3)
        else:
            tvec = np.array(tvec).reshape(1, 1, 3)

        if use_distortion:
            _D = self.D
        else:
            _D = np.asarray([[0.0, 0.0, 0.0, 0.0, 0.0]])

        image_points, jacobian = cv2.projectPoints(
            object_points, rvec, tvec, self.K, _D
        )

        if input_dim == 2:
            image_points.shape = (-1, 2)
        elif input_dim == 3:
            image_points.shape = (-1, 1, 2)
        return image_points

    def __unprojectPoints(self, pts_2d, use_distortion=True, normalize=False):
        """
        Undistorts points according to the camera model.
        :param pts_2d, shape: Nx2
        :return: Array of unprojected 3d points, shape: Nx3
        """
        pts_2d = np.array(pts_2d, dtype=np.float32)

        # Delete any posibly wrong 3rd dimension
        if pts_2d.ndim == 1 or pts_2d.ndim == 3:
            pts_2d = pts_2d.reshape((-1, 2))

        # Add third dimension the way cv2 wants it
        if pts_2d.ndim == 2:
            pts_2d = pts_2d.reshape((-1, 1, 2))

        if use_distortion:
            _D = self.D
        else:
            _D = np.asarray([[0.0, 0.0, 0.0, 0.0, 0.0]])

        pts_2d_undist = cv2.undistortPoints(pts_2d, self.K, _D)

        pts_3d = cv2.convertPointsToHomogeneous(pts_2d_undist)
        pts_3d.shape = -1, 3

        if normalize:
            pts_3d /= np.linalg.norm(pts_3d, axis=1)[:, np.newaxis]

        return pts_3d


In [None]:
camera_model = RadialDistorsionCamera(
    name="Radial Distorsion Camera",
    resolution=(1280, 720),
    K=[
        [829.3510515270362, 0.0, 659.9293047259697],
        [0.0, 799.5709408845464, 373.0776462356668],
        [0.0, 0.0, 1.0],
    ],
    D=[
        [
            -0.43738542863224966,
            0.190570781428104,
            -0.00125233833830639,
            0.0018723428760170056,
            -0.039219091259637684,
        ]
    ],
)
camera_model

### Marker detector

This example is using surfaces defined by `apriltag` markers.

Bellow is a wrapper around the `apriltag` detector provided by the `pupil_apriltags` library. The most importat part that the wrapper does is construct `surface_tracker.Marker` instances from the `apriltag` detections.

In [None]:
import pupil_apriltags


def create_apriltag_marker_uid(tag_family: str, tag_id: int) -> MarkerId:
    # Construct the UID by concatinating the tag family and the tag id
    return MarkerId(f"{tag_family}:{tag_id}")


class ApriltagDetector:

    def __init__(
        self,
        *families: T.Set[str],
        camera_model,
    ):
        families = " ".join(families)
        self._camera_model = camera_model
        self._detector = pupil_apriltags.Detector(families=families)

    def detect_from_image(self, image) -> T.List[Marker]:
        gray = rgb_to_gray(image)
        return self.detect_from_gray(gray)

    def detect_from_gray(self, gray) -> T.List[Marker]:
        undist_gray = self._camera_model.undistort_points_on_image_plane(gray)

        # Detect apriltag markers from the gray image
        markers = self._detector.detect(undist_gray)

        # Ensure detected markers are unique
        # TODO: Between deplicate markers, pick the one with higher confidence
        uid_fn = self.__apiltag_marker_uid
        markers = dict((uid_fn(m), m) for m in markers).values()

        # Convert apriltag markers into surface tracker markers
        marker_fn = self.__apriltag_marker_to_surface_marker
        markers = [marker_fn(m) for m in markers]

        return markers

    @staticmethod
    def __apiltag_marker_uid(apriltag_marker: pupil_apriltags.Detection) -> MarkerId:
        family = apriltag_marker.tag_family.decode("utf-8")
        tag_id = int(apriltag_marker.tag_id)
        return create_apriltag_marker_uid(family, tag_id)

    @staticmethod
    def __apriltag_marker_to_surface_marker(apriltag_marker: pupil_apriltags.Detection) -> Marker:

        # Construct the surface tracker marker UID
        uid = ApriltagDetector.__apiltag_marker_uid(apriltag_marker)

        # Extract vertices in the correct format form apriltag marker
        vertices = [[point] for point in apriltag_marker.corners]

        # TODO: Verify this is correct...
        starting_with = CornerId.TOP_LEFT
        clockwise = True

        return Marker.from_vertices(
            uid=uid,
            undistorted_image_space_vertices=vertices,
            starting_with=starting_with,
            clockwise=clockwise
        )


Finally, here are some helper functions that load sample images, and visualize images enriched by the data supplied by the `surface_tracker` library.

In [None]:
import matplotlib.pyplot as plt
from PIL import Image


### Loading images


def load_image(path: str) -> np.ndarray:
    image = Image.open(path)
    image = np.asarray(image)
    return image


def load_sample_image(name: str, frame_index: int) -> np.ndarray:
    i = str(frame_index).rjust(5, "0")
    path = os.path.join(".", "sample_data", name, "frames", f"{i}.jpg")
    return load_image(path)


### Converting images


def rgb_to_gray(image: np.ndarray) -> np.ndarray:
    image = Image.fromarray(image)
    image = image.convert("L")
    image = np.asarray(image)
    return image


### Overlaying information on images


def overlay_surface_visual_anchors(image: np.ndarray, anchors: SurfaceVisualAnchors) -> np.ndarray:
    image = image.copy()
    image = overlay_polyline(image, anchors.top_polyline, color=(0, 0, 255))
    image = overlay_polyline(image, anchors.perimeter_polyline, color=(255, 0, 0))
    # TODO: Overlay menu edit buttons
    return image


def overlay_markers(image: np.ndarray, markers: T.List[Marker], color=(0, 255, 0), alpha=0.7) -> np.ndarray:
    image = image.copy()
    overlay = np.zeros_like(image)
    vertices = np.asarray([m.vertices() for m in markers], dtype=np.int32)
    overlay = cv2.fillPoly(overlay, vertices, color)
    image = cv2.addWeighted(image, 1.0, overlay, alpha, 0.0)

    for m in markers:
        n = int(str(m.uid).split(":")[1])
        vertices = m.vertices()
        image = overlay_text(image, str(n), vertices[0][0])

    return image


def overlay_polyline(image: np.ndarray, points, color, is_closed=True, thickness=2) -> np.ndarray:
    image = image.copy()
    points = np.array([points], dtype=np.int32)
    return cv2.polylines(image, points, isClosed=is_closed, color=color, thickness=thickness)


def overlay_polyfill(image: np.ndarray, points, color) -> np.ndarray:
    image = image.copy()
    return image  # FIXME


def overlay_text(image: np.ndarray, text: str, point, color=(0, 0, 255)) -> np.ndarray:
    image = image.copy()

    FONT = cv2.FONT_HERSHEY_SIMPLEX
    FONT_SCALE = 1.0
    FONT_THICKNESS = 2

    (label_width, label_height), baseline = cv2.getTextSize(text, FONT, FONT_SCALE, FONT_THICKNESS)

    point = (
        int(point[0]),
        int(point[1]),
    )

    image = cv2.putText(image, text, point, FONT, FONT_SCALE, color, FONT_THICKNESS)
    return image


def overlay_heatmap_on_cropped_image(
    image: np.ndarray,
    image_crop: SurfaceImageCrop,
    heatmap=SurfaceHeatmap,
    heatmap_alpha: float = None
):
    heatmap_alpha = heatmap_alpha if heatmap_alpha is not None else 0.4

    surface_image = image_crop.apply_to_image(image)

    heatmap_image = heatmap.image(size=image_crop.size_in_image_space)

    return cv2.addWeighted(surface_image, 1.0, heatmap_image, heatmap_alpha, 0.0)


### Showing images


def show_markers(image: np.ndarray, markers: T.List[Marker]):
    image = overlay_markers(image, markers)
    show_image(image)


def show_heatmap_on_cropped_surface(
    image: np.ndarray,
    surface: Surface,
    surface_tracker: SurfaceTracker,
    location: SurfaceLocation,
    heatmap_points: T.List[T.Tuple[int, int]],
    heatmap_alpha: float = None,
    crop_width=None, crop_height=None,
    max_width: float=None, max_height: float=None
):

    image_crop, heatmap = surface_tracker.locate_surface_image_crop_with_heatmap(
        surface=surface,
        location=location,
        points=heatmap_points,
        width=crop_width,
        height=crop_height,
    )

    image = overlay_heatmap_on_cropped_image(
        image=image,
        image_crop=image_crop,
        heatmap=heatmap,
        heatmap_alpha=heatmap_alpha,
    )

    show_image(image, max_width=max_width, max_height=max_height)


def show_surface(image: np.ndarray, surface: Surface, surface_tracker: SurfaceTracker, marker_detector: ApriltagDetector, max_width: float=None, max_height: float=None):
    gray = rgb_to_gray(image)

    markers = marker_detector.detect_from_gray(gray)

    ignored_markers = []
    surface_markers = []
    for m in markers:
        if m.uid in surface.registered_marker_uids:
            surface_markers.append(m)
        else:
            ignored_markers.append(m)

    image = overlay_markers(image, ignored_markers, color=(255, 0, 0), alpha=0.3)
    image = overlay_markers(image, surface_markers, color=(0, 255, 0), alpha=0.7)

    location = surface_tracker.locate_surface(
        surface=surface,
        markers=markers,
    )
    
    if location is not None:
        # TODO: Overlay surface name
        show_location(image, surface, location, surface_tracker, max_width=max_width, max_height=max_height)
    else:
        show_image(image, max_width=max_width, max_height=max_height)


def show_location(image: np.ndarray, surface: Surface, location: SurfaceLocation, surface_tracker: SurfaceTracker, max_width: float=None, max_height: float=None):
    visual_anchors = surface_tracker.locate_surface_visual_anchors(
        surface=surface,
        location=location,
    )
    show_visual_anchors(image, visual_anchors, max_width=max_width, max_height=max_height)


def show_visual_anchors(image: np.ndarray, anchors: SurfaceVisualAnchors, max_width: float=None, max_height: float=None):
    image = overlay_surface_visual_anchors(image, anchors)
    show_image(image, max_width=max_width, max_height=max_height)


def show_image(image: np.ndarray, max_width: float=None, max_height: float=None):
    ratio = image.shape[0] / image.shape[1]

    dpi = plt.gcf().dpi

    if max_width is None and max_height is None:
        width_inch = 30 * ratio
        height_inch = 30
    elif max_width is None:
        width_inch = max_height / ratio / dpi
        height_inch = max_height / dpi
    elif max_height is None:
        width_inch = max_width / dpi
        height_inch = max_width * ratio / dpi
    else:
        raise ValueError("Must supply EITHER max_width OR max_height")

    fig, ax = plt.subplots(figsize=(width_inch, height_inch))
    if len(image.shape) == 2:
        ax.imshow(image, interpolation=None, cmap='gray')
    else:
        ax.imshow(image, interpolation=None)
    plt.show()


## Putting it all together

First, the `1`st frame from the `apriltag_world` video is loaded. This will be used to define the surface.

In [None]:
image_1 = load_sample_image("apriltag_world", frame_index=1)
show_image(image_1)

Next, the `apriltag` detector is run to find all the markers in the image. These markers will define the surface.

In [None]:
apriltag_tag_family = "tagCustom48h12"
apriltag_detector = ApriltagDetector(apriltag_tag_family, camera_model=camera_model)
apriltag_detector

In [None]:
detected_markers_1 = apriltag_detector.detect_from_image(image_1)
print(f"Detected {len(detected_markers_1)} markers.")
show_markers(image_1, detected_markers_1)

In [None]:
marker_matrix = [
    [1230, 1025, 820, 615, 410, 205, 0],
    [1231, 1026, 821, 616, 411, 206, 1],
    [1232, 1027, 822, 617, 412, 207, 2],
    [1233, 1028, 823, 618, 413, 208, 3],
    [1234, 1029, 824, 619, 414, 209, 4],
]


for i in range(len(marker_matrix)):
    for j in range(len(marker_matrix[i])):
        f = apriltag_tag_family
        n = marker_matrix[i][j]
        marker_matrix[i][j] = create_apriltag_marker_uid(f, n)

Next, the `SurfaceTracker` instance is used to define a surface called `"My Surface"`. This definition will be used to locate the surface within any frame where a subset of the markers used in the definition are present.

In [None]:
surface_tracker = SurfaceTracker()
surface_tracker

In [None]:
my_surface_marker_uids = [
    marker_matrix[1][1], marker_matrix[1][3],
    marker_matrix[3][1], marker_matrix[3][3],
]

my_surface = surface_tracker.define_surface(
    name="My Surface",
    markers=[m for m in detected_markers_1 if m.uid in my_surface_marker_uids],
)
print([m for m in detected_markers_1 if m.uid in my_surface_marker_uids])
assert my_surface is not None
my_surface

Bellow is an example of how to locate the surface. As a test that the definition is correct, the next cell visualizes the surface within the same frame that was used to define the surface.

In [None]:
my_surface_location_within_frame_1 = surface_tracker.locate_surface(
    surface=my_surface,
    markers=detected_markers_1,
)
assert my_surface_location_within_frame_1 is not None
show_location(image_1, my_surface, my_surface_location_within_frame_1, surface_tracker)

Finally, to test that the definition works within any frame, the code bellow loads a different frame, detects the markers preset in the image, locates the surface within the frame, and visualizes the result.

In [None]:
image_4 = load_sample_image("apriltag_world", frame_index=4)
show_image(image_4)

In [None]:
detected_markers_4 = apriltag_detector.detect_from_image(image_4)
print(f"Detected {len(detected_markers_4)} markers.")
show_markers(image_4, detected_markers_4)

In [None]:
my_surface_location_within_frame_4 = surface_tracker.locate_surface(
    surface=my_surface,
    markers=detected_markers_4,
)
assert my_surface_location_within_frame_4 is not None
show_location(image_4, my_surface, my_surface_location_within_frame_4, surface_tracker)

## Add/Remove corner

The following section will show how to add/remove a corner from the surface definition.

To visualize the update, the surface will be drawn togather with the detected markers. The markers that are part of the surface definition will be filled with green, while those that are not part of the definition will be filled with red. Also, the set of the registered markers will be printed after each update to verify that the update did take place.

In [None]:
show_surface(image=image_4, surface=my_surface, surface_tracker=surface_tracker, marker_detector=apriltag_detector)

In [None]:
my_surface.registered_marker_uids

In [None]:
marker_uids_to_remove = [
    marker_matrix[1][3],
    marker_matrix[3][3]
]

surface_tracker.remove_markers_from_surface(
    surface=my_surface,
    location=my_surface_location_within_frame_4,
    marker_uids=marker_uids_to_remove,
)

my_surface.registered_marker_uids

In [None]:
show_surface(image=image_4, surface=my_surface, surface_tracker=surface_tracker, marker_detector=apriltag_detector)

In [None]:
my_surface.registered_marker_uids

In [None]:
marker_uids_to_add = [
    marker_matrix[0][5],
    marker_matrix[4][1],
    marker_matrix[4][5],
]

markers_to_add = [m for m in detected_markers_4 if m.uid in marker_uids_to_add]
assert len(markers_to_add) == len(marker_uids_to_add)

surface_tracker.add_markers_to_surface(
    surface=my_surface,
    location=my_surface_location_within_frame_4,
    markers=markers_to_add,
)

my_surface.registered_marker_uids

In [None]:
show_surface(image=image_4, surface=my_surface, surface_tracker=surface_tracker, marker_detector=apriltag_detector)

## Current corner position and moving a corner

A corner's position can be updated with a new position in image space. To move a corner with a certain offset in image space, the user can get the current position, apply the offset, and update the corner with the new position.

In [None]:
def move_surface_corners_with_offsets(
    surface: Surface,
    location: SurfaceLocation,
    offsets_by_corner: T.Mapping[CornerId, T.Tuple[int, int]]
):

    ordered_corners = list(offsets_by_corner.keys())

    old_positions_in_image_space = surface_tracker.surface_corner_positions_in_image_space(
        surface=surface,
        location=location,
        corners=ordered_corners,
    )

    new_positions_in_image_space = {}

    for corner, offset in offsets_by_corner.items():
        new_positions_in_image_space[corner] = (
            old_positions_in_image_space[corner][0] + offset[0],
            old_positions_in_image_space[corner][1] + offset[1],
        )

    assert len(offsets_by_corner) == len(new_positions_in_image_space)

    surface_tracker.move_surface_corner_positions_in_image_space(
        surface=surface,
        location=location,
        new_positions=new_positions_in_image_space,
    )


# Move top left corner by y+50
# Move top right corner by x-20
# Move top right corner by y+50
# Move bottom right corner by x-20
move_surface_corners_with_offsets(
    surface=my_surface,
    location=my_surface_location_within_frame_1,
    offsets_by_corner={
        CornerId.TOP_LEFT: (0, +50),
        CornerId.TOP_RIGHT: (-20, +50),
        CornerId.BOTTOM_RIGHT: (-20, 0),
    }
)

In [None]:
show_surface(image=image_4, surface=my_surface, surface_tracker=surface_tracker, marker_detector=apriltag_detector)

## Image crop and points heatmap

In [None]:
show_image(image_4)

In [None]:
my_surface_location_within_frame_4 = surface_tracker.locate_surface(
    surface=my_surface,
    markers=detected_markers_4,
)
assert my_surface_location_within_frame_4 is not None
show_location(image_4, my_surface, my_surface_location_within_frame_4, surface_tracker)

In [None]:
crop_within_frame_4 = surface_tracker.locate_surface_image_crop(
    surface=my_surface,
    location=my_surface_location_within_frame_4,
#     width=100,
    height=100,
)
image_crop_4 = crop_within_frame_4.apply_to_image(image=image_4)
show_image(image_crop_4, max_height=600)

### Loading gaze points

In [None]:
import os
import collections
import types

import msgpack

def load_pldata_file(directory, topic):

    MSGPACK_EXT_CODE = 13

    def unpacking_object_hook(obj):
        if type(obj) is dict:
            return types.MappingProxyType(obj)

    def unpacking_ext_hook(code, data):
        if code == MSGPACK_EXT_CODE:
            return dict(msgpack_bytes=data)
        return msgpack.ExtType(code, data)

    ts_file = os.path.join(directory, topic + "_timestamps.npy")
    msgpack_file = os.path.join(directory, topic + ".pldata")
    try:
        data = collections.deque()
        topics = collections.deque()
        data_ts = np.load(ts_file)
        with open(msgpack_file, "rb") as fh:
            for topic, payload in msgpack.Unpacker(fh, raw=False, use_list=False):
                d = msgpack.unpackb(
                    payload,
                    raw=False,
                    use_list=False,
                    object_hook=unpacking_object_hook,
                    ext_hook=unpacking_ext_hook,
                )
                data.append(d)
                topics.append(topic)
    except FileNotFoundError:
        data = []
        data_ts = []
        topics = []

    return data, data_ts, topics


def load_gaze_points_in_image_space(name, camera_model) -> T.List[T.Tuple[int, int]]:

    path = os.path.join(".", "sample_data", name)
    gaze_data, gaze_ts, _ = load_pldata_file(path, "gaze")

    gaze_ts= np.asarray(gaze_ts)
    gaze_data = np.asarray(gaze_data, dtype=object)

    # Find correct order once and reorder both lists in-place
    sorted_idc = np.argsort(gaze_ts)
    gaze_ts = gaze_ts[sorted_idc].tolist()
    gaze_data = gaze_data[sorted_idc].tolist()

    def denormalize(pos, size, flip_y=False):
        width, height = size
        x = pos[0]
        y = pos[1]
        x *= width
        if flip_y:
            y = 1 - y
        y *= height
        return x, y

    def fn(g: dict) -> T.Tuple[int, int]:
        return denormalize(g["norm_pos"], camera_model.resolution, flip_y=True)

    gaze_data = map(fn, gaze_data)
    gaze_data = list(gaze_data)
    return gaze_data


In [None]:
gaze_data = load_gaze_points_in_image_space("apriltag_world", camera_model)
np.asarray(gaze_data).shape

In [None]:
image_crop_f4, heatmap_f4 = surface_tracker.locate_surface_image_crop_with_heatmap(
    surface=my_surface,
    location=my_surface_location_within_frame_4,
    points=gaze_data,
)
(image_crop_f4, heatmap_f4)

In [None]:
show_image(image_crop_f4.apply_to_image(image_4), max_height=600)

In [None]:
show_image(heatmap_f4.image(size=image_crop_f4.size_in_image_space), max_height=600)

In [None]:
show_heatmap_on_cropped_surface(
    image=image_4,
    surface=my_surface,
    surface_tracker=surface_tracker,
    location=my_surface_location_within_frame_4,
    heatmap_points=gaze_data,
#     heatmap_alpha=None,
#     crop_width=None,
#     crop_height=None,
    max_height=600,
)