In [None]:
# ============================================
# IMPORTS AND DEPENDENCIES
# ============================================

import os
import json
import pandas as pd
from pathlib import Path
from dotenv import load_dotenv
from typing import Optional, Dict, Any

# File reading libraries
import pdfplumber
from PIL import Image
import pytesseract
import xml.etree.ElementTree as ET

# CrewAI imports
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import tool

print("‚úÖ All dependencies imported successfully")

In [None]:
# ============================================
# ENVIRONMENT SETUP
# ============================================

load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    print("‚ö†Ô∏è  Warning: OPENAI_API_KEY not found in environment")
    print("   Please add it to your .env file")
else:
    print("‚úÖ OpenAI API key loaded")

# Initialize LLM
llm = LLM(
    model="gpt-4o",
    api_key=OPENAI_API_KEY,
    temperature=0.1  # Low temperature for consistent extraction
)

print("‚úÖ LLM initialized: gpt-4o")

In [None]:
# ============================================
# CANONICAL INVOICE SCHEMA DEFINITION
# ============================================
CANONICAL_SCHEMA_DOC = """
You MUST output JSON ARRAY matching this structure:
 
[
  "invoice_header": {
    "invoice_number": string or null,
    "vendor_name": string or null,
    "invoice_date": string or null,          // YYYY-MM-DD if possible
    "billing_start_date": string or null,    // YYYY-MM-DD if possible
    "billing_end_date": string or null,      // YYYY-MM-DD if possible
    "currency": string or null,           // YYYY-MM-DD if possible
    "gross_revenue": number or null,
    "discount_amount": number or null,
    "discount_percent": number or null,
    "tax": number or null,
    "line_items": [
        {
          "line_id": integer,
          "campaign_name": string or null,
          "campaign_id": string or null,
          "insertion_order_ID": string or null, // may be same for different segments
          "start_date": string or null,          // YYYY-MM-DD if possible
          "end_date": string or null,            // YYYY-MM-DD if possible
          "planned impressions": number or null,
          "billed impressions": number or null,
          "views": number or null,               // complete views, video views, completed clicks, clicks conversions, etc. choose the closest
          "gross_revenue": number or null,
          "net_revenue": number or null,
          "discount_amount": number or null,
          "discount_percent": number or null,
          "profit": number or null,
          "rate_type": string or null,           // CPM, CPC, CPV, Flat, etc.
          "rate": number or null,
        }
    ]
    },
  ]
 
RULES:
- If a value is not present in the invoice, use null.
- If only one type of revenue is present, store it in gross_revenue and leave net_revenue null (or vice versa if clearly net).
- Discounts can be explicit (discount column) or implicit (difference between gross and net) ‚Äî explain in notes if inferred.
- Profit = revenue - cost, if not directly provided.
- Be conservative: do NOT invent numbers if they are not in the invoice.
- Campaign ID can be the segment ID
- Insertion order id can not be same as Campaign ID. Insertion order id can be a short form
"""
print("‚úÖ Canonical schema defined")

In [None]:
# ============================================
# FILE READING FUNCTIONS
# ============================================

def read_pdf_content(pdf_path: str, max_pages: int = 5) -> str:
    """Extract text from PDF files with OCR fallback for image-based PDFs."""
    try:
        pages_text = []
        with pdfplumber.open(pdf_path) as pdf:
            for i, page in enumerate(pdf.pages):
                if i >= max_pages:
                    break
                text = page.extract_text() or ""
                
                # If no text extracted, try OCR on the page image
                if not text.strip():
                    try:
                        # Convert page to image and use OCR
                        page_image = page.to_image(resolution=300).original
                        text = pytesseract.image_to_string(page_image)
                    except pytesseract.TesseractNotFoundError:
                        text = """[ERROR: Tesseract OCR not installed]
                        
To install Tesseract:
‚Ä¢ macOS: brew install tesseract
‚Ä¢ Ubuntu/Debian: sudo apt-get install tesseract-ocr
‚Ä¢ Windows: Download from https://github.com/UB-Mannheim/tesseract/wiki"""
                    except Exception as ocr_error:
                        text = f"[OCR failed for page {i+1}: {str(ocr_error)}]"
                
                if text:
                    pages_text.append(f"--- Page {i+1} ---\n{text}")
        
        return "\n\n".join(pages_text) if pages_text else "No text extracted from PDF"
    except Exception as e:
        return f"Error reading PDF: {str(e)}"


