In [10]:
# ===== CELL 1: Setup and Imports =====
import os
import sys
import json
import yaml
from pathlib import Path
from typing import Optional, Dict, Any
from dotenv import load_dotenv
import PyPDF2
import fitz  # PyMuPDF
import google.generativeai as genai
from elevenlabs.client import ElevenLabs
from elevenlabs import play, save
from tqdm import tqdm
import time

# Load environment variables
load_dotenv()

# Set up paths
PROJECT_ROOT = Path.cwd()
DATA_DIR = PROJECT_ROOT / "data"
INPUT_DIR = DATA_DIR / "input"
OUTPUT_DIR = DATA_DIR / "output"
CONFIG_DIR = PROJECT_ROOT / "config"

# Create directories if they don't exist
for dir_path in [DATA_DIR, INPUT_DIR, OUTPUT_DIR, CONFIG_DIR]:
    dir_path.mkdir(exist_ok=True)

print("✅ Environment setup complete!")

✅ Environment setup complete!


In [11]:
# ===== CELL 2: Configuration Management =====
def load_config():
    """Load configuration from YAML file"""
    config_path = CONFIG_DIR / "config.yaml"
    
    # Default config if file doesn't exist
    default_config = {
        'pdf': {'max_pages': 50},
        'gemini': {
            'model': 'gemini-1.5-flash',
            'temperature': 0.7,
            'max_output_tokens': 8192
        },
        'podcast': {
            'target_duration_minutes': 10,
            'style': 'conversational',
            'tone': 'engaging'
        },
        'elevenlabs': {
            'model': 'eleven_monolingual_v1',
            'voice_settings': {
                'stability': 0.75,
                'similarity_boost': 0.85,
                'style': 0.0,
                'use_speaker_boost': True
            }
        }
    }
    
    if config_path.exists():
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
    else:
        config = default_config
        # Save default config
        with open(config_path, 'w') as f:
            yaml.dump(config, f, default_flow_style=False)
    
    return config

config = load_config()
print("✅ Configuration loaded!")
print(f"Target podcast duration: {config['podcast']['target_duration_minutes']} minutes")

✅ Configuration loaded!
Target podcast duration: 10 minutes


In [12]:
# ===== CELL 3: API Setup and Validation =====
def setup_apis():
    """Setup and validate API connections"""
    
    # Setup Gemini
    gemini_key = os.getenv('GEMINI_API_KEY')
    if not gemini_key:
        raise ValueError("GEMINI_API_KEY not found in environment variables")
    
    genai.configure(api_key=gemini_key)
    
    # Setup ElevenLabs
    elevenlabs_key = os.getenv('ELEVENLABS_API_KEY')
    if not elevenlabs_key:
        raise ValueError("ELEVENLABS_API_KEY not found in environment variables")
    
    # Initialize ElevenLabs client
    global eleven_client
    eleven_client = ElevenLabs(api_key=elevenlabs_key)
    
    return True

def test_apis():
    """Test API connections"""
    try:
        # Test Gemini
        model = genai.GenerativeModel('gemini-1.5-flash')
        response = model.generate_content("Say 'API test successful'")
        print(f"✅ Gemini API: {response.text.strip()}")
        
        # Test ElevenLabs (get available voices)
        available_voices = eleven_client.voices.get_all()
        print(f"✅ ElevenLabs API: Found {len(available_voices.voices)} voices")
        
        return True
    except Exception as e:
        print(f"❌ API Test Failed: {e}")
        return False

# Setup APIs
setup_apis()
api_status = test_apis()

✅ Gemini API: API test successful
✅ ElevenLabs API: Found 19 voices


