In [2]:
%pip install sentence_transformers

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [3]:
import cv2
import easyocr
import os
import json
import numpy as np
import matplotlib.pyplot as plt
import math
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
from scipy.stats import norm


2025-04-07 08:37:09.724730: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-04-07 08:37:11.308007: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [4]:

# --- Weights for metrics ---
WEIGHT_TEXT = 0.1
WEIGHT_DIFF = 0.6
WEIGHT_SIMILARITY = 0.2
WEIGHT_RECENCY = 0.05

def text_quantity(text):
    return len(text.split())

def frame_diff(frame1, frame2):
    gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
    gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
    return np.sum(cv2.absdiff(gray1, gray2)) / gray1.size

def normalize_list(values):
    min_v, max_v = min(values), max(values)
    return [(v - min_v) / (max_v - min_v) if max_v > min_v else 0 for v in values]

def standardize_list(values):
    mean_val = np.mean(values)
    std_val = np.std(values)
    if std_val == 0:
        return [0.5 for _ in values]
    z_scores = [(v - mean_val) / std_val for v in values]
    return [norm.cdf(z) for z in z_scores]

def calculate_recency(candidates, selected_frames, interval_seconds):
    # Вычисляем recency для каждого кандидата по хронологии
    selected_timecodes = set(f['timecode'] for f in selected_frames)
    sorted_candidates = sorted(candidates, key=lambda f: f['timecode'])
    recency_vals = []
    counter = 0
    for frame in sorted_candidates:
        if frame['timecode'] in selected_timecodes:
            recency_vals.append(0.0)
            counter = 0
        else:
            counter += 1
            val = 0.1 + 0.2 * (counter - 1)
            if val > 1.0:
                val = 1.0
            recency_vals.append(val)
    return recency_vals

