# 📸 Image-based PDF Processing with LLM Vision
## Extract Data from PDF Images using GPT-4 Vision → Store in Pinecone

### 🎯 Goal
Since PDFs contain images, use GPT-4 Vision to read and extract structured data, then store in vector database

In [None]:
!pip install langchain langchain-openai langchain-pinecone pinecone PyMuPDF

In [None]:
import os
import json
import base64
from pathlib import Path
import getpass
from typing import List, Dict

# PDF processing with PyMuPDF (no poppler needed)
import fitz  # PyMuPDF

# LangChain and OpenAI
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain.schema import Document

# Pinecone
from pinecone import Pinecone, ServerlessSpec

print("✅ All imports successful!")

In [None]:
# Configuration
DATA_DIR = "data"
INDEX_NAME = "invoice-vision-vectors"
EMBED_MODEL = "text-embedding-3-small"

# API Keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or getpass.getpass("OpenAI API Key: ")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") or getpass.getpass("Pinecone API Key: ")

print("✅ Configuration set!")

In [None]:
# Initialize components
llm_vision = ChatOpenAI(
    api_key=OPENAI_API_KEY,
    model="gpt-4o-mini",  # Supports vision
    temperature=0
)

embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY, model=EMBED_MODEL)

# Initialize Pinecone
pc = Pinecone(api_key=PINECONE_API_KEY)

# Create index if needed
existing = [idx["name"] for idx in pc.list_indexes()]
if INDEX_NAME not in existing:
    print(f"Creating index: {INDEX_NAME}")
    pc.create_index(
        name=INDEX_NAME,
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )
    import time
    time.sleep(10)

index = pc.Index(INDEX_NAME)
vector_store = PineconeVectorStore(index=index, embedding=embeddings)

print("✅ Components initialized!")

In [None]:
def pdf_to_base64_images(pdf_path: str) -> List[str]:
    """Convert PDF pages to base64 encoded images using PyMuPDF"""
    try:
        # Use PyMuPDF (no poppler needed)
        doc = fitz.open(pdf_path)
        base64_images = []
        
        for page_num in range(len(doc)):
            page = doc.load_page(page_num)
            # Convert to high-quality image
            pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))  # 2x zoom for better quality
            img_data = pix.tobytes("png")
            img_base64 = base64.b64encode(img_data).decode()
            base64_images.append(img_base64)
            
        doc.close()
        return base64_images
    except Exception as e:
        print(f"Error converting PDF to images: {e}")
        return []

def extract_invoice_data_from_image(image_base64: str, filename: str) -> Dict:
    """Use GPT-4 Vision to extract structured data from invoice image"""
    
    prompt = [
        {
            "type": "text",
            "text": """You are an expert at reading invoices from images. Extract the following information:

1. vendor_name: Company or person issuing the invoice
2. invoice_number: Invoice ID or reference number
3. date: Invoice date or any date mentioned
4. amount: Total amount or any monetary value
5. full_text: All visible text content from the image

Return JSON format:
{"vendor_name": "...", "invoice_number": "...", "date": "...", "amount": "...", "full_text": "..."}

Use "NOT_FOUND" if any field is not visible or unclear."""
        },
        {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/png;base64,{image_base64}"
            }
        }
    ]
    
    try:
        response = llm_vision.invoke(prompt)
        content = response.content.strip()
        
        # Clean JSON response
        if '```json' in content:
            content = content.split('```json')[1].split('```')[0].strip()
        elif '```' in content:
            content = content.split('```')[1].strip()
        
        data = json.loads(content)
        data['filename'] = filename
        data['extraction_method'] = 'gpt4_vision'
        
        return data
        
    except Exception as e:
        print(f"Error extracting data from {filename}: {e}")
        return {
            'vendor_name': 'ERROR',
            'invoice_number': 'ERROR',
            'date': 'ERROR',
            'amount': 'ERROR',
            'full_text': f'Extraction failed: {str(e)}',
            'filename': filename,
            'extraction_method': 'gpt4_vision_error'
        }

print("✅ Vision processing functions defined!")

In [None]:
# Process all PDF files
data_dir = Path(DATA_DIR)
pdf_files = list(data_dir.glob("invoice_*.pdf"))

print(f"📁 Found {len(pdf_files)} PDF files to process")

extracted_data = []
documents_for_vector_store = []

