In [1]:
!pip install sentence-transformers pinecone youtube-transcript-api spacy nltk


Collecting pinecone
  Downloading pinecone-6.0.2-py3-none-any.whl.metadata (9.0 kB)
Collecting youtube-transcript-api
  Downloading youtube_transcript_api-1.0.3-py3-none-any.whl.metadata (23 kB)
Collecting pinecone-plugin-interface<0.0.8,>=0.0.7 (from pinecone)
  Downloading pinecone_plugin_interface-0.0.7-py3-none-any.whl.metadata (1.2 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence-transformers)
  Downloadin

In [3]:
import numpy as np
import torch
from youtube_transcript_api import YouTubeTranscriptApi
from sentence_transformers import SentenceTransformer
from pinecone import Pinecone
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import spacy

# Download necessary NLTK resources
try:
    nltk.download('stopwords', quiet=True)
    nltk.download('wordnet', quiet=True)
except Exception as e:
    print(f"Warning: NLTK resource download issue. Error: {e}")

# Load spaCy model for NER and POS tagging
try:
    nlp = spacy.load("en_core_web_sm")
except:
    print("Warning: spaCy model 'en_core_web_sm' not found. Using a simple pipeline.")
    nlp = spacy.blank("en")

# Check for GPU availability
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# Initialize Pinecone
pc = Pinecone(api_key="pcsk_7EKroD_MaZi2zjikyZTdpaDPCkit4qEAE6cjKuJ7C2ot9htS7EE6uurWQLrfznykMd7bW3")
index_name = "genai"
index = pc.Index(index_name)

# Load embedding model
model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2").to(device)

def extract_transcript(video_id):
    """Extract transcript from a YouTube video."""
    try:
        transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
        full_transcript = ' '.join([segment['text'] for segment in transcript_list])
        return full_transcript
    except Exception as e:
        print(f"Error fetching transcript: {e}")
        return None

def simple_tokenize(text):
    """Simple tokenizer that avoids NLTK's punkt."""
    text = re.sub(r'[^\w\s]', ' ', text)
    return [token for token in text.lower().split() if token]

def preprocess_text(text):
    """NLP preprocessing: stopword removal, lemmatization, NER, POS tagging."""
    try:
        stop_words = set(stopwords.words('english'))
    except:
        stop_words = {'a', 'an', 'the', 'and', 'or', 'but', 'is', 'are', 'was', 'were', 'to', 'of', 'in', 'for'}

    lemmatizer = WordNetLemmatizer()
    text = re.sub(r'[^\w\s]', ' ', text).lower()
    tokens = simple_tokenize(text)
    filtered_tokens = [word for word in tokens if word not in stop_words]
    lemmatized_tokens = [lemmatizer.lemmatize(word) for word in filtered_tokens]

    doc = nlp(' '.join(lemmatized_tokens))
    named_entities = [ent.text for ent in doc.ents]
    pos_counts = {}
    for token in doc:
        pos_counts[token.pos_] = pos_counts.get(token.pos_, 0) + 1

    processed_text = ' '.join(lemmatized_tokens)

    metadata = {
        "named_entities": named_entities[:10] if named_entities else [],
        "top_pos_tags": [f"{pos}:{count}" for pos, count in sorted(pos_counts.items(), key=lambda x: x[1], reverse=True)[:5]]
    }
    return processed_text, metadata

def get_embedding(text):
    """Generate embedding using Sentence Transformers."""
    return model.encode(text).tolist()

