In [None]:
import os
import concurrent.futures
import time
from pathlib import Path
import json
import requests
import fitz  # PyMuPDF
from PIL import Image
import io
import base64
import openai
 
# Configuration
OPENAI_API_KEY = "YOUR API KEY"  # Replace with your OpenAI API key
MODEL = "gpt-4.1"  # or another vision model
 
def setup_output_directory(pdf_path):
    """Create output directories for OCR results"""
    base_name = os.path.basename(pdf_path).rsplit('.', 1)[0]
    output_dir = f"{base_name}_ocr_output"
    pages_dir = os.path.join(output_dir, "pages")
    # Create directories if they don't exist
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(pages_dir, exist_ok=True)
    return output_dir, pages_dir
 
def extract_page_as_image(pdf_path, page_num, dpi=300):
    """Extract a single page from PDF as an image"""
    doc = fitz.open(pdf_path)
    page = doc.load_page(page_num)
    # Higher resolution for better OCR quality
    pix = page.get_pixmap(matrix=fitz.Matrix(dpi/72, dpi/72))
    img_data = pix.pil_tobytes(format="JPEG")
    image = Image.open(io.BytesIO(img_data))
    # Convert to bytes for API submission
    buffered = io.BytesIO()
    image.save(buffered, format="JPEG")
    img_bytes = buffered.getvalue()
    return base64.b64encode(img_bytes).decode('utf-8')
 
def process_page_with_vision_api(page_image_base64, page_num):
    """Process a page image with OpenAI Vision API"""
    # Robust prompt designed for document OCR with table preservation
    prompt = """
    Perform Optical Character Recognition (OCR) on this document image with these specific requirements:
    1. Extract ALL text content exactly as it appears, preserving original structure.
    2. For tables:
       - Maintain original table layout with proper alignment
       - Keep column headers and row data properly aligned
       - Use spaces or tabs to preserve column alignment
       - Preserve all numbers, decimal points, and special characters in tables
    3. Include ALL text elements:
       - Headings and subheadings
       - Body text paragraphs
       - Footnotes, captions, and annotations
       - Numbers, dates, and currency values (maintain original format)
       - Lists and bullet points
    4. Preserve the reading order from top to bottom, left to right
    5. Indicate page headers/footers by adding [HEADER] or [FOOTER] tags
    6. Mark any disclaimers or notes with [NOTE] tags
    7. If text appears in multiple columns, process each column completely before moving to the next
    Focus on accurate text extraction with proper layout preservation, especially for tabular data.
    DO NOT summarize, interpret, or modify the content.
    DO NOT add any commentary or explanations.
    """
 
    client = openai.OpenAI(api_key=OPENAI_API_KEY)
    try:
        response = client.chat.completions.create(
            model=MODEL,
            messages=[
                {"role": "system", "content": "You are a precise OCR system that perfectly extracts text while maintaining document structure."},
                {
                    "role": "user", 
                    "content": [
                        {"type": "text", "text": prompt},
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{page_image_base64}",
                                "detail": "high"
                            }
                        }
                    ]
                }
            ],
            max_tokens=20000
        )
        # Extract OCR text from response
        ocr_text = response.choices[0].message.content
        return ocr_text
    except Exception as e:
        print(f"Error processing page {page_num}: {str(e)}")
        return f"[ERROR PROCESSING PAGE {page_num}]: {str(e)}"
 
def process_page(pdf_path, page_num, pages_dir):
    """Process a single page: extract as image and perform OCR"""
    print(f"Processing page {page_num+1}...")
    try:
        # Get page as base64 image
        page_image_base64 = extract_page_as_image(pdf_path, page_num)
        # Get OCR text from Vision API
        ocr_text = process_page_with_vision_api(page_image_base64, page_num+1)
        # Save page OCR to file
        page_file = os.path.join(pages_dir, f"page_{page_num+1:03d}.txt")
        with open(page_file, "w", encoding="utf-8") as f:
            f.write(ocr_text)
        return page_num+1, ocr_text
    except Exception as e:
        print(f"Error in page {page_num+1} processing: {str(e)}")
        return page_num+1, f"[ERROR ON PAGE {page_num+1}]: {str(e)}"
 