def read_image_content(image_path: str) -> str:
    """Extract text from images using OCR."""
    try:
        image = Image.open(image_path)
        text = pytesseract.image_to_string(image)
        return text.strip() if text.strip() else "No text extracted from image"
    except pytesseract.TesseractNotFoundError:
        return """ERROR: Tesseract OCR is not installed.
        
To install Tesseract:
‚Ä¢ macOS: brew install tesseract
‚Ä¢ Ubuntu/Debian: sudo apt-get install tesseract-ocr
‚Ä¢ Windows: Download from https://github.com/UB-Mannheim/tesseract/wiki

After installation, restart your kernel."""
    except Exception as e:
        return f"Error reading image: {str(e)}"


def read_excel_content(excel_path: str, sheet_name=None, max_rows=50) -> Dict[str, Any]:
    """Read Excel file and return structured preview."""
    try:
        excel_file = pd.ExcelFile(excel_path)
        
        # Determine which sheet to read
        if sheet_name is not None:
            sheets = [sheet_name]
        else:
            # Read first sheet only for production
            sheets = [excel_file.sheet_names[0]]
        
        result = {
            "file_name": Path(excel_path).name,
            "total_sheets": len(excel_file.sheet_names),
            "sheet_names": excel_file.sheet_names,
            "data": {}
        }
        
        for sheet in sheets:
            df = pd.read_excel(excel_path, sheet_name=sheet)
            
            # Clean column names
            df.columns = [
                str(c).strip().lower().replace(" ", "_").replace("-", "_")
                for c in df.columns
            ]
            
            # Limit rows
            preview_df = df.head(max_rows)
            
            result["data"][sheet] = {
                "total_rows": len(df),
                "columns": list(df.columns),
                "preview": preview_df.to_dict(orient="records"),
                "preview_text": preview_df.to_string(index=False, max_colwidth=30)
            }
        
        return result
        
    except Exception as e:
        return {"error": str(e)}


def read_csv_content(csv_path: str, max_rows=50) -> Dict[str, Any]:
    """Read CSV file and return structured preview."""
    try:
        df = pd.read_csv(csv_path)
        
        # Clean column names
        df.columns = [
            str(c).strip().lower().replace(" ", "_").replace("-", "_")
            for c in df.columns
        ]
        
        preview_df = df.head(max_rows)
        
        return {
            "file_name": Path(csv_path).name,
            "total_rows": len(df),
            "columns": list(df.columns),
            "preview": preview_df.to_dict(orient="records"),
            "preview_text": preview_df.to_string(index=False, max_colwidth=30)
        }
        
    except Exception as e:
        return {"error": str(e)}


def read_text_content(text_path: str) -> str:
    """Read plain text files."""
    try:
        with open(text_path, 'r', encoding='utf-8') as f:
            return f.read()
    except Exception as e:
        return f"Error reading text file: {str(e)}"


print("‚úÖ File reading functions created (with OCR support for images and image-based PDFs)")

In [None]:
# ============================================
# INVOICE CONTEXT BUILDER
# ============================================

