In [4]:
from dotenv import load_dotenv
load_dotenv()

from app.settings import Settings, init_openai
init_openai()

In [7]:
import os
from llama_index.postprocessor.cohere_rerank import CohereRerank

cohere_rerank = CohereRerank(api_key=os.environ["COHERE_API_KEY"], top_n=5)

In [14]:
from youtube_transcript_api import YouTubeTranscriptApi
from typing import List, Dict, Any
from googleapiclient.discovery import build
from llama_index.core import Document
from llama_index.core.schema import NodeWithScore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.storage import StorageContext
from llama_index.core import VectorStoreIndex

from llama_index.core.extractors import (
    SummaryExtractor,
    QuestionsAnsweredExtractor
)
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.core.ingestion import IngestionPipeline

import os

from app.settings import Settings
class YouTubeTranscriptRetriever():
    def __init__(
        self,
        top_k: int = 10,
        chunk_size: int = 512,
        chunk_overlap: int = 64
    ):
        google_api_key = os.getenv("GOOGLE_API_KEY")
        if not google_api_key:
            raise ValueError("GOOGLE_API_KEY is not set")   
        self.youtube = build('youtube', 'v3', developerKey=google_api_key)
        self.embedding_model = Settings.embed_model
        self.top_k = top_k
        self.node_parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        cohere_api_key = os.getenv("COHERE_API_KEY")    
        if not cohere_api_key:
            raise ValueError("COHERE_API_KEY is not set")
        self.reranker = CohereRerank(api_key=cohere_api_key, top_n=5)

    def _search_videos(self, query: str, max_results: int = 15) -> List[str]:
        """Search YouTube videos and return video IDs."""
        request = self.youtube.search().list(
            q=query,
            part='id',
            maxResults=max_results,
            type='video'
        )
        response = request.execute()
        return [item['id']['videoId'] for item in response['items']]

    def _get_transcript(self, video_id: str) -> Dict[str, Any]:
        """Get transcript with timestamps for a video."""
        try:
            transcript = YouTubeTranscriptApi.get_transcript(video_id)
            return {
                'video_id': video_id,
                'transcript': transcript
            }
        except:
            return None

    def _process_transcript(self, transcript_data: Dict[str, Any]) -> List[Document]:
        """Process transcript into documents with metadata."""
        if not transcript_data:
            return []
        
        full_text = " ".join([entry['text'] for entry in transcript_data['transcript']])
        nodes = self.node_parser.get_nodes_from_documents(
            [Document(text=full_text)]
        )
        
        # Add timestamp metadata to nodes
        for node in nodes:
            # Find closest timestamp for the start of this chunk
            text_start = node.text[:50]  # Use start of chunk to find position
            for entry in transcript_data['transcript']:
                if entry['text'] in text_start:
                    node.metadata = {
                        'video_id': transcript_data['video_id'],
                        'timestamp': entry['start'],
                        'url': f"https://youtube.com/watch?v={transcript_data['video_id']}&t={int(entry['start'])}s"
                    }
                    break
        
        return nodes

    def _retrieve(self, query: str, **kwargs) -> List[NodeWithScore]:
        """Retrieve relevant transcript chunks for a query."""
        # Search videos
        video_ids = self._search_videos(query)
        print("Video IDs: ", video_ids)
        print("\n\n")

        # Get transcripts and do initial reranking of full transcripts
        transcripts = []
        for video_id in video_ids:
            transcript_data = self._get_transcript(video_id)
            if transcript_data:
                full_text = " ".join([entry['text'] for entry in transcript_data['transcript']])
                transcripts.append({
                    'text': full_text,
                    'data': transcript_data
                })
        print("Transcripts: ", transcripts)
        print("\n\n")
        # Rerank full transcripts
        if not transcripts:
            return []
        
        transcript_nodes = [NodeWithScore(node=Document(text=t['text'], metadata={'idx': i}), score=1.0) for i, t in enumerate(transcripts)]
        reranked_transcripts = self.reranker.postprocess_nodes(
            transcript_nodes,
            query_str=query
        )[:5]  # Get top 5 most relevant videos
        print("Reranked transcripts: ", [transcript.node.metadata['idx'] for transcript in reranked_transcripts])
        print("Rerank transcript example: ", reranked_transcripts[0].node)
        print("\n\n")
        
        # Create documents from top transcripts with metadata
        documents = []
        for transcript in reranked_transcripts:
            transcript_text = transcript.node.text
            transcript_data = next(t['data'] for t in transcripts if t['text'] == transcript_text)
            
            # Create document with metadata
            doc = Document(
                text=transcript_text,
                metadata={
                    'video_id': transcript_data['data']['video_id'],
                    'transcript_data': transcript_data['data']['transcript']  # Keep transcript data for timestamp mapping
                }
            )
            documents.append(doc)
            
        print("Documents: ", documents)
        print("\n\n")
        print("Setting up ingestion pipeline")
        # Set up ingestion pipeline with extractors
        text_splitter = TokenTextSplitter(
            separator=" ", 
            chunk_size=512, 
            chunk_overlap=128
        )
        qa_extractor = QuestionsAnsweredExtractor(questions=3)
        summary_extractor = SummaryExtractor(summaries=1)

        pipeline = IngestionPipeline(
            transformations=[
                text_splitter, 
                qa_extractor,
                summary_extractor
            ]
        )

        # Process documents through pipeline
        nodes = pipeline.run(
            documents=documents,
            in_place=True,
            show_progress=True,
        )
        print("Pipeline complete")
        print("Adding timestamps to nodes")
        
        # Add timestamp metadata to nodes
        for node in nodes:
            video_id = node.metadata['video_id']
            transcript_data = node.metadata['transcript_data']
            
            # Find closest timestamp for the start of this chunk
            text_start = node.text[:50]  # Use start of chunk to find position
            for entry in transcript_data:
                if entry['text'] in text_start:
                    node.metadata.update({
                        'timestamp': entry['start'],
                        'url': f"https://youtube.com/watch?v={video_id}&t={int(entry['start'])}s"
                    })
                    break
        print("Nodes: ", nodes)
        print("\n\n")
        
        # Create vector store index from processed nodes
        storage_context = StorageContext.from_defaults()
        index = VectorStoreIndex(
            nodes, 
            storage_context=storage_context,
            embed_model=self.embedding_model
        )
        
        # Query the index
        retriever = index.as_retriever(similarity_top_k=self.top_k)
        return retriever.retrieve(query)

