# Document Summarizer with Ollama (gemma:2b)

Extract text from DOCX, PPTX, PDF, VTT, HTML, and TXT files. Optionally generate summaries using local Ollama (`gemma:2b`).

**Workflow**: Place files in `./input/` ‚Üí Run extraction ‚Üí Outputs saved to `./docs/` ‚Üí Use with `llama_index_rag.ipynb` for RAG

In [None]:
# Install required packages
!pip3 install python-docx python-pptx PyPDF2 webvtt-py beautifulsoup4 python-dotenv requests

## Setup

In [None]:
# Import necessary libraries
import os
from pathlib import Path
from typing import Dict, List, Optional
import json
import requests

# Document processing libraries
import docx
from pptx import Presentation
import PyPDF2
import webvtt
from bs4 import BeautifulSoup

print("‚úÖ All imports successful")

## Ollama LLM Configuration

Ensure Ollama is running at `http://localhost:11434` with `gemma:2b` available.

In [None]:
# Lightweight Ollama caller using requests
import requests
import os
from typing import List, Dict, Any

OLLAMA_BASE = os.getenv('OLLAMA_BASE_URL', 'http://localhost:11434')
OLLAMA_MODEL = os.getenv('OLLAMA_MODEL', 'gemma:2b')
OLLAMA_TIMEOUT = int(os.getenv('OLLAMA_TIMEOUT', '120'))

def call_llm(messages: List[Dict[str, str]], model: str | None = None, temperature: float = 0.1, timeout: int | None = None) -> Dict[str, Any]:
    """Call local Ollama chat endpoint and return parsed content."""
    model = model or OLLAMA_MODEL
    timeout = timeout or OLLAMA_TIMEOUT

    url = f"{OLLAMA_BASE}/api/chat"
    payload = {"model": model, "messages": messages, "temperature": temperature}
    try:
        resp = requests.post(url, json=payload, timeout=timeout)
        resp.raise_for_status()
        data = resp.json()

        # Ollama responses can vary; try common locations for text
        content = ''
        if isinstance(data, dict):
            # standard OpenAI-like choices.response structure
            if 'choices' in data and isinstance(data['choices'], list) and data['choices']:
                choice = data['choices'][0]
                if isinstance(choice, dict):
                    # try nested message content
                    msg = choice.get('message') or choice.get('delta') or choice.get('text')
                    if isinstance(msg, dict):
                        content = msg.get('content', '')
                    elif isinstance(msg, str):
                        content = msg
            # fallback fields
            if not content and 'text' in data:
                content = data.get('text', '')
            if not content:
                content = str(data)
        else:
            content = str(data)

        return {'content': content, 'raw': data}
    except Exception as e:
        return {'content': '', 'error': str(e)}

# Quick smoke test (uncomment to run)
# test = call_llm([{'role':'system','content':'You are a helpful assistant.'},{'role':'user','content':'Say hello in one sentence.'}])
# print(test)

In [None]:
# Optional: Test call_llm interactively (uncomment to run)
# test = call_llm([{'role':'user','content':'Say hello in one sentence.'}])
# print(test.get('content'))

## Text Extraction

In [None]:
def extract_text_from_docx(file_path: str) -> str:
    """Extract text from DOCX file"""
    try:
        doc = docx.Document(file_path)
        text_content = []
        
        for paragraph in doc.paragraphs:
            if paragraph.text.strip():
                text_content.append(paragraph.text)
        
        # Extract text from tables
        for table in doc.tables:
            for row in table.rows:
                for cell in row.cells:
                    if cell.text.strip():
                        text_content.append(cell.text)
        
        return "\n".join(text_content)
    except Exception as e:
        return f"Error extracting DOCX: {str(e)}"


def extract_text_from_pptx(file_path: str) -> str:
    """Extract text from PowerPoint PPTX file"""
    try:
        prs = Presentation(file_path)
        text_content = []
        
        for slide_num, slide in enumerate(prs.slides, 1):
            text_content.append(f"\n--- Slide {slide_num} ---")
            for shape in slide.shapes:
                if hasattr(shape, "text") and shape.text.strip():
                    text_content.append(shape.text)
        
        return "\n".join(text_content)
    except Exception as e:
        return f"Error extracting PPTX: {str(e)}"


