In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from transformers import CLIPProcessor, CLIPModel

from PIL import Image
import numpy as np
from typing import List, Dict, Any, Union, Optional
import os
import cv2  # OpenCV для чтения видео
import decord  # гораздо быстрее OpenCV для больших видео (рекомендую установить: pip install decord)


# =====================================================
# 1. Основной мультимодальный классификатор (изображения + видео)
# =====================================================
class MultimodalVideoTemplateClassifier(nn.Module):
    def __init__(
        self,
        clip_model_name: str = "openai/clip-vit-large-patch14-336",  # 336px версия чуть точнее для видео
        device: str = "cuda" if torch.cuda.is_available() else "cpu",
        normalize: bool = True,
        num_frames: int = 8,                 # сколько кадров семплировать из видео
        sampling_strategy: str = "uniform",  # "uniform" или "sparse" (как в оригинальном CLIP)
    ):
        super().__init__()
        
        self.clip_model = CLIPModel.from_pretrained(clip_model_name)
        self.processor = CLIPProcessor.from_pretrained(clip_model_name)
        
        self.device = device
        self.normalize = normalize
        self.num_frames = num_frames
        self.sampling_strategy = sampling_strategy
        
        self.clip_model.to(device)
        self.clip_model.eval()
        
        # Кэш текстовых эмбеддингов
        self.text_emb_cache: Dict[str, torch.Tensor] = {}

    # =====================================================
    # 2. Чтение и семплинг кадров из видео
    # =====================================================
    def _read_video_decord(self, video_path: str) -> torch.Tensor:
        """
        Самый быстрый и надёжный способ — decord (работает на GPU если нужно)
        Возвращает tensor [T, H, W, C] в uint8
        """
        decord.bridge.set_bridge('torch')  # возвращаем сразу torch tensor
        vr = decord.VideoReader(video_path, ctx=decord.cpu(0))
        total_frames = len(vr)
        
        if total_frames <= self.num_frames:
            indices = np.arange(0, total_frames)
        else:
            if self.sampling_strategy == "uniform":
                indices = np.linspace(0, total_frames - 1, self.num_frames, dtype=int)
            elif self.sampling_strategy == "sparse":  # как в оригинальных видео-CLIP экспериментах
                step = total_frames // self.num_frames
                indices = np.arange(0, total_frames, step)[:self.num_frames]
            else:
                raise ValueError("sampling_strategy должен быть 'uniform' или 'sparse'")
        
        frames = vr.get_batch(indices)  # [T, H, W, C] uint8
        frames = frames.permute(0, 3, 1, 2)  # [T, C, H, W] для CLIP
        return frames

    def _read_video_opencv(self, video_path: str) -> List[Image.Image]:
        """
        Резервный вариант через OpenCV, если decord не установлен
        """
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        indices = np.linspace(0, total_frames - 1, self.num_frames, dtype=int)
        frames = []
        
        for idx in indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if not ret:
                continue
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(Image.fromarray(frame))
        
        cap.release()
        return frames if len(frames) > 0 else [Image.new("RGB", (336, 336), (0, 0, 0))]

    # =====================================================
    # 3. Получение эмбеддинга изображения ИЛИ видео
    # =====================================================
    @torch.no_grad()
    def encode_media(
        self,
        media_input: Union[str, Image.Image, List[Union[str, Image.Image]]]
    ) -> torch.Tensor:
        """
        Универсальная функция: на входе может быть
        - путь к картинке / видео
        - PIL.Image
        - список из вышеперечисленного
        Возвращает нормализованный эмбеддинг [1, dim] (усреднённый по кадрам если видео)
        """
        # 1. Приводим всё к списку PIL.Image
        pil_frames: List[Image.Image] = []
        
        if isinstance(media_input, (str, Image.Image)):
            media_input = [media_input]
        
        for item in media_input:
            if isinstance(item, Image.Image):
                pil_frames.append(item.convert("RGB"))
            elif isinstance(item, str):
                if os.path.isfile(item):
                    # Пробуем определить — видео или картинка по расширению
                    video_exts = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".gif"}
                    _, ext = os.path.splitext(item.lower())
                    if ext in video_exts:
                        # Это видео → семплируем кадры
                        try:
                            video_tensor = self._read_video_decord(item)  # [T, C, H, W]
                            for i in range(video_tensor.shape[0]):
                                frame = video_tensor[i].permute(1, 2, 0).cpu().numpy()  # [H,W,C]
                                frame = np.clip(frame, 0, 255).astype(np.uint8)
                                pil_frames.append(Image.fromarray(frame))
                        except Exception as e:
                            print(f"decord не сработал, пробуем OpenCV: {e}")
                            pil_frames.extend(self._read_video_opencv(item))
                    else:
                        # Обычная картинка
                        pil_frames.append(Image.open(item).convert("RGB"))
                else:
                    raise FileNotFoundError(f"Файл не найден: {item}")
        
        # 2. Если кадров слишком много — можно дополнительно субсемплировать
        if len(pil_frames) > self.num_frames * 2:
            step = len(pil_frames) // self.num_frames
            pil_frames = pil_frames[::step][:self.num_frames]
        
        # 3. Прогоняем через CLIP
        inputs = self.processor(
            images=pil_frames,
            return_tensors="pt",
            padding=True,
        ).to(self.device)
        
        image_features = self.clip_model.get_image_features(**inputs)  # [N_frames, dim]
        
        if self.normalize:
            image_features = F.normalize(image_features, p=2, dim=-1)
        
        # Усредняем по всем кадрам
        media_embedding = image_features.mean(dim=0, keepdim=True)  # [1, dim]
        return media_embedding

    # =====================================================
    # 4. Кодирование текстовых шаблонов (то же самое)
    # =====================================================
    def encode_text_templates(
        self,
        class_names: List[str],
        templates: List[str] = None,
    ) -> torch.Tensor:
        # (тот же код, что и в предыдущей версии)
        if templates is None:
            templates = [
                "a photo of a {}.",
                "a video of a {}.",
                "this is a {}.",
                "an image showing {}.",
                "{}.",
            ]
        
        text_inputs_list = []
        for class_name in class_names:
            for tmpl in templates:
                text_inputs_list.append(tmpl.format(class_name))
        
        inputs = self.processor(
            text=text_inputs_list,
            return_tensors="pt",
            padding=True,
            truncation=True,
        ).to(self.device)
        
        with torch.no_grad():
            text_features = self.clip_model.get_text_features(**inputs)
            
        if self.normalize:
            text_features = F.normalize(text_features, p=2, dim=-1)
        
        num_templates = len(templates)
        text_features = text_features.view(len(class_names), num_templates, -1).mean(dim=1)
        return text_features

    # =====================================================
    # 5. Основной метод предсказания
    # =====================================================
    @torch.no_grad()
    def predict(
        self,
        media_input: Union[str, Image.Image, List[Union[str, Image.Image]]],
        class_names: List[str],
        templates: Optional[List[str]] = None,
        temperature: float = 0.07,
        return_probs: bool = True,
    ) -> Dict[str, Any]:
        
        # 1. Эмбеддинг медиа (картинка или видео)
        media_emb = self.encode_media(media_input)  # [1, dim]
        
        # 2. Текстовые эмбеддинги (кэшируем)
        cache_key = "|".join(class_names) + "||" + ("|".join(templates or ["default"]))
        if cache_key not in self.text_emb_cache:
            self.text_emb_cache[cache_key] = self.encode_text_templates(class_names, templates)
        
        text_embs = self.text_emb_cache[cache_key]  # [num_classes, dim]
        
        # 3. Косинусное сходство
        logits = media_emb @ text_embs.T  # [1, num_classes]
        
        if return_probs:
            probs = torch.softmax(logits / temperature, dim=-1)
        else:
            probs = None
        
        pred_idx = logits.argmax(dim=-1).item()
        pred_class = class_names[pred_idx]
        
        return {
            "logits": logits.squeeze(0).cpu(),
            "probs": probs.squeeze(0).cpu() if probs is not None else None,
            "predicted_idx": pred_idx,
            "predicted_class": pred_class,
            "class_names": class_names,
        }


# =====================================================
# 6. Пример использования с видео
# =====================================================
if __name__ == "__main__":
    classifier = MultimodalVideoTemplateClassifier(
        clip_model_name="openai/clip-vit-large-patch14-336",
        num_frames=12,          # можно 8–16, больше — точнее, но медленнее
        sampling_strategy="uniform",
    )
    
    class_names = [
        "кошка играет",
        "собака бегает",
        "человек танцует",
        "автомобиль едет",
        "готовка еды",
        "спорт",
        "природа и пейзаж",
        "мультфильм",
    ]
    
    # Можно передать и картинку, и видео, и список
    result = classifier.predict(
        media_input="video_dancing_cat.mp4",  # ← твоё видео
        class_names=class_names,
        # templates=["это видео про {}", "видео с {}"],
        temperature=0.1,
    )
    
    print("Предсказанный класс:", result["predicted_class"])
    print("\nВсе вероятности:")
    for name, prob in zip(class_names, result["probs"]):
        print(f"  {name}: {prob:.4f}")