In [3]:
import os
import json
import re
import cv2
import imagehash
import numpy as np
from PIL import Image
from collections import defaultdict
from tqdm import tqdm
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
import argparse
import math 


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


SAMPLE_RATE = 2.0
HASH_SIZE = 12
MIN_INTRO_DURATION = 7
MAX_ANALYSIS_TIME = 15 * 60
SKIP_START_TIME = 5
MAX_WORKERS = min(6, multiprocessing.cpu_count())
MAX_INTRO_DURATION = 45
DEFAULT_INTRO_START = 5
DEFAULT_INTRO_END = 15
HASH_DISTANCE_THRESHOLD = 6
MIN_MATCH_PERCENTAGE = 0.75
MATCH_SEARCH_WINDOW_SEC = 15


def time_str_to_seconds(time_str: str) -> int:
    parts = list(map(int, time_str.split(':')))
    seconds = 0
    if len(parts) == 3:
        seconds = parts[0] * 3600 + parts[1] * 60 + parts[2]
    elif len(parts) == 2:
        seconds = parts[0] * 60 + parts[1]
    elif len(parts) == 1:
        seconds = parts[0]
    return seconds

def calculate_iou(pred_box: tuple[float, float], gt_box: tuple[float, float]) -> float:
    pred_start, pred_end = pred_box
    gt_start, gt_end = gt_box
    inter_start = max(pred_start, gt_start)
    inter_end = min(pred_end, gt_end)
    inter_duration = max(0, inter_end - inter_start)
    pred_duration = pred_end - pred_start
    gt_duration = gt_end - gt_start
    union_duration = pred_duration + gt_duration - inter_duration
    if union_duration == 0:
        return 1.0 if inter_duration == 0 else 0.0
    return inter_duration / union_duration

def parse_series_info(name: str) -> tuple[str, int]:
    name_lower = name.lower()
    season_match = re.search(r'(\d+)\s*сезон', name_lower)
    season = int(season_match.group(1)) if season_match else 1
    title_match = re.split(r'\s*\.\s*\d+\s*сезон|\s*\.\s*серия', name, flags=re.IGNORECASE)
    title = title_match[0].strip() if title_match else name.strip()
    return title, season

def group_episodes(metadata_path: str, data_root: str) -> dict:
    logging.info("Группировка серий и загрузка ground truth данных...")
    with open(metadata_path, 'r', encoding='utf-8') as f:
        metadata = json.load(f)
    groups = defaultdict(list)
    for video_id, info in metadata.items():
        try:
            title, season = parse_series_info(info['name'])
            video_dir = os.path.join(data_root, video_id)
            video_file = next((os.path.join(video_dir, f) for f in os.listdir(video_dir) if f.endswith('.mp4')), None)
            if video_file:
                gt_start = time_str_to_seconds(info.get('start', '0'))
                gt_end = time_str_to_seconds(info.get('end', '0'))
                if gt_start > gt_end: gt_start, gt_end = gt_end, gt_start
                groups[(title, season)].append({"path": video_file, "gt_start": gt_start, "gt_end": gt_end})
            else:
                logging.warning(f"Видеофайл для ID {video_id} не найден в {video_dir}")
        except Exception as e:
            logging.error(f"Ошибка при обработке метаданных для ID {video_id} ('{info.get('name')}'): {e}")
    return groups

def calculate_hashes_for_video(video_path: str) -> list[tuple[float, imagehash.ImageHash]]:
    hashes = []
    cap = None
    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened(): return []
        video_fps = cap.get(cv2.CAP_PROP_FPS)
        total_duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / video_fps if video_fps > 0 else 0
        end_time = min(total_duration, SKIP_START_TIME + MAX_ANALYSIS_TIME)
        current_time = SKIP_START_TIME
        while current_time < end_time:
            cap.set(cv2.CAP_PROP_POS_MSEC, int(current_time * 1000))
            ret, frame = cap.read()
            if not ret: break
            pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            video_hash = imagehash.dhash(pil_img, hash_size=HASH_SIZE)
            hashes.append((current_time, video_hash))
            current_time += 1.0 / SAMPLE_RATE
    except Exception as e:
        logging.error(f"Ошибка при обработке файла {video_path}: {e}")
    finally:
        if cap: cap.release()
    return hashes

