<a href="https://colab.research.google.com/github/webremake/video-transcription/blob/main/video_summary_python_llm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<a href="https://colab.research.google.com/github/meta-llama/llama-recipes/blob/main/recipes/use_cases/VideoSummary.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## This demo app shows:
* How to use LangChain's YoutubeLoader to retrieve the caption in a YouTube video
* How to ask Llama 3 to summarize the content (per the Llama's input size limit) of the video in a naive way using LangChain's stuff method
* How to bypass the limit of Llama 3's 8k context length limit by using a more sophisticated way using LangChain's `refine` and `map_reduce` methods - see [here](https://python.langchain.com/docs/use_cases/summarization) for more info

We start by installing the necessary packages:
- [youtube-transcript-api](https://pypi.org/project/youtube-transcript-api/) API to get transcript/subtitles of a YouTube video
- [langchain](https://python.langchain.com/docs/get_started/introduction) provides necessary RAG tools for this demo
- [tiktoken](https://github.com/openai/tiktoken) BytePair Encoding tokenizer
- [pytube](https://pytube.io/en/latest/) Utility for downloading YouTube videos

In [2]:
!pip install langchain youtube-transcript-api tiktoken pytube replicate

Collecting youtube-transcript-api
  Downloading youtube_transcript_api-0.6.2-py3-none-any.whl.metadata (15 kB)
Collecting tiktoken
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl.metadata (5.0 kB)
Collecting replicate
  Downloading replicate-1.0.3-py3-none-any.whl.metadata (25 kB)
Downloading youtube_transcript_api-0.6.2-py3-none-any.whl (24 kB)
Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m18.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pytube-15.0.0-py3-none-any.whl (57 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading replicate-1.0.3-py3-none-any.whl (46 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.7/46.7 kB[0m [31m2

Let's first load a long (2:47:16) YouTube video (Lex Fridman with Yann Lecun: Meta AI, Open Source, Limits of LLMs, AGI & the Future of AI) transcript using the YoutubeLoader.

In [None]:
!pip install -U langchain-community

In [None]:
from langchain.document_loaders import YoutubeLoader

loader = YoutubeLoader.from_youtube_url(
    "https://www.youtube.com/watch?v=5t1vTLU7s40", add_video_info=True
)

In [None]:
!pip install --upgrade pytube

In [None]:
# load the youtube video caption into Documents
docs = loader.load()

In [6]:
from youtube_transcript_api import YouTubeTranscriptApi
from langchain.schema import Document

# Указываем ID видео
video_id = "5t1vTLU7s40"

try:
    # Загружаем субтитры через YouTubeTranscriptApi
    transcript = YouTubeTranscriptApi.get_transcript(video_id)

    # Преобразуем субтитры в единый текстовый блок, чтобы он больше походил на цельный документ
    combined_text = "\n".join([item['text'] for item in transcript])

    # Создаем объект Document для всего текста, чтобы формат соответствовал требованиям
    docs = [
        Document(
            page_content=combined_text,
            metadata={"source": f"https://youtu.be/{video_id}"}
        )
    ]

    # Проверяем количество символов и выводим часть контента
    print(len(docs[0].page_content), docs[0].page_content[:300], len(docs))
    print(transcript)
except Exception as e:
    print(f"Ошибка при загрузке субтитров: {e}")

142689 - I see the danger of this
concentration of power
through proprietary AI systems
as a much bigger danger
than everything else.
What works against this
is people who think that
for reasons of security,
we should keep AI systems
under lock and key
because it's too dangerous
to put it in the hands of e 1
[{'text': '- I see the danger of this\nconcentration of power', 'start': 0.12, 'duration': 2.52}, {'text': 'through proprietary AI systems', 'start': 2.64, 'duration': 3.39}, {'text': 'as a much bigger danger\nthan everything else.', 'start': 6.03, 'duration': 2.91}, {'text': 'What works against this', 'start': 8.94, 'duration': 2.49}, {'text': 'is people who think that\nfor reasons of security,', 'start': 11.43, 'duration': 3.75}, {'text': 'we should keep AI systems\nunder lock and key', 'start': 15.18, 'duration': 3.42}, {'text': "because it's too dangerous", 'start': 18.6, 'duration': 0.96}, {'text': 'to put it in the hands of everybody.', 'start': 19.56, 'duration': 2.52}, {'te

In [None]:
# check how many characters in the doc and some content
len(docs[0].page_content), docs[0].page_content[:300], len(docs)

In [None]:
# Смотрим полный текст субтитров
full_text = docs[0].page_content
print(full_text)  # Вывод полного текста субтитров в консоль

You should see 142689 returned for the doc character length, which is about 30k words or 40k tokens, beyond the 8k context length limit of Llama 3. You'll see how to summarize a text longer than the limit.

**Note:** We will be using [Replicate](https://replicate.com/meta/meta-llama-3-8b-instruct) to run the examples here. You will need to first sign in with Replicate with your github account, then create a free API token [here](https://replicate.com/account/api-tokens) that you can use for a while. You can also use other Llama 3 cloud providers such as [Groq](https://console.groq.com/), [Together](https://api.together.xyz/playground/language/meta-llama/Llama-3-8b-hf), or [Anyscale](https://app.endpoints.anyscale.com/playground) - see Section 2 of the Getting to Know Llama [notebook](https://github.com/meta-llama/llama-recipes/blob/main/recipes/quickstart/Getting_to_know_Llama.ipynb) for more info.

If you'd like to run Llama 3 locally for the benefits of privacy, no cost or no rate limit (some Llama 3 hosting providers set limits for free plan of queries or tokens per second or minute), see [Running Llama Locally](https://github.com/meta-llama/llama-recipes/blob/main/recipes/quickstart/Running_Llama2_Anywhere/Running_Llama_on_Mac_Windows_Linux.ipynb).

In [None]:
# enter your Replicate API token, or you can use local Llama. See README for more info
from getpass import getpass
import os

TOGETHERAI_API_TOKEN = getpass()
os.environ["TOGETHERAI_API_TOKEN"] = TOGETHERAI_API_TOKEN


Next you'll call the Llama 3 70b chat model from Replicate because it's more powerful than the Llama 3 8b chat model when summarizing long text. You may also try Llama 3 8b model by replacing the `model` name with "meta/meta-llama-3-8b-instruct".

In [None]:
!pip install together langchain -U

from langchain_together import ChatTogether

llm = ChatTogether(
    model="meta-llama/Llama-3.2-3B-Instruct-Turbo",
    together_api_key=TOGETHERAI_API_TOKEN,
    temperature=0.0,
    top_p=1,
    max_tokens=14000
)

Once everything is set up, you can prompt Llama 3 to summarize the first 4000 characters of the transcript.

**Note:** The context length of 8k tokens in Llama 3 is roughly 6000-7000 words or 32k characters, so you should be able to use a number larger than 4000.

In [None]:
text = docs[0].page_content
prompt = f"Make a summury for text below: {text}."
response = llm.generate([prompt])

# Извлекаем текст ответа из объекта response
summary = response.generations[0][0].text
print(summary)

You can try a larger text to see how the summary differs.

In [None]:
text = docs[0].page_content[:10000]
prompt = f"Make a summury for text below: {text}."
response = llm.generate([prompt])

# Извлекаем текст ответа из объекта response
summary = response.generations[0][0].text
print(summary)

If you try the whole content which has over 142k characters, about 40k tokens, which exceeds the 8k limit, you'll get an empty result (Replicate used to return an error "RuntimeError: Your input is too long.").

In [None]:
# this will generate an empty result because the input exceeds Llama 3's context length limit
text = docs[0].page_content
summary = llm.invoke(f"Give me a summary of the text below: {text}.")
print(summary)

To fix this, you can use LangChain's `load_summarize_chain` method (detail [here](https://python.langchain.com/docs/use_cases/summarization)).

First you'll create splits or sub-documents of the original content, then use the LangChain's `load_summarize_chain` with the `refine` or `map_reduce` type.

Because this may involve many calls to Llama 3, it'd be great to set up a quick free LangChain API key [here](https://smith.langchain.com/settings), run the following cell to set up necessary environment variables, and check the logs on [LangSmith](https://docs.smith.langchain.com) during and after the run.

In [None]:
import os
os.environ["LANGCHAIN_API_KEY"] = "your_langchain_api_key"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Video Summary with Llama 3"

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

# we need to split the long input text
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=1000, chunk_overlap=0
)
split_docs = text_splitter.split_documents(docs)

In [None]:
# check the splitted docs lengths
len(split_docs), len(docs), len(split_docs[0].page_content), len(docs[0].page_content)

The `refine` type implements the following steps under the hood:

1. Call Llama 3 on the first sub-document to generate a concise summary;
2. Loop over each subsequent sub-document, pass the previous summary with the current sub-document to generate a refined new summary;
3. Return the final summary generated on the final sub-document as the final answer - the summary of the whole content.

An example prompt template for each call in step 2, which gets used under the hood by LangChain, is:
```
Your job is to produce a final summary.
We have provided an existing summary up to a certain point:
<previous_summary>
Refine the existing summary (only if needed) with some more content below:
<new_content>
```

**Note:** The following call will make 33 calls to Llama 3 and genereate the final summary in about 10 minutes. The complete log of the the calls with inputs and outputs is [here](https://smith.langchain.com/public/7f23d823-926f-4874-bbd7-b509328a94bf/r).

In [None]:
from langchain.chains.summarize import load_summarize_chain

chain = load_summarize_chain(llm, chain_type="refine")
chain.run(split_docs)

You can also set `chain_type` to [`map_reduce`](https://python.langchain.com/docs/modules/chains/document/map_reduce) to generate the summary of the entire content using the standard map and reduce method, which works behind the scene by first mapping each split document to a sub-summary via a call to LLM, then combines all those sub-summaries into a single final summary by yet another call to LLM.

**Note:** The following call takes about 3 minutes and all the calls to Llama 3 with inputs and outputs can be traced [here](https://smith.langchain.com/public/e54fad15-91ad-44a0-8d8f-f27a0d880b04/r).

In [None]:
chain = load_summarize_chain(llm, chain_type="map_reduce")
chain.run(split_docs)

One final `chain_type` you can set is `stuff`, but it won't work with large documents because it stuffs all the split documents into one and uses it in a single prompt which exceeds the Llama 3 context length limit.

In [None]:
# this will return nothing
chain = load_summarize_chain(llm, chain_type="stuff")
chain.run(split_docs)

In [7]:
from youtube_transcript_api import YouTubeTranscriptApi

# Указываем ID видео
video_id = "5t1vTLU7s40"

try:
    # Загружаем субтитры через YouTubeTranscriptApi
    transcript = YouTubeTranscriptApi.get_transcript(video_id)

    print(transcript)
except Exception as e:
    print(f"Ошибка при загрузке субтитров: {e}")

[{'text': '- I see the danger of this\nconcentration of power', 'start': 0.12, 'duration': 2.52}, {'text': 'through proprietary AI systems', 'start': 2.64, 'duration': 3.39}, {'text': 'as a much bigger danger\nthan everything else.', 'start': 6.03, 'duration': 2.91}, {'text': 'What works against this', 'start': 8.94, 'duration': 2.49}, {'text': 'is people who think that\nfor reasons of security,', 'start': 11.43, 'duration': 3.75}, {'text': 'we should keep AI systems\nunder lock and key', 'start': 15.18, 'duration': 3.42}, {'text': "because it's too dangerous", 'start': 18.6, 'duration': 0.96}, {'text': 'to put it in the hands of everybody.', 'start': 19.56, 'duration': 2.52}, {'text': 'That would lead to a very bad future', 'start': 22.08, 'duration': 3.3}, {'text': 'in which all of our information diet', 'start': 25.38, 'duration': 2.31}, {'text': 'is controlled by a small\nnumber of companies', 'start': 27.69, 'duration': 3.21}, {'text': 'through proprietary systems.', 'start': 30.9

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
import re
from datetime import timedelta

def format_time(seconds):
    """Конвертирует секунды в формат hh:mm:ss"""
    # Округляем до целых секунд
    seconds = round(seconds)
    # Создаем timedelta и форматируем
    time = str(timedelta(seconds=seconds))
    # Если длительность меньше часа, timedelta вернет формат MM:SS
    # Добавляем часы если их нет
    if len(time.split(':')) == 2:
        time = '00:' + time
    return time

def clean_text(text):
    """Очищает текст от специальных символов и лишних пробелов"""
    # Заменяем переносы строк на пробелы
    text = text.replace('\n', ' ')
    # Удаляем специальные символы и двойные пробелы
    text = re.sub(r'\s+', ' ', text)
    # Удаляем другие специальные символы, но оставляем базовую пунктуацию
    text = re.sub(r'[^\w\s.,!?()-]', '', text)
    return text.strip()

def format_transcript(transcript_data):
    current_text = []
    formatted_blocks = []
    block_start = None

    for item in transcript_data:
        # Очищаем текст от спецсимволов
        cleaned_item_text = clean_text(item['text'])

        # Если это начало новой реплики (начинается с тире)
        if cleaned_item_text.strip().startswith('-'):
            # Если у нас есть предыдущий блок, сохраним его
            if current_text:
                formatted_blocks.append({
                    'start': block_start,
                    'text': ' '.join(current_text)
                })
                current_text = []

            block_start = item['start']
            current_text.append(cleaned_item_text)
        else:
            current_text.append(cleaned_item_text)

    # Добавляем последний блок
    if current_text:
        formatted_blocks.append({
            'start': block_start,
            'text': ' '.join(current_text)
        })

    # Форматируем результат
    formatted_lines = []
    for block in formatted_blocks:
        # Форматируем время в hh:mm:ss
        time_str = format_time(block['start'])
        formatted_lines.append(time_str)
        formatted_lines.append(block['text'])
        formatted_lines.append("")  # Пустая строка между блоками

    return '\n'.join(formatted_lines)

def process_youtube_transcript(video_id):
    try:
        # Загружаем субтитры
        transcript = YouTubeTranscriptApi.get_transcript(video_id)

        # Форматируем транскрипт
        formatted_text = format_transcript(transcript)

        # Сохраняем в файл
        output_filename = f"transcript_{video_id}.txt"
        with open(output_filename, 'w', encoding='utf-8') as f:
            f.write(formatted_text)

        print(f"Транскрипт сохранен в файл: {output_filename}")

        # Выводим на экран
        print("\nОтформатированный транскрипт:")
        print(formatted_text)

    except Exception as e:
        print(f"Ошибка при обработке субтитров: {e}")

# Пример использования
video_id = "5t1vTLU7s40"
process_youtube_transcript(video_id)

Добавлена функция split_long_block(), которая:

Разбивает текст на предложения
Группирует предложения по 3 (или другому указанному количеству)
Вычисляет время начала для каждого нового подблока
Возвращает список подблоков с их временными метками


Добавлен параметр max_sentences, который позволяет контролировать максимальное количество предложений в одном блоке
Модифицирована основная логика для использования разбиения длинных блоков

In [None]:
from youtube_transcript_api import YouTubeTranscriptApi
import re
from datetime import timedelta

def format_time(seconds):
    """Конвертирует секунды в формат hh:mm:ss"""
    seconds = round(seconds)
    time = str(timedelta(seconds=seconds))
    if len(time.split(':')) == 2:
        time = '00:' + time
    return time

def clean_text(text):
    """Очищает текст от специальных символов и лишних пробелов"""
    text = text.replace('\n', ' ')
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'[^\w\s.,!?()-]', '', text)
    return text.strip()

def split_long_block(block_items, max_sentences=3):
    """Разбивает длинный блок текста на подблоки, сохраняя точные временные метки"""
    # Собираем предложения с их временными метками
    sentences = []
    current_sentence = []
    current_items = []

    for item in block_items:
        text = clean_text(item['text'])
        # Проверяем, заканчивается ли текст знаком препинания
        if re.search(r'[.!?]$', text):
            current_sentence.append(text)
            current_items.append(item)
            sentences.append({
                'text': ' '.join(current_sentence),
                'items': current_items,
                'start': current_items[0]['start']  # берем время начала первого айтема
            })
            current_sentence = []
            current_items = []
        else:
            current_sentence.append(text)
            current_items.append(item)

    # Добавляем последнее предложение, если оно есть
    if current_sentence:
        sentences.append({
            'text': ' '.join(current_sentence),
            'items': current_items,
            'start': current_items[0]['start']
        })

    # Если предложений меньше или равно max_sentences, возвращаем как один блок
    if len(sentences) <= max_sentences:
        return [{
            'start': block_items[0]['start'],
            'text': ' '.join(s['text'] for s in sentences)
        }]

    # Разбиваем на подблоки
    sub_blocks = []
    current_sentences = []
    sentence_count = 0

    for sentence in sentences:
        current_sentences.append(sentence)
        sentence_count += 1

        if sentence_count == max_sentences or sentence == sentences[-1]:
            sub_blocks.append({
                'start': current_sentences[0]['start'],  # используем точное время начала первого предложения в блоке
                'text': ' '.join(s['text'] for s in current_sentences)
            })
            current_sentences = []
            sentence_count = 0

    return sub_blocks

def format_transcript(transcript_data, max_sentences=3):
    # Группируем айтемы по блокам (начинающимся с тире)
    current_block_items = []
    all_blocks = []

    for item in transcript_data:
        cleaned_text = clean_text(item['text'])

        if cleaned_text.strip().startswith('-'):
            if current_block_items:
                # Разбиваем предыдущий блок
                sub_blocks = split_long_block(current_block_items, max_sentences)
                all_blocks.extend(sub_blocks)
                current_block_items = []
            current_block_items = [item]
        else:
            if current_block_items:  # добавляем только если уже есть начало блока
                current_block_items.append(item)

    # Обрабатываем последний блок
    if current_block_items:
        sub_blocks = split_long_block(current_block_items, max_sentences)
        all_blocks.extend(sub_blocks)

    # Форматируем результат
    formatted_lines = []
    for block in all_blocks:
        time_str = format_time(block['start'])
        formatted_lines.append(time_str)
        formatted_lines.append(block['text'])
        formatted_lines.append("")  # Пустая строка между блоками

    return '\n'.join(formatted_lines)

def process_youtube_transcript(video_id, max_sentences=3):
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        formatted_text = format_transcript(transcript, max_sentences)

        output_filename = f"transcript_{video_id}.txt"
        with open(output_filename, 'w', encoding='utf-8') as f:
            f.write(formatted_text)

        print(f"Транскрипт сохранен в файл: {output_filename}")
        print("\nОтформатированный транскрипт:")
        print(formatted_text)

    except Exception as e:
        print(f"Ошибка при обработке субтитров: {e}")

# Пример использования
video_id = "5t1vTLU7s40"
process_youtube_transcript(video_id, max_sentences=3)

# Основные изменения:

Код теперь извлекает отдельные предложения с их точными временными метками
Удалены все механизмы группировки по блокам
Результат сохраняется в двух форматах:

Текстовый файл для просмотра
JSON файл для последующей обработки через LLM



JSON файл будет иметь структуру:
jsonCopy[
  {
    "time": 0.12,
    "text": "This is the first sentence."
  },
  {
    "time": 5.45,
    "text": "This is the second sentence."
  }
  // ...
]
Этот формат будет удобен для последующей обработки через LLM, где вы сможете:

Группировать предложения по темам
Создавать иерархическую структуру с заголовками
Формировать логические абзацы
Сохранять временные метки для навигации по видео

In [11]:
from youtube_transcript_api import YouTubeTranscriptApi
import re
from datetime import timedelta

def format_time(seconds):
    """Конвертирует секунды в формат hh:mm:ss"""
    seconds = round(seconds)
    time = str(timedelta(seconds=seconds))
    if len(time.split(':')) == 2:
        time = '00:' + time
    return time

def clean_text(text):
    """Очищает текст от специальных символов и лишних пробелов"""
    text = text.replace('\n', ' ')
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'[^\w\s.,!?()-]', '', text)
    return text.strip()

def extract_sentences(transcript_data):
    """Извлекает предложения с их временными метками"""
    sentences = []
    current_sentence = []
    sentence_start_time = None

    for item in transcript_data:
        clean_item_text = clean_text(item['text'])

        # Если это начало предложения, сохраняем время
        if not sentence_start_time:
            sentence_start_time = item['start']

        # Добавляем текст к текущему предложению
        current_sentence.append(clean_item_text)

        # Если находим конец предложения
        if re.search(r'[.!?]$', clean_item_text):
            # Собираем полное предложение
            full_sentence = ' '.join(current_sentence)
            # Удаляем тире в начале предложения, если оно есть
            full_sentence = re.sub(r'^-\s*', '', full_sentence)

            sentences.append({
                'time': sentence_start_time,
                'text': full_sentence
            })

            # Сбрасываем накопители
            current_sentence = []
            sentence_start_time = None

    # Добавляем последнее предложение, если оно есть
    if current_sentence:
        full_sentence = ' '.join(current_sentence)
        full_sentence = re.sub(r'^-\s*', '', full_sentence)
        sentences.append({
            'time': sentence_start_time,
            'text': full_sentence
        })

    return sentences

def format_output(sentences):
    """Форматирует предложения для вывода"""
    formatted_lines = []
    for sentence in sentences:
        time_str = format_time(sentence['time'])
        formatted_lines.append(time_str)
        formatted_lines.append(sentence['text'])
        formatted_lines.append("")  # Пустая строка между предложениями

    return '\n'.join(formatted_lines)

def process_youtube_transcript(video_id):
    try:
        # Загружаем субтитры
        transcript = YouTubeTranscriptApi.get_transcript(video_id)

        # Извлекаем предложения
        sentences = extract_sentences(transcript)

        # Сохраняем в файл
        output_filename = f"transcript_{video_id}.txt"
        formatted_text = format_output(sentences)

        with open(output_filename, 'w', encoding='utf-8') as f:
            f.write(formatted_text)

        # Также сохраняем в JSON для последующей обработки через LLM
        import json
        json_filename = f"transcript_{video_id}.json"
        with open(json_filename, 'w', encoding='utf-8') as f:
            json.dump(sentences, f, ensure_ascii=False, indent=2)

        print(f"Транскрипт сохранен в файлы:\n{output_filename}\n{json_filename}")
        print("\nПример форматированного текста:")
        print(formatted_text[:500] + "...\n")  # Показываем только начало для примера

    except Exception as e:
        print(f"Ошибка при обработке субтитров: {e}")

# Пример использования
video_id = "5t1vTLU7s40"
process_youtube_transcript(video_id)

Транскрипт сохранен в файлы:
transcript_5t1vTLU7s40.txt
transcript_5t1vTLU7s40.json

Пример форматированного текста:
0:00:00
I see the danger of this concentration of power through proprietary AI systems as a much bigger danger than everything else.

0:00:09
What works against this is people who think that for reasons of security, we should keep AI systems under lock and key because its too dangerous to put it in the hands of everybody.

0:00:22
That would lead to a very bad future in which all of our information diet is controlled by a small number of companies through proprietary systems.

0:00:32
I believe ...

