# Автоматическое определение границ заставок и титров

**Цель** — для входного видео эпизода определить две точные временные метки:
1. **`start_main`** — первый кадр *после* вступительного логотипа/повтора.
2. **`end_main`** — последний кадр *перед* началом финальных титров.

Шаги реализации:
1. Извлечение CLIP-эмбеддингов из кадров (опционально, с кэшированием)
2. Построение и аугментация датасета
3. Обучение трансформера (опционально, с кэшированием)
4. Оценка + метрики + визуальная диагностика


## 1. Настройка окружения и глобальные параметры

In [2]:
import os, pathlib, shutil, json, math, glob
import numpy as np, pandas as pd, torch, torch.nn as nn
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from sklearn.metrics import (precision_recall_fscore_support,
                             confusion_matrix, ConfusionMatrixDisplay)
import open_clip
from torchvision import transforms
from PIL import Image
from tqdm.auto import tqdm

plt.rcParams['figure.dpi'] = 110

# Пути
ROOT = pathlib.Path('.').resolve()
DATA_DIR = ROOT / 'data'
FRAMES_DIR = DATA_DIR / 'frames'  # Кадры с частотой 1 кадр/сек
CLIP_DIR = DATA_DIR / 'clip_windows'  # Кэшированные окна (.npz)
LABELS_CSV = DATA_DIR / 'labels.csv'  # Разметка временных меток
WEIGHTS_DIR = ROOT / 'model_weights'
WEIGHTS_DIR.mkdir(exist_ok=True, parents=True)
BEST_WEIGHTS = WEIGHTS_DIR / 'best_clip_attention60.pth'

# Воспроизводимость
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

## 2. Вспомогательные функции

In [3]:
def hhmmss_to_sec(ts: str) -> int:
    # Конвертирует 'ЧЧ:ММ:СС' в секунды (целое число)
    h, m, s = ts.split(':')
    return int(h) * 3600 + int(m) * 60 + int(float(s))


def sec_to_hhmmss(sec: int) -> str:
    # Обратное преобразование к `hhmmss_to_sec`. Округляет до целых секунд
    h, m = divmod(sec, 3600)
    m, s = divmod(m, 60)
    return f"{h:02}:{m:02}:{s:02}"


def exists(path: pathlib.Path) -> bool:
    # Проверяет существование файла/директории с содержимым
    return path.exists() and any(path.iterdir()) if path.is_dir() else path.exists()

## 3. Этап 1: Извлечение CLIP-эмбеддингов 
*(пропускается, если файлы .npz уже существуют)*

In [4]:
if not exists(CLIP_DIR):
    CLIP_DIR.mkdir(parents=True, exist_ok=True)


def extract_clip_embeddings(frames_root: pathlib.Path = FRAMES_DIR,
                            out_root: pathlib.Path = CLIP_DIR,
                            batch: int = 128,
                            window: int = 60):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model, _, _ = open_clip.create_model_and_transforms(
        'ViT-B-32', pretrained='openai', device=device)
    model.eval()

    preprocess = transforms.Compose([
        transforms.Resize((224, 224), transforms.InterpolationMode.BICUBIC),
        transforms.ToTensor(),
        transforms.Normalize((0.48145466, 0.4578275, 0.40821073),
                             (0.26862954, 0.26130258, 0.27577711)),
    ])

    for ep_dir in sorted(frames_root.rglob('*')):
        if not ep_dir.is_dir():
            continue
        rel = ep_dir.relative_to(frames_root)
        out_f = out_root / (str(rel).replace(os.sep, '_') + '_windows.npz')
        if out_f.exists():
            print(f'{rel} – cached')
            continue

        f_paths = sorted(ep_dir.glob('*.jpg'))
        if len(f_paths) < window:
            print(f'{rel}: <{window} frames, skipping')
            continue

        embeddings = np.empty((len(f_paths), 512), np.float32)
        buf, idxbuf = [], []
        for idx, fp in enumerate(tqdm(f_paths, desc=str(rel), unit='f')):
            buf.append(preprocess(Image.open(fp)).unsqueeze(0))
            idxbuf.append(idx)
            if len(buf) == batch or idx == len(f_paths) - 1:
                with torch.no_grad():
                    feats = model.encode_image(torch.cat(buf).to(device))
                embeddings[idxbuf] = feats.cpu().float().numpy()
                buf, idxbuf = [], []

        starts = range(0, len(embeddings) - window + 1, window)
        windows = np.stack([embeddings[s:s + window] for s in starts])
        np.savez_compressed(out_f, windows=windows, start_indices=np.array(list(starts)))
        print(f'Saved {windows.shape[0]} windows ➜ {out_f.name}')