def extract_text_from_pdf(file_path: str) -> str:
    """Extract text from PDF file"""
    try:
        text_content = []
        
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            
            for page_num in range(len(pdf_reader.pages)):
                page = pdf_reader.pages[page_num]
                text = page.extract_text()
                if text.strip():
                    text_content.append(f"\n--- Page {page_num + 1} ---")
                    text_content.append(text)
        
        return "\n".join(text_content)
    except Exception as e:
        return f"Error extracting PDF: {str(e)}"


def extract_text_from_vtt(file_path: str) -> str:
    """Extract text from WebVTT subtitle file"""
    try:
        text_content = []
        
        for caption in webvtt.read(file_path):
            text = caption.text.strip()
            if text:
                text_content.append(text)
        
        return " ".join(text_content)
    except Exception as e:
        return f"Error extracting VTT: {str(e)}"


def extract_text_from_html(file_path: str) -> str:
    """Extract text from HTML file"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            soup = BeautifulSoup(file, 'html.parser')
            
            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()
            
            text = soup.get_text()
            
            # Clean up whitespace
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = '\n'.join(chunk for chunk in chunks if chunk)
            
            return text
    except Exception as e:
        return f"Error extracting HTML: {str(e)}"


def extract_text_from_txt(file_path: str) -> str:
    """Extract text from plain text file"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except Exception as e:
        return f"Error extracting TXT: {str(e)}"


print("‚úÖ Text extraction functions defined")

## Universal Document Processor

Automatically detects file type and extracts text:

In [None]:
def extract_text_from_document(file_path: str) -> Dict[str, any]:
    """
    Universal document text extractor
    
    Args:
        file_path: Path to the document
        
    Returns:
        Dict with text, file_type, and success status
    """
    file_path = Path(file_path)
    
    if not file_path.exists():
        return {
            'text': '',
            'file_type': 'unknown',
            'success': False,
            'error': f"File not found: {file_path}"
        }
    
    # Map file extensions to extraction functions
    extractors = {
        '.docx': extract_text_from_docx,
        '.pptx': extract_text_from_pptx,
        '.pdf': extract_text_from_pdf,
        '.vtt': extract_text_from_vtt,
        '.html': extract_text_from_html,
        '.htm': extract_text_from_html,
        '.txt': extract_text_from_txt,
    }
    
    file_ext = file_path.suffix.lower()
    
    if file_ext not in extractors:
        return {
            'text': '',
            'file_type': file_ext,
            'success': False,
            'error': f"Unsupported file type: {file_ext}"
        }
    
    print(f"üìÑ Processing {file_path.name} ({file_ext})...")
    
    text = extractors[file_ext](str(file_path))
    
    if text.startswith("Error"):
        return {
            'text': '',
            'file_type': file_ext,
            'success': False,
            'error': text
        }
    
    return {
        'text': text,
        'file_type': file_ext,
        'success': True,
        'char_count': len(text),
        'word_count': len(text.split())
    }


print("‚úÖ Universal document processor defined")

## AI Summarization with Ollama (gemma:2b)

Generate concise summaries using the local Ollama model (`gemma:2b`). Ensure Ollama is running before calling these functions.

In [None]:
def summarize_text_with_ai(text: str, max_words: int = 500) -> str:
    """
    Summarize text using Ollama (gemma:2b)
    
    Args:
        text: The text to summarize
        max_words: Maximum words in summary
        
    Returns:
        Summary text
    """
    if not text or len(text.strip()) < 100:
        return "Text too short to summarize."
    
    # Truncate very long documents
    if len(text) > 50000:
        print(f"  ‚ö†Ô∏è Text is {len(text)} chars, truncating to 50000...")
        text = text[:50000] + "\n\n[Document truncated for summarization]"
    
    prompt = f"""Please provide a comprehensive summary of the following document. 
The summary should:
- Be around {max_words} words
- Capture the main topics and key points
- Be well-structured and easy to read
- Include important details and facts

Document:
{text}

Summary:"""
    
    messages = [{"role": "user", "content": prompt}]
    
    try:
        response = call_llm(messages)
        summary = response.get('content', '').strip()
        return summary
    except Exception as e:
        return f"Error generating summary: {str(e)}"