def build_invoice_context(file_path: str, max_rows: int = 50) -> str:
    """
    Build formatted context from any invoice file type.
    
    Args:
        file_path: Path to invoice file
        max_rows: Maximum rows for tabular data
    
    Returns:
        Formatted string with invoice content
    """
    file_path_obj = Path(file_path)
    
    if not file_path_obj.exists():
        return f"ERROR: File not found: {file_path}"
    
    suffix = file_path_obj.suffix.lower()
    
    output = []
    output.append(f"FILE: {file_path_obj.name}")
    output.append("=" * 70)
    
    # PDF files
    if suffix == '.pdf':
        output.append("TYPE: PDF Invoice")
        output.append("\nCONTENT:")
        content = read_pdf_content(str(file_path))
        output.append(content)
    
    # Excel files
    elif suffix in ['.xlsx', '.xls']:
        output.append("TYPE: Excel Spreadsheet")
        data = read_excel_content(str(file_path), max_rows=max_rows)
        
        if "error" in data:
            output.append(f"\nERROR: {data['error']}")
        else:
            output.append(f"\nSheets: {', '.join(data['sheet_names'])}")
            for sheet_name, sheet_data in data['data'].items():
                output.append(f"\n--- Sheet: {sheet_name} ---")
                output.append(f"Total Rows: {sheet_data['total_rows']}")
                output.append(f"Columns: {', '.join(sheet_data['columns'])}")
                output.append(f"\nData Preview (first {max_rows} rows):")
                output.append(sheet_data['preview_text'])
    
    # CSV files
    elif suffix == '.csv':
        output.append("TYPE: CSV File")
        data = read_csv_content(str(file_path), max_rows=max_rows)
        
        if "error" in data:
            output.append(f"\nERROR: {data['error']}")
        else:
            output.append(f"\nTotal Rows: {data['total_rows']}")
            output.append(f"Columns: {', '.join(data['columns'])}")
            output.append(f"\nData Preview (first {max_rows} rows):")
            output.append(data['preview_text'])
    
    # Image files
    elif suffix in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
        output.append("TYPE: Image File (OCR Extraction)")
        output.append("\nCONTENT:")
        content = read_image_content(str(file_path))
        output.append(content)
    
    # Text files
    elif suffix == '.txt':
        output.append("TYPE: Text Invoice")
        output.append("\nCONTENT:")
        content = read_text_content(str(file_path))
        output.append(content)
    
    else:
        output.append(f"ERROR: Unsupported file type: {suffix}")
    
    return "\n".join(output)


print("‚úÖ Invoice context builder created (supports PDF, Excel, CSV, Images, Text)")

In [None]:
# ============================================
# CREWAI AGENT DEFINITION
# ============================================

invoice_extraction_agent = Agent(
    role='Media Invoice Data Extraction Specialist',
    goal='Extract structured financial and delivery data from media invoices into canonical JSON format',
    backstory="""You are an expert in media billing and invoice processing. 
    You understand advertising metrics (impressions, views, clicks), financial terms 
    (revenue, costs, discounts, profit), and how to extract data accurately from 
    various invoice formats including OCR-extracted text from images and scanned PDFs.
    
    You are skilled at handling noisy or imperfectly formatted text from OCR, identifying
    patterns, and extracting meaningful data even when formatting is inconsistent.
    You always follow the canonical schema strictly and never invent data - you use null 
    for missing values. When dealing with OCR text, you intelligently parse tables and 
    structured data even when spacing or alignment is imperfect.""",
    llm=llm,
    tools=[],  # No tools needed - direct file reading
    verbose=True,
    allow_delegation=False
)

print("‚úÖ Invoice extraction agent created (optimized for OCR text handling)")
print(f"   Role: {invoice_extraction_agent.role}")

In [None]:
# ============================================
# TASK CREATION FUNCTION
# ============================================

def create_extraction_task(file_path: str, max_rows: int = 50) -> Task:
    """
    Create extraction task with invoice context and schema.
    
    Args:
        file_path: Path to invoice file
        max_rows: Maximum rows for tabular data
    
    Returns:
        Task configured for invoice extraction
    """
    # Build context from file
    context_str = build_invoice_context(file_path, max_rows=max_rows)
    
    description = f"""
Extract structured invoice data from the provided file and map it to the canonical schema.

**CANONICAL SCHEMA:**
{CANONICAL_SCHEMA_DOC}

**INVOICE DATA:**
{context_str}

**INSTRUCTIONS:**
1. Identify invoice header information (vendor, dates, totals, currency)
2. Extract all line items with sequential line_id starting from 1
3. Map financial metrics (revenue, costs, discounts, profit)
4. Map delivery metrics (impressions, views, clicks)
5. Calculate implicit discounts if gross and net revenue differ
6. Use null for missing values - DO NOT INVENT DATA
7. For OCR-extracted text: Look for patterns and table structures even if spacing/formatting is imperfect
8. Handle OCR artifacts gracefully (e.g., misread characters, spacing issues)
9. Add clarifications to 'notes' field if needed or if OCR quality affected extraction
10. Return ONLY valid JSON - no markdown, no explanations

**OUTPUT REQUIREMENT:**
Return a single valid JSON object following the canonical schema exactly.
""".strip()
    
    task = Task(
        description=description,
        agent=invoice_extraction_agent,
        expected_output="Valid JSON object with invoice_header, line_items array, and notes field"
    )
    
    return task