def extract_key_frames(video_path, all_frames_dir, top_frames_dir, config_path, interval_seconds=10, top_n=5, max_iter=100):
    # Инициализация easyocr и SentenceTransformer
    reader = easyocr.Reader(['en', 'ru'], gpu=True)
    embedder = SentenceTransformer('distiluse-base-multilingual-cased-v1')

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    total_duration = total_frames / fps

    # Получаем размеры кадра из первого считанного кадра
    ret, sample_frame = cap.read()
    if not ret:
        print("Ошибка чтения видео")
        return
    height, width = sample_frame.shape[:2]
    # Сброс позиции для повторного чтения видео
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    # Используем полный кадр (координаты сохраняются для конфига)
    x1, y1 = 0, 0
    x2, y2 = width, height

    prev_frames = []
    all_embeddings = []
    frame_candidates = []

    frame_num = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        current_time = frame_num / fps
        if current_time >= total_duration - interval_seconds:
            break

        # Здесь не производится обрезка: используется весь кадр
        if frame_num % int(fps * interval_seconds) == 0 and frame_num != 0:
            # Прогон через easyocr
            text_results = reader.readtext(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY))
            text = " ".join([t[1] for t in text_results])
            if len(text.strip()) >= 10:
                current_embedding = embedder.encode(text)
                diff_scores = [frame_diff(frame, pf) for pf in prev_frames] if prev_frames else [0]
                avg_diff_score = np.mean(diff_scores)
                text_qty = text_quantity(text)

                if all_embeddings:
                    sim_scores = [cosine_similarity([current_embedding], [emb])[0][0] for emb in all_embeddings]
                    max_sim_score = max(sim_scores)
                else:
                    max_sim_score = 0
                semantic_uniqueness = 1 - max_sim_score

                frame_candidates.append({
                    "frame": frame.copy(),
                    "text": text,
                    "text_quantity": text_qty,
                    "diff_score": avg_diff_score,
                    "sim_score": semantic_uniqueness,
                    "timecode": current_time
                })

                prev_frames.append(frame.copy())
                all_embeddings.append(current_embedding)
                if len(prev_frames) > 2:
                    prev_frames.pop(0)
        frame_num += 1

    cap.release()

    # Нормализуем метрики
    text_norm = normalize_list([f['text_quantity'] for f in frame_candidates])
    diff_norm = normalize_list([f['diff_score'] for f in frame_candidates])
    sim_norm = normalize_list([f['sim_score'] for f in frame_candidates])

    for idx, frame in enumerate(frame_candidates):
        frame['text_norm'] = text_norm[idx]
        frame['diff_norm'] = diff_norm[idx]
        frame['sim_norm'] = sim_norm[idx]
        frame['recency_norm'] = 0

    # Начальный выбор (без учета recency) по сумме первых трёх метрик
    selected_frames = sorted(
        frame_candidates,
        key=lambda x: (WEIGHT_TEXT * x['text_norm'] +
                       WEIGHT_DIFF * x['diff_norm'] +
                       WEIGHT_SIMILARITY * x['sim_norm']),
        reverse=True
    )[:top_n]

    # Итеративное улучшение выбора
    for iteration in range(max_iter):
        prev_selected = selected_frames.copy()
        recency_norm = calculate_recency(frame_candidates, selected_frames, interval_seconds)
        for idx, frame in enumerate(frame_candidates):
            frame['recency_norm'] = recency_norm[idx]
            frame['composite_score'] = (
                WEIGHT_TEXT * frame['text_norm'] +
                WEIGHT_DIFF * frame['diff_norm'] +
                WEIGHT_SIMILARITY * frame['sim_norm'] +
                WEIGHT_RECENCY * frame['recency_norm']
            )
        selected_frames = sorted(frame_candidates, key=lambda x: x['composite_score'], reverse=True)[:top_n]
        if set(f['timecode'] for f in selected_frames) == set(f['timecode'] for f in prev_selected):
            print(f"Итерация стабилизировалась на {iteration+1}-й итерации.")
            break

    # Визуализация составных метрик (необязательно)
    plt.figure(figsize=(15, 7))
    times = [f['timecode'] for f in frame_candidates]
    bar_width = interval_seconds * 0.8
    bottom = np.zeros(len(times))
    plt.bar(times, [WEIGHT_TEXT * v for v in text_norm], width=bar_width, label='Text Quantity')
    bottom += np.array([WEIGHT_TEXT * v for v in text_norm])
    plt.bar(times, [WEIGHT_DIFF * v for v in diff_norm], width=bar_width, bottom=bottom, label='Frame Diff')
    bottom += np.array([WEIGHT_DIFF * v for v in diff_norm])
    plt.bar(times, [WEIGHT_SIMILARITY * v for v in sim_norm], width=bar_width, bottom=bottom, label='Semantic Uniqueness')
    bottom += np.array([WEIGHT_SIMILARITY * v for v in sim_norm])
    plt.bar(times, [WEIGHT_RECENCY * f['recency_norm'] for f in frame_candidates],
            width=bar_width, bottom=bottom, label='Recency')
    plt.title("Составные метрики по времени")
    plt.xlabel("Время (с)")
    plt.ylabel("Оценка")
    plt.legend()
    plt.grid(True)
    os.makedirs(all_frames_dir, exist_ok=True)
    plt.savefig(os.path.join(all_frames_dir, "composite_score_components.png"))
    plt.close()

    # Сохранение всех обработанных кадров и их метрик в отдельной папке
    all_frames_data = []
    for idx, frame_info in enumerate(frame_candidates):
        filename = f"{int(round(frame_info['timecode']))}s_top_{idx+1}.jpg"
        output_path = os.path.join(all_frames_dir, filename)
        cv2.imwrite(output_path, frame_info['frame'])
        frame_info_copy = frame_info.copy()
        # Удаляем изображение перед сохранением в JSON
        frame_info_copy.pop('frame', None)
        frame_info_copy['filename'] = filename
        all_frames_data.append(frame_info_copy)

    with open(os.path.join(all_frames_dir, "all_frames_data.json"), "w", encoding='utf-8') as json_file:
        json.dump(all_frames_data, json_file, ensure_ascii=False, indent=4)

    # Сохранение топ-n кадров в отдельной директории
    os.makedirs(top_frames_dir, exist_ok=True)
    top_frames_data = []
    for idx, frame_info in enumerate(selected_frames):
        filename = f"{int(round(frame_info['timecode']))}s_top_{idx+1}.jpg"
        output_path = os.path.join(top_frames_dir, filename)
        cv2.imwrite(output_path, frame_info['frame'])
        frame_info_copy = frame_info.copy()
        frame_info_copy.pop('frame', None)
        frame_info_copy['filename'] = filename
        top_frames_data.append(frame_info_copy)

    with open(os.path.join(top_frames_dir, "top_frames_data.json"), "w", encoding='utf-8') as json_file:
        json.dump(top_frames_data, json_file, ensure_ascii=False, indent=4)

    # Сохранение конфигурационного файла с используемыми параметрами
    config = {
        "weights": {
            "WEIGHT_TEXT": WEIGHT_TEXT,
            "WEIGHT_DIFF": WEIGHT_DIFF,
            "WEIGHT_SIMILARITY": WEIGHT_SIMILARITY,
            "WEIGHT_RECENCY": WEIGHT_RECENCY
        },
        "coordinates": {
            "x1": x1,
            "y1": y1,
            "x2": x2,
            "y2": y2
        }
    }
    with open(config_path, "w", encoding='utf-8') as json_file:
        json.dump(config, json_file, ensure_ascii=False, indent=4)

    print(f"\nСохранено {len(frame_candidates)} обработанных кадров с метриками в {all_frames_dir}")
    print(f"Сохранено топ-{top_n} кадров в {top_frames_dir}")
    print(f"Конфигурация сохранена в {config_path}")