def process_pdf(pdf_path):
    """Process the entire PDF document in parallel"""
    output_dir, pages_dir = setup_output_directory(pdf_path)
    # Get number of pages
    doc = fitz.open(pdf_path)
    num_pages = len(doc)
    print(f"PDF has {num_pages} pages. Starting OCR extraction...")
    # Process pages in parallel
    results = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Submit jobs
        future_to_page = {
            executor.submit(process_page, pdf_path, page_num, pages_dir): page_num 
            for page_num in range(num_pages)
        }
        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_page):
            page_num = future_to_page[future]
            try:
                page_num, ocr_text = future.result()
                results[page_num] = ocr_text
                print(f"✅ Completed page {page_num}")
            except Exception as e:
                print(f"❌ Page {page_num+1} generated an exception: {str(e)}")
    # Combine all pages in order
    combined_text = ""
    for page_num in sorted(results.keys()):
        combined_text += f"\n\n----- PAGE {page_num} -----\n\n"
        combined_text += results[page_num]
    # Save combined OCR text
    combined_file = os.path.join(output_dir, "combined_ocr.txt")
    with open(combined_file, "w", encoding="utf-8") as f:
        f.write(combined_text)
    print(f"\nOCR extraction complete!")
    print(f"- Individual page files saved in: {pages_dir}")
    print(f"- Combined OCR text saved to: {combined_file}")
    return combined_file
 
def main():
    """Main function to run the OCR extraction"""
    # Get PDF file path from user
    pdf_path = input("Enter the path to your PDF file: ")
    # Validate path
    if not os.path.exists(pdf_path) or not pdf_path.lower().endswith('.pdf'):
        print("Invalid PDF file path. Please provide a valid PDF file.")
        return
    # Process the PDF
    start_time = time.time()
    combined_file = process_pdf(pdf_path)
    elapsed_time = time.time() - start_time
    print(f"\nProcessing completed in {elapsed_time:.2f} seconds")
    print(f"Combined OCR text saved to: {combined_file}")
 
if __name__ == "__main__":
    main()

PDF has 25 pages. Starting OCR extraction...
Processing page 1...
Processing page 2...
Processing page 3...
Processing page 4...
Processing page 5...
Processing page 6...✅ Completed page 5

Processing page 7...
✅ Completed page 1
Processing page 8...✅ Completed page 3

Processing page 9...✅ Completed page 2

Processing page 10...✅ Completed page 4

Processing page 11...
✅ Completed page 8
Processing page 12...✅ Completed page 10

Processing page 13...
✅ Completed page 7
Processing page 14...
✅ Completed page 11
Processing page 15...✅ Completed page 12

Processing page 16...✅ Completed page 6

Processing page 17...
✅ Completed page 14
Processing page 18...✅ Completed page 15

Processing page 19...✅ Completed page 16

Processing page 20...
✅ Completed page 17
Processing page 21...
✅ Completed page 9
Processing page 22...✅ Completed page 13

Processing page 23...✅ Completed page 20

Processing page 24...✅ Completed page 18

Processing page 25...✅ Completed page 19

✅ Completed page 22
✅ C