for pdf_file in sorted(pdf_files):
    print(f"\n📄 Processing: {pdf_file.name}")
    
    # Convert PDF to images using PyMuPDF
    base64_images = pdf_to_base64_images(str(pdf_file))
    
    if not base64_images:
        print(f"❌ Failed to convert {pdf_file.name} to images")
        continue
    
    print(f"📸 Converted to {len(base64_images)} images")
    
    # Process each page image
    for page_num, image_base64 in enumerate(base64_images, 1):
        print(f"🔍 Extracting data from page {page_num}...")
        
        # Extract structured data using GPT-4 Vision
        invoice_data = extract_invoice_data_from_image(image_base64, pdf_file.name)
        invoice_data['page_number'] = page_num
        
        extracted_data.append(invoice_data)
        
        # Create document for vector store
        full_text = invoice_data.get('full_text', '')
        if full_text and full_text != 'NOT_FOUND':
            doc = Document(
                page_content=full_text,
                metadata={
                    'filename': pdf_file.name,
                    'page_number': page_num,
                    'vendor_name': invoice_data.get('vendor_name', 'NOT_FOUND'),
                    'invoice_number': invoice_data.get('invoice_number', 'NOT_FOUND'),
                    'date': invoice_data.get('date', 'NOT_FOUND'),
                    'amount': invoice_data.get('amount', 'NOT_FOUND'),
                    'doc_type': 'invoice',
                    'extraction_method': 'gpt4_vision'
                }
            )
            documents_for_vector_store.append(doc)
        
        print(f"✅ Extracted: {invoice_data.get('vendor_name', 'N/A')[:20]}, {invoice_data.get('invoice_number', 'N/A')}")

print(f"\n✅ Processed {len(pdf_files)} PDFs, extracted {len(extracted_data)} pages")
print(f"📄 Created {len(documents_for_vector_store)} documents for vector storage")

In [None]:
# Store extracted data in Pinecone
if documents_for_vector_store:
    print(f"🗄️ Storing {len(documents_for_vector_store)} documents in Pinecone...")
    
    try:
        vector_ids = vector_store.add_documents(documents_for_vector_store)
        print(f"✅ Successfully stored {len(vector_ids)} vectors!")
        
        # Verify storage
        stats = index.describe_index_stats()
        print(f"📊 Index now contains {stats.total_vector_count} total vectors")
        
    except Exception as e:
        print(f"❌ Error storing in Pinecone: {e}")
else:
    print("❌ No documents to store")

In [None]:
# Display extraction results
import pandas as pd

if extracted_data:
    df = pd.DataFrame(extracted_data)
    
    # Create display table
    display_df = pd.DataFrame({
        'Filename': df['filename'],
        'Page': df['page_number'],
        'Vendor': df['vendor_name'].str[:25],
        'Invoice #': df['invoice_number'],
        'Date': df['date'],
        'Amount': df['amount'],
        'Method': df['extraction_method']
    })
    
    print("📊 EXTRACTION RESULTS:")
    print("=" * 80)
    print(display_df.to_string(index=False))
    
    # Summary
    total_pages = len(df)
    successful_extractions = len(df[df['vendor_name'] != 'ERROR'])
    
    print(f"\n📈 SUMMARY:")
    print(f"Total pages processed: {total_pages}")
    print(f"Successful extractions: {successful_extractions}/{total_pages}")
    print(f"Success rate: {successful_extractions/total_pages*100:.1f}%")
else:
    print("❌ No extraction results to display")

In [None]:
# Test vector search with extracted data
print("🔍 Testing vector search on extracted invoice data:")
print("=" * 50)

test_queries = [
    "invoice amount payment",
    "vendor company name",
    "date invoice number"
]

for query in test_queries:
    print(f"\n🔍 Query: '{query}'")
    results = vector_store.similarity_search(query, k=3)
    
    for i, doc in enumerate(results, 1):
        filename = doc.metadata.get('filename', 'Unknown')
        vendor = doc.metadata.get('vendor_name', 'Unknown')
        invoice_num = doc.metadata.get('invoice_number', 'Unknown')
        method = doc.metadata.get('extraction_method', 'Unknown')
        
        print(f"  {i}. {filename} - {vendor} ({invoice_num}) [{method}]")
        print(f"     Content: {doc.page_content[:100]}...")

print("\n✅ Vision-based PDF processing and vector storage complete!")