# Qwen2.5-VL Demand Letter Spark Notebook

In [None]:
# Cell 1
import os, json, uuid, threading
import fitz
import numpy as np
import torch
from pydantic import BaseModel, Field
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("DemandLetterQwen2_5_VL").getOrCreate()
print("Spark version:", spark.version)
print("CUDA:", torch.cuda.is_available())

In [None]:
# Cell 2
class DemandLetterExtract(BaseModel):
    is_demand_letter: bool|None=None
    claim_number: str|None=None
    claimant_name: str|None=None
    claimant_address: str|None=None
    claimant_contact_phone: str|None=None
    claimant_contact_email: str|None=None
    claimant_contact_facsimile: str|None=None
    claimant_legal_office_information: str|None=None
    insurance_company_representative: str|None=None
    claim_amount: str|None=None
    demand_letter_date: str|None=None
    response_deadline_date: str|None=None
    evidence_attached: list = Field(default_factory=list)
    date_of_loss: str|None=None
    insured_property_address: str|None=None
    insured_asset_description: str|None=None
    policy_number: str|None=None
    referenced_policy_language: str|None=None
    threats_of_legal_action: str|None=None
    requested_resolution: str|None=None
    tone_of_letter: str|None=None
    claimant_stated_cause_of_loss: str|None=None
    letter_response_markdown: str|None=None

MASTER_PROMPT = """You are an insurance claims adjuster analyzing a document containing both text and images (PDF page snapshots). 
Use ALL provided content — extracted text AND page images — to determine if the document is a demand letter and to extract structured data.  
Images always override text in cases of conflict.

Your job is to produce ONLY a JSON object matching the exact schema below, with no commentary, no Markdown, and no extra fields:

{{
    "is_demand_letter": boolean | false,
    "claim_number": string | null,
    "claimant_name": string | null,
    "claimant_address": string | null,
    "claimant_contact_phone": string | null,
    "claimant_contact_email": string | null,
    "claimant_contact_facsimile": string | null,
    "claimant_legal_office_information": string | null,
    "insurance_company_representative": string | null,
    "claim_amount": string | null,
    "demand_letter_date": string | null,
    "response_deadline_date": string | null,
    "evidence_attached": array,
    "date_of_loss": string | null,
    "insured_property_address": string | null,
    "insured_asset_description": string | null,
    "policy_number": string | null,
    "referenced_policy_language": string | null,
    "threats_of_legal_action": string | null,
    "requested_resolution": string | null,
    "tone_of_letter": string | null,
    "claimant_stated_cause_of_loss": string | null,
    "letter_response_markdown": string | null
}}

Rules:

1. Use BOTH:  
   - The extracted TEXT of the PDF pages  
   - The IMAGE representations of the pages  
   If there is any discrepancy, treat the IMAGE as the authoritative source.

2. First determine whether the document IS a demand letter.  
   If NOT a demand letter:  
   - "is_demand_letter" = false  
   - All other fields MUST be null (including "letter_response_markdown").  
   - Return the JSON and stop.

3. If it IS a demand letter:  
   - Extract all fields strictly from the provided content.  
   - Use null for any field not explicitly present.  
   - Do not infer, guess, or hallucinate any information.

4. For "evidence_attached":  
   - Provide an array of evidence types ONLY if explicitly attached or referenced.  
   - Otherwise leave as an empty array.

5. The field "letter_response_markdown" must contain a short, professional acknowledgement letter from the insurance adjuster.  
   Requirements:  
   - No salutation (“Dear…”).  
   - No names of adjusters.  
   - Must acknowledge receipt of the demand.  
   - Must state the claim is under review.  
   - Must summarize ONLY verifiable facts present in the letter.  
   - Must mention a 30-day review/response timeline.  
   - No additional claims, legal commentary, or invented facts.

6. Use consistent, neutral, factual language.  
   Never fabricate legal, financial, or policy details.

7. When reading images, pay attention to:  
   - Headers (law office, claimant, insurer)  
   - Stamps, dates, letterheads  
   - Signatures  
   - Embedded exhibits  
   - Handwritten annotations  
   - Policy numbers, claim numbers  
   - Amounts demanded  

8. Your output must be **strict JSON**, parsable with no trailing text or commentary.

Now analyze the provided text and images and return ONLY the JSON object.
""" 

In [None]:
# Cell 3
_model_lock=threading.Lock()
_model_instance=None
_processor_instance=None

def get_model_and_processor():
    global _model_instance,_processor_instance
    if _model_instance is not None:
        return _model_instance,_processor_instance
    with _model_lock:
        if _model_instance is None:
            _model_instance=Qwen2_5_VLForConditionalGeneration.from_pretrained(
                "Qwen/Qwen2.5-VL-7B-Instruct", torch_dtype="auto", device_map="auto"
            )
            _processor_instance=AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct")
    return _model_instance,_processor_instance

In [None]:
# Cell 4
def extract_pdf_pages(pdf_path:str):
    pdf=fitz.open(pdf_path)
    out=[]
    for page in pdf:
        text=page.get_text()
        pix=page.get_pixmap(dpi=200)
        fn=f"/tmp/page_{uuid.uuid4()}.png"
        pix.save(fn)
        out.append({"text":text,"image_path":fn})
    return out

def analyze_pdf_local(pdf_path):
    model,processor=get_model_and_processor()
    pages=extract_pdf_pages(pdf_path)
    msgs=[{"role":"user","content":[{"type":"text","text": MASTER_PROMPT}]}]
    for p in pages:
        msgs[0]["content"].append({"type":"image","image":p["image_path"]})
        msgs[0]["content"].append({"type":"text","text":p["text"]})
    txt=processor.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)
    imgs,vids=process_vision_info(msgs)
    inputs=processor(text=[txt], images=imgs, videos=vids, padding=True, return_tensors="pt")
    inputs=inputs.to("cuda" if torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        gen=model.generate(**inputs, max_new_tokens=2048)
    trimmed=[o[len(i):] for i,o in zip(inputs.input_ids,gen)]
    out=processor.batch_decode(trimmed, skip_special_tokens=True)[0]
    return DemandLetterExtract.parse_raw(out)

In [None]:
# Cell 5
def analyze_pdf_udf_impl(p):
    try:
        if p is None: return json.dumps({"error":"null path"})
        r=analyze_pdf_local(p)
        return r.json()
    except Exception as e:
        return json.dumps({"error":str(e),"path":p})

analyze_pdf_udf=udf(analyze_pdf_udf_impl,StringType())

In [None]:
# Cell 6
df=spark.createDataFrame([
    ("/data/pdfs/sample1.pdf",),
    ("/data/pdfs/sample2.pdf",)
],["pdf_path"])

scored=df.withColumn("analysis", analyze_pdf_udf("pdf_path"))
scored.show(truncate=False)