if not any(CLIP_DIR.glob('*_windows.npz')):
    print('Extracting CLIP embeddings …')
    extract_clip_embeddings()
else:
    print('Embeddings already present, skipping extraction.')


Embeddings already present, skipping extraction.


## 4. Создание датасета

In [5]:
class VideoWindowDataset(torch.utils.data.Dataset):
    """Датасет для окон видео. Каждый элемент:
    - Окно 60×512 (CLIP-эмбеддинги)
    - Бинарная маска 60×1 (1=заставка/титры, 0=основное содержание)"""

    def __init__(self,
                 root_npz=CLIP_DIR,
                 labels_csv=LABELS_CSV,
                 split='train',
                 test_shows=('show2',),  # Сериалы для тестирования
                 augment=False):
        self.root = pathlib.Path(root_npz)
        self.labels_df = pd.read_csv(labels_csv)

        # Словарь эпизод -> (start_main, end_main) в секундах
        self.meta = {pathlib.Path(row.file).stem: (hhmmss_to_sec(row.start_main),
                                                   hhmmss_to_sec(row.end_main))
                     for _, row in self.labels_df.iterrows()}

        self.items = []
        for p in self.root.glob('*.npz'):
            show = p.name.split('_')[0]
            # Разделение на train/val по сериалам (§4.1.6)
            if (split == 'train' and show in test_shows) or \
                    (split == 'val' and show not in test_shows):
                continue

            with np.load(p) as npz:
                win, idxs = npz['windows'], npz['start_indices']
            vid_key = '_'.join(p.stem.split('_')[1:-1])
            s, e = self.meta[vid_key]

            # Генерация масок для каждого окна
            for w, idx in zip(win, idxs):
                t = np.arange(idx, idx + 60)
                y = ((t < s) | (t >= e)).astype(np.float32)
                self.items.append((w.astype(np.float32), y))

        self.augment = augment  # Флаг аугментации

    def __len__(self):
        return len(self.items)

    def __getitem__(self, i):
        x, y = self.items[i]
        if self.augment:
            x, y = self._augment(x, y)
        return torch.from_numpy(x), torch.from_numpy(y)

    # Аугментация
    @staticmethod
    def _augment(x, y):
        # Случайный сдвиг до ±5 секунд
        shift = np.random.randint(-5, 6)
        if shift:
            x = np.roll(x, shift, 0)
            y = np.roll(y, shift)
        # Замена случайных кадров (10-30%)
        mask = np.random.rand(60) < np.random.uniform(0.1, 0.3)
        x[mask] = x[np.random.permutation(60)[mask]]
        return x, y

## 5. Модель — *ClipAttention60*  *(§3.3–3.5 статьи)*

In [6]:
class ClipAttention60(nn.Module):
    """Трансформер с многоголовым вниманием для окон 60×512.
    Возвращает 60 сигмоидных вероятностей p(t)=P[заставка/титры в секунду t]."""

    def __init__(self, d_model=512, nhead=16, nlayers=16, dropout=0.1):
        super().__init__()
        self.pos = nn.Parameter(torch.randn(60, d_model))  # Обучаемая позиционная кодировка

        # Трансформер-энкодер с 16 слоями и 16 головами (§3.4)
        enc_layer = nn.TransformerEncoderLayer(d_model, nhead,
                                               4 * d_model, dropout, batch_first=True)
        self.encoder = nn.TransformerEncoder(enc_layer, nlayers)

        # 60 независимых классификаторов (§3.5)
        self.classifiers = nn.ModuleList([nn.Linear(d_model, 1) for _ in range(60)])
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):  # (B,60,512)
        h = self.encoder(x + self.pos)  # (B,60,512)
        logits = torch.cat([cls(h[:, t]) for t, cls in enumerate(self.classifiers)], 1)
        return self.sigmoid(logits)

## 6. Этап 2: Обучение модели 
*(пропускается, если веса уже обучены)*