In [13]:
# ===== CELL 4: PDF Text Extraction =====
class PDFProcessor:
    def __init__(self, max_pages: int = 50):
        self.max_pages = max_pages
    
    def extract_text_pymupdf(self, pdf_path: str) -> str:
        """Extract text using PyMuPDF (better for complex PDFs)"""
        doc = fitz.open(pdf_path)
        text = ""
        
        pages_to_process = min(len(doc), self.max_pages)
        
        for page_num in tqdm(range(pages_to_process), desc="Extracting PDF text"):
            page = doc.load_page(page_num)
            text += page.get_text()
            text += "\n\n"
        
        doc.close()
        return text.strip()
    
    def extract_text_pypdf2(self, pdf_path: str) -> str:
        """Extract text using PyPDF2 (fallback method)"""
        with open(pdf_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            text = ""
            
            pages_to_process = min(len(pdf_reader.pages), self.max_pages)
            
            for page_num in tqdm(range(pages_to_process), desc="Extracting PDF text"):
                page = pdf_reader.pages[page_num]
                text += page.extract_text()
                text += "\n\n"
        
        return text.strip()
    
    def extract_text(self, pdf_path: str) -> str:
        """Extract text from PDF using the best available method"""
        try:
            # Try PyMuPDF first (better quality)
            text = self.extract_text_pymupdf(pdf_path)
            if len(text.strip()) < 100:  # If extraction failed
                raise Exception("PyMuPDF extraction insufficient")
            return text
        except Exception as e:
            print(f"PyMuPDF failed ({e}), falling back to PyPDF2...")
            return self.extract_text_pypdf2(pdf_path)

# Initialize PDF processor
pdf_processor = PDFProcessor(max_pages=config['pdf']['max_pages'])
print("✅ PDF processor initialized!")

✅ PDF processor initialized!


In [14]:
# ===== CELL 5: Gemini-Powered Script Generation =====
class PodcastScriptGenerator:
    def __init__(self, model_name: str = "gemini-1.5-flash"):
        self.model = genai.GenerativeModel(model_name)
        self.config = config['gemini']
        self.podcast_config = config['podcast']
    
    def analyze_text_length(self, text: str) -> Dict[str, Any]:
        """Analyze the input text to understand its scope"""
        word_count = len(text.split())
        char_count = len(text)
        
        # Estimate reading time (average 200 words per minute)
        reading_time_minutes = word_count / 200
        
        return {
            'word_count': word_count,
            'char_count': char_count,
            'estimated_reading_time': reading_time_minutes,
            'pages_estimate': word_count / 250  # ~250 words per page
        }
    
    def create_podcast_prompt(self, text: str, analysis: Dict[str, Any]) -> str:
        """Create a comprehensive prompt for podcast script generation"""
        
        target_duration = self.podcast_config['target_duration_minutes']
        style = self.podcast_config['style']
        tone = self.podcast_config['tone']
        
        prompt = f"""
You are an expert podcast scriptwriter. Convert the following research paper into an engaging {target_duration}-minute podcast script.

**REQUIREMENTS:**
- Create a {style} and {tone} podcast script
- Target duration: {target_duration} minutes (approximately {target_duration * 150} words)
- Structure: Introduction, Main Content (2-3 key points), Conclusion
- Use natural speech patterns, pauses, and transitions
- Make complex concepts accessible to general audience
- Include engaging hooks and smooth transitions

**INPUT TEXT ANALYSIS:**
- Word count: {analysis['word_count']}
- Estimated reading time: {analysis['estimated_reading_time']:.1f} minutes
- Content scope: {analysis['pages_estimate']:.1f} pages

**SCRIPT FORMAT:**
Structure your response as a natural podcast script with:
1. **INTRO** (30 seconds): Hook the audience with the paper's significance
2. **MAIN CONTENT** (8-9 minutes): Break down key findings into digestible segments
3. **CONCLUSION** (30 seconds): Summarize impact and future implications

**TONE GUIDELINES:**
- Conversational and accessible
- Enthusiastic about the research
- Use analogies and examples
- Smooth transitions between topics
- Natural speaking rhythm

**RESEARCH PAPER TEXT:**
{text}

Generate the podcast script now:
"""
        return prompt
    
    def generate_script(self, text: str) -> str:
        """Generate podcast script from research paper text"""
        
        print("🔍 Analyzing text...")
        analysis = self.analyze_text_length(text)
        
        print(f"📊 Text Analysis:")
        print(f"   - Words: {analysis['word_count']:,}")
        print(f"   - Characters: {analysis['char_count']:,}")
        print(f"   - Estimated reading time: {analysis['estimated_reading_time']:.1f} minutes")
        
        print("🤖 Generating podcast script with Gemini...")
        prompt = self.create_podcast_prompt(text, analysis)
        
        try:
            response = self.model.generate_content(
                prompt,
                generation_config=genai.types.GenerationConfig(
                    temperature=self.config['temperature'],
                    max_output_tokens=self.config['max_output_tokens']
                )
            )
            
            script = response.text
            
            # Basic validation
            if len(script) < 500:
                raise Exception("Generated script too short")
            
            print(f"✅ Script generated! Length: {len(script.split())} words")
            return script
            
        except Exception as e:
            print(f"❌ Script generation failed: {e}")
            raise

# Initialize script generator
script_generator = PodcastScriptGenerator(config['gemini']['model'])
print("✅ Podcast script generator initialized!")

✅ Podcast script generator initialized!


In [18]:
# ===== CELL 6: ElevenLabs Text-to-Speech (FIXED WITH TIMEOUT & CHUNKING) =====
class AudioGenerator:
    def __init__(self):
        self.config = config['elevenlabs']
        self.voice_id = os.getenv('ELEVENLABS_VOICE_ID', None)
        self.client = eleven_client
        # Set longer timeout for audio generation
        self.client.timeout = 300  # 5 minutes
    
    def get_available_voices(self):
        """Get list of available voices"""
        try:
            voices_response = self.client.voices.get_all()
            return [(voice.voice_id, voice.name) for voice in voices_response.voices]
        except Exception as e:
            print(f"Error getting voices: {e}")
            return []
    
    def select_best_voice(self):
        """Select the best voice for podcast narration"""
        available_voices = self.get_available_voices()
        
        if not available_voices:
            return None
        
        # If specific voice ID is set, use it
        if self.voice_id:
            for voice_id, name in available_voices:
                if voice_id == self.voice_id:
                    print(f"🎤 Using specified voice: {name}")
                    return voice_id
        
        # Prefer specific voices for podcast narration (if available)
        preferred_names = ['Rachel', 'Adam', 'Antoni', 'Arnold', 'Josh', 'Sam', 'Daniel', 'Lily']
        
        for voice_id, name in available_voices:
            if name in preferred_names:
                print(f"🎤 Selected voice: {name}")
                return voice_id
        
        # Use first available voice as fallback
        voice_id, name = available_voices[0]
        print(f"🎤 Using voice: {name}")
        return voice_id
    
    def chunk_text(self, text: str, max_chars: int = 2500) -> list:
        """Split text into smaller chunks for processing"""
        chunks = []
        sentences = text.split('. ')
        current_chunk = ""
        
        for sentence in sentences:
            # If adding this sentence would exceed the limit, save current chunk
            if len(current_chunk) + len(sentence) + 2 > max_chars and current_chunk:
                chunks.append(current_chunk.strip() + '.')
                current_chunk = sentence
            else:
                if current_chunk:
                    current_chunk += '. ' + sentence
                else:
                    current_chunk = sentence
        
        # Add the last chunk
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def generate_audio_chunk(self, text: str, voice_id: str, chunk_num: int) -> bytes:
        """Generate audio for a single text chunk"""
        print(f"   🔄 Processing chunk {chunk_num}: {len(text)} characters")
        
        try:
            # Generate audio with retry logic
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    audio = self.client.text_to_speech.convert(
                        voice_id=voice_id,
                        text=text,
                        model_id=self.config['model'],
                        voice_settings={
                            "stability": self.config['voice_settings']['stability'],
                            "similarity_boost": self.config['voice_settings']['similarity_boost'],
                            "style": self.config['voice_settings']['style'],
                            "use_speaker_boost": self.config['voice_settings']['use_speaker_boost']
                        }
                    )
                    
                    # Collect all audio data
                    audio_data = b""
                    for chunk in audio:
                        audio_data += chunk
                    
                    return audio_data
                    
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"   ⚠️ Chunk {chunk_num} attempt {attempt + 1} failed, retrying...")
                        time.sleep(2 ** attempt)  # Exponential backoff
                    else:
                        raise e
                        
        except Exception as e:
            print(f"   ❌ Failed to generate chunk {chunk_num}: {e}")
            raise e
    
    def generate_audio(self, text: str, output_path: str) -> bool:
        """Generate audio from text using ElevenLabs - IMPROVED WITH CHUNKING"""
        
        try:
            # Select voice
            voice_id = self.select_best_voice()
            if not voice_id:
                raise Exception("No voices available")
            
            print("🎵 Generating audio with ElevenLabs...")
            print(f"   - Text length: {len(text)} characters")
            print(f"   - Estimated audio duration: {len(text.split()) / 150:.1f} minutes")
            
            # Check if text needs to be chunked (over 2500 characters)
            if len(text) > 2500:
                print("   📝 Text is long, splitting into chunks...")
                chunks = self.chunk_text(text, max_chars=2500)
                print(f"   📦 Split into {len(chunks)} chunks")
                
                # Generate audio for each chunk
                all_audio_data = b""
                
                for i, chunk in enumerate(chunks, 1):
                    chunk_audio = self.generate_audio_chunk(chunk, voice_id, i)
                    all_audio_data += chunk_audio
                    
                    # Small delay between chunks to avoid rate limiting
                    if i < len(chunks):
                        time.sleep(1)
                
                # Save combined audio
                with open(output_path, "wb") as f:
                    f.write(all_audio_data)
                    
            else:
                # Single chunk processing
                print("   📝 Processing as single chunk...")
                audio = self.client.text_to_speech.convert(
                    voice_id=voice_id,
                    text=text,
                    model_id=self.config['model'],
                    voice_settings={
                        "stability": self.config['voice_settings']['stability'],
                        "similarity_boost": self.config['voice_settings']['similarity_boost'],
                        "style": self.config['voice_settings']['style'],
                        "use_speaker_boost": self.config['voice_settings']['use_speaker_boost']
                    }
                )
                
                # Save audio file
                with open(output_path, "wb") as f:
                    for chunk in audio:
                        f.write(chunk)
            
            print(f"✅ Audio saved to: {output_path}")
            
            # Verify file was created and has content
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                file_size = os.path.getsize(output_path) / (1024 * 1024)  # MB
                print(f"   📁 File size: {file_size:.2f} MB")
                return True
            else:
                raise Exception("Audio file was not created or is empty")
            
        except Exception as e:
            print(f"❌ Audio generation failed: {e}")
            # Print more detailed error info
            import traceback
            print(f"Full error: {traceback.format_exc()}")
            return False

