In [None]:
!pip install moviepy pydub librosa soundfile validators transformers torch 

In [None]:
import os
import requests
import numpy as np
import librosa
import soundfile as sf
from moviepy import VideoFileClip
from pydub import AudioSegment
from io import BytesIO
import validators

def extract_audio(source, output_path=None, format="wav"):
    """
    Extract audio from a URL or local file path (audio or video).
    
    Args:
        source (str): URL or file path of the audio or video file
        output_path (str, optional): Path to save the extracted audio. If None, audio is not saved to disk.
        format (str, optional): Format of the output audio file. Default is "wav".
        
    Returns:
        tuple: (audio_array, sample_rate) if output_path is None, otherwise the path to the saved audio file
    """
    # Check if source is a URL or a file path
    is_url = validators.url(source) or source.startswith(('http://', 'https://'))
    
    if is_url:
        return extract_audio_from_url(source, output_path, format)
    else:
        # Check if it's a video or audio file
        if source.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv')):
            return extract_audio_from_video(source, output_path, format)
        elif source.lower().endswith(('.mp3', '.wav', '.ogg', '.flac', '.aac')):
            return extract_audio_from_audio_file(source, output_path, format)
        else:
            raise ValueError(f"Unsupported file format: {source}")

def extract_audio_from_url(url, output_path=None, format="wav"):
    """
    Extract audio from a URL pointing to an audio or video file.
    
    Args:
        url (str): URL of the audio or video file
        output_path (str, optional): Path to save the extracted audio. If None, audio is not saved to disk.
        format (str, optional): Format of the output audio file. Default is "wav".
        
    Returns:
        tuple: (audio_array, sample_rate) if output_path is None, otherwise the path to the saved audio file
    """
    # Download the file
    response = requests.get(url, stream=True)
    if response.status_code != 200:
        raise Exception(f"Failed to download file: {response.status_code}")
    
    content_type = response.headers.get('content-type', '')
    
    # Process based on content type
    if 'video' in content_type:
        # Handle video file
        temp_video_path = "temp_video.mp4"
        with open(temp_video_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        
        # Extract audio from video
        video = VideoFileClip(temp_video_path)
        audio = video.audio
        
        if output_path:
            audio.write_audiofile(output_path)
            video.close()
            os.remove(temp_video_path)
            return output_path
        else:
            temp_audio_path = "temp_audio.wav"
            audio.write_audiofile(temp_audio_path)
            audio_array, sample_rate = librosa.load(temp_audio_path, sr=None)
            video.close()
            os.remove(temp_video_path)
            os.remove(temp_audio_path)
            return audio_array, sample_rate
            
    elif 'audio' in content_type:
        # Handle audio file
        audio_data = BytesIO(response.content)
        
        try:
            # Try loading with librosa
            audio_array, sample_rate = librosa.load(audio_data, sr=None)
            
            if output_path:
                sf.write(output_path, audio_array, sample_rate)
                return output_path
            else:
                return audio_array, sample_rate
                
        except Exception:
            # Fallback to pydub
            audio = AudioSegment.from_file(audio_data)
            
            if output_path:
                audio.export(output_path, format=format)
                return output_path
            else:
                # Convert to numpy array
                samples = np.array(audio.get_array_of_samples())
                if audio.channels > 1:
                    samples = samples.reshape((-1, audio.channels))
                return samples, audio.frame_rate
    else:
        # Try to guess based on file extension
        file_ext = url.split('.')[-1].lower()
        if file_ext in ['mp4', 'avi', 'mov', 'mkv', 'flv', 'wmv']:
            # Treat as video
            temp_video_path = f"temp_video.{file_ext}"
            with open(temp_video_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
            
            return extract_audio_from_video(temp_video_path, output_path, format)
        else:
            # Try as audio
            try:
                audio_data = BytesIO(response.content)
                return extract_audio_from_audio_file(audio_data, output_path, format)
            except Exception as e:
                raise Exception(f"Could not process content: {e}")

def extract_audio_from_video(video_path, output_path=None, format="wav"):
    """
    Extract audio from a local video file.
    
    Args:
        video_path (str): Path to the video file
        output_path (str, optional): Path to save the extracted audio. If None, audio is not saved to disk.
        format (str, optional): Format of the output audio file. Default is "wav".
        
    Returns:
        tuple: (audio_array, sample_rate) if output_path is None, otherwise the path to the saved audio file
    """
    # Load video
    video = VideoFileClip(video_path)
    audio = video.audio
    
    if output_path:
        # Save audio to file
        audio.write_audiofile(output_path)
        video.close()
        return output_path
    else:
        # Return audio as numpy array
        temp_audio_path = "temp_audio.wav"
        audio.write_audiofile(temp_audio_path)
        audio_array, sample_rate = librosa.load(temp_audio_path, sr=None)
        video.close()
        os.remove(temp_audio_path)
        return audio_array, sample_rate

def extract_audio_from_audio_file(audio_path, output_path=None, format="wav"):
    """
    Process an audio file (either from a file path or a BytesIO object).
    
    Args:
        audio_path: Path to the audio file or BytesIO object
        output_path (str, optional): Path to save the processed audio. If None, audio is not saved to disk.
        format (str, optional): Format of the output audio file. Default is "wav".
        
    Returns:
        tuple: (audio_array, sample_rate) if output_path is None, otherwise the path to the saved audio file
    """
    try:
        # Try loading with librosa
        audio_array, sample_rate = librosa.load(audio_path, sr=None)
        
        if output_path:
            sf.write(output_path, audio_array, sample_rate)
            return output_path
        else:
            return audio_array, sample_rate
            
    except Exception:
        # Fallback to pydub
        audio = AudioSegment.from_file(audio_path)
        
        if output_path:
            audio.export(output_path, format=format)
            return output_path
        else:
            # Convert to numpy array
            samples = np.array(audio.get_array_of_samples())
            if audio.channels > 1:
                samples = samples.reshape((-1, audio.channels))
            return samples, audio.frame_rate

# Example usage
# 1. Extract audio from a URL
# audio_data, sr = extract_audio("https://example.com/video.mp4")
# extract_audio("https://example.com/video.mp4", "output_audio.wav")

# 2. Extract audio from a local video file
# audio_data, sr = extract_audio("input_video.mp4")
import os
if os.path.exists("./file1.mp4"):
    extract_audio("./file1.mp4", "output_audio.wav")
else:
    print("File does not exist: ./file1.mp4")

# 3. Extract audio from a local audio file
# audio_data, sr = extract_audio("input_audio.mp3")
# extract_audio("input_audio.mp3", "output_audio.wav")



In [None]:
!pip install openai-whisper

In [4]:
# import whisper

# file_path = "output_audio.wav"

# model = whisper.load_model("base")
# result = model.transcribe(file_path)

In [None]:
import torch
from transformers import pipeline
import os

ffmpeg_path = "/opt/homebrew/bin/ffmpeg"
os.environ['PATH'] += f':{os.path.dirname(ffmpeg_path)}'

file_path = "output_audio.wav"

# Initialize the speech-to-text pipeline using Whisper
device = "cuda:0" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")

s2t_pipe = pipeline(
    "automatic-speech-recognition",
    model="openai/whisper-base",
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    device=device,
)

def transcribe_audio(audio_path):
    """
    Transcribe WAV audio file to text using Whisper model
    
    Args:
        audio_path (str): Path to the WAV audio file
        
    Returns:
        dict: Transcription result with timestamps
    """
    if not audio_path.endswith('.wav'):
        raise ValueError("Only WAV files are supported")
        
    result = s2t_pipe(audio_path)
    return result

# Example usage
if os.path.exists("output_audio.wav"):
    print("Transcribing audio...")
    transcription = transcribe_audio("output_audio.wav")
    print("Transcription result:")
    print(transcription)
else:
    print("Audio file not found. Make sure you have a WAV file to transcribe.")



In [None]:
!pip install langgraph

In [12]:
import os
from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END

# Определение структуры состояния
class LectureSummaryState(TypedDict):
    """Состояние для процесса суммаризации лекции."""
    lecture_text: str
    timing: str
    analysis_result: str
    summary_result: str
    markdown_result: str
    final_result: str

# Функция для разбиения текста на части
def chunk_text(text, max_chunk_size=16000):
    """Разбивает текст на управляемые фрагменты."""
    paragraphs = text.split('\n\n')
    chunks = []
    current_chunk = []
    current_size = 0
    
    for paragraph in paragraphs:
        paragraph_size = len(paragraph)
        
        if current_size + paragraph_size > max_chunk_size and current_chunk:
            chunks.append('\n\n'.join(current_chunk))
            current_chunk = [paragraph]
            current_size = paragraph_size
        else:
            current_chunk.append(paragraph)
            current_size += paragraph_size + 2  # +2 for '\n\n'
    
    if current_chunk:
        chunks.append('\n\n'.join(current_chunk))
        
    return chunks

class LectureSummarizer:
    """Класс для суммаризации лекций с LangGraph."""
    
    def __init__(self):
        # Инициализация LLM с указанными параметрами
        self.llm = ChatOpenAI(
            model=os.getenv("OPENAI_MODEL_NAME"),
            temperature=0,
            base_url=os.getenv("OPENAI_BASE_URL"),
            api_key=os.getenv("OPENAI_API_KEY"),
        )
        
        # Создание графа
        self.graph = self._create_graph()
    
    def _analyze_lecture(self, state: LectureSummaryState) -> LectureSummaryState:
        """Анализирует текст лекции."""
        prompt = f"""
        ВАЖНО: Проанализируйте ТОЛЬКО предоставленный текст лекции.
        
        Текст лекции:
        ```
        {state['lecture_text']}
        ```
        
        Информация о временных метках: {state['timing']}
        
        Ваша задача:
        1. Определить основные темы и разделы ЭТОЙ лекции
        2. Извлечь ключевые определения и понятия из текста
        3. Определить, какие части подходят для таблиц
        4. Отметить все временные метки
        
        НЕ ДОБАВЛЯЙТЕ информацию, которой нет в тексте.
        """
        
        result = self.llm.invoke(prompt)
        state["analysis_result"] = result.content
        return state
    
    def _summarize_content(self, state: LectureSummaryState) -> LectureSummaryState:
        """Создает краткое изложение разделов."""
        prompt = f"""
        Создайте краткие изложения разделов лекции на основе анализа.
        
        Текст лекции:
        ```
        {state['lecture_text']}
        ```
        
        Результаты анализа:
        ```
        {state['analysis_result']}
        ```
        
        Временные метки: {state['timing']}
        
        Для каждого раздела создайте краткое изложение, сохраняя:
        1. Ключевое содержание
        2. Точные определения
        3. Оригинальную структуру
        4. Временные метки
        
        НЕ СОЗДАВАЙТЕ нового содержания.
        """
        
        result = self.llm.invoke(prompt)
        state["summary_result"] = result.content
        return state
    
    def _generate_markdown(self, state: LectureSummaryState) -> LectureSummaryState:
        """Создает Markdown-документ."""
        prompt = f"""
        Создайте Markdown-документ на основе суммированного содержания.
        
        Текст лекции:
        ```
        {state['lecture_text']}
        ```
        
        Краткие изложения:
        ```
        {state['summary_result']}
        ```
        
        Документ должен включать:
        1. Настоящее название лекции
        2. Оглавление с разделами и временными метками
        3. Отформатированные разделы с содержанием
        4. Таблицы, где необходимо
        
        Пример оглавления:
        ```markdown
        # Основные философские течения в современном обществе
        
        ## Оглавление
        1. [Традиционализм (0:21)](#традиционализм)
        2. [Умеренный прогрессивизм: национальная школа (1:37)](#умеренный-прогрессивизм-национальная-школа)
        3. [Умеренный прогрессивизм: глобальная школа (3:33)](#умеренный-прогрессивизм-глобальная-школа)
        4. [Революционные движения (5:04)](#революционные-движения)
        ```
        
        Верните ТОЛЬКО Markdown без комментариев.
        """
        
        result = self.llm.invoke(prompt)
        state["markdown_result"] = result.content
        state["final_result"] = result.content  # Для одной части они совпадают
        return state
    
    def _create_graph(self):
        """Создает граф обработки."""
        builder = StateGraph(LectureSummaryState)
        
        # Добавляем узлы
        builder.add_node("analyzer", self._analyze_lecture)
        builder.add_node("summarizer", self._summarize_content)
        builder.add_node("markdown_generator", self._generate_markdown)
        
        # Определяем последовательность
        builder.add_edge(START, "analyzer")
        builder.add_edge("analyzer", "summarizer")
        builder.add_edge("summarizer", "markdown_generator")
        builder.add_edge("markdown_generator", END)
        
        # Компилируем граф
        return builder.compile()
    
    def _merge_markdown_documents(self, markdown_parts):
        """Объединяет несколько Markdown-документов."""
        prompt = f"""
        Объедините эти Markdown-документы в один целостный документ:
        
        ```
        {markdown_parts}
        ```
        
        Обеспечьте:
        1. Единое оглавление со всеми разделами
        2. Последовательное содержание без повторов
        3. Согласованное форматирование
        4. Сохранение всех временных меток
        
        Верните ТОЛЬКО объединенный Markdown без комментариев.
        """
        
        return self.llm.invoke(prompt).content
    
    def process_lecture(self, lecture_data, max_chunk_size=16000):
        """Обрабатывает лекцию с учетом таймингов."""
        # Подготовка данных
        if isinstance(lecture_data, str):
            chunks = chunk_text(lecture_data, max_chunk_size)
            structured_chunks = [(chunk, None) for chunk in chunks]
        elif isinstance(lecture_data, dict):
            structured_chunks = list(lecture_data.items())
        elif isinstance(lecture_data, list) and all(isinstance(item, tuple) for item in lecture_data):
            structured_chunks = lecture_data
        else:
            raise ValueError("Неверный формат данных лекции")
        
        # Если только одна часть
        if len(structured_chunks) == 1:
            text, timing = structured_chunks[0]
            state = {
                "lecture_text": text,
                "timing": timing if timing else "",
                "analysis_result": "",
                "summary_result": "",
                "markdown_result": "",
                "final_result": ""
            }
            
            result = self.graph.invoke(state)
            return result["final_result"]
        
        # Обработка нескольких частей
        print(f"Лекция разбита на {len(structured_chunks)} частей")
        markdown_parts = []
        
        for i, (text, timing) in enumerate(structured_chunks):
            print(f"Обработка части {i+1}/{len(structured_chunks)}...")
            
            state = {
                "lecture_text": text,
                "timing": timing if timing else "",
                "analysis_result": "",
                "summary_result": "",
                "markdown_result": "",
                "final_result": ""
            }
            
            try:
                result = self.graph.invoke(state)
                markdown_parts.append(result["markdown_result"])
                print(f"Часть {i+1} обработана успешно")
            except Exception as e:
                print(f"Ошибка при обработке части {i+1}: {e}")
        
        # Объединение результатов
        if len(markdown_parts) > 1:
            print("Объединение всех частей...")
            markdown_parts_text = "\n\n===DOCUMENT SEPARATOR===\n\n".join(markdown_parts)
            final_markdown = self._merge_markdown_documents(markdown_parts_text)
            return final_markdown
        elif len(markdown_parts) == 1:
            return markdown_parts[0]
        else:
            raise ValueError("Не удалось обработать ни одну часть лекции")

def summarize_lecture_with_timings(lecture_data, max_chunk_size=16000):
    """
    Создает структурированный Markdown-документ с конспектом лекции.
    
    Args:
        lecture_data: текст лекции (строка), словарь {текст: время} или список [(текст, время)]
        max_chunk_size: максимальный размер фрагмента для обработки (по умолчанию 16000)
        
    Returns:
        Markdown с суммаризацией лекции и таймингами
    """
    summarizer = LectureSummarizer()
    return summarizer.process_lecture(lecture_data, max_chunk_size=max_chunk_size)

In [None]:
# %env OPENAI_BASE_URL=http://localhost:11434
# %env OPENAI_MODEL_NAME=ollama/qwen2.5:latest
%env OPENAI_BASE_URL=https://openrouter.ai/api/v1
%env OPENAI_MODEL_NAME=qwen/qwen-2.5-72b-instruct

In [None]:
transcription["text"]

In [None]:

# os.environ["OPENAI_BASE_URL"] = "http://localhost:11434"
# os.environ["OPENAI_MODEL_NAME"] = "ollama/qwen2.5:latest"
# os.environ["OPENAI_BASE_URL"] = "https://openrouter.ai/api/v1"
# os.environ["OPENAI_MODEL_NAME"] = "openrouter/qwen/qwq-32b:free"


text = summarize_lecture_with_timings(transcription["text"])

In [None]:
# Save the summarized lecture text to a file
with open("lecture_summary.md", "w", encoding="utf-8") as f:
    f.write(text)
    
print(f"Lecture summary saved to lecture_summary.md")


In [None]:
import langchain_openai
langchain_openai