In [None]:
!pip3 install vidformer supervision

# [Vidformer](https://github.com/ixlab/vidformer): Video Data Transformation

Vidformer uses a `cv2`-compatibility layer allowing `import vidformer.cv2 as cv2` conversion:

In [None]:
import math
import vidformer as vf
import vidformer.cv2 as cv2
import supervision as sv
import vidformer.supervision as vf_sv

# Use the api.vidformer.org guest account
# The guest account has few permissions (can't access other videos) and low limits
# To get around this:
#     1) Ask for a regular account
#     2) Self-host
server = vf.Server("https://api.vidformer.org", api_key="VF_GUEST", vod_only=True)
cv2.set_server(server)

In [None]:
cap = cv2.VideoCapture("https://f.dominik.win/vf-sample-media/tos_720p.mp4")
assert cap.isOpened()
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
out = cv2.VideoWriter(None, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))

# Play the video in the notebook cell (outside jupyter add method="link")
# This will say "Waiting" until you fill in the content by running the next cell
cv2.vidplay(out)

In [None]:
radius = 100
center_x, center_y = 300, 300
speed = 2 * math.pi / 100
i = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break
    angle = i * speed
    text_x = int(center_x + radius * math.cos(angle))
    text_y = int(center_y + radius * math.sin(angle))
    cv2.putText(
        frame,
        "Hello, world!",
        (text_x, text_y),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (0, 255, 0),
        2,
        cv2.LINE_AA,
    )
    out.write(frame)
    i += 1
out.release()

## Vidformer for CV Annotation with supervision

In [None]:
# # Run Yolov8m on the video
# import cv2 as ocv_cv2
# import supervision as sv
# from ultralytics import YOLO

# model = YOLO("yolov8m.pt")

# ocv_cap = ocv_cv2.VideoCapture(
#     "https://f.dominik.win/vf-sample-media/tos_720p.mp4"
# )
# assert ocv_cap.isOpened()

# detections = []
# while True:
#   ret, frame = ocv_cap.read()
#   if not ret:
#     break
#   detections.append(sv.Detections.from_ultralytics(model(frame)[0]))


# Or just load pre-computed detections instead to save some time
import pickle
import urllib

with urllib.request.urlopen(
    "https://f.dominik.win/vf-sample-media/tos_720p-yolov8l-detections.pkl"
) as response:
    detections = pickle.load(response)

