In [None]:
import os
import json
from pathlib import Path
import cv2
import numpy as np
from typing import Tuple

from tqdm import tqdm
import bokeh.io
from bokeh.resources import INLINE
from bokeh.io import curdoc
from bokeh.palettes import tol
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource
from bokeh.plotting import figure, show
from bokeh.transform import dodge

bokeh.io.reset_output()
bokeh.io.output_notebook(INLINE)

WIDTH = 1000
HEIGHT = 600
curdoc().theme = "dark_minimal"

cwd = os.getcwd()
root_path = Path(cwd) / ".." / "recordings"
print("Working in directory: ", root_path)

In [None]:
def load_json(filename):
    if os.path.isfile(str(filename)):
        data = json.load(open(filename, "r"))
        return data

In [None]:
def plot_times(title, data, color="yellow", size=2):
    p = figure(title=title, width=WIDTH, height=HEIGHT)
    p.grid.minor_grid_line_color = "#131313"
    x = list(range(len(data)))
    p.scatter(x, data, color=color, size=size)
    show(p)

def multi_plot_times(title, datas, colors, sizes):
    p = figure(title=title, width=WIDTH, height=HEIGHT)
    p.grid.minor_grid_line_color = "#131313"
    max = min([len(d) for d in datas])
    x = list(range(max))
    for i, d in enumerate(datas):
        p.scatter(x, d[:max], color=colors[i], size=sizes[i])
    show(p)

def plot_grouped_vbars(title, groups, data):
    # Expects data to contain multiple columns where each len(column)==len(groups)
    columns = list(data.keys())
    columns.sort()
    N = len(columns)
    colors = tol["Sunset"][max(3, N)][0:N]
    groups = [str(x) for x in groups]
    data["series"] = groups
    source = ColumnDataSource(data=data)
    offset = 0.0

    p = figure(x_range=groups, title=title, width=WIDTH, height=HEIGHT)
    p.grid.minor_grid_line_color = "#131313"

    for i, name in enumerate(columns):
        color = colors[i]
        column = data[name]
        p.vbar(x=dodge("series", offset, range=p.x_range), top=name,
            width=0.2, color=color, legend_label=name, source=source)
        offset += 0.25

    p.x_range.range_padding = 0.1
    p.xgrid.grid_line_color = None
    p.legend.location = "top_left"
    p.legend.orientation = "horizontal"

    show(p)

In [None]:
def get_instant_fps(ticks):
    fps = []
    p = None
    duplicates = 0
    for t in ticks:
        if p is None:
            p = t
        elif t == p:
            duplicates += 1
        else:
            fps.append(1 / float(t - p))
        p = t
    return fps, duplicates

def load_stats(filename):
    times = []
    with open(filename, "r") as f:
        # out=  0 st=  0 frame=     0 q= -0.0 f_size=  50969 s_size=        0KiB time= 0.010 br= 24465.1kbits/s avg_br=     0.0kbits/s type= ?
        for line in f.readlines():
            if "time=" in line:
                p = line.split("time=")
                timestamp = float(p[1].strip().split(" ")[0])
                times.append(timestamp)
    return times

def print_tick_stats(ticks, duplicates):
    print(f"Found {duplicates} duplicate ticks")
    less_than_20 = [x for x in ticks if x < 21 and x > 11]
    less_than_30 = [x for x in ticks if x < 31 and x > 21]
    less_than_15 = [x for x in ticks if x <= 16]
    print(f"Found {len(less_than_15)} less than 15fps")
    print(f"Found {len(ticks)} ticks, with {len(less_than_30)} < 30fps and {len(less_than_20)} < 20fps ")
    total = len(ticks) + len(less_than_30) + 2 * len(less_than_20) + 4 * len(less_than_15)
    print(f"So we should have {total} video frames")


def plot_ticks(profile):
    ticks, duplicates = get_instant_fps(profile["video_ticks"])
    print_tick_stats(ticks, duplicates)
    plot_times("60fps samples", ticks, "yellow")

    if "frame_times" in profile:
        frames, duplicates = get_instant_fps(profile["frame_times"])
        print_tick_stats(frames, duplicates)
        plot_times("60fps frames", frames, "red")
    #multi_plot_times("60fps 1080p", [ticks, frames], ["yellow", "red"], [1,1])