# Initialize audio generator
audio_generator = AudioGenerator()
print("✅ Audio generator initialized!")

# Display available voices
voices_list = audio_generator.get_available_voices()
print(f"📢 Available voices: {len(voices_list)}")
for voice_id, name in voices_list[:5]:  # Show first 5
    print(f"   - {name}")

✅ Audio generator initialized!
📢 Available voices: 19
   - Aria
   - Sarah
   - Laura
   - Charlie
   - George


In [19]:
# ===== CELL 7: Main Pipeline Function =====
def run_pdf_to_podcast_pipeline(pdf_path: str, output_name: str = "podcast"):
    """Main pipeline to convert PDF to podcast"""
    
    print("🚀 Starting PDF to Podcast Pipeline")
    print("="*50)
    
    # Validate input
    if not os.path.exists(pdf_path):
        raise FileNotFoundError(f"PDF file not found: {pdf_path}")
    
    # Define output paths
    text_output = OUTPUT_DIR / f"{output_name}_extracted.txt"
    script_output = OUTPUT_DIR / f"{output_name}_script.txt"
    audio_output = OUTPUT_DIR / f"{output_name}.mp3"
    
    try:
        # Step 1: Extract text from PDF
        print("\n📄 Step 1: Extracting text from PDF...")
        extracted_text = pdf_processor.extract_text(pdf_path)
        
        # Save extracted text
        with open(text_output, 'w', encoding='utf-8') as f:
            f.write(extracted_text)
        
        print(f"✅ Text extracted: {len(extracted_text.split())} words")
        
        # Step 2: Generate podcast script
        print("\n🤖 Step 2: Generating podcast script...")
        podcast_script = script_generator.generate_script(extracted_text)
        
        # Save script
        with open(script_output, 'w', encoding='utf-8') as f:
            f.write(podcast_script)
        
        print(f"✅ Script generated: {len(podcast_script.split())} words")
        
        # Step 3: Generate audio
        print("\n🎵 Step 3: Converting script to audio...")
        success = audio_generator.generate_audio(podcast_script, str(audio_output))
        
        if success:
            print("\n🎉 Pipeline completed successfully!")
            print(f"📁 Output files:")
            print(f"   - Extracted text: {text_output}")
            print(f"   - Podcast script: {script_output}")
            print(f"   - Audio file: {audio_output}")
            
            return {
                'text_file': text_output,
                'script_file': script_output,
                'audio_file': audio_output,
                'success': True
            }
        else:
            raise Exception("Audio generation failed")
            
    except Exception as e:
        print(f"\n❌ Pipeline failed: {e}")
        return {'success': False, 'error': str(e)}