def find_best_intro_candidate(all_hashes_data: dict) -> tuple[float | None, float | None]:
    if len(all_hashes_data) < 2: return None, None
    
    sorted_paths = sorted(all_hashes_data.keys(), key=lambda p: len(all_hashes_data[p]))
    ref_path, other_paths = sorted_paths[0], sorted_paths[1:]
    ref_hashes = all_hashes_data[ref_path]
    if not ref_hashes: return None, None

    other_hashes_list = [all_hashes_data[p] for p in other_paths]

    min_series_with_intro = math.ceil(len(all_hashes_data) * MIN_MATCH_PERCENTAGE)
    min_matches_needed = max(1, min_series_with_intro - 1) 

    match_scores = []
    for i, (ts, ref_hash) in enumerate(ref_hashes):
        matches = sum(1 for other_h in other_hashes_list if any(
            (ref_hash - h[1]) <= HASH_DISTANCE_THRESHOLD
            for h in other_h[max(0, i - MATCH_SEARCH_WINDOW_SEC): i + MATCH_SEARCH_WINDOW_SEC + 1]
        ))
        match_scores.append(matches)

    candidates = []
    in_sequence, start_idx = False, 0
    for i, score in enumerate(match_scores):
        is_strong_match = score >= min_matches_needed
        if is_strong_match and not in_sequence:
            in_sequence, start_idx = True, i
        elif not is_strong_match and in_sequence:
            in_sequence = False
            duration_frames = i - start_idx
            if (duration_frames / SAMPLE_RATE) >= MIN_INTRO_DURATION:
                candidates.append({
                    'start_idx': start_idx,
                    'end_idx': i,
                    'duration': duration_frames
                })

    if in_sequence:
        duration_frames = len(match_scores) - start_idx
        if (duration_frames / SAMPLE_RATE) >= MIN_INTRO_DURATION:
            candidates.append({
                'start_idx': start_idx,
                'end_idx': len(match_scores),
                'duration': duration_frames
            })

    if not candidates:
        return None, None

    best_candidate = max(candidates, key=lambda c: c['duration'])
    
    start_time = ref_hashes[best_candidate['start_idx']][0]
    end_time = ref_hashes[best_candidate['end_idx'] - 1][0] + (1.0 / SAMPLE_RATE)
    
    if (end_time - start_time) > MAX_INTRO_DURATION:
        logging.info(f"Найдена слишком длинная заставка ({end_time - start_time:.0f} сек). Обрезается до {MAX_INTRO_DURATION} сек.")
        end_time = start_time + MAX_INTRO_DURATION

    return start_time, end_time


def main(metadata_path, data_root):

    series_groups = group_episodes(metadata_path, data_root)
    logging.info(f"Найдено {len(series_groups)} групп (сериал/сезон) для анализа.")
    
    results = {}
    all_iou_scores = []

    for (title, season), episodes_data in tqdm(series_groups.items()):
        group_key = f"{title} (Сезон {season})"
        logging.info(f"\n--- Анализ группы: {group_key} ({len(episodes_data)} серий) ---")
        
        start_time, end_time = None, None
        
        if len(episodes_data) > 1:
            filepaths = [e['path'] for e in episodes_data]
            all_hashes_data = {}
            with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
                future_to_path = {executor.submit(calculate_hashes_for_video, path): path for path in filepaths}
                for future in tqdm(as_completed(future_to_path), total=len(filepaths), desc=f"Хэширование '{title}'"):
                    all_hashes_data[future_to_path[future]] = future.result()
            
            all_hashes_data = {k: v for k, v in all_hashes_data.items() if v}
            if len(all_hashes_data) >= 2:
                logging.info("Поиск наилучшего кандидата на заставку...")
                start_time, end_time = find_best_intro_candidate(all_hashes_data)

        log_message = ""
        if start_time is None:
            if len(episodes_data) == 1:
                log_message = "Найдена одна серия, применяется правило по умолчанию."
            else:
                log_message = "Общая заставка не найдена алгоритмом, применяется правило по умолчанию."
            logging.info(log_message)
            start_time, end_time = float(DEFAULT_INTRO_START), float(DEFAULT_INTRO_END)
        else:
             duration = end_time - start_time
             log_message = f"Найдена заставка! [{int(start_time // 60):02d}:{int(start_time % 60):02d} - {int(end_time // 60):02d}:{int(end_time % 60):02d}] Длительность: {int(duration)} сек."
             logging.info(log_message)

        group_ious = []
        pred_box = (start_time, end_time)
        for episode in episodes_data:
            gt_box = (episode['gt_start'], episode['gt_end'])
            iou = calculate_iou(pred_box, gt_box)
            group_ious.append(iou)
                
        avg_group_iou = np.mean(group_ious) if group_ious else 0.0
        results[group_key] = {
            "start": f"{int(start_time // 60):02d}:{int(start_time % 60):02d}",
            "end": f"{int(end_time // 60):02d}:{int(end_time % 60):02d}",
            "duration_sec": int(end_time - start_time),
            "avg_iou": avg_group_iou,
            "log": log_message
        }
        all_iou_scores.extend(group_ious)

    print("\n\n" + "="*60)
    print("ИТОГОВЫЕ РЕЗУЛЬТАТЫ")
    print("="*60)
    for group, intro_data in sorted(results.items()):
        print(f"🎬 Сериал: {group}")
        print(f"   🕒 Предсказание:   {intro_data['start']} - {intro_data['end']} (~{intro_data['duration_sec']} сек.)")
        print(f"   📊 Качество (IoU): {intro_data['avg_iou']:.2%}")
    print("="*60)
    if all_iou_scores:
        overall_avg_iou = np.mean(all_iou_scores)
        print(f"\n🏆 ОБЩЕЕ СРЕДНЕЕ КАЧЕСТВО (IoU) ПО ВСЕМ СЕРИЯМ: {overall_avg_iou:.2%}")
    print("="*60)


