In [7]:
# Install necessary libraries
!pip install -q requests PyMuPDF pandas transformers torchvision pytorch-lightning pdf2image

# Install poppler (required for pdf2image)
!apt-get install -y poppler-utils


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.1/24.1 MB[0m [31m78.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m77.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m77.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m43.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [67]:
import requests
import fitz  # PyMuPDF
import pandas as pd
import json
import os
from pdf2image import convert_from_path
from PIL import Image
import torch
from transformers import DonutProcessor, VisionEncoderDecoderModel

# ========== Gemini API Setup ==========
API_KEY = "ADD YOUR API KEY"
BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent"
headers = {
    "Content-Type": "application/json",
    "X-goog-api-key": API_KEY
}

def call_gemini(prompt):
    payload = {"contents": [{"parts": [{"text": prompt}]}]}
    response = requests.post(BASE_URL, headers=headers, json=payload)
    if response.status_code == 200:
        try:
            return response.json()['candidates'][0]['content']['parts'][0]['text']
        except Exception:
            print("⚠️ Could not parse Gemini response.")
            return None
    else:
        print("❌ Gemini API error:", response.text)
        return None

def extract_text_from_pdf(file_path):
    print("📄 Checking for selectable text...")
    text = ""
    with fitz.open(file_path) as doc:
        for page in doc:
            text += page.get_text()
    if text.strip():
        print("✅ Selectable text found.")
        return text.strip()
    else:
        print("⚠️ No selectable text found. Using Donut for OCR...")
        return extract_text_with_donut(file_path)

def extract_text_with_donut(pdf_path):
    images = convert_from_path(pdf_path, dpi=200)
    processor = DonutProcessor.from_pretrained("naver-clova-ix/donut-base")
    model = VisionEncoderDecoderModel.from_pretrained("naver-clova-ix/donut-base").to("cuda" if torch.cuda.is_available() else "cpu")
    results = []
    for img in images:
        img = img.convert("RGB")
        pixel_values = processor(img, return_tensors="pt").pixel_values.to(model.device)
        decoder_input_ids = processor.tokenizer("<s>", add_special_tokens=False, return_tensors="pt").input_ids.to(model.device)
        outputs = model.generate(pixel_values, decoder_input_ids=decoder_input_ids, max_length=512)
        result = processor.batch_decode(outputs, skip_special_tokens=True)[0]
        results.append(result)
    return "\n\n".join(results)

def detect_output_format(task_detail):
    return "json" if "json" in task_detail.lower() else "csv"

def parse_csv_response(csv_text, expected_num_cols=None):
    from io import StringIO
    lines = csv_text.strip().splitlines()
    clean_lines = []
    for line in lines:
        cols = [c.strip() for c in line.split(',')]
        if expected_num_cols is None:
            expected_num_cols = len(cols)
        if len(cols) == expected_num_cols:
            clean_lines.append(','.join(cols))
        else:
            print(f"⚠️ Skipping malformed row: {line}")
    cleaned_csv = "\n".join(clean_lines)
    try:
        return pd.read_csv(StringIO(cleaned_csv))
    except Exception as e:
        print("⚠️ Final CSV parsing failed:", e)
        return None

def save_output(df, output_format="csv", output_path="output"):
    if os.path.exists(f"{output_path}.{output_format}"):
        print(f"📎 Appending to existing {output_format.upper()} file...")
        if output_format == "csv":
            existing_df = pd.read_csv(f"{output_path}.csv")
            common_cols = [col for col in df.columns if col in existing_df.columns]
            df = df[common_cols]
            df.to_csv(f"{output_path}.csv", mode='a', header=False, index=False)
        elif output_format == "json":
            with open(f"{output_path}.json", "r") as f:
                existing_data = json.load(f)
            if isinstance(existing_data, list):
                new_data = json.loads(df.to_json(orient="records"))
                existing_data.extend(new_data)
                with open(f"{output_path}.json", "w") as f:
                    json.dump(existing_data, f, indent=2)
    else:
        if output_format == "json":
            df.to_json(f"{output_path}.json", orient="records", indent=2)
            print(f"✅ JSON saved to {output_path}.json")
        elif output_format == "csv":
            df.to_csv(f"{output_path}.csv", index=False)
            print(f"✅ CSV saved to {output_path}.csv")
        else:
            print("❌ Unsupported format")

def main():
    file_path = input("📁 Enter the full path to your PDF file (e.g. /content/invoice.pdf): ").strip()
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"❌ File not found at: {file_path}")

    task_detail = input("📝 Enter your task instruction: ").strip()
    output_format = detect_output_format(task_detail)
    print(f"📤 Detected output format: {output_format.upper()}")

    print("\n🔍 Extracting text from PDF...")
    pdf_text = extract_text_from_pdf(file_path)

    print("\n🧠 Inferring expected document type from instruction...")
    doc_type_prompt = (
        f"Based on this instruction:\n'{task_detail}'\n"
        f"What type of document is the user referring to? "
        f"Reply with one word like: invoice, receipt, resume, contract, report, purchase_order, letter, etc."
    )
    expected_type = call_gemini(doc_type_prompt)
    expected_type_clean = expected_type.strip().lower() if expected_type else None

    print(f"\n🔎 Validating whether the uploaded PDF is a '{expected_type_clean}' (or closely related)...")
    validation_prompt = (
        f"The user wants to work with a document of type '{expected_type_clean}'. "
        f"Check the following document text and answer: "
        f"Is it generally consistent with that type or a closely related business document "
        f"(like invoice vs purchase order)? Say 'yes' if they are meaningfully related and not wildly different "
        f"(like resume vs invoice). Reply only 'yes' or 'no'.\n\n{pdf_text[:3000]}"
    )
    is_valid = call_gemini(validation_prompt)
    if is_valid and is_valid.strip().lower().startswith("yes"):
        print(f"✅ Document matches expected type or is closely related. Continuing...")
    else:
        print(f"❌ This file does not appear to be a '{expected_type_clean}' or closely related. No extraction performed.")
        return

    print("\n🤖 Generating prompt to return plain CSV table...")
    csv_prompt = (
        f"Based on the following task: '{task_detail}', extract the relevant fields from the document "
        f"and return ONLY a plain CSV table as raw text with no explanation, markdown, or code formatting."
    )
    final_prompt = csv_prompt + f"\n\nHere is the document text:\n{pdf_text}"

    print("\n🤖 Sending to Gemini for structured CSV output...")
    csv_response = call_gemini(final_prompt)
    print("\n📦 Gemini Response:\n", csv_response)

    df = parse_csv_response(csv_response)
    if df is not None:
        save_output(df, output_format)
    else:
        print("❌ Could not parse CSV. Saving raw output.")
        with open("output_raw.txt", "w") as f:
            f.write(csv_response)
        print("✅ Saved raw Gemini output to output_raw.txt")

# Run the script
main()

📁 Enter the full path to your PDF file (e.g. /content/invoice.pdf): /content/Resume.pdf
📝 Enter your task instruction: Extract key fields like vendor name and units from this purchase order
📤 Detected output format: CSV

🔍 Extracting text from PDF...
📄 Checking for selectable text...
✅ Selectable text found.

🧠 Inferring expected document type from instruction...

🔎 Validating whether the uploaded PDF is a 'purchase_order' (or closely related)...
❌ This file does not appear to be a 'purchase_order' or closely related. No extraction performed.


In [None]:
extract item name, date, vendor, units, and total cost from this purchase order. save as spreadsheet format