print("✅ Pipeline function ready!")

✅ Pipeline function ready!


In [20]:
# ===== CELL 8: Execute Pipeline (MODIFY THIS PART) =====
# Example: Process the Transformer paper
print("📋 To run the pipeline:")
print("1. Download a PDF file to the 'data/input/' folder")
print("2. Update the pdf_path variable below")
print("3. Run the pipeline")
print()
print("Example:")
print(f"pdf_path = INPUT_DIR / 'your_paper.pdf'")
print(f"result = run_pdf_to_podcast_pipeline(pdf_path, 'my_podcast')")

# UNCOMMENT AND MODIFY THESE LINES TO RUN WITH YOUR PDF:
pdf_path = INPUT_DIR / "transformer_paper.pdf"  # Replace with your PDF filename
result = run_pdf_to_podcast_pipeline(pdf_path, "transformer_podcast")

📋 To run the pipeline:
1. Download a PDF file to the 'data/input/' folder
2. Update the pdf_path variable below
3. Run the pipeline

Example:
pdf_path = INPUT_DIR / 'your_paper.pdf'
result = run_pdf_to_podcast_pipeline(pdf_path, 'my_podcast')
🚀 Starting PDF to Podcast Pipeline

📄 Step 1: Extracting text from PDF...


Extracting PDF text: 100%|██████████| 15/15 [00:00<00:00, 107.20it/s]