def plot_video_metadata(filename):
    if not os.path.isfile(filename):
        print("File not found: ", filename)
    else:
        profile = load_json(filename)
        plot_ticks(profile)


In [None]:
# frame= 1349 fps=117 q=-0.0 Lsize=N/A time=00:00:22.48 bitrate=N/A speed=1.94x
print(f"ffmpeg said 1293 frames = {21.55 * 60}")
filename = str(root_path / "Calibration_0001_video_meta.json")
plot_video_metadata(filename)

In [None]:
name_header_map = {
    "360": "360",
    "480": "480",
    "720": "720",
    "1k": "1080",
    "2k": "1440",
    "4k": "2160",
    "8k": "4320"
}


def get_file_size(file_path):
    path = Path(file_path)
    return path.stat().st_size

class VideoInfo:
    def __init__(self, path):
        self.file = path
        self.file_size = get_file_size(path.replace("_video_meta.json", "_video.mp4"))
        self.ticks, _ = get_instant_fps(path)
        filename = os.path.basename(path)
        parts = filename.split('_')
        self.frame_rate = name_header_map[parts[1]]
        self.fps = parts[2]
        self.header = parts[0]
        self.dropped = len([x for x in self.ticks if x <= 30])
        self.length = len(self.ticks)

def load_files(folder):
    info = []
    for file in os.listdir(folder):
        if file.endswith("_video_meta.json") and "Crash" not in file and ("30fps" in file or "60fps" in file):
            filename = os.path.join(folder, file)
            info.append(VideoInfo(filename))
    return info

def tabulate_prop(info, getter):
    keys = list(set([int(i.frame_rate) for i in info]))
    fps_keys = list(set([i.fps for i in info]))
    print(",".join([" "] + fps_keys))
    keys.sort()
    print(f"keys={keys}")
    for k in keys:
        cols = [x for x in info if x.frame_rate == str(k)]
        header = cols[0].header
        data = []
        for fps in fps_keys:
            item = [x for x in cols if x.fps == fps][0]
            data.append(getter(item))
        tail = ",".join([str(x) for x in data])
        print(f"{header}_{k},{tail}")

    fps_keys.sort()
    data = {}
    for fps in fps_keys:
        sorted_row = []
        for k in keys:
            item = [x for x in info if x.fps == str(fps) and x.frame_rate == str(k)][0]
            sorted_row.append(item)
        data[fps] = [getter(x) for x in sorted_row]
    return keys, data

folder = "d:\\temp\\benchmark"
if os.path.isdir(folder):
    info = load_files(folder)

    print("=========> dropped")
    groups, data = tabulate_prop(info, lambda x: x.dropped)
    plot_grouped_vbars("dropped", groups, data)

    print("=========> frame count")
    groups, data = tabulate_prop(info, lambda x: x.length)
    plot_grouped_vbars("frame count", groups, data)

    print("=========> video file size ")
    groups, data = tabulate_prop(info, lambda x: x.file_size / 1000000)
    plot_grouped_vbars("video file size", groups, data)


In [None]:
filename = str(root_path / "frames" / "stats.txt")
if os.path.isfile(filename):
    frames = load_stats()
    fps = get_instant_fps(frames)
    plot_times("60fps frames", fps, "red")

In [None]:
def load_frames(frame_dir):
    frames = []
    for file in tqdm(os.listdir(frame_dir)):
        frames += [cv2.imread(os.path.join(frame_dir, file))]
    return frames

