In [15]:
import PyPDF2
import requests
from bs4 import BeautifulSoup
import youtube_transcript_api
from youtube_transcript_api import YouTubeTranscriptApi
import chromadb
from chromadb.config import Settings
from transformers import AutoTokenizer, AutoModel
import torch
import re
import unicodedata
import os
from nltk.corpus import stopwords
import nltk

In [16]:
# Download NLTK stopwords (run once)
# try:
#     nltk.data.find('corpus/stopwords')
# except LookupError:
#     nltk.download('stopwords')


In [17]:
# Step 1: Initialize Hugging Face model and tokenizer for embeddings
def initialize_embedding_model():
    embedding_model = "sentence-transformers/all-MiniLM-L6-v2"
    tokenizer = AutoTokenizer.from_pretrained(embedding_model)
    model = AutoModel.from_pretrained(embedding_model)
    return tokenizer, model

In [18]:
# Step 2: Generate embeddings for text
def get_embeddings(text, tokenizer, model):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
    return embeddings

In [19]:
# Step 3: Preprocessing - Clean and normalize text
def preprocess_text(text, preserve_code=True):
    # Normalize Unicode characters
    text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('ascii')
    
    # Remove special characters, keep alphanumeric and basic punctuation
    text = re.sub(r'[^\w\s.,;:!?-]', ' ', text)
    
    # Replace multiple spaces, newlines, or tabs with a single space
    text = re.sub(r'\s+', ' ', text).strip()
    
    # Convert to lowercase (except for code snippets if preserve_code is True)
    if not preserve_code:
        text = text.lower()
    else:
        # Preserve code snippets (assuming they are within triple backticks or indentation)
        code_blocks = []
        def code_replacement(match):
            code_blocks.append(match.group(0))
            return f"__CODE_BLOCK_{len(code_blocks)-1}__"
        
        text = re.sub(r'```[\s\S]*?```', code_replacement, text)
        text = re.sub(r'^\s{4,}.*$', code_replacement, text, flags=re.MULTILINE)
        text = text.lower()
        # Restore code blocks
        for i, code in enumerate(code_blocks):
            text = text.replace(f"__CODE_BLOCK_{i}__", code)
    
    # Optionally remove stopwords (disabled by default for programming context)
    # stop_words = set(stopwords.words('english'))
    # words = text.split()
    # text = ' '.join(word for word in words if word.lower() not in stop_words)
    
    return text