✅ Text extracted: 6095 words

🤖 Step 2: Generating podcast script...
🔍 Analyzing text...
📊 Text Analysis:
   - Words: 6,095
   - Characters: 39,525
   - Estimated reading time: 30.5 minutes
🤖 Generating podcast script with Gemini...





✅ Script generated! Length: 651 words
✅ Script generated: 651 words

🎵 Step 3: Converting script to audio...
🎤 Selected voice: Daniel
🎵 Generating audio with ElevenLabs...
   - Text length: 4591 characters
   - Estimated audio duration: 4.3 minutes
   📝 Text is long, splitting into chunks...
   📦 Split into 2 chunks
   🔄 Processing chunk 1: 2422 characters
   🔄 Processing chunk 2: 2166 characters
✅ Audio saved to: c:\Users\Mohammed Kayser\OneDrive\Desktop\My_Portfolio\Projects\PDF_2_PODCAST_CLAUDE\notebooks\data\output\transformer_podcast.mp3
   📁 File size: 4.69 MB

🎉 Pipeline completed successfully!
📁 Output files:
   - Extracted text: c:\Users\Mohammed Kayser\OneDrive\Desktop\My_Portfolio\Projects\PDF_2_PODCAST_CLAUDE\notebooks\data\output\transformer_podcast_extracted.txt
   - Podcast script: c:\Users\Mohammed Kayser\OneDrive\Desktop\My_Portfolio\Projects\PDF_2_PODCAST_CLAUDE\notebooks\data\output\transformer_podcast_script.txt
   - Audio file: c:\Users\Mohammed Kayser\OneDrive\Des