In [None]:
import os
import json
import re
import requests
import psycopg2
from typing import Dict, Any, List
from datetime import datetime, date
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Database connection parameters
DB_CONFIG = {
    'host': os.getenv('DB_HOST'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD'),
    'database': os.getenv('DB_NAME'),
    'port': os.getenv('DB_PORT')
}

# GPT-4.1 API configuration
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY environment variable is required")

API_URL = "https://api.openai.com/v1/chat/completions"
MODEL = "gpt-4.1"

def date_serializer(obj):
    """Custom JSON serializer for handling date objects."""
    if isinstance(obj, date):
        return obj.isoformat()
    raise TypeError(f"Type {obj.__class__.__name__} not serializable")

def read_ocr_file(filename: str) -> str:
    """Read OCR output file."""
    try:
        with open(filename, "r", encoding="utf-8") as f:
            return f.read()
    except UnicodeDecodeError:
        try:
            with open(filename, "r", encoding="latin-1") as f:
                return f.read()
        except Exception as e:
            print(f"Error reading file with latin-1 encoding: {e}")
            exit(1)
    except FileNotFoundError:
        print(f"OCR file {filename} not found.")
        exit(1)

def get_db_connection():
    """Get a connection to the PostgreSQL database."""
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        return conn
    except psycopg2.Error as e:
        print(f"Database connection error: {e}")
        exit(1)

def generate_json_from_data(data_content: str, schema_definition: str) -> Dict[str, Any]:
    """Generate JSON from data content using GPT-4.1."""
    
    # Prepare the prompt for GPT-4.1
    prompt = f"""
    I need to convert structured data into a JSON format that follows a PostgreSQL database schema.
    
    Here's the database schema:
    {schema_definition}
    
    Here's the data that needs to be converted to JSON:
    {data_content}
    
    Generate a JSON structure that follows the database schema tables. Extract all relevant information from the data.
    
    IMPORTANT GUIDELINES:
    1. Follow the schema structure precisely (batches, steps, facts, materials, signoffs).
    2. Use the exact field names as specified in the schema.
    3. Ensure all foreign key relationships are maintained correctly.
    4. Format dates as YYYY-MM-DD strings.
    5. For jsonb fields (like 'kv'), use nested JSON objects.
    6. For fact categories, use appropriate values like 'measurement', 'answer', 'calculation', etc.
    7. Format the output as a valid JSON object.
    
    The JSON structure should have these top-level keys:
    - batches: Array of batch objects
    - steps: Array of step objects 
    - facts: Array of fact objects
    - materials: Array of material objects
    - signoffs: Array of signoff objects
    
    Return ONLY the JSON structure without any explanation.
    """
    
    # Make API call to GPT-4.1
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {OPENAI_API_KEY}"
    }
    
    payload = {
        "model": MODEL,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.2  # Lower temperature for more consistent output
    }
    
    try:
        response = requests.post(API_URL, headers=headers, json=payload)
        response.raise_for_status()
        result = response.json()
        
        # Extract the JSON from the response
        json_text = result['choices'][0]['message']['content'].strip()
        
        # Clean up any markdown formatting if present
        if json_text.startswith("```json"):
            json_text = json_text[7:]
        if json_text.endswith("```"):
            json_text = json_text[:-3]
        
        json_text = json_text.strip()
        
        # Parse the JSON
        return json.loads(json_text)
    
    except requests.exceptions.RequestException as e:
        print(f"API request error: {e}")
        return {}
    except json.JSONDecodeError as e:
        print(f"JSON decode error: {e}")
        print(f"Response content: {json_text}")
        return {}