In [None]:
if not BEST_WEIGHTS.exists():
    print('Обучение трансформера...')


    def train_model(epochs=16, batch=8):
        # Процедура обучения 
        train_ds = VideoWindowDataset(split='train', augment=True)
        val_ds = VideoWindowDataset(split='val', augment=False)
        train_loader = DataLoader(train_ds, batch, shuffle=True, num_workers=4)
        val_loader = DataLoader(val_ds, batch, shuffle=False, num_workers=4)

        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        model = ClipAttention60().to(device)
        optim = torch.optim.Adam(model.parameters(), lr=5e-5)  # Оптимизатор Adam
        criterion = nn.BCELoss()  # Бинарная кросс-энтропия

        best = math.inf  # Лучшее значение loss
        for epoch in range(1, epochs + 1):
            for phase, loader, train in [('train', train_loader, True),
                                         ('val', val_loader, False)]:
                running = 0
                n = 0
                model.train(train)
                for x, y in loader:
                    x, y = x.to(device), y.to(device)
                    with torch.set_grad_enabled(train):
                        p = model(x)
                        loss = criterion(p, y)
                        if train:
                            loss.backward()
                            optim.step()
                            optim.zero_grad()
                    running += loss.item() * x.size(0)
                    n += x.size(0)
                print(f"Эпоха {epoch:02} {phase:<5} loss={running / n:.4f}")

            # Сохранение лучшей модели
            if running / n < best:
                best = running / n
                torch.save(model.state_dict(), BEST_WEIGHTS)
                print('Сохранена лучшая модель')


    train_model()
else:
    print('Обученные веса обнаружены, обучение пропущено.')

Обучение трансформера...


## 7. Этап 3: Оценка модели и визуальная диагностика

In [None]:
import numpy as np, torch, pathlib, pandas as pd, matplotlib.pyplot as plt
from scipy.ndimage import median_filter, uniform_filter1d


# служебные функции 
def hhmmss_to_sec(ts: str) -> float:
    # Перевод времени 'HH:MM:SS(.sss)' в секунды (float)
    h, m, s = ts.split(":")
    return int(h) * 3600 + int(m) * 60 + float(s)


def mad(x: np.ndarray) -> float:
    # Median Absolute Deviation— устойчивая оценка разброса
    med = np.median(x)
    return np.median(np.abs(x - med))


def key_npz(p: pathlib.Path) -> str:
    # Переименование файла `show_ep_windows.npz` → уникальный идентификатор
    return "_".join(p.stem.split("_")[1:-1])


def key_csv(path: str) -> str:  # labels path → id
    return pathlib.Path(path).stem


def find_first_block(mask: np.ndarray) -> int:
    """Возвращает индекс начала первого длинного блока из ≥5 едиц
    (intro) за которым следует ≥10 нуля (основное содержание)."""
    run1 = run0 = 0
    for i, m in enumerate(mask):
        if m:
            run1 += 1
            run0 = 0
        else:
            run0 += 1
            if run1 >= 5 and run0 >= 10:
                return i - run0 + 1
    return 0  # если не найдено


def find_last_block(mask: np.ndarray) -> int:
    # Аналогично `find_first_block`, но ищем с конца
    run1 = run0 = 0
    pos = len(mask) - 1
    for i in range(len(mask) - 1, -1, -1):
        if mask[i]:
            run1 += 1
            run0 = 0
        else:
            run0 += 1
            if run1 >= 5 and run0 >= 10:
                pos = i + run0 - 1
                break
    return pos


# 1. загрузка обученной сети
BEST_WEIGHTS = pathlib.Path("model_weights/best_clip_attention60.pth")
model = ClipAttention60()
model.load_state_dict(torch.load(BEST_WEIGHTS, map_location="cpu"))
model.eval()

# 2. загрузка разметки
labels = pd.read_csv("data/labels.csv")
labels["key"] = labels["file"].apply(key_csv)
labels = labels.set_index("key")

# 3. обход всех эпизодов
root = pathlib.Path("data/clip_windows")
results = []  # будем складывать (key, s_pred, s_true, e_pred, e_true, p, thr_s, thr_e)

