<a href="https://colab.research.google.com/github/takayama-rado/trado_samples/blob/main/colab_files/exp_track_affine_numpy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. Load library

In [None]:
!pip3 install mediapipe==0.10.0

Collecting mediapipe==0.10.0
  Downloading mediapipe-0.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m47.2 MB/s[0m eta [36m0:00:00[0m
Collecting sounddevice>=0.4.4 (from mediapipe==0.10.0)
  Downloading sounddevice-0.4.6-py3-none-any.whl (31 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.0 sounddevice-0.4.6


In [None]:
# Standard modules.
import gc
import sys
import time
from pathlib import Path
from functools import partial

# CV/ML.
import cv2
import numpy as np

# For drawing.
from mediapipe.python.solutions.face_mesh_connections import (
    FACEMESH_CONTOURS)
from mediapipe.python.solutions.hands_connections import (
    HAND_CONNECTIONS)
from mediapipe.python.solutions.pose_connections import (
    POSE_CONNECTIONS)

from IPython.display import HTML
from base64 import b64encode
import io

In [None]:
print(f"Python:{sys.version}")
print(f"OpenCV:{cv2.__version__}")
print(f"Numpy:{np.__version__}")

Python:3.10.12 (main, Jun 11 2023, 05:26:28) [GCC 11.4.0]
OpenCV:4.8.0
Numpy:1.23.5


# 2. Load data

In [None]:
!wget https://github.com/takayama-rado/trado_samples/raw/main/test_data/finger_far0_non_static.npy

--2023-10-29 12:16:15--  https://github.com/takayama-rado/trado_samples/raw/main/test_data/finger_far0_non_static.npy
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/takayama-rado/trado_samples/main/test_data/finger_far0_non_static.npy [following]
--2023-10-29 12:16:16--  https://raw.githubusercontent.com/takayama-rado/trado_samples/main/test_data/finger_far0_non_static.npy
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2300608 (2.2M) [application/octet-stream]
Saving to: ‘finger_far0_non_static.npy’


2023-10-29 12:16:16 (39.2 MB/s) - ‘finger_far0_non_static.npy’ saved [2300608/2300608]



In [None]:
!ls

finger_far0_non_static.npy  sample_data


In [None]:
def draw_landmarks(draw, landmarks, connections,
                   pt_color=(0, 0, 0), line_color=(0, 0, 0),
                   pt_size=3, line_size=3,
                   do_remove_error=True):
    for i, line in enumerate(connections):
        p0 = landmarks[line[0], :2]
        p1 = landmarks[line[1], :2]
        if do_remove_error:
            if np.isnan(p0).any() or np.isnan(p1).any():
                continue
            if (p0==0).any() or (p1==0).any():
                continue
        p0 = (int(p0[0]), int(p0[1]))
        p1 = (int(p1[0]), int(p1[1]))
        cv2.line(draw, p0, p1, line_color, line_size)
        cv2.circle(draw, p0, pt_size, pt_color, -1)
        cv2.circle(draw, p1, pt_size, pt_color, -1)
    return draw

In [None]:
def drawing(videofile, trackfile, outvideo):
    """Draw landmarks over input video.

    # Args:
      - videofile: The path of input video. This is used to draw background image.
      - trackfile: The path of tracking data.
      - outvideo: The path of output video file.
    """
    trackdata = np.load(trackfile)
    if videofile.is_file():
        capture = cv2.VideoCapture(str(videofile))
        capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
        width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
    else:
        capture = None
        width = 1200
        height = 1200

    xmin_orig = trackdata[:, :, :, 0].min()
    ymin_orig = trackdata[:, :, :, 1].min()
    xmax_orig = trackdata[:, :, :, 0].max()
    ymax_orig = trackdata[:, :, :, 1].max()
    if xmax_orig < 10 and ymax_orig < 10:
        trackdata[:, :, 0] *= width
        trackdata[:, :, 1] *= height
    xmin_proj = int(xmin_orig)
    ymin_proj = int(ymin_orig)
    xmax_proj = int(xmax_orig)
    ymax_proj = int(ymax_orig)

    # Add offset.
    offset_x = 0
    offset_y = 0
    if xmin_proj < 0:
        offset_x = -xmin_proj
        trackdata[:, :, :, 0] += offset_x
    if ymin_proj < 0:
        offset_y = -ymin_proj
        trackdata[:, :, :, 1] += offset_y

    ywin = int(max(ymax_proj + offset_y, height + offset_y))
    xwin = int(max(xmax_proj + offset_x, width + offset_x))

    print("Window size:", xwin, ywin)
    print("Offsets:", offset_x, offset_y)

    writer = cv2.VideoWriter(str(outvideo), cv2.VideoWriter_fourcc(*"mp4v"),
        30.0, (xwin, ywin))
    if writer.isOpened() is False:
        print("VideoWriter is failed to open.")
        if writer is not None:
            writer.release()
        raise ValueError(f"Can not open {videofile}")

    # `[P, T, J, C] -> [T, P, J, C]`
    trackdata = np.transpose(trackdata, [1, 0, 2, 3])
    for frame in trackdata:
        if capture is not None:
            status, image = capture.read()
            if status is False:
                break
        else:
            image = np.full([height, width, 3], 255, dtype=np.uint8)

        # Draw.
        draw = np.full([int(ywin), int(xwin), 3], 64, dtype=np.uint8)
        draw[offset_y: offset_y+height, offset_x: offset_x+width, :] = image

        for instance in frame:
            body = instance[:33, :]
            lhand = instance[33:33+21, :]
            rhand = instance[33+21:33+21+21, :]
            face = instance[33+21+21:, :]
            draw_landmarks(draw, body, POSE_CONNECTIONS, [0, 255, 0], [0, 255, 0],
                           do_remove_error=True)
            draw_landmarks(draw, lhand, HAND_CONNECTIONS, [255, 0, 0], [255, 0, 0],
                           do_remove_error=True)
            draw_landmarks(draw, rhand, HAND_CONNECTIONS, [0, 0, 255], [0, 0, 255],
                           do_remove_error=True)
            draw_landmarks(draw, face, FACEMESH_CONTOURS, [255, 255, 0], [255, 255, 0],
                           do_remove_error=True)
        writer.write(draw)
    writer.release()
    if capture is not None:
        capture.release()

In [None]:
def show_video(video_path, video_width=500, video_height=500):
    """Show video in a browser.

    # Args:
      - video_path: The path to a video file.
      - video_width: The width of video object in a browser.
      - video_height: The height of video object in a browser.

    # Returns:
      - The HTML object containing video data.
    """
    video = io.open(str(video_path), 'r+b').read()
    encoded = b64encode(video)
    decoded = encoded.decode("ascii")
    data = f"<video width={video_width} height={video_height} controls>" \
        + f'<source src="data:video/mp4;base64,{decoded}"' \
        + 'type="video/mp4" /></video>'
    return(HTML(data=data))

In [None]:
trackfile = Path("./finger_far0_non_static.npy")
outvideo = Path("./finger_far0_track_orig.mp4")

drawing(Path(""), trackfile, outvideo)

Window size: 1200 1200
Offsets: 0 0


In [None]:
# The MP4 file generated by OpenCV can not display correctly.
# So I convert it into WEBM file.
!ffmpeg -i finger_far0_track_orig.mp4 -vcodec vp9 -y finger_far0_track_orig.webm

ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enab

In [None]:
show_video(outvideo.stem + ".webm")

# 3. Evaluation settings

In [None]:
def get_perf_str(val):
    token_si = ["", "m", "µ", "n", "p"]
    exp_si = [1, 1e3, 1e6, 1e9, 1e12]
    perf_str = f"{val:3g}s"
    si = ""
    sval = val
    for token, exp in zip(token_si, exp_si):
        if val * exp > 1.0:
            si = token
            sval = val * exp
            break
    perf_str = f"{sval:3g}{si}s"
    return perf_str

In [None]:
def print_perf_time(intervals, top_k=None):
    if top_k is not None:
        intervals = np.sort(intervals)[:top_k]
    min = intervals.min()
    max = intervals.max()
    mean = intervals.mean()
    std = intervals.std()

    smin = get_perf_str(min)
    smax = get_perf_str(max)
    mean = get_perf_str(mean)
    std = get_perf_str(std)
    if top_k:
        print(f"Top {top_k} summary: Max {smax}, Min {smin}, Mean +/- Std {mean} +/- {std}")
    else:
        print(f"Overall summary: Max {smax}, Min {smin}, Mean +/- Std {mean} +/- {std}")

In [None]:
class PerfMeasure():
    def __init__(self,
                 trials=100,
                 top_k=10):
        self.trials = trials
        self.top_k = top_k

    def __call__(self, func):
        gc.collect()
        gc.disable()
        intervals = []
        for _ in range(self.trials):
            start = time.perf_counter()
            func()
            end = time.perf_counter()
            intervals.append(end - start)
        intervals = np.array(intervals)
        print_perf_time(intervals)
        if self.top_k:
            print_perf_time(intervals, self.top_k)
        gc.enable()
        gc.collect()

In [None]:
TRIALS = 100
TOPK = 10
pmeasure = PerfMeasure(TRIALS, TOPK)

# 4. Implement affine transformation

In [None]:
def get_affine_matrix_2d(center,
                         trans,
                         scale,
                         rot,
                         skew,
                         to_radians=True,
                         order=["center", "scale", "rot", "skew", "trans"],
                         dtype=np.float32):
    center = np.array(center)
    trans = np.array(trans)
    scale = np.array(scale)
    center_m = np.array([[1, 0, -center[0]],
                         [0, 1, -center[1]],
                         [0, 0, 1]])
    scale_m = np.array([[scale[0], 0, 0],
                        [0, scale[1], 0],
                        [0, 0, 1]])
    rot = np.radians(rot) if to_radians else rot
    _cos = np.cos(rot)
    _sin = np.sin(rot)
    rot_m = np.array([[_cos, -_sin, 0],
                      [_sin, _cos, 0],
                      [0, 0, 1]])
    _tan = np.tan(np.radians(skew)) if to_radians else np.tan(skew)
    skew_m = np.array([[1, _tan[0], 0],
                       [_tan[1], 1, 0],
                       [0, 0, 1]])
    move = center + trans
    trans_m = np.array([[1, 0, move[0]],
                        [0, 1, move[1]],
                        [0, 0, 1]])
    mat = np.identity(3, dtype=dtype)
    for name in order:
        if name == "center":
            mat = np.matmul(center_m, mat)
        if name == "scale":
            mat = np.matmul(scale_m, mat)
        if name == "rot":
            mat = np.matmul(rot_m, mat)
        if name == "skew":
            mat = np.matmul(skew_m, mat)
        if name == "trans":
            mat = np.matmul(trans_m, mat)
    return mat.astype(dtype)

In [None]:
def apply_affine(inputs, mat):
    xy = inputs[:, :, :2]
    xy = np.concatenate([xy, np.ones([xy.shape[0], xy.shape[1], 1])], axis=-1)
    xy = np.einsum("...j,ij", xy, mat)
    inputs[:, :, :2] = xy[:, :, :-1]
    return inputs

In [None]:
# Load data.
trackdata = np.load(trackfile)
print(trackdata.shape)

# Remove person axis.
trackdata = trackdata[0]

(1, 130, 553, 4)


In [None]:
# Get affine matrix.
center = (638.0, 389.0)
trans = (100.0, 0.0)
scale = (2.0, 0.5)
rot = 15.0
skew = (15.0, 15.0)
print("Parameters")
print("Center:", center)
print("Trans:", trans)
print("Scale:", scale)
print("Rot:", rot)
print("Skew:", skew)
mat = get_affine_matrix_2d(center, trans, scale, rot, skew, to_radians=True, dtype=trackdata.dtype)
print("Affine matrix:", mat)

Parameters
Center: (638.0, 389.0)
Trans: (100.0, 0.0)
Scale: (2.0, 0.5)
Rot: 15.0
Skew: (15.0, 15.0)
Affine matrix: [[ 2.07055236e+00  9.69385328e-18 -5.83012406e+02]
 [ 1.03527618e+00  4.48287736e-01 -4.45890132e+02]
 [ 0.00000000e+00  0.00000000e+00  1.00000000e+00]]


In [None]:
newtrack = apply_affine(trackdata.copy(), mat)

# Add person axis.
newtrack = np.expand_dims(newtrack, axis=0)
print("Shape of newtrack:", newtrack.shape)

# Save.
np.save("newtrack.npy", newtrack)

Shape of newtrack: (1, 130, 553, 4)


In [None]:
trackfile = Path("./newtrack.npy")
outvideo = Path("./newtrack.mp4")

drawing(Path(""), trackfile, outvideo)

Window size: 1783 1645
Offsets: 583 445


In [None]:
# The MP4 file generated by OpenCV can not display correctly.
# So I convert it into WEBM file.
!ffmpeg -i newtrack.mp4 -vcodec vp9 -y newtrack.webm

ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enab

In [None]:
show_video(outvideo.stem + ".webm")

In [None]:
def perf_wrap_func(trackdata, center, trans, scale, rot, skew, to_radians, dtype):
    mat = get_affine_matrix_2d(center, trans, scale, rot, skew, to_radians=to_radians, dtype=dtype)
    newtrack = apply_affine(trackdata, mat)

In [None]:
testtrack = trackdata.copy()
target_fn = partial(perf_wrap_func,
                    trackdata=testtrack,
                    center=center, trans=trans, scale=scale, rot=rot, skew=skew,
                    to_radians=True, dtype=testtrack.dtype)
pmeasure(target_fn)

Overall summary: Max 6.54465ms, Min 2.99026ms, Mean +/- Std 3.77009ms +/- 967.613µs
Top 10 summary: Max 3.18122ms, Min 2.99026ms, Mean +/- Std 3.12526ms +/- 55.0925µs


# 5. Application to randomized transformation

In [None]:
class RandomAffineTransform2D():
    def __init__(self,
                 center_joints,
                 apply_ratio,
                 trans_range,
                 scale_range,
                 rot_range,
                 skew_range,
                 random_seed=None,
                 order=["center", "scale", "rot", "skew", "trans"],
                 dtype=np.float32):
        self.center_joints = center_joints
        self.apply_ratio = apply_ratio
        self.trans_range = trans_range
        self.scale_range = scale_range
        self.rot_range = np.radians(rot_range)
        self.skew_range = np.radians(skew_range)
        self.order = order
        self.dtype = dtype
        if random_seed is not None:
            self.rng = np.random.default_rng(random_seed)
        else:
            self.rng = np.random.default_rng()

    def __call__(self, inputs):
        if self.rng.uniform() >= self.apply_ratio:
            return inputs

        # Calculate center position.
        temp = inputs[:, self.center_joints, :]
        temp = temp.reshape([inputs.shape[0], -1, inputs.shape[-1]])
        mask = np.sum(temp, axis=(1, 2)) != 0
        # Use x and y only.
        center = temp[mask].mean(axis=0).mean(axis=0)[:2]

        trans = self.rng.uniform(self.trans_range[0], self.trans_range[1], 2)
        scale = self.rng.uniform(self.scale_range[0], self.scale_range[1], 2)
        rot = self.rng.uniform(self.rot_range[0], self.rot_range[1])
        skew = self.rng.uniform(self.skew_range[0], self.skew_range[1], 2)

        # Calculate matrix.
        mat = get_affine_matrix_2d(center, trans, scale, rot, skew,
            to_radians=False, order=self.order, dtype=self.dtype)

        # Apply transform.
        inputs = apply_affine(inputs, mat)
        return inputs

In [None]:
aug_fn = RandomAffineTransform2D(
    center_joints=[11, 12],
    apply_ratio=1.0,
    trans_range=[-100.0, 100.0],
    scale_range=[0.5, 2.0],
    rot_range=[-30.0, 30.0],
    skew_range=[-30.0, 30.0])

In [None]:
augtracks = []
for _ in range(3):
    augtracks.append(aug_fn(trackdata.copy()))

In [None]:
# Save trakcs and videos.
for i in range(3):
    track = augtracks[i]
    track = np.expand_dims(track, axis=0)
    outtrack = Path(f"./newtrack_{i}.npy")
    outvideo = Path(f"./newtrack_{i}.mp4")
    np.save(outtrack, track)
    drawing(Path(""), outtrack, outvideo)

Window size: 1200 1729
Offsets: 0 361
Window size: 1200 1661
Offsets: 0 0
Window size: 1614 2132
Offsets: 414 563


In [None]:
# The MP4 file generated by OpenCV can not display correctly.
# So I convert it into WEBM file.
!ffmpeg -i newtrack_0.mp4 -vcodec vp9 -y newtrack_0.webm
!ffmpeg -i newtrack_1.mp4 -vcodec vp9 -y newtrack_1.webm
!ffmpeg -i newtrack_2.mp4 -vcodec vp9 -y newtrack_2.webm

ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enab

In [None]:
show_video("./newtrack_0.webm")

In [None]:
show_video("./newtrack_1.webm")

In [None]:
show_video("./newtrack_2.webm")

In [None]:
testtrack = trackdata.copy()
target_fn = partial(aug_fn, inputs=testtrack)
pmeasure(target_fn)

Overall summary: Max 91.7504ms, Min 22.5217ms, Mean +/- Std 42.0113ms +/- 17.8009ms
Top 10 summary: Max 24.1266ms, Min 22.5217ms, Mean +/- Std 23.4392ms +/- 571µs