In [5]:

# --- Запуск ---
video_directory = '/home/jupyter/datasphere/s3/transcribation-project/test/input'
all_frames_output_directory = '/home/jupyter/datasphere/s3/transcribation-project/test/work/all_images'
top_frames_output_directory = '/home/jupyter/datasphere/s3/transcribation-project/test/output/images'
config_directory = '/home/jupyter/datasphere/s3/transcribation-project/test'

os.makedirs(all_frames_output_directory, exist_ok=True)
os.makedirs(top_frames_output_directory, exist_ok=True)
os.makedirs(config_directory, exist_ok=True)

for video_file in os.listdir(video_directory):
    if video_file.lower().endswith(('.mp4', '.avi', '.mkv')):
        video_path = os.path.join(video_directory, video_file)
        video_name = os.path.splitext(video_file)[0].replace(' ', '_')  # Замена пробелов на '_'
        
        # Для каждого видео создаем свои директории для сохранения результатов
        video_all_frames_dir = os.path.join(all_frames_output_directory, video_name)
        video_top_frames_dir = os.path.join(top_frames_output_directory, video_name)
        os.makedirs(video_all_frames_dir, exist_ok=True)
        os.makedirs(video_top_frames_dir, exist_ok=True)
        
        # Путь для конфигурационного файла
        config_path = os.path.join(config_directory, f"{video_name}_config.json")
        
        extract_key_frames(video_path, video_all_frames_dir, video_top_frames_dir, config_path, interval_seconds=10, top_n=10)



Сохранено 40 обработанных кадров с метриками в /home/jupyter/datasphere/s3/transcribation-project/test/work/all_images/WEEK_3_-_LEC_14
Сохранено топ-10 кадров в /home/jupyter/datasphere/s3/transcribation-project/test/output/images/WEEK_3_-_LEC_14
Конфигурация сохранена в /home/jupyter/datasphere/s3/transcribation-project/test/WEEK_3_-_LEC_14_config.json


# Транскрипция видео с Whisper и вставка маркеров изображений