for f in sorted(root.glob("*_windows.npz")):
    key = key_npz(f)
    if key not in labels.index:
        print(f"{key} пропущен — нет разметки")
        continue

    data = np.load(f)
    windows = data["windows"]  # (N, 60, 512)
    starts = data["start_indices"]  # (N,)

    # если до конца сериала осталось <30 с, добавляем «хвост» из нулей
    total_secs = starts[-1] + windows.shape[1]
    labelled_end = hhmmss_to_sec(labels.loc[key, "end_main"])
    if labelled_end + 30 > total_secs:
        windows = np.concatenate([windows, np.zeros((1, 60, 512), np.float32)], axis=0)

    # прогноз вероятности p(t) для каждого кадра 
    probs = []
    with torch.no_grad():
        for w in torch.from_numpy(windows):
            probs.append(model(w.unsqueeze(0)).squeeze(0).numpy())
    p = np.concatenate(probs)  # (T,)

    # сглаживаем: mean(3 с) → median(7 с)
    p = uniform_filter1d(p, size=3)
    p = median_filter(p, size=7)

    # адаптивные пороги 
    base = np.median(p)
    spread = mad(p)
    thr_s = base + 2 * spread  # строгий порог для заставки
    thr_e = np.percentile(p, 70)  # мягче для титров

    mask_s = p > thr_s
    mask_e = p > thr_e

    # предсказанные границы 
    s_pred = find_first_block(mask_s)
    tail = int(len(mask_e) * 0.50)  # анализируем только вторую половину
    e_pred = tail + find_last_block(mask_e[tail:])

    # истинные границы
    s_true = hhmmss_to_sec(labels.loc[key, "start_main"])
    e_true = hhmmss_to_sec(labels.loc[key, "end_main"])

    results.append((key, s_pred, s_true, e_pred, e_true, p, thr_s, thr_e))

# 4. метрики MAE 
mae_start = np.mean([abs(sp - st) for _, sp, st, _, _, _, _, _ in results])
mae_end = np.mean([abs(ep - et) for _, _, _, ep, et, _, _, _ in results])
print(f"\nСредняя абсолютная ошибка:  начало = {mae_start:.2f}с,  конец = {mae_end:.2f}с\n")

# 5. вывод + сохранение графиков
pathlib.Path("plots_2").mkdir(exist_ok=True)
for key, sp, st, ep, et, p, ts, te in results:
    print(f"{key:38}   start {sp:4}/{st:4}с   |   end {ep:4}/{et:4}с")
    # отрисуем график
    plt.figure(figsize=(10, 2))
    t = np.arange(len(p))
    plt.plot(t, p, lw=1, label='p(t)')
    plt.axhline(ts, color='C2', ls='--', label='порог заставки')
    plt.axhline(te, color='C3', ls='--', label='порог титров')
    plt.axvline(sp, color='C2')
    plt.axvline(st, color='C2', ls=':')
    plt.axvline(ep, color='C3')
    plt.axvline(et, color='C3', ls=':')
    plt.title(key)
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"plots_2/{key}.png", dpi=120)
    plt.close()


In [ ]:
import re, pathlib, numpy as np

# Проверяем, что вероятность рассчитана раз в секунду
assert len(p) == starts[-1] + windows.shape[1], \
    "Длина p не совпадает с числом секунд – проверьте fps!"

SAVE_DIR = pathlib.Path('data/video_probs')
SAVE_DIR.mkdir(parents=True, exist_ok=True)

episode_id = re.sub(r'\.npz$', '', f.name).replace('_windows', '')  # show1_s01e02
np.save(SAVE_DIR / f'p_video_{episode_id}.npy', p.astype('float32'))
print(f'✔ Видео-вероятности сохранены: {SAVE_DIR / f"p_video_{episode_id}.npy"}')

## 8. Заключение и дальнейшие шаги

**Результаты:**
- Конвейер CLIP+Attention60 достигает F1 >90% с точностью до секунды (§4.2)
- Модель эффективна для разных типов контента (фильмы, сериалы, короткие видео)
- Оптимизация под ONNX/TensorRT позволяет обработку в реальном времени (§6)

**Ограничения (§8.1):**
- Трудности с наложенными титрами
- Ошибки в сложных художественных переходах
- Проблемы с короткими заставками (<5 сек)

**Направления развития (§8.2):**
1. Мультимодальное обучение (аудио, субтитры)
2. Использование управляющих токенов для фокусировки на ключевых областях
3. Расширение датасета (пользовательский контент, мультиязычность)
4. Улучшение интерпретируемости предсказаний
5. Применение для других задач: рекламные паузы, границы сцен

**Практическое применение:**
- Автоматическое пропуск заставок в стриминговых платформах
- Индексация видеоархивов
- Генерация видеосаммари
- Контентная модерация