def read_video(filepath: str, width: int = -1, height: int = -1, color_format: str = "rgb") -> Tuple[np.ndarray, int]:
    """
    Read video in mp4 file.
    :param path: The path to the file.
    :param width: Desired output width of the video, unchanged if `-1` is specified.
    :param height: Desired output height of the video, unchanged if `-1` is specified.
    :param color_format: The color format to use ("bgr" or "rgb").
    :return: The tuple containing the loaded mp4 data as numpy array and the video fps.
    """
    # Note: if we move this to the top of the file then pytest crashes on Linux.
    # So keep it here for now, thanks [chris]
    from decord import VideoReader

    video_reader = VideoReader(filepath, width=width, height=height)
    frames = video_reader[:].asnumpy()
    if color_format == "bgr":
        frames = frames[:, :, :, ::-1]
    return frames, int(video_reader.get_avg_fps())

In [None]:
def calibrate_input_to_video(frames, events, video_ticks):
    menuOpen = False
    menuColor = 33+33+33
    opened = []
    closed = []
    a_pressed = []
    b_released = []
    first_open_time = 0
    for i, f in enumerate(frames):
        if i < len(video_ticks):
            t = video_ticks[i]
            pixel = f[74,183]
            if np.sum(pixel) < menuColor:
                if not menuOpen:
                    menuOpen = True
                    # t = i * fps / 1000.0
                    if first_open_time == 0:
                        first_open_time = t
                    # t -= first_open_time
                    opened.append(t)
                    # print(f"Menu opened at {t}")
            else:
                if menuOpen:
                    menuOpen = False
                    # t = i * fps / 1000.0
                    closed.append(t)
                    # print(f"Menu closed at {t}")

    state = {}
    first_a_press = 0
    for e in events:
        t = e['ticks']
        if "a" not in state:
            state["a"] = e["a"]
        if "b" not in state:
            state["b"] = e["b"]
        if state["a"] != e["a"]:
            state["a"] = e["a"]
            if e["a"]:
                if first_a_press == 0:
                    first_a_press = t
                # t -= first_a_press
                a_pressed.append(t)
                # print(f"Button a pressed at {t}")
        if state["b"] != e["b"]:
            state["b"] = e["b"]
            if not e["b"]: # triggers on release
                # t -= first_a_press
                b_released.append(t)
                # print(f"Button b released at {t}")
    return opened, closed, a_pressed, b_released

In [None]:
inputs = str(root_path / "Calibration4_inputs.json")
video = str(root_path / "Calibration4_video.mp4")
meta_data = load_json(str(root_path / "Calibration3_video_ticks.json"))

events = load_json(inputs)
frames, fps = read_video(video)
print(f"Loaded {len(frames)} frames at {fps} fps")
opened, closed, a_pressed, b_released = calibrate_input_to_video(frames, events, meta_data["video_ticks"])
multi_plot_times("Menu open/A_pressed", [opened,  a_pressed], ["yellow", "red"], [3,3])
multi_plot_times("Menu close/B released", [closed, b_released], ["yellow", "red"], [3,3])


In [None]:
# trim off last item
opened = opened[:-1]
closed = closed[:-1]
a_pressed = a_pressed[:-1]
b_released = b_released[:-1]

open_latency = np.array(opened) - np.array(a_pressed)
close_latency = np.array(closed) - np.array(b_released)

plot_grouped_vbars("latencies", list(range(len(open_latency))), {"1.open": open_latency, "2.closed": close_latency})


In [None]:
# now try video created by SmartReplyApp
inputs = str(root_path / "Calibration_0002_inputs.json")
video = str(root_path / "Calibration_0002_video.mp4")
meta_data = load_json(str(root_path / "Calibration_0002_video_meta.json"))

events = load_json(inputs)
frames, fps = read_video(video)
print(f"Loaded {len(frames)} frames at {fps} fps")
opened, closed, a_pressed, b_released = calibrate_input_to_video(frames, events, meta_data["video_ticks"])
multi_plot_times("Menu open/A_pressed", [opened,  a_pressed], ["yellow", "red"], [3,3])
multi_plot_times("Menu close/B released", [closed, b_released], ["yellow", "red"], [3,3])

In [None]:
open_latency = np.array(opened) - np.array(a_pressed)
close_latency = np.array(closed) - np.array(b_released)

plot_grouped_vbars("latencies", list(range(len(open_latency))), {"1.open": open_latency, "2.closed": close_latency})