print("‚úÖ Task creation function defined (with OCR-specific instructions)")

In [None]:
# ============================================
# MAIN EXTRACTION FUNCTION
# ============================================

def extract_invoice_data(file_path: str, max_rows: int = 50) -> Dict[str, Any]:
    """
    Extract structured invoice data from any supported file format.
    
    Args:
        file_path: Path to invoice file (PDF, Excel, CSV, or text)
        max_rows: Maximum rows to process from tabular files
    
    Returns:
        Dictionary with extracted invoice data in canonical format
    """
    print(f"\n{'='*70}")
    print(f"üìÑ EXTRACTING INVOICE DATA")
    print(f"{'='*70}")
    print(f"File: {Path(file_path).name}")
    print(f"{'='*70}\n")
    
    # Create task
    task = create_extraction_task(file_path, max_rows=max_rows)
    
    # Create crew with single agent
    crew = Crew(
        agents=[invoice_extraction_agent],
        tasks=[task],
        process=Process.sequential,
        verbose=True
    )
    
    # Execute extraction
    result = crew.kickoff()
    result_str = str(result).strip()
    
    # Parse JSON from result
    try:
        parsed = json.loads(result_str)
    except json.JSONDecodeError:
        # Try to extract JSON from response
        start = result_str.find("{")
        end = result_str.rfind("}")
        
        if start != -1 and end != -1 and start < end:
            json_str = result_str[start : end + 1]
            try:
                parsed = json.loads(json_str)
            except json.JSONDecodeError as e:
                return {
                    "error": "Failed to parse JSON response",
                    "details": str(e),
                    "raw_response": result_str[:500]
                }
        else:
            return {
                "error": "No JSON object found in response",
                "raw_response": result_str[:500]
            }
    
    print(f"\n{'='*70}")
    print(f"‚úÖ EXTRACTION COMPLETE")
    print(f"{'='*70}\n")
    
    return parsed


print("‚úÖ Main extraction function created")
print("   Usage: result = extract_invoice_data('invoice.xlsx')")

In [None]:
# ============================================
# VALIDATION FUNCTIONS
# ============================================