def summarize_with_chunks(text: str, chunk_size: int = 10000, max_words: int = 500) -> str:
    """
    Summarize very long documents by chunking
    
    Args:
        text: The text to summarize
        chunk_size: Size of each chunk
        max_words: Maximum words in final summary
        
    Returns:
        Summary text
    """
    if len(text) <= chunk_size:
        return summarize_text_with_ai(text, max_words)
    
    print(f"  üìö Document is {len(text)} chars, using chunked summarization...")
    
    # Split into chunks
    chunks = []
    for i in range(0, len(text), chunk_size):
        chunks.append(text[i:i + chunk_size])
    
    print(f"  üìÑ Processing {len(chunks)} chunks...")
    
    # Summarize each chunk
    chunk_summaries = []
    for i, chunk in enumerate(chunks, 1):
        print(f"    Chunk {i}/{len(chunks)}...", end=" ")
        summary = summarize_text_with_ai(chunk, max_words // len(chunks))
        chunk_summaries.append(summary)
        print("‚úì")
    
    # Combine chunk summaries
    combined = "\n\n".join(chunk_summaries)
    
    # Final summary of summaries
    print(f"  üîÑ Generating final summary...")
    final_summary = summarize_text_with_ai(combined, max_words)
    
    return final_summary


print("‚úÖ AI summarization functions defined")

## Complete Processing Pipeline

Process documents and save as text files:

In [None]:
def process_document(
    input_path: str = "./input",
    output_dir: str = "./docs",
    summarize: bool = False,
    max_summary_words: int = 500
) -> Dict[str, any]:
    """
    Complete document processing pipeline
    
    Args:
        input_path: Path to input document
        output_dir: Directory to save output (default: ./docs for RAG)
        summarize: Whether to generate AI summary
        max_summary_words: Maximum words in summary
        
    Returns:
        Dict with processing results
    """
    input_path = Path(input_path)
    output_dir = Path(output_dir)
    output_dir.mkdir(exist_ok=True)
    
    print(f"\n{'='*80}")
    print(f"Processing: {input_path.name}")
    print(f"{'='*80}")
    
    # Extract text
    result = extract_text_from_document(input_path)
    
    if not result['success']:
        print(f"‚ùå {result['error']}")
        return result
    
    text = result['text']
    print(f"‚úÖ Extracted {result['char_count']:,} characters ({result['word_count']:,} words)")
    
    # Save extracted text
    text_output = output_dir / f"{input_path.stem}_extracted.txt"
    with open(text_output, 'w', encoding='utf-8') as f:
        f.write(text)
    print(f"‚úÖ Saved extracted text: {text_output.name}")
    
    # Generate summary if requested
    if summarize:
        print(f"\nü§ñ Generating AI summary...")
        summary = summarize_with_chunks(text, max_words=max_summary_words)
        
        if not summary.startswith("Error"):
            summary_output = output_dir / f"{input_path.stem}_summary.txt"
            with open(summary_output, 'w', encoding='utf-8') as f:
                f.write(f"# Summary of {input_path.name}\n\n")
                f.write(summary)
            print(f"‚úÖ Saved summary: {summary_output.name}")
            print(f"   Summary length: {len(summary.split())} words")
            
            result['summary'] = summary
            result['summary_file'] = str(summary_output)
        else:
            print(f"‚ùå {summary}")
            result['summary'] = None
    
    result['text_file'] = str(text_output)
    
    print(f"\n{'='*80}\n")
    
    return result


print("‚úÖ Document processing pipeline defined")

## Batch Processing

Process entire folders of documents:

In [None]:
def process_folder(
    input_folder: str,
    output_dir: str = "./docs",
    summarize: bool = False,
    file_extensions: List[str] = None
) -> Dict[str, any]:
    """
    Process all documents in a folder
    
    Args:
        input_folder: Path to folder with documents
        output_dir: Directory to save outputs (default: ./docs for RAG)
        summarize: Whether to generate AI summaries
        file_extensions: List of extensions to process
        
    Returns:
        Dict with batch processing results
    """
    if file_extensions is None:
        file_extensions = ['.docx', '.pptx', '.pdf', '.vtt', '.html', '.htm', '.txt']
    
    input_folder = Path(input_folder)
    
    if not input_folder.exists():
        print(f"‚ùå Folder not found: {input_folder}")
        return {'success': False, 'error': 'Folder not found'}
    
    # Find all matching files
    files = []
    for ext in file_extensions:
        files.extend(input_folder.glob(f"*{ext}"))
    
    if not files:
        print(f"‚ùå No matching files found in {input_folder}")
        return {'success': False, 'error': 'No files found'}
    
    print(f"\n{'='*80}")
    print(f"BATCH PROCESSING: {len(files)} files")
    print(f"{'='*80}\n")
    
    results = []
    successful = 0
    failed = 0
    
    for i, file_path in enumerate(files, 1):
        print(f"\n[{i}/{len(files)}] Processing {file_path.name}...")
        
        try:
            result = process_document(
                str(file_path),
                output_dir=output_dir,
                summarize=summarize
            )
            
            if result['success']:
                successful += 1
            else:
                failed += 1
                
            results.append({
                'file': file_path.name,
                'result': result
            })
            
        except Exception as e:
            print(f"‚ùå Error processing {file_path.name}: {str(e)}")
            failed += 1
            results.append({
                'file': file_path.name,
                'result': {'success': False, 'error': str(e)}
            })
    
    # Summary
    print(f"\n{'='*80}")
    print(f"BATCH PROCESSING COMPLETE")
    print(f"{'='*80}")
    print(f"‚úÖ Successful: {successful}")
    print(f"‚ùå Failed: {failed}")
    print(f"üìÅ Output directory: {output_dir}")
    print(f"{'='*80}\n")
    
    return {
        'success': True,
        'total': len(files),
        'successful': successful,
        'failed': failed,
        'results': results
    }


print("‚úÖ Batch processing function defined")

## üöÄ Quick Start: Process Documents for RAG

**Automated workflow to prepare documents for RAG:**

In [None]:
# üéØ Quick Start: Process all documents from input folder
input_folder = "./input"
output_folder = "./docs"  # Extracted text saved here for RAG

# Create folders
Path(input_folder).mkdir(exist_ok=True)
Path(output_folder).mkdir(exist_ok=True)

print(f"üìÇ Input folder: {input_folder}")
print(f"üìÇ Output folder: {output_folder}")
print(f"\nüí° Place your documents (docx, pptx, pdf, vtt, html, txt) in '{input_folder}'")
print(f"   Extracted text will be saved to '{output_folder}' for RAG indexing\n")

# Check for documents
supported_extensions = ['.docx', '.pptx', '.pdf', '.vtt', '.txt', '.html', '.htm']
all_files = []
for ext in supported_extensions:
    all_files.extend(Path(input_folder).glob(f"*{ext}"))

if all_files:
    print(f"‚úÖ Found {len(all_files)} document(s) to process\n")
    
    # Process all documents
    batch_result = process_folder(
        input_folder=input_folder,
        output_dir=output_folder,
        summarize=True,  # Set to True if you want AI summaries
        file_extensions=supported_extensions
    )
    
    # Display results
    if batch_result['success']:
        print("\n" + "="*80)
        print("‚úÖ PROCESSING COMPLETE!")
        print("="*80)
        print(f"üìä Total files: {batch_result['total']}")
        print(f"‚úÖ Successful: {batch_result['successful']}")
        print(f"‚ùå Failed: {batch_result['failed']}")
        print(f"\nüìÅ Extracted text saved to: {output_folder}/")
        print(f"üí° Next: Run llama_index_rag.ipynb to index these documents")
        print("="*80)
else:
    print(f"‚ö†Ô∏è No documents found in '{input_folder}'")
    print(f"   Supported formats: {', '.join(supported_extensions)}")
    print(f"\nüí° Add documents to '{input_folder}' and run this cell again!")

## Example: Process Single Document

Process and optionally summarize a single document:

In [None]:
# Example: Process a single document with AI summary
example_file = "./input/sample.docx"  # Change to your file

if Path(example_file).exists():
    result = process_document(
        input_path=example_file,
        output_dir="./docs",
        summarize=True,  # Generate AI summary
        max_summary_words=500
    )
    
    if result['success']:
        print(f"\n‚úÖ Processing complete!")
        print(f"   Extracted text: {result['text_file']}")
        if 'summary_file' in result:
            print(f"   Summary: {result['summary_file']}")
else:
    print(f"‚ö†Ô∏è Example file not found: {example_file}")
    print("Update the path to an actual document file.")