In [19]:
google_api_key = os.getenv("GOOGLE_API_KEY")
if not google_api_key:
    raise ValueError("GOOGLE_API_KEY is not set")   
youtube = build('youtube', 'v3', developerKey=google_api_key)
embedding_model = Settings.embed_model
top_k = 10
node_parser = SentenceSplitter(chunk_size=512, chunk_overlap=128)
cohere_api_key = os.getenv("COHERE_API_KEY")    
if not cohere_api_key:
    raise ValueError("COHERE_API_KEY is not set")
reranker = CohereRerank(api_key=cohere_api_key, top_n=5)

def search_videos(query: str, max_results: int = 15) -> List[str]:
    """Search YouTube videos and return video IDs."""
    request = youtube.search().list(
        q=query,
        part='id',
        maxResults=max_results,
        type='video'
    )
    response = request.execute()
    return [item['id']['videoId'] for item in response['items']]

def get_transcript(video_id: str) -> Dict[str, Any]:
    """Get transcript with timestamps for a video."""
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        return {
            'video_id': video_id,
            'transcript': transcript
        }
    except:
        return None

In [21]:
query = "How to scrape youtube videos"

video_ids = search_videos(query)
print("Video IDs: ", video_ids)
print("\n\n")

# Get transcripts and do initial reranking of full transcripts
transcripts = []
for video_id in video_ids:
    transcript_data = get_transcript(video_id)
    if transcript_data:
        full_text = " ".join([entry['text'] for entry in transcript_data['transcript']])
        transcripts.append({
            'text': full_text,
            'data': transcript_data
        })
print("Transcript example: ", transcripts[0])
print("\n\n")


Video IDs:  ['RjHzznQy6hI', 'zSgx8U16stk', 'Cpmk_V0Is_Q', 'PxbRBqWmqas', 'GBdn_N-4bI0', 'AJ030_mDmIU', '5xZZct6vE4E', 'SwSbnmqk3zY', 'y6Dpc9jFFwI', '2FAkhgtvDM8', '3Cyp9-TpyVk', 'SAAdW1FlJRM', 'SNz_E-gqu5I', 'tsXPIlt_HZc', '5KY-CEFJAZ4']