import re
def chunk_text(text, max_chunk_size=150, min_chunk_size=50, max_chunks=None):
    """
    Split text into chunks based on semantic boundaries while respecting size constraints.

    Args:
        text (str): Input text to be chunked
        max_chunk_size (int): Maximum number of words per chunk
        min_chunk_size (int): Minimum number of words per chunk
        max_chunks (int, optional): Maximum number of chunks to return

    Returns:
        list: List of text chunks
    """
    # Normalize whitespace and split into sentences
    # Use regex to split on sentence-ending punctuation followed by space or newline
    sentences = re.split(r'(?<=[.!?])\s+', text.strip())

    chunks = []
    current_chunk = []
    current_chunk_size = 0

    for sentence in sentences:
        # Count words in the sentence
        sentence_words = sentence.split()
        sentence_word_count = len(sentence_words)

        # If adding this sentence would exceed max chunk size, finalize current chunk
        if current_chunk_size + sentence_word_count > max_chunk_size:
            # Join and add current chunk if it's not empty
            if current_chunk:
                chunks.append(' '.join(current_chunk))
                current_chunk = []
                current_chunk_size = 0

        # Add sentence to current chunk
        current_chunk.append(sentence)
        current_chunk_size += sentence_word_count

        # If chunk is getting too large, force a split
        if current_chunk_size >= max_chunk_size:
            chunks.append(' '.join(current_chunk))
            current_chunk = []
            current_chunk_size = 0

        # Stop if we've reached max chunks
        if max_chunks and len(chunks) >= max_chunks:
            break

    # Add any remaining chunk
    if current_chunk:
        chunks.append(' '.join(current_chunk))

    # Handle edge cases
    if not chunks:
        chunks = [text]

    return chunks

def generate_embeddings(video_id):
    """Generate embeddings from YouTube video transcript with NLP preprocessing."""
    transcript = extract_transcript(video_id)
    if not transcript:
        return None, None, None

    text_chunks = chunk_text(transcript)
    print(f"Created {len(text_chunks)} text chunks from transcript")

    embeddings, original_chunks, nlp_metadata_list = [], [], []
    for chunk in text_chunks:
        if len(chunk.strip()) > 10:
            processed_chunk, nlp_metadata = preprocess_text(chunk)
            embedding = get_embedding(processed_chunk)
            embeddings.append(embedding)
            original_chunks.append(chunk)
            nlp_metadata_list.append(nlp_metadata)

    print(f"Generated {len(embeddings)} embeddings with NLP preprocessing")
    return np.array(embeddings, dtype=np.float32), original_chunks, nlp_metadata_list

def store_embeddings_in_pinecone(embeddings, video_id, original_chunks=None, nlp_metadata_list=None):
    """Store embeddings in Pinecone index with valid metadata."""
    if embeddings is not None and len(embeddings) > 0:
        ids = [f"{video_id}_{i}" for i in range(len(embeddings))]
        vectors = []

        for i, emb in enumerate(embeddings):
            metadata = {"video_id": video_id, "chunk_id": i}
            if original_chunks and i < len(original_chunks):
                metadata["text_sample"] = original_chunks[i][:500] + "..." if len(original_chunks[i]) > 500 else original_chunks[i]

            if nlp_metadata_list and i < len(nlp_metadata_list):
                nlp_meta = nlp_metadata_list[i]
                metadata["named_entities"] = nlp_meta["named_entities"]
                metadata["top_pos_tags"] = nlp_meta["top_pos_tags"] if nlp_meta["top_pos_tags"] else []

            vectors.append({"id": ids[i], "values": emb.tolist(), "metadata": metadata})

        batch_size = 100
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            index.upsert(vectors=batch)
            print(f"Upserted batch {i//batch_size + 1}/{(len(vectors)-1)//batch_size + 1} to Pinecone")

        print(f"Successfully upserted {len(vectors)} embeddings into Pinecone.")

def main():
    """Main function to process YouTube videos."""
    yt_video_id = "NUy_wOxOM8E"  # Replace with actual YouTube video ID
    print(f"Processing YouTube video: {yt_video_id}")
    embeddings, original_chunks, nlp_metadata_list = generate_embeddings(yt_video_id)

    if embeddings is not None and len(embeddings) > 0:
        print(f"Generated {len(embeddings)} embedding chunks")
        store_embeddings_in_pinecone(embeddings, yt_video_id, original_chunks, nlp_metadata_list)
    else:
        print("Failed to generate embeddings")

if __name__ == "__main__":
    main()


Using device: cpu
Processing YouTube video: NUy_wOxOM8E
Created 1 text chunks from transcript
Generated 1 embeddings with NLP preprocessing
Generated 1 embedding chunks
Upserted batch 1/1 to Pinecone
Successfully upserted 1 embeddings into Pinecone.