def validate_extracted_data(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Validate extracted data against canonical schema.
    
    Returns:
        Validation report with errors and warnings
    """
    validation = {
        "valid": True,
        "errors": [],
        "warnings": []
    }
    
    # Check top-level structure
    if "invoice_header" not in data:
        validation["valid"] = False
        validation["errors"].append("Missing 'invoice_header' field")
    
    if "line_items" not in data:
        validation["valid"] = False
        validation["errors"].append("Missing 'line_items' field")
    elif not isinstance(data["line_items"], list):
        validation["valid"] = False
        validation["errors"].append("'line_items' must be an array")
    
    # Check header has minimal info
    if "invoice_header" in data:
        header = data["invoice_header"]
        if not header.get("invoice_number") and not header.get("vendor_name"):
            validation["warnings"].append("Missing both invoice_number and vendor_name")
        if not header.get("currency"):
            validation["warnings"].append("Currency not specified")
    
    # Check line items have IDs
    if "line_items" in data and isinstance(data["line_items"], list):
        for idx, item in enumerate(data["line_items"]):
            if "line_id" not in item:
                validation["warnings"].append(f"Line item at index {idx} missing 'line_id'")
    
    return validation


print("‚úÖ Validation function created")

In [None]:
# ============================================
# EXPORT FUNCTIONS
# ============================================

def save_to_json(data: Dict[str, Any], output_path: str = None) -> str:
    """
    Save extracted data to JSON file.
    
    Args:
        data: Extracted invoice data
        output_path: Custom output path (optional)
    
    Returns:
        Path to saved file
    """
    if output_path is None:
        timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
        output_path = f"data/invoice_extract_{timestamp}.json"
    
    # Ensure directory exists
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    return output_path


def convert_to_dataframe(data: Dict[str, Any]) -> pd.DataFrame:
    """
    Convert line items to pandas DataFrame.
    
    Args:
        data: Extracted invoice data
    
    Returns:
        DataFrame with line items and header info
    """
    if "line_items" not in data or not data["line_items"]:
        return pd.DataFrame()
    
    df = pd.DataFrame(data["line_items"])
    
    # Add header fields to each row
    if "invoice_header" in data:
        header = data["invoice_header"]
        for key in ["invoice_number", "vendor_name", "invoice_date", "currency"]:
            if key in header:
                df[key] = header[key]
    
    return df


print("‚úÖ Export functions created")
print("   ‚Ä¢ save_to_json() - Save to JSON file")
print("   ‚Ä¢ convert_to_dataframe() - Convert to DataFrame")

## üöÄ USAGE EXAMPLE

### Basic Extraction:

```python
# Extract from any file type
result = extract_invoice_data('path/to/invoice.xlsx')

# Validate
validation = validate_extracted_data(result)
print(f"Valid: {validation['valid']}")

# Save to JSON
output_file = save_to_json(result)
print(f"Saved to: {output_file}")

# Convert to DataFrame for analysis
df = convert_to_dataframe(result)
print(df.head())
```

### Advanced Usage:

```python
# Limit rows for large files
result = extract_invoice_data('large_invoice.xlsx', max_rows=30)

# Custom output path
save_to_json(result, 'output/my_invoice.json')

# Access specific fields
header = result['invoice_header']
line_items = result['line_items']
print(f"Vendor: {header['vendor_name']}")
print(f"Total Items: {len(line_items)}")
```


In [None]:
# ============================================
# PRODUCTION READY - PROCESS YOUR INVOICE
# ============================================

# USAGE: Update the file_path to your invoice file

file_path = 'data/sample_media_invoice.xlsx'  # Change this to your file

# Extract invoice data
invoice_data = extract_invoice_data(file_path, max_rows=50)

# Display results
if "error" in invoice_data:
    print(f"\n‚ùå ERROR: {invoice_data['error']}")
    if "details" in invoice_data:
        print(f"Details: {invoice_data['details']}")
else:
    # Show header
    print("\nüìã INVOICE HEADER:")
    print(json.dumps(invoice_data.get("invoice_header", {}), indent=2))
    
    # Show line items summary
    line_items = invoice_data.get("line_items", [])
    print(f"\nüì¶ LINE ITEMS: {len(line_items)} total")
    
    if line_items:
        print("\nSample line items:")
        for item in line_items[:3]:
            print(f"  ‚Ä¢ Line {item.get('line_id')}: {item.get('campaign_name', 'N/A')}")
            print(f"    Revenue: ${item.get('gross_revenue', 0):,.2f}, " +
                  f"Impressions: {item.get('billed_impressions', 0):,}")
    
    # Validate
    validation = validate_extracted_data(invoice_data)
    print(f"\n‚úÖ VALIDATION: {'PASS' if validation['valid'] else 'FAIL'}")
    if validation['errors']:
        print(f"Errors: {validation['errors']}")
    if validation['warnings']:
        print(f"Warnings: {validation['warnings']}")
    
    # Save results
    output_file = save_to_json(invoice_data)
    print(f"\nüíæ Saved to: {output_file}")
    
    # Convert to DataFrame
    df = convert_to_dataframe(invoice_data)
    if not df.empty:
        print(f"\nüìä DataFrame: {len(df)} rows")
        print("\nColumns:", df.columns.tolist())