def insert_into_database(data: Dict[str, Any]) -> None:
    """Insert JSON data into PostgreSQL database."""
    conn = None
    try:
        conn = get_db_connection()
        cursor = conn.cursor()
        
        # Insert batches
        if "batches" in data:
            for batch in data["batches"]:
                batch_id = batch.get("batch_id")
                
                # Check if batch already exists
                cursor.execute("SELECT batch_id FROM batches WHERE batch_id = %s", (batch_id,))
                if cursor.fetchone() is None:
                    # Insert new batch
                    cursor.execute(
                        "INSERT INTO batches (batch_id, doc_no, title, effective_dt) VALUES (%s, %s, %s, %s)",
                        (batch_id, batch.get("doc_no"), batch.get("title"), batch.get("effective_dt"))
                    )
                    print(f"Inserted batch: {batch_id}")
                else:
                    print(f"Batch {batch_id} already exists, skipping.")
        
        # Insert steps
        step_id_map = {}  # Map to store original step_id to new step_id
        if "steps" in data:
            for step in data["steps"]:
                original_step_id = step.get("step_id")
                batch_id = step.get("batch_id")
                step_no = step.get("step_no")
                
                # Check if step already exists
                cursor.execute("SELECT step_id FROM steps WHERE batch_id = %s AND step_no = %s", (batch_id, step_no))
                result = cursor.fetchone()
                
                if result is None:
                    # Insert new step
                    cursor.execute(
                        "INSERT INTO steps (batch_id, step_no, step_title) VALUES (%s, %s, %s) RETURNING step_id",
                        (batch_id, step_no, step.get("step_title"))
                    )
                    new_step_id = cursor.fetchone()[0]
                    step_id_map[original_step_id] = new_step_id
                    print(f"Inserted step: {step_no} (ID: {new_step_id})")
                else:
                    step_id_map[original_step_id] = result[0]
                    print(f"Step {step_no} already exists (ID: {result[0]}), skipping.")
        
        # Insert materials
        if "materials" in data:
            for material in data["materials"]:
                original_step_id = material.get("step_id")
                step_id = step_id_map.get(original_step_id)
                
                if step_id:
                    cursor.execute(
                        "INSERT INTO materials (step_id, kv) VALUES (%s, %s)",
                        (step_id, json.dumps(material.get("kv")))
                    )
                    print(f"Inserted material for step_id: {step_id}")
        
        # Insert facts
        if "facts" in data:
            for fact in data["facts"]:
                original_step_id = fact.get("step_id")
                step_id = step_id_map.get(original_step_id)
                
                if step_id:
                    cursor.execute(
                        "INSERT INTO facts (step_id, category, kv) VALUES (%s, %s, %s)",
                        (step_id, fact.get("category"), json.dumps(fact.get("kv")))
                    )
                    print(f"Inserted fact for step_id: {step_id}")
        
        # Insert signoffs
        if "signoffs" in data:
            for signoff in data["signoffs"]:
                original_step_id = signoff.get("step_id")
                step_id = step_id_map.get(original_step_id)
                signed_ts = signoff.get("signed_ts")
                
                if step_id:
                    cursor.execute(
                        "INSERT INTO signoffs (step_id, role, initials, signed_ts) VALUES (%s, %s, %s, %s)",
                        (step_id, signoff.get("role"), signoff.get("initials"), signed_ts)
                    )
                    print(f"Inserted signoff for step_id: {step_id}")
        
        # Commit all changes to the database
        conn.commit()
        print("All data successfully inserted into the database!")
    
    except psycopg2.Error as err:
        print(f"Database error: {err}")
        if conn:
            conn.rollback()
    finally:
        if conn:
            cursor.close()
            conn.close()

def main():
    # Define database schema
    schema_definition = """
    CREATE TABLE batches (
        batch_id      text PRIMARY KEY,
        doc_no        text,
        title         text,
        effective_dt  date
    );
    CREATE TABLE steps (
        step_id       bigserial PRIMARY KEY,
        batch_id      text REFERENCES batches,
        step_no       text,           -- '5.2', '3.4', '6.1', …
        step_title    text
    );
    CREATE TABLE facts (           -- "wide-fact" table
        fact_id       bigserial PRIMARY KEY,
        step_id       bigint REFERENCES steps,
        category      text,         -- measurement | answer | setting
        kv            jsonb         -- free-form key/value payload
    );
    CREATE TABLE materials (
        mat_id        bigserial PRIMARY KEY,
        step_id       bigint REFERENCES steps,
        kv            jsonb         -- {name, part_no, lot, qty, unit, expiry …}
    );
    CREATE TABLE signoffs (
        sign_id       bigserial PRIMARY KEY,
        step_id       bigint REFERENCES steps,
        role          text,         -- performer | verifier | supervisor
        initials      text,
        signed_ts     timestamp
    );
    """
    
    # Read data content from file
    data_content = read_ocr_file(r"EBR-06-922947-PUR-HID2007_922945_235_A Prep and  Sartobind Q_ocr_output\EBR-06-922947-PUR-HID2007_922945_235_A Prep and  Sartobind Q_ocr_output.txt")
    
    # Generate JSON from data
    print("Generating JSON from data using GPT-4.1...")
    json_data = generate_json_from_data(data_content, schema_definition)
    
    # Save JSON to file for reference
    with open("generated_data.json", "w", encoding="utf-8") as f:
        json.dump(json_data, f, indent=2, default=date_serializer)
    print("JSON data saved to 'generated_data.json'")
    
    # Insert data into database
    print("Inserting data into database...")
    insert_into_database(json_data)

if __name__ == "__main__":
    main()