In [20]:
# Step 4: Extract text from PDF with structuring
def extract_text_from_pdf(pdf_path):
    try:
        with open(pdf_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            text_chunks = []
            current_section = ""
            current_text = ""
            
            for page in reader.pages:
                text = page.extract_text() or ""
                lines = text.split('\n')
                for line in lines:
                    # Heuristic: Assume lines with all caps or short length are headings
                    if line.isupper() or (len(line.strip()) < 50 and line.strip().endswith(':')):
                        if current_text:
                            text_chunks.append((current_section, preprocess_text(current_text)))
                            current_text = ""
                        current_section = line.strip()
                    else:
                        current_text += " " + line
                if current_text:
                    text_chunks.append((current_section, preprocess_text(current_text)))
                    current_text = ""
            
            return text_chunks  # List of (section, text) tuples
    except Exception as e:
        print(f"Error extracting text from PDF {pdf_path}: {e}")
        return []

In [21]:
# Step 5: Extract text from website with structuring
def extract_text_from_website(url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # Remove script, style, and navigation elements
        for element in soup(["script", "style", "nav", "footer", "header"]):
            element.decompose()
        
        text_chunks = []
        current_section = ""
        current_text = ""
        
        # Extract headings (h1, h2, h3) and their content
        for element in soup.find_all(['h1', 'h2', 'h3', 'p', 'li']):
            if element.name in ['h1', 'h2', 'h3']:
                if current_text:
                    text_chunks.append((current_section, preprocess_text(current_text)))
                    current_text = ""
                current_section = element.get_text(strip=True)
            else:
                current_text += " " + element.get_text(strip=True)
        
        if current_text:
            text_chunks.append((current_section, preprocess_text(current_text)))
        
        return text_chunks  # List of (section, text) tuples
    except Exception as e:
        print(f"Error extracting text from website {url}: {e}")
        return []

In [22]:
# Step 6: Extract text from YouTube video transcript with structuring
def extract_youtube_transcript(video_url):
    try:
        video_id = None
        if "youtube.com" in video_url or "youtu.be" in video_url:
            match = re.search(r"(?:v=|youtu\.be/)([0-9A-Za-z_-]{11})", video_url)
            if match:
                video_id = match.group(1)
        
        if not video_id:
            print(f"Invalid YouTube URL: {video_url}")
            return []
            
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        text_chunks = []
        current_text = ""
        current_timestamp = 0
        
        # Group transcript by time intervals (e.g., every 60 seconds)
        for entry in transcript:
            if entry['start'] > current_timestamp + 60:
                if current_text:
                    text_chunks.append((f"Segment_{int(current_timestamp)}", preprocess_text(current_text)))
                    current_text = ""
                current_timestamp = entry['start']
            current_text += " " + entry['text']
        
        if current_text:
            text_chunks.append((f"Segment_{int(current_timestamp)}", preprocess_text(current_text)))
        
        return text_chunks  # List of (segment, text) tuples
    except Exception as e:
        print(f"Error extracting transcript from YouTube video {video_url}: {e}")
        return []

In [23]:
# Step 7: Cache preprocessed text to disk
def cache_text(source, text_chunks, cache_dir="../cache"):
    os.makedirs(cache_dir, exist_ok=True)
    source_name = re.sub(r'[^\w\-_\.]', '_', source)
    cache_path = os.path.join(cache_dir, f"{source_name}.txt")
    
    try:
        with open(cache_path, 'w', encoding='utf-8') as f:
            for section, text in text_chunks:
                f.write(f"--- {section} ---\n{text}\n\n")
        print(f"Cached text for {source} to {cache_path}")
    except Exception as e:
        print(f"Error caching text for {source}: {e}")

In [24]:
# Step 8: Check cache for preprocessed text
def load_from_cache(source, cache_dir="../cache"):
    source_name = re.sub(r'[^\w\-_\.]', '_', source)
    cache_path = os.path.join(cache_dir, f"{source_name}.txt")
    
    if os.path.exists(cache_path):
        try:
            with open(cache_path, 'r', encoding='utf-8') as f:
                content = f.read()
                chunks = []
                current_section = ""
                current_text = ""
                for line in content.splitlines():
                    if line.startswith("--- ") and line.endswith(" ---"):
                        if current_text:
                            chunks.append((current_section, current_text.strip()))
                            current_text = ""
                        current_section = line[4:-4].strip()
                    else:
                        current_text += " " + line
                if current_text:
                    chunks.append((current_section, current_text.strip()))
                return chunks
        except Exception as e:
            print(f"Error loading cache for {source}: {e}")
    return None

In [25]:
# Step 9: Initialize embedding model and generate sample embeddings for review
def generate_sample_embeddings(text_chunks, sources, num_samples=5):
    """
    Initialize embedding model and generate sample embeddings for review.
    Does not store embeddings, only displays them for evaluation.
    
    Args:
        text_chunks: List of text chunks from different sources
        sources: List of source names corresponding to text_chunks
        num_samples: Number of sample embeddings to generate and display
    """
    
    # Initialize the embedding model
    print("Initializing embedding model...")
    tokenizer, model = initialize_embedding_model()
    print("Embedding model initialized successfully!\n")
    
    sample_count = 0
    
    for source, chunks in zip(sources, text_chunks):
        if not chunks or sample_count >= num_samples:
            break
            
        print(f"Processing source: {source}")
        print("-" * 50)
        
        for i, (section, text) in enumerate(chunks):
            if sample_count >= num_samples:
                break
                
            # Split text into smaller chunks (~500 words) for embedding
            words = text.split()
            chunk_size = 500
            sub_chunks = [" ".join(words[j:j + chunk_size]) for j in range(0, len(words), chunk_size)]
            
            for j, sub_chunk in enumerate(sub_chunks):
                if sample_count >= num_samples:
                    break
                    
                # Generate embedding for the sub-chunk
                print(f"\nSample {sample_count + 1}:")
                print(f"Source: {source}")
                print(f"Section: {section}")
                print(f"Chunk ID: {i}_{j}")
                print(f"Text preview (first 100 chars): {sub_chunk[:100]}...")
                
                # Get the embedding
                embedding = get_embeddings(sub_chunk, tokenizer, model)
                
                # Display embedding information
                print(f"Embedding shape: {embedding.shape}")
                print(f"Embedding type: {type(embedding)}")
                print(f"First 10 dimensions: {embedding[:10].tolist()}")
                # print(f"Embedding norm: {float(embedding.norm()):.4f}")
                print(f"Min value: {float(embedding.min()):.4f}")
                print(f"Max value: {float(embedding.max()):.4f}")
                print(f"Mean value: {float(embedding.mean()):.4f}")
                print("=" * 60)
                
                sample_count += 1
    
    print(f"\nGenerated and displayed {sample_count} sample embeddings for review.")

In [26]:
# Main function to process all sources
def process_knowledge_base(pdf_paths, website_urls, youtube_urls, cache_dir="..\cache"):
    texts = []
    sources = []
    
    # Process PDFs
    for pdf_path in pdf_paths:
        cached = load_from_cache(pdf_path, cache_dir)
        if cached:
            texts.append(cached)
            sources.append(pdf_path)
        else:
            chunks = extract_text_from_pdf(pdf_path)
            if chunks:
                cache_text(pdf_path, chunks, cache_dir)
                texts.append(chunks)
                sources.append(pdf_path)
    
    # Process websites
    for url in website_urls:
        cached = load_from_cache(url, cache_dir)
        if cached:
            texts.append(cached)
            sources.append(url)
        else:
            chunks = extract_text_from_website(url)
            if chunks:
                cache_text(url, chunks, cache_dir)
                texts.append(chunks)
                sources.append(url)
    
    # Process YouTube videos
    for video_url in youtube_urls:
        cached = load_from_cache(video_url, cache_dir)
        if cached:
            texts.append(cached)
            sources.append(video_url)
        else:
            chunks = extract_youtube_transcript(video_url)
            if chunks:
                cache_text(video_url, chunks, cache_dir)
                texts.append(chunks)
                sources.append(video_url)
    
    # generate_sample_embeddings for review
    generate_sample_embeddings(texts, sources)

In [27]:
# Example usage
if __name__ == "__main__":
    pdf_paths = [
        "../data/raw_data/Starting Out with Python, Global Edition, 4th Edition.pdf"
    ]
    website_urls = [
        "https://www.geeksforgeeks.org/how-to-learn-python-from-scratch/"
    ]
    youtube_urls = [
        "https://www.youtube.com/watch?v=8124kv-632k"
    ]
    
    process_knowledge_base(pdf_paths, website_urls, youtube_urls)

Initializing embedding model...
Embedding model initialized successfully!

Processing source: ../data/raw_data/Starting Out with Python, Global Edition, 4th Edition.pdf
--------------------------------------------------

Sample 1:
Source: ../data/raw_data/Starting Out with Python, Global Edition, 4th Edition.pdf
Section: FOURTH EDITION
Chunk ID: 0_0
Text preview (first 100 chars): tony gaddisstarting out with python...
Embedding shape: (384,)
Embedding type: <class 'numpy.ndarray'>
First 10 dimensions: [-0.19292797148227692, 0.13389192521572113, -0.12019527703523636, -0.11675722151994705, -0.1036008670926094, -0.4054447114467621, 0.4728517532348633, -0.01164678018540144, -0.5176032781600952, -0.4057198762893677]
Min value: -0.5997
Max value: 0.6762
Mean value: -0.0023

Sample 2:
Source: ../data/raw_data/Starting Out with Python, Global Edition, 4th Edition.pdf
Section: FOURTH EDITION
Chunk ID: 1_0
Text preview (first 100 chars): digital resources for students your new textbook provides