Данный ноутбук выполняет следующие задачи:
1. Проходит по всем видео в папке `video_directory` и транскрибирует их с помощью модели Whisper (base), получая JSON с таймкодами.
2. Если для видео существует папка с извлечёнными кадрами в `extracted_frames_base` (имя папки совпадает с именем видео без расширения, где пробелы заменены на `_`), то для каждого изображения (например, `149s_top_6.jpg`) извлекается временная метка и ищется соответствующий сегмент транскрипции.
3. В соответствующий сегмент вставляется маркер изображения в формате `⟪149s_top_6.jpg⟫` (маркер вставляется после последней точки предложения, если она есть).
4. Итоговый JSON транскрипции сохраняется в папку `whisper_json`.
5. Из JSON извлекается поле `text`, и сохраняется как текстовый файл (.txt) в папку `whisper_text`.

In [5]:
import os
import re
import json
import whisper


In [6]:
def process_videos_transcription(video_directory, whisper_json_directory, whisper_text_directory):
    """
    Функция транскрипции:
      1. Для каждого видео в video_directory, если итоговый TXT уже существует – пропускаем.
      2. Если TXT отсутствует, проверяется наличие JSON:
         - Если JSON существует, он загружается.
         - Если JSON отсутствует, запускается транскрипция с Whisper (base) и результат сохраняется.
      3. Итоговый текст формируется как объединение сегментов (без вставки меток) и сохраняется в TXT.
    """
    print("[INFO] Загружаем модель Whisper (base)...")
    model = whisper.load_model("base")
    
    for video_file in os.listdir(video_directory):
        if not video_file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
            continue
        
        video_path = os.path.join(video_directory, video_file)
        video_name_no_ext = os.path.splitext(video_file)[0]
        video_name_clean = video_name_no_ext.replace(" ", "_")
        print(f"[INFO] Обработка видео: {video_path}")
        
        json_output_path = os.path.join(whisper_json_directory, video_name_clean + ".json")
        txt_output_path = os.path.join(whisper_text_directory, video_name_clean + ".txt")
        
        # Если итоговый TXT уже существует, пропускаем обработку этого видео
        if os.path.exists(txt_output_path):
            print(f"[INFO] TXT файл для видео '{video_name_no_ext}' уже существует ({txt_output_path}). Пропускаем обработку.")
            continue
        
        # Если JSON существует, загружаем его, иначе запускаем транскрипцию
        if os.path.exists(json_output_path):
            print(f"[INFO] JSON транскрипция для видео '{video_name_no_ext}' уже существует. Загружаем JSON.")
            with open(json_output_path, "r", encoding="utf-8") as f:
                transcription = json.load(f)
        else:
            print(f"[INFO] JSON транскрипция для видео '{video_name_no_ext}' не найдена. Запускаем транскрипцию.")
            transcription = model.transcribe(video_path, task="transcribe")
            with open(json_output_path, "w", encoding="utf-8") as f:
                json.dump(transcription, f, ensure_ascii=False, indent=4)
            print(f"[INFO] JSON транскрипция сохранена: {json_output_path}")
        
        # Формируем итоговый текст транскрипции как объединение сегментов (без маркеров)
        if "segments" in transcription:
            transcription["text"] = " ".join(seg["text"] for seg in transcription["segments"])
            print(f"[DEBUG] Итоговый текст сформирован. Длина текста: {len(transcription['text'])} символов")
        else:
            print(f"[WARN] Нет сегментов в транскрипции для видео '{video_name_no_ext}'")
        
        # Сохраняем итоговый текст в TXT
        with open(txt_output_path, "w", encoding="utf-8") as f:
            f.write(transcription.get("text", ""))
        print(f"[INFO] TXT файл сохранён: {txt_output_path}")  

