In [1]:
!pip install Pypdf tools

Collecting Pypdf
  Downloading pypdf-5.4.0-py3-none-any.whl.metadata (7.3 kB)
Collecting tools
  Downloading tools-0.1.9.tar.gz (34 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pytils (from tools)
  Downloading pytils-0.4.3.tar.gz (101 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.4/101.4 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Downloading pypdf-5.4.0-py3-none-any.whl (302 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m302.3/302.3 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: tools, pytils
  Building wheel for tools (setup.py) ... [?25l[?25hdone
  Created wheel for tools: filename=tools-0.1.9-py3-none-any.whl size=46730 sha256=581aed2acc7748d29d1049b1c9da60e7982b89e516cadb4df40339e6970e176a
  

In [3]:
import os
import logging
import re
import string
import random
from collections import Counter
from typing import List, Dict, Tuple, Any, Optional
import traceback

# --- PDF Handling Library ---
try:
    from pypdf import PdfReader
    logging.info("Successfully imported pypdf.")
except ImportError:
    logging.error("Failed to import 'pypdf'. Please install it: pip install pypdf")
    # Define a dummy class if import fails to allow script structure check
    class PdfReader:
        def __init__(self, *args):
            raise ImportError("pypdf not found")
        pages = []
        metadata = None
        is_encrypted = False
    # exit() # Use exit() in a real script

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

# --- PDF Handling Functions (Independent) ---
def get_pdf_file(pdf_folder: str, mode: str = "latest") -> str:
    """
    Selects a PDF file path from a specified folder based on the mode.
    (Standalone version)
    """
    if not os.path.exists(pdf_folder):
        try:
            os.makedirs(pdf_folder, exist_ok=True)
            logging.warning(f"PDF folder '{pdf_folder}' did not exist and was created.")
            raise FileNotFoundError(f"PDF folder '{pdf_folder}' was created, but no PDFs found.")
        except OSError as e:
            logging.error(f"Failed to create PDF folder '{pdf_folder}': {e}")
            raise FileNotFoundError(f"PDF folder '{pdf_folder}' does not exist and could not be created.")

    try:
        pdf_files = [f for f in os.listdir(pdf_folder) if f.lower().endswith('.pdf') and os.path.isfile(os.path.join(pdf_folder, f))]
    except OSError as e:
        logging.error(f"Error listing files in folder '{pdf_folder}': {e}")
        raise FileNotFoundError(f"Could not access files in the PDF folder '{pdf_folder}'.")

    if not pdf_files:
        logging.warning(f"No PDF files found in folder: {pdf_folder}")
        raise FileNotFoundError(f"No PDF files found in {pdf_folder}")

    logging.info(f"Found {len(pdf_files)} PDF file(s) in '{pdf_folder}'.")
    selected_file_path = None

    if mode == "latest":
        try:
            latest_file = max(pdf_files, key=lambda f: os.path.getmtime(os.path.join(pdf_folder, f)))
            selected_file_path = os.path.join(pdf_folder, latest_file)
            logging.info(f"Selected latest PDF: {latest_file}")
        except Exception as e:
            logging.error(f"Error determining latest file: {e}")
            raise
    elif mode == "random":
        random_file = random.choice(pdf_files)
        selected_file_path = os.path.join(pdf_folder, random_file)
        logging.info(f"Selected random PDF: {random_file}")
    elif mode == "interactive":
        # (Interactive code omitted for brevity, can be added back if needed)
        logging.warning("Interactive mode selection not fully implemented in this snippet.")
        # Fallback to random if interactive part is omitted
        random_file = random.choice(pdf_files)
        selected_file_path = os.path.join(pdf_folder, random_file)
        logging.info(f"Selected random PDF (fallback): {random_file}")
    else:
        raise ValueError(f"Invalid mode: '{mode}'. Must be 'latest', 'random', or 'interactive'")

    if selected_file_path is None:
        raise RuntimeError("Failed to select a PDF file.")

    return selected_file_path

def extract_full_text_metadata_pypdf(pdf_path: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
    """
    Extracts the full text content and metadata from a PDF using pypdf.
    (Standalone version)
    """
    if not os.path.exists(pdf_path):
        logging.error(f"PDF file not found: {pdf_path}")
        return None, None

    if not os.path.isfile(pdf_path):
        logging.error(f"Path exists but is not a file: {pdf_path}")
        return None, None

    try:
        logging.info(f"Opening PDF: {os.path.basename(pdf_path)} using pypdf")
        reader = PdfReader(pdf_path)
        metadata = reader.metadata
        num_pages = len(reader.pages)

        extracted_metadata = {
            "title": getattr(metadata, 'title', os.path.basename(pdf_path)),
            "author": getattr(metadata, 'author', "Unknown"),
            "subject": getattr(metadata, 'subject', ""),
            "creator": getattr(metadata, 'creator', ""),
            "producer": getattr(metadata, 'producer', ""),
            "page_count": num_pages,
            "is_encrypted": reader.is_encrypted,
            "file_name": os.path.basename(pdf_path),
            "file_path": pdf_path
        }

        logging.info(f"Extracting text from {num_pages} page(s)...")
        full_text = ""
        for page_num, page in enumerate(reader.pages):
            try:
                text = page.extract_text()
                if text:
                    full_text += text + "\n"  # Add single newline between pages
                else:
                    logging.warning(f"No text extracted from page {page_num + 1}.")
            except Exception as page_error:
                logging.warning(f"Could not extract text from page {page_num + 1}: {page_error}")

        logging.info(f"Successfully extracted {len(full_text)} characters from PDF.")
        full_text = full_text.replace('\x00', '').strip()  # Remove null chars and trim ends

        # Simple preprocessing: fix hyphenation and normalize whitespace
        full_text = re.sub(r'(\w)-\n(\w)', r'\1\2', full_text)
        full_text = re.sub(r'\s+', ' ', full_text)  # Normalize all whitespace to single space

        return full_text, extracted_metadata

    except ImportError:
        logging.error("pypdf library is required but not installed.")
        return None, None
    except Exception as e:
        logging.error(f"Error extracting text from PDF '{os.path.basename(pdf_path)}' using pypdf: {e}")
        logging.error(traceback.format_exc())
        return None, None

# --- EDA Functions (Operating on Full Text) ---
STOP_WORDS = set([
    'a', 'about', 'above', 'after', 'again', 'against', 'all', 'am', 'an', 'and', 'any', 'are', "aren't", 'as', 'at',
    'be', 'because', 'been', 'before', 'being', 'below', 'between', 'both', 'but', 'by',
    'can', "can't", 'cannot', 'could', "couldn't", 'did', "didn't", 'do', 'does', "doesn't", 'doing', "don't", 'down', 'during',
    'each', 'few', 'for', 'from', 'further', 'had', "hadn't", 'has', "hasn't", 'have', "haven't", 'having', 'he', "he'd", "he'll", "he's", 'her', 'here', "here's", 'hers', 'herself', 'him', 'himself', 'his', 'how', "how's",
    'i', "i'd", "i'll", "i'm", "i've", 'if', 'in', 'into', 'is', "isn't", 'it', "it's", 'its', 'itself',
    "let's", 'me', 'more', 'most', "mustn't", 'my', 'myself',
    'no', 'nor', 'not', 'of', 'off', 'on', 'once', 'only', 'or', 'other', 'ought', 'our', 'ours', 'ourselves', 'out', 'over', 'own',
    'same', "shan't", 'she', "she'd", "she'll", "she's", 'should', "shouldn't", 'so', 'some', 'such',
    'than', 'that', "that's", 'the', 'their', 'theirs', 'them', 'themselves', 'then', 'there', "there's", 'these', 'they', "they'd", "they'll", "they're", "they've", 'this', 'those', 'through', 'to', 'too',
    'under', 'until', 'up', 'very', 'was', "wasn't", 'we', "we'd", "we'll", "we're", "we've", 'were', "weren't", 'what', "what's", 'when', "when's", 'where', "where's", 'which', 'while', 'who', "who's", 'whom', 'why', "why's", 'with', "won't", 'would', "wouldn't",
    'you', "you'd", "you'll", "you're", "you've", 'your', 'yours', 'yourself', 'yourselves',
    # Domain specific
    'et', 'al', 'nber', 'working', 'paper', 'series', 'figure', 'table', 'http', 'org', 'www', 'abstract', 'doi', 'appendix',
    'university', 'research', 'data', 'results', 'analysis', 'study', 'based', 'using', 'also', 'however', 'within', 'whether'
])

def calculate_basic_stats(full_text: str) -> Dict[str, Any]:
    """Calculates basic statistics from the full text."""
    stats = {}
    if not full_text:
        return {"error": "Input text is empty"}

    try:
        words = full_text.split()
        stats['word_count'] = len(words)

        # Simple sentence split - may be inaccurate with abbreviations etc.
        sentences = re.split(r'[.?!]\s+', full_text)  # Split on .?! followed by space
        stats['sentence_count'] = len([s for s in sentences if s.strip()])

        if stats['sentence_count'] > 0:
            stats['avg_sentence_length_words'] = round(stats['word_count'] / stats['sentence_count'], 2)
        else:
            stats['avg_sentence_length_words'] = stats['word_count']

        stats['character_count'] = len(full_text)
        logging.info("Calculated basic text statistics.")
        return stats

    except Exception as e:
        logging.error(f"Error calculating basic stats: {e}")
        return {"error": f"Failed to calculate stats: {e}"}

def get_word_frequency(full_text: str, num_words: int = 25) -> List[Tuple[str, int]]:
    """Calculates frequency of significant words in the full text."""
    if not full_text:
        return []

    try:
        text = full_text.lower()
        text = text.translate(str.maketrans('', '', string.punctuation))
        text = re.sub(r'\d+', '', text)  # Remove numbers
        words = text.split()
        filtered_words = [word for word in words if word not in STOP_WORDS and len(word) > 2]
        word_counts = Counter(filtered_words)
        most_common = word_counts.most_common(num_words)
        logging.info(f"Calculated word frequencies, found top {len(most_common)} words.")
        return most_common

    except Exception as e:
        logging.error(f"Error calculating word frequency: {e}")
        return []

def find_potential_proper_nouns(full_text: str, min_freq: int = 3) -> List[Tuple[str, int]]:
    """Finds potential proper nouns using capitalization heuristic."""
    if not full_text:
        return []

    potential_nouns = Counter()
    try:
        # Use regex to find capitalized words that are not at the start of a line
        # This is still a very rough heuristic.
        # Matches words starting with an uppercase letter, followed by lowercase,
        # NOT preceded by sentence-ending punctuation and space, or start of text.
        pattern = r"(?<![\.\?!]\s)(?<!^)\b([A-Z][a-z]+(?:[-'][A-Z][a-z]+)*)\b"
        matches = re.findall(pattern, full_text)
        potential_nouns.update(matches)
        frequent_nouns = [(noun, freq) for noun, freq in potential_nouns.items() if freq >= min_freq and noun.lower() not in STOP_WORDS]
        frequent_nouns.sort(key=lambda x: x[1], reverse=True)
        logging.info(f"Found {len(frequent_nouns)} potential proper nouns (heuristic) with min frequency {min_freq}.")
        return frequent_nouns

    except Exception as e:
        logging.error(f"Error finding potential proper nouns: {e}")
        return []

def find_common_headers(full_text: str) -> Dict[str, int]:
    """Counts occurrences of common academic paper headers."""
    if not full_text:
        return {}

    headers = {
        "Abstract": 0, "Introduction": 0, "Method": 0, "Methodology": 0,
        "Data": 0, "Results": 0, "Discussion": 0, "Conclusion": 0,
        "References": 0, "Appendix": 0
    }
    found_headers = {}

    try:
        # Case-insensitive search for headers at the start of a line (potentially with numbers/whitespace)
        for header in headers.keys():
            # Regex: start of line, optional whitespace/numbering, header text, optional colon, whitespace/newline end
            pattern = re.compile(r"^\s*(?:\d+\.?\s*)?" + re.escape(header) + r"\s*:?\s*$", re.IGNORECASE | re.MULTILINE)
            matches = pattern.findall(full_text)
            count = len(matches)
            if count > 0:
                found_headers[header] = count

        logging.info(f"Checked for common headers. Found: {found_headers}")
        return found_headers

    except Exception as e:
        logging.error(f"Error finding common headers: {e}")
        return {}

# --- Text Chunking Function for the Podcast Generator ---
def chunk_text(text: str, chunk_size: int = 1500, chunk_overlap: int = 200) -> List[str]:
    """
    Split the text into overlapping chunks of roughly equal size.
    """
    if not text:
        logging.warning("Empty text provided for chunking.")
        return []

    chunks = []
    start = 0
    text_length = len(text)

    while start < text_length:
        # Determine the end position for this chunk
        end = start + chunk_size

        # If we're at the end of the text, use the rest
        if end >= text_length:
            chunks.append(text[start:])
            break

        # Try to find a good break point (end of sentence)
        # Look back from 'end' for a sentence break
        sentence_break = max(text.rfind('. ', end - 100, end),
                             text.rfind('? ', end - 100, end),
                             text.rfind('! ', end - 100, end))

        if sentence_break != -1:
            # Found a sentence break, use it
            chunks.append(text[start:sentence_break + 1])
            start = sentence_break + 1 - chunk_overlap  # Move start with overlap
        else:
            # No good sentence break, try to find a space at least
            space_break = text.rfind(' ', end - 100, end)
            if space_break != -1:
                chunks.append(text[start:space_break])
                start = space_break + 1 - chunk_overlap
            else:
                # No good break point, just chunk at the exact position
                chunks.append(text[start:end])
                start = end - chunk_overlap

        # Make sure we don't go backward
        start = max(0, start)

    logging.info(f"Split text into {len(chunks)} chunks of ~{chunk_size} chars with {chunk_overlap} char overlap.")
    return chunks

# --- Main Process PDF Function (to be imported by the podcast generator) ---
def process_pdf(pdf_folder: str, selection_mode: str = "latest",
                chunk_size: int = 1500, chunk_overlap: int = 200) -> Tuple[List[str], Dict[str, Any]]:
    """
    Process a PDF file from the specified folder and return text chunks and metadata.

    Args:
        pdf_folder: Directory containing PDF files
        selection_mode: How to select the PDF ('latest', 'random')
        chunk_size: Maximum number of characters per chunk
        chunk_overlap: Number of characters to overlap between chunks

    Returns:
        A tuple of (list of text chunks, metadata dictionary)
    """
    try:
        # Select the PDF file
        pdf_path = get_pdf_file(pdf_folder, mode=selection_mode)

        # Extract text and metadata
        full_text, metadata = extract_full_text_metadata_pypdf(pdf_path)

        if not full_text:
            logging.error("Failed to extract text from PDF.")
            return [], {"error": "Text extraction failed", "file_name": os.path.basename(pdf_path)}

        # Create chunks
        chunks = chunk_text(full_text, chunk_size=chunk_size, chunk_overlap=chunk_overlap)

        return chunks, metadata

    except Exception as e:
        logging.error(f"Error processing PDF: {e}")
        return [], {"error": str(e)}

# --- Main EDA Execution Block ---
if __name__ == "__main__":
    logging.info("Standalone EDA Script execution started.")

    # <<< --- CONFIGURATION --- >>>
    PDF_FOLDER = "my_pdfs"  # IMPORTANT: Change this
    SELECTION_MODE = "random"  # "latest", "random" (interactive needs more code)
    TOP_N_WORDS = 30
    MIN_PROPER_NOUN_FREQ = 4
    # <<< --- END CONFIGURATION --- >>>

    try:
        # Step 1: Select PDF
        pdf_path = get_pdf_file(PDF_FOLDER, mode=SELECTION_MODE)

        # Step 2: Extract Full Text & Metadata
        full_text, metadata = extract_full_text_metadata_pypdf(pdf_path)

        if full_text and metadata:
            print("\n" + "=" * 60)
            print(f"Running Standalone EDA for: {metadata.get('file_name', 'N/A')}")
            print("=" * 60)

            # Step 3: Basic Stats Analysis
            print("\n--- Basic Text Statistics ---")
            stats = calculate_basic_stats(full_text)
            if 'error' in stats:
                print(f"  Error: {stats['error']}")
            else:
                for key, value in stats.items():
                    print(f"  {key.replace('_', ' ').capitalize():<30}: {value}")
            print("-" * 60)

            # Step 4: Word Frequency (Potential Topics)
            print(f"\n--- Top {TOP_N_WORDS} Frequent Words (Potential Topics) ---")
            top_words = get_word_frequency(full_text, num_words=TOP_N_WORDS)
            if top_words:
                col_width = 20
                for i in range(0, len(top_words), 2):
                    word1, freq1 = top_words[i]
                    entry1 = f"{word1}: {freq1}"
                    entry2 = ""
                    if i + 1 < len(top_words):
                        word2, freq2 = top_words[i+1]
                        entry2 = f"{word2}: {freq2}"
                    print(f"  {entry1:<{col_width}} {entry2:<{col_width}}")
            else:
                print("  Could not calculate word frequencies.")
            print("-" * 60)

            # Step 5: Potential Proper Nouns (Characters/Entities - HEURISTIC)
            print(f"\n--- Potential Proper Nouns (Frequency >= {MIN_PROPER_NOUN_FREQ}) ---")
            print("  (Warning: Basic heuristic, may include errors. Use NER for accuracy)")
            potential_names = find_potential_proper_nouns(full_text, min_freq=MIN_PROPER_NOUN_FREQ)
            if potential_names:
                col_width = 25
                for i in range(0, len(potential_names), 2):
                    name1, freq1 = potential_names[i]
                    entry1 = f"{name1}: {freq1}"
                    entry2 = ""
                    if i + 1 < len(potential_names):
                        name2, freq2 = potential_names[i+1]
                        entry2 = f"{name2}: {freq2}"
                    print(f"  {entry1:<{col_width}} {entry2:<{col_width}}")
            else:
                print(f"  No potential proper nouns found with frequency >= {MIN_PROPER_NOUN_FREQ}.")
            print("-" * 60)

            # Step 6: Common Header Check
            print("\n--- Common Header Check ---")
            found_headers = find_common_headers(full_text)
            if found_headers:
                for header, count in found_headers.items():
                    print(f"  Found '{header}': {count} time(s)")
            else:
                print("  No common headers (Abstract, Introduction, etc.) found matching patterns.")
            print("-" * 60)
        else:
            print("\n" + "=" * 60)
            print("EDA Failed: Could not extract text from the selected PDF.")
            print("=" * 60)

    except FileNotFoundError as e:
        logging.error(f"File/Folder Error: {e}")
        print(f"\nERROR: {e}")
    except ValueError as e:
        logging.error(f"Configuration or Input Error: {e}")
        print(f"\nERROR: {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}", exc_info=True)
        print(f"\nUNEXPECTED ERROR: {e}")

    logging.info("Standalone EDA Script execution finished.")


Running Standalone EDA for: w27392 (1).pdf

--- Basic Text Statistics ---
  Word count                    : 9930
  Sentence count                : 399
  Avg sentence length words     : 24.89
  Character count               : 62068
------------------------------------------------------------

--- Top 30 Frequent Words (Potential Topics) ---
  covid: 135           students: 117       
  eﬀects: 63           outcomes: 55        
  ∗∗∗: 55              health: 51          
  treatment: 48        economic: 37        
  online: 35           due: 34             
  graduation: 34       job: 34             
  student: 31          likely: 31          
  income: 28           proxies: 27         
  survey: 26           eﬀect: 25           
  honors: 25           average: 25         
  pandemic: 24         expected: 24        
  lost: 24             state: 22           
  academic: 22         sample: 22          
  expectations: 20     shocks: 20          
  major: 20            gpa: 20           

In [4]:
#!pip install sentence_transformers transformers

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence_transformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence_transformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence_transformers)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence_transformers)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.11.0->sentence_transformers)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch>=1.11.0->sentence_transformers)
 

In [4]:
!pip install faiss-cpu

Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl (30.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/30.7 MB[0m [31m66.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.10.0


In [5]:
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
import torch
import numpy as np
from typing import List, Dict, Optional, Tuple, Any
from datetime import datetime
import logging
import os
from collections import Counter
import re
#from pdf_processor import process_pdf  # Assumes pdf_processor.py is in the same directory

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Data Structures ---

class TextChunk:
    def __init__(self, chunk_id: int, text: str, metadata: Optional[Dict] = None):
        self.chunk_id = chunk_id
        self.text = text
        self.metadata = metadata or {}

    def __repr__(self):
        return f"Chunk(id={self.chunk_id}, text='{self.text[:50]}...', metadata={self.metadata})"

class PodcastSegment:
    def __init__(self, segment_type: str, content: str, speakers: Optional[List[str]] = None, title: Optional[str] = None):
        self.segment_type = segment_type
        self.content = content
        self.speakers = speakers or []
        self.title = title

    def format_transcript(self) -> str:
        """Format the segment for the transcript based on its type"""
        if self.segment_type == 'intro':
            return f"\n## 🎙️ {self.title or 'Introduction'}\n\n{self.content}\n"
        elif self.segment_type == 'outro':
            return f"\n## 🎬 {self.title or 'Closing'}\n\n{self.content}\n"
        elif self.segment_type == 'transition':
            return f"\n### {self.title or 'Transition'}\n\n{self.content}\n"
        elif self.segment_type == 'discussion':
            header = f"\n## 💬 {self.title or 'Discussion'}\n\n" if self.title else "\n## 💬 Discussion Segment\n\n"
            return f"{header}{self.content}\n"
        else:
            return f"\n## [{self.segment_type.upper()}]\n\n{self.content}\n"

# --- Vector Database with Sentence Transformers ---

class VectorDB:
    def __init__(self, chunks: List[TextChunk]):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.chunk_texts = [chunk.text for chunk in chunks]
        self.chunk_embeddings = self.model.encode(self.chunk_texts, convert_to_tensor=True)
        self.chunks = chunks

    def retrieve_relevant_chunks(self, query: str, top_k: int = 2) -> List[TextChunk]:
        query_embedding = self.model.encode(query, convert_to_tensor=True)
        similarities = util.cos_sim(query_embedding, self.chunk_embeddings).cpu().numpy()[0]
        top_k_indices = np.argsort(similarities)[::-1][:top_k]
        return [self.chunks[i] for i in top_k_indices]

# --- Hugging Face LLM Provider ---

class HuggingFaceLLMProvider:
    def __init__(self, model_name: str = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"):
        self.pipeline = pipeline("text-generation", model=model_name, device=0 if torch.cuda.is_available() else -1)
        if self.pipeline.tokenizer.pad_token is None:
            self.pipeline.tokenizer.pad_token = self.pipeline.tokenizer.eos_token

    def generate_response(self, prompt: str, max_tokens: int = 100000) -> str:
        """Generate text using the HuggingFace pipeline and clean the response"""
        output = self.pipeline(prompt, max_new_tokens=max_tokens, do_sample=True, temperature=0.7,
                              pad_token_id=self.pipeline.tokenizer.pad_token_id)
        generated_text = output[0]['generated_text'][len(prompt):].strip()
        return self._clean_response(generated_text)

    def _clean_response(self, text: str) -> str:
        """Clean up generated text to remove prompt leakage and formatting artifacts"""
        # Remove any thinking/instruction text (text within < > or [ ])
        text = re.sub(r'<[^>]+>', '', text)
        text = re.sub(r'\[(?!Host)[^\]]+\]', '', text)

        # Remove "Use a formal tone" and similar instructions
        text = re.sub(r'Use a\s+\w+\s+tone\.?\s*', '', text)
        text = re.sub(r'The user wants this for.*?\.', '', text)

        # Remove any remaining meta-instructions
        text = re.sub(r'(?i)I need to generate.*?\.', '', text)
        text = re.sub(r'(?i)I should.*?\.', '', text)
        text = re.sub(r'(?i)I must.*?\.', '', text)
        text = re.sub(r'(?i)The tone needs to be.*?\.', '', text)

        return text.strip()

# --- Podcast Generator Logic ---

class PodcastGenerator:
    def __init__(self, chunks: List[TextChunk], podcast_title: str = "Insights Unpacked",
                 host_names: Optional[List[str]] = None, guest_name: Optional[str] = None):
        if not chunks:
            raise ValueError("Cannot initialize PodcastGenerator with empty chunks.")

        self.chunks = chunks
        self.podcast_title = podcast_title
        self.hosts = host_names or ["Jamie", "Taylor"]
        self.guest = guest_name

        logging.info(f"Initializing PodcastGenerator for '{self.podcast_title}' with {len(chunks)} chunks.")

        # Initialize components
        self.vector_db = VectorDB(chunks)
        self.llm = HuggingFaceLLMProvider()

        # Analyze content and determine episode details
        self.topics = self._determine_topics(top_n=3)
        self.episode_subtitle = self._generate_episode_subtitle()
        self.episode_number = 1

        logging.info(f"Determined topics: {self.topics}")
        logging.info(f"Episode subtitle: {self.episode_subtitle}")

    def _determine_topics(self, top_n: int = 3) -> List[str]:
        """Extract most relevant topics from document chunks"""
        logging.info("Determining topics from chunks...")
        full_text = " ".join([chunk.text for chunk in self.chunks])

        # Improved text preprocessing for topic extraction
        text = full_text.lower()
        text = re.sub(r'[^\w\s]', ' ', text)
        words = text.split()

        # Extended stop words list for better topic filtering
        stop_words = set([
            'a', 'an', 'the', 'is', 'in', 'it', 'of', 'for', 'on', 'and', 'to', 'was', 'were', 'be',
            'this', 'that', 'those', 'these', 'they', 'them', 'their', 'with', 'from', 'have', 'has',
            'had', 'am', 'are', 'been', 'being', 'by', 'at', 'as', 'if', 'or', 'not', 'no', 'but',
            'can', 'could', 'would', 'should', 'will', 'shall', 'may', 'might', 'must', 'about'
        ])

        # Extract noun phrases (approximated with n-grams)
        bigrams = [' '.join(words[i:i+2]) for i in range(len(words)-1)]
        trigrams = [' '.join(words[i:i+3]) for i in range(len(words)-2)]

        # Filter out phrases containing only stop words
        filtered_bigrams = [bg for bg in bigrams if not all(word in stop_words for word in bg.split())]
        filtered_trigrams = [tg for tg in trigrams if not all(word in stop_words for word in tg.split())]

        # Single words (only keep words longer than 3 characters not in stop words)
        filtered_words = [word for word in words if word not in stop_words and len(word) > 3]

        # Count occurrences
        word_counts = Counter(filtered_words)
        bigram_counts = Counter(filtered_bigrams)
        trigram_counts = Counter(filtered_trigrams)

        # Combine and weight n-grams higher
        all_counts = word_counts
        for bg, count in bigram_counts.items():
            all_counts[bg] = count * 2  # Weight bigrams higher

        for tg, count in trigram_counts.items():
            all_counts[tg] = count * 3  # Weight trigrams even higher

        # Select top topics, prefer n-grams over single words
        potential_topics = [topic for topic, _ in all_counts.most_common(top_n * 2)]

        # Prioritize phrases
        phrases = [t for t in potential_topics if ' ' in t]
        singles = [t for t in potential_topics if ' ' not in t]

        # Combine, prioritizing phrases
        selected_topics = phrases[:top_n]
        if len(selected_topics) < top_n:
            selected_topics.extend(singles[:top_n - len(selected_topics)])

        # Use default if no good topics are found
        return selected_topics[:top_n] if selected_topics else ["General Overview"]

    def _generate_episode_subtitle(self) -> str:
        """Create an engaging episode subtitle based on the main topics"""
        main_topic = self.topics[0] if self.topics else "Industry Insights"

        if len(self.topics) >= 2:
            prompt = f"Create a catchy, professional podcast episode subtitle that covers '{self.topics[0]}' and '{self.topics[1]}'. Make it sound engaging and professional, under 10 words."
        else:
            prompt = f"Create a catchy, professional podcast episode subtitle about '{main_topic}'. Make it sound engaging and professional, under 10 words."

        subtitle = self.llm.generate_response(prompt, max_tokens=30)

        # Clean up and format
        subtitle = subtitle.strip('"\'').strip()
        if len(subtitle) > 60:  # If still too long, truncate
            subtitle = subtitle[:57] + "..."

        return subtitle

    def _generate_intro(self) -> PodcastSegment:
        """Generate an engaging podcast introduction segment"""
        logging.info("Generating podcast intro segment.")

        topics_str = ", ".join(self.topics)
        host_str = ", ".join(self.hosts)
        guest_str = f" with special guest {self.guest}" if self.guest else ""

        prompt = f"""Generate a professional podcast introduction for '{self.podcast_title}', Episode {self.episode_number}: "{self.episode_subtitle}".

        The podcast covers these topics: {topics_str}.

        The hosts are {host_str}{guest_str}.

        Format the output as:

        🎙️ Podcast Title: {self.podcast_title}
        🎧 Episode: {self.episode_number} — "{self.episode_subtitle}"

        {self.hosts[0]} (Host):
        [Engaging welcome message, mentioning the podcast focus, co-host, and today's topics]

        {self.hosts[1]} (Host):
        [Brief follow-up and, if applicable, guest introduction]

        {self.guest or ''}:
        [If there's a guest, include their opening remarks thanking the hosts and briefly mentioning their expertise]

        Make it sound natural, engaging and professional. Keep it under 150 words total.
        """

        intro_text = self.llm.generate_response(prompt, max_tokens=200)
        return PodcastSegment(segment_type='intro', content=intro_text, speakers=[self.hosts[0]], title=f"Podcast Title: {self.podcast_title}")

    def _generate_outro(self) -> PodcastSegment:
        """Generate a podcast outro segment that wraps up the discussion"""
        logging.info("Generating podcast outro segment.")

        topics_str = ", ".join(self.topics)

        prompt = f"""Generate a professional podcast outro for '{self.podcast_title}', Episode {self.episode_number}: "{self.episode_subtitle}".

        The podcast covered these topics: {topics_str}.

        Format the output as:

        ## Final Thoughts

        {self.hosts[1]} (Host):
        [Brief summary of what was covered and acknowledgment to guest or co-host]

        {self.guest or self.hosts[0]}:
        [Brief closing remarks]

        {self.hosts[0]} (Host):
        [Thank listeners, encourage subscription/reviews, and briefly tease the next episode]

        Make it sound natural and professional. Keep it under 100 words total.
        """

        outro_text = self.llm.generate_response(prompt, max_tokens=150)
        return PodcastSegment(segment_type='outro', content=outro_text, speakers=self.hosts, title="Final Thoughts")

    def _generate_transition(self, from_topic: Optional[str], to_topic: str) -> PodcastSegment:
        """Generate a smooth transition between podcast segments"""
        logging.info(f"Generating transition segment to topic: '{to_topic}'.")

        if from_topic:
            prompt = f"""Write a brief, natural-sounding transition from '{from_topic}' to '{to_topic}' for a professional podcast.
            The transition should be spoken by {self.hosts[0]} and should sound conversational.
            Format as:
            {self.hosts[0]}:
            [Transition text connecting the previous topic to the new one]

            Keep it under 30 words and make it flow naturally.
            """
        else:
            prompt = f"""Write a brief, natural-sounding transition to introduce the topic '{to_topic}' for a professional podcast.
            The transition should be spoken by {self.hosts[0]} and should sound conversational.
            Format as:
            {self.hosts[0]}:
            [Transition text introducing the new topic]

            Keep it under 30 words and make it flow naturally.
            """

        transition_text = self.llm.generate_response(prompt, max_tokens=50)
        return PodcastSegment(segment_type='transition', content=transition_text, title=f"Transition to {to_topic}")

    def _format_dialogue(self, text: str) -> str:
        """Ensure dialogue is correctly formatted with proper speaker attributions"""
        # Standardize speaker attributions
        for host in self.hosts:
            # Replace variations like "Host A:" or "Host A -" with standard format
            text = re.sub(fr'{host}\s*[-:]\s*', f"{host}: ", text)

        if self.guest:
            # Standardize guest attribution
            text = re.sub(fr'{self.guest}\s*[-:]\s*', f"{self.guest}: ", text)

        # Ensure proper line breaks between speakers
        text = re.sub(r'(\w+:\s*[^\n]+)(\w+:)', r'\1\n\n\2', text)

        return text.strip()

    def _generate_discussion_segment(self, topic: str, segment_number: int) -> PodcastSegment:
        """Generate a natural-sounding discussion segment focused on a specific topic"""
        logging.info(f"Generating discussion segment {segment_number} for topic: '{topic}'.")

        # Retrieve relevant content for this topic
        relevant_chunks = self.vector_db.retrieve_relevant_chunks(query=topic, top_k=2)
        context_text = "\n".join([chunk.text for chunk in relevant_chunks]) if relevant_chunks else "No specific context available."

        # Create segment title
        segment_title = f"Segment {segment_number}: {topic.title()}"

        # Define speakers for this segment
        if self.guest and segment_number % 2 == 1:  # Alternate guest involvement
            speakers = [self.hosts[0], self.hosts[1], self.guest]
            speaker_str = f"{self.hosts[0]}, {self.hosts[1]}, and {self.guest}"
        else:
            speakers = self.hosts
            speaker_str = " and ".join(self.hosts)

        # Create detailed prompt
        prompt = f"""Generate an engaging, informative podcast dialogue about '{topic}' between {speaker_str}.

        Use this context from the document as reference:
        {context_text[:800]}...

        Format the output as a natural dialogue:

        {self.hosts[0]}: [Ask an insightful question about {topic}]

        {self.hosts[1] if len(speakers) > 1 else speakers[0]}: [Provide a detailed, informative response that incorporates specific details from the context]

        [Continue the dialogue with 2-3 more exchanges, exploring different aspects of {topic}]

        Each speaker should have 2-3 turns in the conversation. Make responses substantive but not too lengthy (2-3 sentences each).
        Include technical details from the context where appropriate, but explain them clearly.
        Avoid using meta-instructions or explaining what you're doing in the output.

        Keep the entire dialogue between 200-300 words.
        """

        # Generate the dialogue
        dialogue_text = self.llm.generate_response(prompt, max_tokens=400)

        # Format and clean the dialogue
        dialogue_text = self._format_dialogue(dialogue_text)

        return PodcastSegment(segment_type='discussion', content=dialogue_text,
                              speakers=speakers, title=segment_title)

    def generate_transcript(self) -> str:
        """Generate a complete podcast transcript with all segments"""
        logging.info("Starting full podcast transcript generation...")

        transcript_segments: List[PodcastSegment] = []

        # Add introduction
        transcript_segments.append(self._generate_intro())

        # Generate segments for each topic
        last_topic = None
        for i, topic in enumerate(self.topics):
            # Add transition to this topic
            transcript_segments.append(self._generate_transition(from_topic=last_topic, to_topic=topic))

            # Add discussion segment
            transcript_segments.append(self._generate_discussion_segment(topic=topic, segment_number=i+1))

            last_topic = topic

        # Add outro
        transcript_segments.append(self._generate_outro())

        # Combine all segments into full transcript
        full_transcript = f"# {self.podcast_title} - Episode {self.episode_number}: {self.episode_subtitle}\n"
        full_transcript += f"# Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
        full_transcript += "=" * 50 + "\n"

        for segment in transcript_segments:
            full_transcript += segment.format_transcript()

        logging.info("Podcast transcript generation finished.")
        return full_transcript

# --- Main Execution Block ---

if __name__ == "__main__":
    print("--- RAG Podcast Generator with DeepSeek Model ---")
    print("Using deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B for generation.")
    print("-" * 50)

    try:
        # Process PDF to extract chunks
        chunks, metadata = process_pdf(
            pdf_folder="my_pdfs",
            selection_mode="latest",
            chunk_size=1500,
            chunk_overlap=200
        )

        if chunks:
            # Convert chunks to TextChunk objects
            text_chunks = [TextChunk(i, chunk, {"source": metadata["file_name"]}) for i, chunk in enumerate(chunks)]

            # Create podcast title from document title if available
            doc_title = metadata.get("title", "")
            podcast_title = f"{doc_title} Insights" if doc_title and doc_title != "Unknown" else "Digital Insights"

            # Initialize generator with document-based title
            generator = PodcastGenerator(
                chunks=text_chunks,
                podcast_title=podcast_title,
                host_names=["Jamie", "Taylor"],
                guest_name="Dr. Priya Sharma"  # Optional: Add a guest expert
            )

            # Generate the transcript
            transcript = generator.generate_transcript()

            # Display and save output
            print("\n--- Generated Podcast Transcript ---")
            print(transcript[:1000] + "...\n(transcript continues)")
            print("-" * 50)

            # Save to file with timestamp
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            output_filename = f"podcast_transcript_{timestamp}.md"
            with open(output_filename, "w", encoding="utf-8") as f:
                f.write(transcript)
            print(f"Complete transcript saved to {output_filename}")

        else:
            print("No chunks returned from pdf_processor.")

    except Exception as e:
        logging.error(f"An error occurred: {e}", exc_info=True)
        print(f"\nAn error occurred: {e}")

--- RAG Podcast Generator with DeepSeek Model ---
Using deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B for generation.
--------------------------------------------------


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/679 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.55G [00:00<?, ?B/s]

Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


generation_config.json:   0%|          | 0.00/181 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/3.07k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

Device set to use cuda:0



--- Generated Podcast Transcript ---
# Digital Insights - Episode 1: The subtitle should be in English. Also, the subtitle sho...
# Generated on: 2025-04-12 22:25:17

## 🎙️ Podcast Title: Digital Insights

Okay, I need to help this user create a professional podcast introduction for "Digital Insights," Episode 1. Let me start by understanding the requirements. The podcast title is Digital Insights, and the episode is Episode 1, specifically "The subtitle should be in English. Also, the subtitle sho...". Hmm, the subtitle part seems a bit confusing. Maybe it's a placeholder or a typo, so I'll focus on the main points.

The hosts are Jamie and Taylor, with Dr. Priya Sharma as a guest. The podcast covers COVID-19 and other related topics. The user wants the output in a specific format, using emojis and the hosts' names. They also mentioned to keep it natural, engaging, and professional, under 150 words.

First, I'll structure the introduction. It should start with a catchy t...
(transcri