out = cv2.VideoWriter(None, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
cv2.vidplay(out)

In [None]:
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

box_anot = vf_sv.BoxAnnotator()
label_anot = vf_sv.LabelAnnotator()
i = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break

    det = detections[i]
    det = det[det.confidence > 0.5]

    labels = [
        f"{class_name} {confidence:.2f}"
        for class_name, confidence in zip(det["class_name"], det.confidence)
    ]
    frame = box_anot.annotate(frame.copy(), det)
    frame = label_anot.annotate(frame.copy(), det, labels)

    out.write(frame)
    i += 1
out.release()

## Going beyond simple stream annotation

Vidformer doesn't just hard-code the simple case of video stream annotation, it's a generalized video transformation system.
It uses a novel decoding system to efficiently access source frames:

- **Use frames in any order:** Sped up, reversed, repeated access, even randomly shuffled if you have the compute to support it. Vidformer finds efficient access plans so you don't have to.
- **Use frames from multiple sources:** Create transformed videos from one video or millions of source videos.
- **Combine multiple frames together:** Compose frames side-by-side, in a grid, or anything else you can think of.

In [None]:
out = cv2.VideoWriter(None, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
cv2.vidplay(out)

In [None]:
def frame_n(n):
    cap.set(cv2.CAP_PROP_POS_FRAMES, n)
    ret, frame = cap.read()
    assert ret
    return frame


half_size = (height // 2, width // 2)
for i in range(frame_count):
    frame = cv2.zeros((height, width, 3))

    f_sped_up = frame_n(i * 2 % frame_count)
    f_sped_up = cv2.resize(f_sped_up, (half_size[1], half_size[0]))
    y_offset, x_offset = (height - half_size[0]) // 2, 0
    frame[y_offset : y_offset + half_size[0], x_offset : x_offset + half_size[1]] = (
        f_sped_up
    )
    cv2.putText(
        frame,
        "Sped up 2x:",
        (x_offset + 10, y_offset - 10),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )

    f_reversed = frame_n(frame_count - i - 1)
    f_reversed = cv2.resize(f_reversed, (half_size[1], half_size[0]))
    y_offset, x_offset = (height - half_size[0]) // 2, width // 2
    frame[y_offset : y_offset + half_size[0], x_offset : x_offset + half_size[1]] = (
        f_reversed
    )
    cv2.putText(
        frame,
        "Reversed:",
        (x_offset + 10, y_offset - 10),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )

    out.write(frame)

out.release()

## Storing Data in Videos: Object Masks

Object masks can be large, often 10x larger than the underlying video when decompressed. Vidformer is good at accessing frames out of order so you can store data in video files with lossless codecs (like FFV1). For example, each object mask can be stored as a seperate frame and stitched together during viewing:

In [None]:
# import cv2
# from ultralytics import YOLO
# import pickle

# cap = cv2.VideoCapture(
#     "https://f.dominik.win/vf-sample-media/tos_720p.mp4"
# )
# model = YOLO("yolov8x-seg.pt")

# msw = vf_sv.MaskStreamWriter("tos_720p-yolov8x-seg-masks.mkv", (1280, 720))
# detections = []
# while True:
#     ret, frame = cap.read()
#     if not ret:
#         break
#     results = model(frame, verbose=False)[0]
#     det = sv.Detections.from_ultralytics(results)
#     msw.write_detections(det)
#     det.mask = None
#     detections.append(det)
# cap.release()
# msw.release()

# with open("tos_720p-yolov8x-seg-detections.pkl", "wb") as f:
#     pickle.dump(detections, f)

import urllib
import pickle

# Load detections with from yolov8x-seg
with urllib.request.urlopen(
    "https://f.dominik.win/vf-sample-media/tos_720p-yolov8x-seg-detections.pkl"
) as response:
    detections = pickle.load(response)

# Open the detection masks compressed into a video file
mask_cap = cv2.VideoCapture(
    "https://f.dominik.win/vf-sample-media/tos_720p-yolov8x-seg-masks.mkv"
)

out = cv2.VideoWriter(None, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
cv2.vidplay(out)

In [None]:
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

mask_anot = vf_sv.MaskAnnotator()
label_anot = vf_sv.LabelAnnotator(text_position=sv.Position.CENTER)

i = 0
mask_i = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break

    det = detections[i]
    vf_sv.populate_mask(det, mask_cap, mask_i)
    mask_i += len(det)

    labels = [
        f"{class_name} {confidence:.2f}"
        for class_name, confidence in zip(det["class_name"], det.confidence)
    ]
    frame = label_anot.annotate(frame.copy(), det, labels)
    frame = mask_anot.annotate(frame.copy(), det)
    det.mask = None

    out.write(frame)
    i += 1
out.release()

## Building a Search Engine

Here we search for the word "Houston" from many hours of Apollo 11 videos. It also has some niceties, like rendering subtitles. Subtitle files were created with whisper turbo.

In [None]:
import re
import math
import requests

BASE = "https://f.dominik.win/vf-sample-media/apollo-11-mission/Apollo 11 {xx}"
PARTS = [f"{i:02d}" for i in range(1, 24)]

QUERY = "Houston"  # Search term

PAD_BEFORE = 0.25
PAD_AFTER = 0.35


def srt_to_sec(ts):
    hh, mm, ss, ms = re.match(r"(\d+):(\d+):(\d+)[,\.](\d+)", ts.strip()).groups()
    return int(hh) * 3600 + int(mm) * 60 + int(ss) + int(ms) / 1000.0


def parse_srt(text):
    cues = []
    text = text.strip().replace("\r\n", "\n").replace("\r", "\n")
    for block in re.split(r"\n\s*\n", text):
        lines = [x.strip() for x in block.split("\n") if x.strip()]
        if not lines:
            continue
        if lines[0].isdigit():
            lines = lines[1:]
        if not lines:
            continue
        m = re.match(r"(.*?)\s*-->\s*(.*?)(?:\s+.*)?$", lines[0])
        if not m:
            continue
        s = srt_to_sec(m.group(1))
        e = srt_to_sec(m.group(2))
        t = "\n".join(lines[1:]).strip()
        if t:
            cues.append((s, e, t))
    return cues


def find_intervals(cues, q, pad_before, pad_after):
    q = q.lower()
    hits = []
    for s, e, t in cues:
        if q in t.lower():
            hits.append((max(0, s - pad_before), e + pad_after))
    hits.sort()
    merged = []
    for s, e in hits:
        if not merged or s > merged[-1][1]:
            merged.append([s, e])
        else:
            merged[-1][1] = max(merged[-1][1], e)
    return [(s, e) for s, e in merged]


def active_text(cues, t):
    # include overlapping cues, unique them, preserve order
    out = [txt for s, e, txt in cues if s <= t <= e]
    seen, uniq = set(), []
    for x in out:
        if x not in seen:
            seen.add(x)
            uniq.append(x)
    return "\n".join(uniq).strip()


def draw_subs(frame, text):
    if not text:
        return frame

    h, w = frame.shape[:2]

    # wrap width roughly proportional to resolution
    max_chars = int(w / 22)  # ~58 for 1280px wide
    max_chars = max(24, min(80, max_chars))

    # wrap each original line
    lines = []
    for ln in text.splitlines():
        ln = ln.strip()
        if not ln:
            continue
        while len(ln) > max_chars:
            cut = ln.rfind(" ", 0, max_chars)
            if cut == -1:
                cut = max_chars
            lines.append(ln[:cut].strip())
            ln = ln[cut:].strip()
        lines.append(ln)

    font = cv2.FONT_HERSHEY_SIMPLEX

    # scale relative to 720p baseline
    scale = 0.9 * (h / 720.0)
    scale = max(0.5, min(1.4, scale))

    thick = max(1, int(round(2 * (h / 720.0))))
    outline = thick + max(2, int(round(4 * (h / 720.0))))

    sizes = [cv2.getTextSize(ln, font, scale, thick)[0] for ln in lines]
    line_h = max(sz[1] for sz in sizes) + int(10 * (h / 720.0))

    bottom_margin = int(60 * (h / 720.0))
    y0 = h - bottom_margin - line_h * len(lines)

    for i, ln in enumerate(lines):
        tw, th = sizes[i]
        x = (w - tw) // 2
        y = y0 + i * line_h + th

        # outline then fill
        cv2.putText(frame, ln, (x, y), font, scale, (0, 0, 0), outline, cv2.LINE_AA)
        cv2.putText(frame, ln, (x, y), font, scale, (255, 255, 255), thick, cv2.LINE_AA)

    return frame


def draw_overlay(frame, label, t):
    h = frame.shape[0]

    scale = 0.6 * (h / 720.0)
    scale = max(0.4, min(1.0, scale))

    thick = max(1, int(round(2 * (h / 720.0))))
    outline = thick + 2

    hh = int(t // 3600)
    mm = int((t % 3600) // 60)
    ss = int(t % 60)
    ms = int((t - int(t)) * 1000)

    time_str = f"{hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}"
    text = f"{label}  |  {time_str}"

    x = 20
    y = int(40 * (h / 720.0))

    cv2.putText(
        frame,
        text,
        (x, y),
        cv2.FONT_HERSHEY_SIMPLEX,
        scale,
        (0, 0, 0),
        outline,
        cv2.LINE_AA,
    )
    cv2.putText(
        frame,
        text,
        (x, y),
        cv2.FONT_HERSHEY_SIMPLEX,
        scale,
        (255, 255, 255),
        thick,
        cv2.LINE_AA,
    )


first_url = BASE.format(xx=PARTS[0]) + ".mp4"
cap0 = cv2.VideoCapture(first_url)
assert cap0.isOpened()

fps = cap0.get(cv2.CAP_PROP_FPS)

w = int(cap0.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap0.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap0.release()

writer = cv2.VideoWriter(None, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
cv2.vidplay(writer)

In [None]:
for xx in PARTS:
    base = BASE.format(xx=xx)
    vid_url = base + ".mp4"
    sub_url = (base + ".srt").replace(" ", "%20")

    cues = parse_srt(requests.get(sub_url).text)
    intervals = find_intervals(cues, QUERY, PAD_BEFORE, PAD_AFTER)

    cap = cv2.VideoCapture(vid_url)
    if not cap.isOpened():
        continue

    for a, b in intervals:
        cap.set(cv2.CAP_PROP_POS_MSEC, int(a * 1000))

        while True:
            ok, frame = cap.read()
            if not ok:
                break

            t = cap.get(cv2.CAP_PROP_POS_FRAMES) / fps
            if t > b:
                break
            if frame.shape[1] != w or frame.shape[0] != h:
                frame = cv2.resize(frame, (w, h))
            frame = draw_subs(frame, active_text(cues, t))
            draw_overlay(frame, f"Apollo 11 {xx}.mp4", t)
            writer.write(frame)
    cap.release()

writer.release()
print("Done!")