In [7]:
# def insert_frame_images(transcription, frames_dir):
#     """
#     Для каждого файла кадра в папке frames_dir с именем вида "149s_top_6.jpg":
#       - извлекает временную метку (например, 149),
#       - определяет соответствующий сегмент транскрипции,
#       - вставляет маркер изображения в формате "⟪149s_top_6.jpg⟫" после последней точки предложения.
#     """
#     print(f"[DEBUG] Обработка изображений из папки: {frames_dir}")
#     # Получаем список файлов изображений (поддерживаются форматы jpg и png)
#     frame_files = [f for f in os.listdir(frames_dir) if f.lower().endswith(('.jpg', '.png'))]
#     print(f"[DEBUG] Найдено {len(frame_files)} файлов изображений: {frame_files}")
    
#     # Создаем словарь: время кадра (float) -> имя файла
#     frame_mapping = {}
#     for filename in frame_files:
#         match = re.match(r'(\d+)s', filename)
#         if match:
#             timestamp = float(match.group(1))
#             frame_mapping[timestamp] = filename
#             print(f"[DEBUG] Файл {filename}: timestamp {timestamp}")
#         else:
#             print(f"[DEBUG] Файл {filename} не соответствует ожидаемому формату")
    
#     # Обрабатываем каждый кадр: ищем соответствующий сегмент транскрипции
#     for frame_time, image_file in frame_mapping.items():
#         segment_found = None
#         for seg in transcription.get("segments", []):
#             if seg["start"] <= frame_time <= seg["end"]:
#                 segment_found = seg
#                 print(f"[DEBUG] Timestamp {frame_time} найден в сегменте {seg.get('id', 'N/A')} (диапазон {seg['start']} - {seg['end']})")
#                 break
#         if not segment_found and transcription.get("segments", []):
#             segment_found = min(transcription["segments"], key=lambda seg: abs(seg["start"] - frame_time))
#             print(f"[DEBUG] Timestamp {frame_time} не попадает ни в один сегмент, выбран ближайший сегмент {segment_found.get('id', 'N/A')} (диапазон {segment_found['start']} - {segment_found['end']})")
#         if segment_found:
#             marker = f" ⟪{image_file}⟫"
#             seg_text = segment_found.get("text", "")
#             last_dot = seg_text.rfind('.')
#             if last_dot != -1:
#                 new_text = seg_text[:last_dot+1] + marker + seg_text[last_dot+1:]
#             else:
#                 new_text = seg_text + marker
#             print(f"[DEBUG] Вставка маркера {marker} в сегмент {segment_found.get('id', 'N/A')}")
#             segment_found["text"] = new_text
#         else:
#             print(f"[DEBUG] Не найден сегмент для файла {image_file} с timestamp {frame_time}")
#     return transcription


# def process_frame_markers(video_directory, whisper_json_directory, whisper_text_directory, extracted_frames_base):
#     """
#     Функция вставки меток:
#       1. Для каждого видео в video_directory проверяется наличие JSON транскрипции и соответствующего TXT.
#       2. Поиск папки с извлечёнными кадрами (варианты имени – оригинальное, с заменой пробелов и наоборот).
#       3. Если папка найдена, загружается JSON, выполняется вставка маркеров через insert_frame_images,
#          итоговый текст (объединение сегментов) формируется заново и сохраняется в TXT.
#     """
#     for video_file in os.listdir(video_directory):
#         if not video_file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
#             continue
        
#         video_name_no_ext = os.path.splitext(video_file)[0]
#         video_name_clean = video_name_no_ext.replace(" ", "_")
#         json_output_path = os.path.join(whisper_json_directory, video_name_clean + ".json")
#         txt_output_path = os.path.join(whisper_text_directory, video_name_clean + ".txt")
        
#         # Если JSON или TXT не существует, пропускаем видео (так как транскрипция должна быть выполнена первой)
#         if not os.path.exists(json_output_path):
#             print(f"[INFO] JSON транскрипция для видео '{video_name_no_ext}' не найдена. Пропускаем вставку маркеров.")
#             continue
#         if not os.path.exists(txt_output_path):
#             print(f"[INFO] TXT файл для видео '{video_name_no_ext}' не найден. Пропускаем вставку маркеров.")
#             continue
        