In [4]:


if __name__ == "__main__":
    main(metadata_path='D:/vk/data/data/labels.json', data_root='D:/vk/data/data/')

2025-06-13 13:39:53,262 - INFO - Группировка серий и загрузка ground truth данных...
2025-06-13 13:39:53,265 - INFO - Найдено 34 групп (сериал/сезон) для анализа.
  0%|          | 0/34 [00:00<?, ?it/s]2025-06-13 13:39:53,266 - INFO - 
--- Анализ группы: Баскетс (Сезон 4) (5 серий) ---
Хэширование 'Баскетс': 100%|██████████| 5/5 [01:24<00:00, 17.00s/it]
2025-06-13 13:41:18,248 - INFO - Поиск наилучшего кандидата на заставку...
2025-06-13 13:41:18,560 - INFO - Найдена заставка! [00:05 - 00:31] Длительность: 26 сек.
  3%|▎         | 1/34 [01:25<46:54, 85.30s/it]2025-06-13 13:41:18,561 - INFO - 
--- Анализ группы: Бывaeт и xyжe (Сезон 1) (3 серий) ---
Хэширование 'Бывaeт и xyжe': 100%|██████████| 3/3 [00:41<00:00, 13.88s/it]
2025-06-13 13:42:00,194 - INFO - Поиск наилучшего кандидата на заставку...
2025-06-13 13:42:00,355 - INFO - Общая заставка не найдена алгоритмом, применяется правило по умолчанию.
  6%|▌         | 2/34 [02:07<31:50, 59.71s/it]2025-06-13 13:42:00,356 - INFO - 
--- Анали



ИТОГОВЫЕ РЕЗУЛЬТАТЫ
🎬 Сериал: 24 часа (Сезон 1)
   🕒 Предсказание:   00:05 - 00:15 (~10 сек.)
   📊 Качество (IoU): 100.00%
🎬 Сериал: 24 часа (Сезон 2)
   🕒 Предсказание:   00:12 - 00:19 (~7 сек.)
   📊 Качество (IoU): 25.89%
🎬 Сериал: 24 часа (Сезон 3)
   🕒 Предсказание:   00:05 - 00:17 (~12 сек.)
   📊 Качество (IoU): 85.42%
🎬 Сериал: 24 часа (Сезон 4)
   🕒 Предсказание:   02:12 - 02:19 (~7 сек.)
   📊 Качество (IoU): 0.00%
🎬 Сериал: 24 часа (Сезон 5)
   🕒 Предсказание:   00:05 - 00:18 (~13 сек.)
   📊 Качество (IoU): 76.54%
🎬 Сериал: 24 часа (Сезон 6)
   🕒 Предсказание:   00:05 - 00:15 (~10 сек.)
   📊 Качество (IoU): 66.67%
🎬 Сериал: 24 часа (Сезон 7)
   🕒 Предсказание:   00:05 - 00:19 (~14 сек.)
   📊 Качество (IoU): 75.86%
🎬 Сериал: 24 часа (Сезон 8)
   🕒 Предсказание:   00:05 - 00:18 (~13 сек.)
   📊 Качество (IoU): 75.31%
🎬 Сериал: 3вeздный пyть: Пикap (Сезон 3)
   🕒 Предсказание:   00:05 - 00:15 (~10 сек.)
   📊 Качество (IoU): 0.00%
🎬 Сериал: Агентство Локвуд и компания (Сезон 1)
  


