#### Подготовка окружения (импорт библиотек)

In [None]:
import json
from pathlib import Path

import numpy as np
import torch
import torch.nn as nn
import torchvision.io as io
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision.models.video import r3d_18, R3D_18_Weights
from tqdm.auto import tqdm

#### Конфигурация

In [None]:
# Configuration
BASE_DIR = Path(__file__).parent                    # Root directory of the project
TRAIN_VIDEO_DIR = BASE_DIR / "data_train_short/"    # directory with train .mp4 files named as <video_id>.mp4
TEST_VIDEO_DIR = BASE_DIR / "data_test_short/"      # directory with test  .mp4 files named as <video_id>.mp4
TRAIN_JSON = BASE_DIR / "train_labels.json"         # json mapping train video_id -> {"start", "end"}
TEST_JSON = BASE_DIR / "test_labels.json"           # json mapping test  video_id -> {"start", "end"}
FRAME_RATE = 2                                      # fps for sampling
WINDOW_SEC = 15                                     # window length in seconds
STRIDE_SEC = 1                                      # stride in seconds

In [None]:
# Device selection: MPS > CUDA > CPU
if torch.backends.mps.is_available():
    DEVICE = torch.device("mps")
elif torch.cuda.is_available():
    DEVICE = torch.device("cuda")
else:
    DEVICE = torch.device("cpu")

print("Model use device:", DEVICE)

#### Инициализация модели и трансформаций

In [None]:
# Preprocessing transform for video clips
video_transform = T.Compose([
    T.Resize((112, 112)),  # resize frames
    T.Normalize(mean=[0.43216, 0.394666, 0.37645], std=[0.22803, 0.22145, 0.216989]),
])


In [None]:
# Load pretrained 3D ResNet and adjust head
torch.set_grad_enabled(False)
model = r3d_18(weights=R3D_18_Weights.DEFAULT)
model.fc = nn.Linear(model.fc.in_features, 2)  # binary output
model.to(DEVICE)
model.train()

#### Загрузка и слияние меток

In [None]:
# Load and merge label files
def load_labels_from_dir(json_path: Path) -> dict:
    with json_path.open('r', encoding='utf-8') as f:
        return json.load(f)

train_labels = load_labels_from_dir(TRAIN_JSON)
test_labels  = load_labels_from_dir(TEST_JSON)

# Merge two dicts
labels = {**train_labels, **test_labels}

In [None]:
# Convert time string to seconds
def time_to_seconds(t_str):
    parts = list(map(int, t_str.split(':')))
    if len(parts) == 3:
        h, m, s = parts
        return h * 3600 + m * 60 + s
    elif len(parts) == 2:
        m, s = parts
        return m * 60 + s
    else:
        raise ValueError(f"Invalid time format: {t_str}")


#### Определение датасета

In [None]:
# Custom Dataset for intro detection
class VideoIntroDataset(Dataset):
    def __init__(self, labels: dict, video_dir: Path, transform=None):
        self.labels = labels
        self.video_dir = video_dir
        self.video_ids = list(labels.keys())
        self.transform = transform

    def __len__(self):
        return len(self.video_ids)

    def __getitem__(self, idx):
        vid = self.video_ids[idx]
        path = self.video_dir / vid / f"{vid}.mp4"
        video, _, _ = io.read_video(str(path), pts_unit='sec')
        # optional: sample frames or clip
        if self.transform:
            video = self.transform(video)
        label = torch.tensor(self.labels[vid]['label'], dtype=torch.long)
        return video, label

#### Обучение модели

In [None]:
# Training setup
dataset = VideoIntroDataset(labels, TRAIN_VIDEO_DIR, transform=video_transform)
dataloader = DataLoader(dataset, batch_size=15, shuffle=True)

criterion = nn.CrossEntropyLoss()  # classification loss
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

In [None]:
# Training loop
epochs = 5
for epoch in range(1, epochs + 1):
    total, correct = 0, 0
    for videos, targets in tqdm(dataloader, desc=f"Epoch {epoch}"):
        videos, targets = videos.to(DEVICE), targets.to(DEVICE)
        logits = model(videos)
        loss = criterion(logits, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        preds = logits.argmax(dim=1)
        total += targets.size(0)
        correct += (preds == targets).sum().item()

    print(f"Epoch {epoch} — Loss: {loss:.4f}, Acc: {correct/total:.3f}")

#### Детекция и оценка на тестовых данных

In [None]:
# Sliding-window intro detection
def detect_intro(model, video_path: Path, clip_len=WINDOW_SEC, step=STRIDE_SEC, threshold=0.5) -> tuple[float, float]:
    """Returns (start_sec, end_sec) or (None, None) if no intro detected."""
    model.eval()
    video, _, info = io.read_video(str(video_path), pts_unit="sec")  # Video frames and metadata
    fps = info['video_fps']
    T_total = video.shape[0]
    scores: list[float] = []
    times: list[float] = []
    # Slide over video
    for t0 in range(0, max(1, T_total - clip_len + 1), step):
        clip = video[t0:t0 + clip_len]  # select clip
        # Prepare tensor: [T, C, H, W]
        clip = clip.permute(0, 3, 1, 2) / 255.0
        clip = torch.stack([video_transform(frame) for frame in clip])
        clip = clip.unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            logits = model(clip)
            score = torch.sigmoid(logits).item()  # confidence
        scores.append(score)
        times.append(t0 / fps)
    scores = np.array(scores)
    times = np.array(times)
    # Mask above threshold
    mask = scores > threshold
    if not mask.any():
        return None, None
    # Find longest continuous segment
    max_len = 0
    best = (0, 0)
    start_i = None
    for i, m in enumerate(mask):
        if m and start_i is None:
            start_i = i
        if (not m or i == len(mask) - 1) and start_i is not None:
            end_i = i if not m else i + 1
            length = end_i - start_i
            if length > max_len:
                max_len = length
                best = (start_i, end_i)
            start_i = None
    s, e = best
    # Convert to seconds
    start_sec = float(times[s])
    end_sec = float(times[e - 1] + clip_len / fps)
    return start_sec, end_sec

In [None]:
# IoU for two intervals
def iou_interval(a: int, b: int, c: int, d: int) -> float:
    inter = max(0, min(b, d) - max(a, c))
    union = (b - a) + (d - c) - inter
    return inter / union if union > 0 else 0

In [None]:
# Inference and evaluation
model.eval()
ious = []
for vid, info in tqdm(labels.items(), desc="Eval"):
    path = TEST_VIDEO_DIR / vid / f"{vid}.mp4"
    # define detect_intro to return start/end in seconds
    pred_s, pred_e = detect_intro(model, path)
    if pred_s is None:
        continue
    gt_s = time_to_seconds(info['start'])
    gt_e = time_to_seconds(info['end'])
    ious.append(iou_interval(gt_s, gt_e, pred_s, pred_e))

print(f"Mean IoU: {sum(ious)/len(ious):.3f}")