# Verify the segments are correct

Create a fast binary classifier that determines if a frame has interpreter or not. Can't use the face_recognizer as it's not batched and is very slow.

In [1]:
import sys
import os

notebook_dir = os.getcwd()
project_root = os.path.abspath(os.path.join(notebook_dir, '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

%load_ext autoreload
%autoreload 2

# Interpreter box
X, Y = 571, 208
W, H = 180, 193
ROOT_DIR = 'scraped'

## Extract frames with and without interpreter

In [None]:
import json
import cv2
import numpy as np
import os

from utils.video import get_vid_metadata, get_frame


def pick_indices(count, start, end, n_inside=40, n_outside=40):
    # 1) Inside [start, end]
    inside = np.linspace(start, end, n_inside, dtype=int)

    # 2) Outside: [0, start) ∪ (end, count-1]
    left_len = max(0, start)  # indices 0 .. start-1  (length = start)
    right_len = max(0, count - 1 - end)  # indices end+1 .. count-1

    total_outside_len = left_len + right_len
    if total_outside_len == 0 or n_outside == 0:
        outside = np.array([], dtype=int)
        return inside, outside

    # Allocate samples proportionally to segment length
    if left_len == 0:
        n_left = 0
        n_right = n_outside
    elif right_len == 0:
        n_left = n_outside
        n_right = 0
    else:
        n_left = int(round(n_outside * left_len / total_outside_len))
        n_left = min(n_left, n_outside)  # safety
        n_right = n_outside - n_left

    # Sample each side with linspace (uniform within each segment)
    left = (
        np.linspace(0, start - 1, n_left, dtype=int)
        if n_left > 0 and left_len > 0
        else np.array([], dtype=int)
    )
    right = (
        np.linspace(end + 1, count - 1, n_right, dtype=int)
        if n_right > 0 and right_len > 0
        else np.array([], dtype=int)
    )

    # Combine & ensure sorted, unique (dedup if linspace collapsed values)
    outside = np.unique(np.concatenate([left, right])).astype(int)

    assert len(inside) == n_inside and len(outside) == n_outside

    return inside, outside


with_interpreter = []
without_interpreter = []


i = 0
for root, _, files in os.walk(ROOT_DIR):
    for file in files:
        if not file.endswith('.json'):
            continue
        json_path = os.path.join(root, file)
        vid_path = json_path.replace('.json', '.mp4')

        with open(json_path) as f:
            js = json.load(f)

        start = js['start']
        end = js['end']

        cap, _, frame_count = get_vid_metadata(vid_path)
        cap.release()

        with_interpreter_frame_indexes, without_interpreter_frame_indexes = (
            pick_indices(frame_count, start, end)
        )

        cap = cv2.VideoCapture(vid_path)
        if not cap.isOpened():
            raise IOError(f'Could not open video: {vid_path}')

        for w in with_interpreter_frame_indexes:
            with_interpreter.append(get_frame(w, cap, X, Y, H, W))

        for wo in without_interpreter_frame_indexes:
            without_interpreter.append(get_frame(wo, cap, X, Y, H, W))

        cap.release()
        i += 1


print(i)
print(len(with_interpreter))
print(len(without_interpreter))


with_interpreter = np.array(with_interpreter)
np.save('with_interpreter.npy', with_interpreter)
without_interpreter = np.array(without_interpreter)
np.save('without_interpreter.npy', without_interpreter)

561
22440
22440


## Train fast binary classifier

In [6]:
def train_net(
    with_interpreter,
    without_interpreter,
    epochs: int = 6,
    batch_size: int = 512,
    lr: float = 1e-3,
    save_path: str = 'binary_classifier.pt',
):  # ────────────────────────────────────────────────────────────────────
    # 1. Imports
    # ────────────────────────────────────────────────────────────────────
    import itertools
    import os
    import time

    import torch
    from PIL import Image
    from torch import nn
    from torch.utils.data import ConcatDataset, DataLoader, Dataset
    from torchvision import models
    from torchvision import transforms as T

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # ────────────────────────────────────────────────────────────────────
    # 2.  Dataset
    # ────────────────────────────────────────────────────────────────────
    weights_enum = models.MobileNet_V3_Small_Weights.DEFAULT
    try:
        mean = weights_enum.meta['mean']
        std = weights_enum.meta['std']
    except (KeyError, AttributeError):
        # Standard ImageNet normalisation
        mean = (0.485, 0.456, 0.406)
        std = (0.229, 0.224, 0.225)

    class FrameDataset(Dataset):
        def __init__(self, frames, label):
            self.tfm = T.Compose([T.ToTensor(), T.Normalize(mean, std)])
            self.label = label
            self.data = frames

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            img = self.data[idx]
            pil = Image.fromarray(img)
            return self.tfm(pil), self.label

    ds_int = FrameDataset(with_interpreter, 1)
    ds_no = FrameDataset(without_interpreter, 0)

    full_ds = ConcatDataset([ds_int, ds_no])
    loader = DataLoader(
        full_ds,
        batch_size=batch_size,
        shuffle=True,
        num_workers=min(4, os.cpu_count() or 1),
        pin_memory=torch.cuda.is_available(),
    )

    # ────────────────────────────────────────────────────────────────────
    # 3.  Model – MobileNet V3 Small (backbone frozen)
    # ────────────────────────────────────────────────────────────────────
    model = models.mobilenet_v3_small(weights=weights_enum)
    in_feats = model.classifier[3].in_features
    model.classifier[3] = nn.Linear(in_feats, 2)  # type: ignore

    for p in itertools.chain(model.features.parameters(), model.avgpool.parameters()):
        p.requires_grad = False
    model = model.to(device)

    # ────────────────────────────────────────────────────────────────────
    # 4.  Loss, optimiser, early stop
    # ────────────────────────────────────────────────────────────────────
    criterion = nn.CrossEntropyLoss()

    optimizer = torch.optim.AdamW(model.classifier.parameters(), lr=lr)

    best_acc, patience = 0.0, 3
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        for x, y in loader:
            x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
            logits = model(x)
            loss = criterion(logits, y)
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            optimizer.step()

        # quick accuracy on same loader (enough for early stopping)
        model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for x, y in loader:
                x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
                pred = model(x).argmax(1)
                correct += (pred == y).sum().item()
                total += y.numel()
        acc = correct / total
        print(f'Epoch {epoch + 1:02d}: accuracy={acc:.4f}')

        if acc > best_acc:
            best_acc, patience = acc, 3
        else:
            patience -= 1
        if patience == 0 or best_acc >= 0.995:
            break

    print(
        f'Training finished in {time.time() - start_time:.1f}s '
        f'with accuracy {best_acc:.4f}'
    )

    # ────────────────────────────────────────────────────────────────────
    # 5.  Save weights
    # ────────────────────────────────────────────────────────────────────
    torch.save(model.state_dict(), save_path)
    print(f'Model saved to {os.path.abspath(save_path)}')


In [7]:
train_net(with_interpreter, without_interpreter)

Epoch 01: accuracy=0.9824
Epoch 02: accuracy=0.9941
Epoch 03: accuracy=0.9985
Training finished in 82.9s with accuracy 0.9985
Model saved to /home/radumicea/Projects/University/LinguSign/protv/binary_classifier.pt


## Test the cropped segments to only have interpreter frames

In [2]:
import torch
from PIL import Image
from torchvision import models
from torchvision import transforms as T
from torch import nn
import numpy as np
import os
import cv2


def load_interpreter_model(model_path: str):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    weights_enum = models.MobileNet_V3_Small_Weights.DEFAULT
    try:
        mean = weights_enum.meta['mean']
        std = weights_enum.meta['std']
    except (KeyError, AttributeError):
        # Standard ImageNet normalisation
        mean = (0.485, 0.456, 0.406)
        std = (0.229, 0.224, 0.225)

    model = models.mobilenet_v3_small(weights=weights_enum)
    in_feats = model.classifier[3].in_features
    model.classifier[3] = nn.Linear(in_feats, 2)  # type: ignore
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval().to(device)

    transform = T.Compose([T.ToTensor(), T.Normalize(mean, std)])
    return model, transform, device


fps = 25
model, tfm, dev = load_interpreter_model('binary_classifier.pt')


@torch.inference_mode()
def has_interpreter_batch(
    frames: list[np.ndarray] | np.ndarray,
    model: torch.nn.Module,
    transform: T.Compose,
    device: torch.device | str = 'cpu',
    threshold: float = 0.5,
    return_probs: bool = False,
):
    if isinstance(frames, np.ndarray) and frames.ndim == 4:
        frames_iter = frames
    else:
        frames_iter = list(frames)

    tensors = []
    for frame in frames_iter:
        pil = Image.fromarray(frame)
        tensors.append(transform(pil))
    batch = torch.stack(tensors).to(device)

    logits = model(batch)
    probs = torch.softmax(logits, dim=1)[:, 1]  # class-1 = interpreter
    preds = (probs >= threshold).cpu()

    if return_probs:
        return preds, probs.cpu()
    return preds


def video_has_interpreter_always(video_path, batch_size=1024):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Can't open {video_path}")

    wrongs = []

    done = False
    while not done:
        frames_batch = []

        for _ in range(batch_size):
            ok, frame = cap.read()
            if not ok:
                done = True
                break

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames_batch.append(frame)

        if not frames_batch:
            break

        results = has_interpreter_batch(frames_batch, model, tfm, dev)
        wrongs.append(torch.nonzero(~results, as_tuple=True)[0])  # type: ignore

    x = torch.cat(wrongs)
    print(x.shape)

    if x.numel() < fps:
        cap.release()
        return

    # Differences between adjacent elements
    diff = x[1:] - x[:-1]

    # True where the step is exactly +1 => consecutive integers
    is_step1 = diff == 1

    if is_step1.numel() == 0:
        cap.release()
        return

    # Find runs of True in is_step1
    # Pad with False at both ends so transitions show up at boundaries
    pad = torch.tensor([False], device=is_step1.device)
    p = torch.cat([pad, is_step1, pad])  # length = len(diff) + 2

    # Locations where value changes (False->True or True->False)
    changes = torch.nonzero(p[1:] != p[:-1]).flatten()
    # True-runs start at even indices, end at odd indices in `changes`
    starts = changes[0::2]  # indices into is_step1 where a run starts
    ends = changes[1::2]  # first index AFTER the run in is_step1

    # Lengths in terms of number of diffs (True values)
    run_len_diffs = ends - starts

    # Each run of k diffs corresponds to k+1 elements in x
    run_len_elems = run_len_diffs + 1

    # 4. Check for any run with at least `min_len` elements
    ok = run_len_elems >= fps
    if not ok.any():
        cap.release()
        return

    # Get the first qualifying block
    first_idx = torch.nonzero(ok, as_tuple=False)[0, 0]
    start_in_diffs = starts[first_idx].item()
    length_elems = run_len_elems[first_idx].item()

    # In the original x, the block is [start_idx, end_idx_exclusive)
    start_idx = start_in_diffs
    end_idx_exclusive = start_in_diffs + length_elems

    print(f'{video_path} at [{start_idx}, {end_idx_exclusive}]')

    cap.release()


for root, _, files in os.walk(ROOT_DIR):
    for file in files:
        if not file.endswith('.seg.mp4'):
            continue
        path = os.path.join(root, file)
        video_has_interpreter_always(path)
        print('DONE')

torch.Size([52])
DONE
torch.Size([41])
DONE
torch.Size([62])
DONE
torch.Size([9])
DONE
torch.Size([66])
DONE
torch.Size([29])
DONE
torch.Size([23])
DONE
torch.Size([107])
DONE
torch.Size([29])
DONE
torch.Size([48])
DONE
torch.Size([52])
DONE
torch.Size([0])
DONE
torch.Size([6])
DONE
torch.Size([77])
DONE
torch.Size([5])
DONE
torch.Size([8])
DONE
torch.Size([71])
DONE
torch.Size([125])
DONE
torch.Size([234])
scraped/2025/05/04/62540479-2.seg.mp4 at [74, 225]
DONE
torch.Size([56])
DONE
torch.Size([2])
DONE
torch.Size([5])
DONE
torch.Size([195])
scraped/2025/05/17/62544360-2.seg.mp4 at [0, 175]
DONE
torch.Size([44])
DONE
torch.Size([129])
DONE
torch.Size([61])
DONE
torch.Size([2])
DONE
torch.Size([48])
DONE
torch.Size([9])
DONE
torch.Size([0])
DONE
torch.Size([7])
DONE
torch.Size([162])
scraped/2025/10/15/62583074-2.seg.mp4 at [0, 150]
DONE
torch.Size([108])
DONE
torch.Size([8])
DONE
torch.Size([31])
DONE
torch.Size([19])
DONE
torch.Size([57])
DONE
torch.Size([65])
DONE
torch.Size([62])
D