#         # Загружаем JSON транскрипции
#         with open(json_output_path, "r", encoding="utf-8") as f:
#             transcription = json.load(f)
        
#         # Поиск папки с извлечёнными кадрами: варианты имени – оригинальное, с заменой пробелов и наоборот
#         folder_options = {
#             video_name_no_ext,
#             video_name_no_ext.replace(" ", "_"),
#             video_name_no_ext.replace("_", " ")
#         }
#         print(f"[DEBUG] Варианты имени папки для кадров: {folder_options}")
#         frames_dir = None
#         for folder in folder_options:
#             potential_dir = os.path.join(extracted_frames_base, folder)
#             print(f"[DEBUG] Проверка папки: {potential_dir}")
#             if os.path.exists(potential_dir):
#                 frames_dir = potential_dir
#                 print(f"[INFO] Папка с кадрами найдена: {frames_dir}")
#                 break
        
#         if frames_dir:
#             # Выполняем вставку маркеров в транскрипцию
#             transcription = insert_frame_images(transcription, frames_dir)
#             # Формируем новый итоговый текст (объединение сегментов с маркерами)
#             if "segments" in transcription:
#                 transcription["text"] = " ".join(seg["text"] for seg in transcription["segments"])
#                 print(f"[DEBUG] Итоговый текст с метками сформирован. Длина текста: {len(transcription['text'])} символов")
#             else:
#                 print(f"[WARN] Нет сегментов в транскрипции для видео '{video_name_no_ext}' после вставки меток")
            
#             # Сохраняем обновлённый TXT файл (при этом JSON можно оставить без изменений или обновить по необходимости)
#             with open(txt_output_path, "w", encoding="utf-8") as f:
#                 f.write(transcription.get("text", ""))
#             print(f"[INFO] TXT файл обновлён с метками: {txt_output_path}")
#         else:
#             print(f"[INFO] Папка с кадрами не найдена для видео '{video_name_no_ext}'")


In [8]:
video_directory = '/home/jupyter/datasphere/s3/transcribation-project/test/input'
whisper_json_directory = '/home/jupyter/datasphere/s3/transcribation-project/test/work/whisper_json'
whisper_text_directory = '/home/jupyter/datasphere/s3/transcribation-project/test/work/whisper_text'
extracted_frames_base = '/home/jupyter/datasphere/s3/transcribation-project/test/output/images'

os.makedirs(whisper_json_directory, exist_ok=True)
os.makedirs(whisper_text_directory, exist_ok=True)

# Этап 1: Транскрипция видео и сохранение итогового TXT без меток
process_videos_transcription(video_directory, whisper_json_directory, whisper_text_directory)


[INFO] Загружаем модель Whisper (base)...


100%|███████████████████████████████████████| 139M/139M [00:05<00:00, 28.0MiB/s]


[INFO] Обработка видео: /home/jupyter/datasphere/s3/transcribation-project/test/input/WEEK 3 - LEC 14.mp4
[INFO] JSON транскрипция для видео 'WEEK 3 - LEC 14' не найдена. Запускаем транскрипцию.
[INFO] JSON транскрипция сохранена: /home/jupyter/datasphere/s3/transcribation-project/test/work/whisper_json/WEEK_3_-_LEC_14.json
[DEBUG] Итоговый текст сформирован. Длина текста: 5448 символов
[INFO] TXT файл сохранён: /home/jupyter/datasphere/s3/transcribation-project/test/work/whisper_text/WEEK_3_-_LEC_14.txt


In [None]:
# # Этап 2: Вставка меток изображений в транскрипцию и обновление TXT-файла
# process_frame_markers(video_directory, whisper_json_directory, whisper_text_directory, extracted_frames_base)