Transcript example:  {'text': "Hey there, it's Michelle from Apify. Today,\xa0\nI'll show you how to how to scrape data from\xa0\xa0 Youtube using Apify’s YouTube Scraper API. This\xa0\ntool can scrape YouTube comments, video titles,\xa0\xa0 descriptions, subscribers, URLs, and a lot\xa0\nmore. First, find the scraper on Apify Store\xa0\xa0 and hit 'Try for free'. For this, you’ll need\xa0\nan Apify account; don’t worry, it’s quick, free,\xa0\xa0 and no credit card is required. You'll now be\xa0\ntaken to the Actors input page. You can choose\xa0\xa0 to configure your scrape through our user-friendly\xa0\ninterface or by using JSON. We're going to opt for\xa0\xa0 the regular input view. You can choose to get your\xa0\ndata in 2 different ways: via a

In [30]:
# Create nodes with all metadata
transcript_nodes = [
    NodeWithScore(
        node=Document(
            text=t['text'], 
            metadata={
                'idx': i,
                'video_id': t['data']['video_id'],
                'transcript_data': t['data']['transcript']
            },
            excluded_llm_metadata_keys=["transcript_data"]
        ), 
        score=1.0
    ) for i, t in enumerate(transcripts)
]

reranked_transcripts = reranker.postprocess_nodes(
    transcript_nodes,
    query_str=query
)[:5]  # Get top 5 most relevant videos
print("Reranked transcripts: ", [transcript.node.metadata['idx'] for transcript in reranked_transcripts])

print("Rerank transcript example: ", reranked_transcripts[0].node)
print("\n\n")

Reranked transcripts:  [9, 4, 10, 1, 0]
Rerank transcript example:  Doc ID: 7b0e7945-17f6-4d43-be80-c21273355d5b
Text: Hey there, it's Michelle from Apify. Today,  I'll show you how
to how to scrape data from   Youtube using Apify’s YouTube Scraper
API. This  tool can scrape YouTube comments, video titles,  
descriptions, subscribers, URLs, and a lot  more. First, find the
scraper on Apify Store   and hit 'Try for free'. For this, you’ll
need  an Apify account; ...





In [74]:
def _create_time_chunks(transcript_data: List[Dict], interval_seconds: int = 60, overlap_seconds: int = 30) -> List[Dict]:
    """Group transcript entries into overlapping time intervals."""
    chunks = []
    current_start_time = transcript_data[0]['start']
    last_entry_time = transcript_data[-1]['start'] + transcript_data[-1]['duration']
    
    # Create chunks with overlapping windows
    while current_start_time < last_entry_time:
        chunk_entries = []
        chunk_end_time = current_start_time + interval_seconds
        
        # Collect entries for this chunk
        for entry in transcript_data:
            if current_start_time <= entry['start'] < chunk_end_time:
                chunk_entries.append(entry)
        
        if chunk_entries:
            chunks.append({
                'entries': chunk_entries,
                'start_time': current_start_time,
                'end_time': chunk_end_time
            })
        
        # Move window forward by (interval - overlap)
        current_start_time += (interval_seconds - overlap_seconds)
    
    return chunks

In [128]:
from llama_index.core.schema import MetadataMode

# Create documents from top transcripts with metadata
documents = []
for node in reranked_transcripts:
    video_id = node.node.metadata['video_id']
    transcript_data = node.node.metadata['transcript_data']
    
    # Create overlapping time-based chunks
    time_chunks = _create_time_chunks(
        transcript_data, 
        interval_seconds=60,  # 60 second chunks
        overlap_seconds=20    # 30 second overlap
    )
    
    # Create a document for each time chunk
    for chunk in time_chunks:
        text = " ".join([entry['text'] for entry in chunk['entries']])
        doc = Document(
            text=text,
            metadata={
                'video_id': video_id,
                'start_time': chunk['start_time'],
                'end_time': chunk['end_time'],
                'url': f"https://youtube.com/watch?v={video_id}&t={int(chunk['start_time'])}s",
                "text": text
            },
            excluded_embed_metadata_keys=["video_id", "start_time", "end_time", "url"],
            excluded_llm_metadata_keys=["text"],
            metadata_template="{key}=>{value}",
            text_template="Metadata: {metadata_str}\n-----\nContent: {content}"
        )
        documents.append(doc)

print("Document example: ", documents[0].get_content(metadata_mode=MetadataMode.LLM))
print("\n\n")

Document example:  Metadata: video_id=>3Cyp9-TpyVk
start_time=>3.246
end_time=>63.246
url=>https://youtube.com/watch?v=3Cyp9-TpyVk&t=3s
-----
Content: In this video, I will demonstrate how to scrape YouTube comments and extract emails using Prospety. Prospety enables you to scrape comments and retrieve emails from channels and videos with just a few clicks. Scraping comments and generating leads on YouTube involves several steps. You need to extract comments, filter commentator channels, and retrieve emails. The manual approach is not viable because of the large number of comments and the inability to extract emails at scale due to YouTube's limitations. But there is a way to automate this process by using Prospety. You can scrape comments and retrieve emails from channels and videos with just a few clicks. Extracting emails from comments is an alternative to subscriber scraping because comments are public, and commentators often subscribe to channels. It's straightforward to get start

In [129]:
len(documents)

44

In [130]:
import nest_asyncio
nest_asyncio.apply()

In [131]:
print("Setting up ingestion pipeline")
# Set up ingestion pipeline with extractors
pipeline = IngestionPipeline(
)

# Process documents through pipeline
nodes = pipeline.run(
    documents=documents,
    in_place=True,
    show_progress=True,
)
print("Pipeline complete")

Setting up ingestion pipeline


Parsing nodes: 100%|██████████| 44/44 [00:00<00:00, 3865.64it/s]
Generating embeddings: 100%|██████████| 44/44 [00:01<00:00, 30.66it/s]

Pipeline complete





In [132]:
nodes[0].metadata

{'video_id': '3Cyp9-TpyVk',
 'start_time': 3.246,
 'end_time': 63.246,
 'url': 'https://youtube.com/watch?v=3Cyp9-TpyVk&t=3s',
 'text': "In this video, I will demonstrate how to scrape YouTube comments and extract emails using Prospety. Prospety enables you to scrape comments and retrieve emails from channels and videos with just a few clicks. Scraping comments and generating leads on YouTube involves several steps. You need to extract comments, filter commentator channels, and retrieve emails. The manual approach is not viable because of the large number of comments and the inability to extract emails at scale due to YouTube's limitations. But there is a way to automate this process by using Prospety. You can scrape comments and retrieve emails from channels and videos with just a few clicks. Extracting emails from comments is an alternative to subscriber scraping because comments are public, and commentators often subscribe to channels. It's straightforward to get started with Prospe

In [133]:
# Create vector store index from processed nodes
storage_context = StorageContext.from_defaults()
index = VectorStoreIndex(
    nodes, 
    storage_context=storage_context,
    embed_model=embedding_model
)

# Query the index
retriever = index.as_retriever(similarity_top_k=5)
res = retriever.retrieve(query)
print(res)

[NodeWithScore(node=TextNode(id_='ccca9a98-c647-49d0-9376-59b3e71f1463', embedding=None, metadata={'video_id': 'SAAdW1FlJRM', 'start_time': 82.1, 'end_time': 142.1, 'url': 'https://youtube.com/watch?v=SAAdW1FlJRM&t=82s', 'text': 'Step 2 adding YouTube video URLs to the Google sheet in the First Column of your Google sheet or column A add a list of YouTube video URLs you want to scrape details to leave everything else as it should be step 3 creating a script to scrape the video details go to the tools menu in Google Sheets and click on script editor in the script editor paste the following scraping code you can also find the code Below on the description of this video save it and leave it for now step 4 getting a YouTube API key to get an API key go to the Google Cloud console create a new project and name it whatever you want'}, excluded_embed_metadata_keys=['video_id', 'start_time', 'end_time', 'url'], excluded_llm_metadata_keys=['text'], relationships={<NodeRelationship.SOURCE: '1'>:

In [134]:
len(res)

5

In [139]:
from llama_index.llms.openai import OpenAI
from pydantic import BaseModel, Field
from typing import List
from IPython.display import display, Markdown, Image


class TextBlock(BaseModel):
    """Text block."""

    text: str = Field(..., description="The text for this block.")


class VideoBlock(BaseModel):
    """Video block."""

    video_url: str = Field(..., description="URL to the video.")


class ReportOutput(BaseModel):
    """Data model for a report.

    Can contain a mix of text and image blocks. MUST contain at least one image block.

    """

    blocks: List[TextBlock | VideoBlock] = Field(
        ..., description="A list of text and video blocks."
    )

    def render(self) -> None:
        """Render as HTML on the page."""
        for b in self.blocks:
            if isinstance(b, TextBlock):
                display(Markdown(b.text))
            else:
                display(Markdown(f"[{b.video_url}]({b.video_url})"))


system_prompt = """\
You are a youtube report generation assistant tasked with producing a well-formatted context given parsed context, to help users save time from watching all clickbait videos, and you just get to the good parts.

You will be given context from one or more youtube videos that take the form of parsed text.

You are responsible for producing a report with interleaving text and videos - in the format of interleaving text and "video" blocks.
Since you cannot directly produce a video, the video block takes in a video url instead.

How do you know which video to generate? Each context chunk will contain metadata including a video render of the source chunk, given as a video url. 
Include ONLY the videos from the chunks that have heavy visual elements (you can get a hint of this if the parsed text contains a lot of tables).
You MUST include at least one video block in the output.

You MUST output your response as a tool call in order to adhere to the required output format. Do NOT give back normal text.

"""


llm = OpenAI(model="gpt-4o", system_prompt=system_prompt)
sllm = llm.as_structured_llm(output_cls=ReportOutput)

In [140]:
query_engine = index.as_query_engine(
    similarity_top_k=10,
    llm=sllm,
)

In [141]:
res = query_engine.query(query)

In [143]:
res.response.render()


To scrape YouTube videos, you can use various tools and methods. Here are some approaches you can consider:

[https://youtube.com/watch?v=SAAdW1FlJRM&t=82s](https://youtube.com/watch?v=SAAdW1FlJRM&t=82s)

1. **Using Google Sheets and YouTube API**: 
   - Create a new Google Sheet and add the list of YouTube video URLs you want to scrape in the first column.
   - Use the Google Sheets script editor to paste a scraping script. You can find such scripts in video descriptions or online resources.
   - Obtain a YouTube API key from the Google Cloud console to enable data scraping.
   - Run the script to scrape video details like title, view count, comment count, and like count.

[https://youtube.com/watch?v=RjHzznQy6hI&t=0s](https://youtube.com/watch?v=RjHzznQy6hI&t=0s)

2. **Using Apify’s YouTube Scraper API**:
   - Sign up for a free Apify account and access the YouTube Scraper API.
   - Configure your scrape using the user-friendly interface or JSON.
   - You can scrape data by search phrases or direct URLs, extracting information like comments, video titles, descriptions, and more.
   - Set your preferences for the number of results, video format, and sorting options before running the scraper.

[https://youtube.com/watch?v=GBdn_N-4bI0&t=0s](https://youtube.com/watch?v=GBdn_N-4bI0&t=0s)

3. **Using Prospety for Comment and Email Extraction**:
   - Prospety allows you to scrape YouTube comments and extract emails from channels and videos.
   - This tool automates the process, making it easier to handle large volumes of comments and retrieve emails efficiently.
   - Start by signing up on Prospety's website and initiating a new search to begin scraping.

In [135]:
from llama_index.embeddings.openai import OpenAIEmbedding

youtube = YouTubeTranscriptRetriever()

nodes = youtube._retrieve("How to scrape youtube videos")
print(nodes)

Video IDs:  ['RjHzznQy6hI', 'zSgx8U16stk', 'Cpmk_V0Is_Q', 'PxbRBqWmqas', 'GBdn_N-4bI0', 'AJ030_mDmIU', '5xZZct6vE4E', 'SwSbnmqk3zY', 'y6Dpc9jFFwI', '2FAkhgtvDM8', '3Cyp9-TpyVk', 'SAAdW1FlJRM', 'SNz_E-gqu5I', 'tsXPIlt_HZc', '5KY-CEFJAZ4']



Transcripts:  [{'text': "Hey there, it's Michelle from Apify. Today,\xa0\nI'll show you how to how to scrape data from\xa0\xa0 Youtube using Apify’s YouTube Scraper API. This\xa0\ntool can scrape YouTube comments, video titles,\xa0\xa0 descriptions, subscribers, URLs, and a lot\xa0\nmore. First, find the scraper on Apify Store\xa0\xa0 and hit 'Try for free'. For this, you’ll need\xa0\nan Apify account; don’t worry, it’s quick, free,\xa0\xa0 and no credit card is required. You'll now be\xa0\ntaken to the Actors input page. You can choose\xa0\xa0 to configure your scrape through our user-friendly\xa0\ninterface or by using JSON. We're going to opt for\xa0\xa0 the regular input view. You can choose to get your\xa0\ndata in 2 different ways: via a searc

KeyError: 'data'