# Insurance Claim Processing

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/mongodb-developer/GenAI-Showcase/blob/main/notebooks/agents/specialty_pharmacy_claims_processing_workflow.ipynb)

[![AI Learning Hub](https://img.shields.io/badge/AI%20Learning%20Hub-Click%20Here-blue)](https://www.mongodb.com/resources/use-cases/artificial-intelligence?utm_campaign=ai_learning_hub&utm_source=github&utm_medium=referral)

In [None]:
! pip install -qU pymongo voyageai openai PyMuPDF openai-agents nest_asyncio

In [2]:
import getpass
import os


# Function to securely get and set environment variables
def set_env_securely(var_name, prompt):
    value = getpass.getpass(prompt)
    os.environ[var_name] = value

In [3]:
# Set your OpenAI API Key
# TODO: Place a link on where openai api key can be obtained
set_env_securely("OPENAI_API_KEY", "Enter your OPENAI API KEY: ")

In [4]:
import os
from datetime import date
from typing import List, Optional

In [5]:
import re
from datetime import datetime
from typing import Optional

from pydantic import BaseModel, ConfigDict, field_validator


class AddressModel(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    line1: str
    line2: Optional[str] = None
    city: str
    state: str
    zip_code: str

    @field_validator("state")
    @classmethod
    def validate_state(cls, v):
        if len(v) != 2:
            raise ValueError("State must be a 2-letter code")
        return v.upper()

    @field_validator("zip_code")
    @classmethod
    def validate_zip(cls, v):
        # Basic US ZIP code validation
        if not re.match(r"^\d{5}(-\d{4})?$", v):
            raise ValueError("Invalid ZIP code format")
        return v


class ContactInfo(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    phone: Optional[str] = None
    email: Optional[str] = None
    address: AddressModel


class PatientModel(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    member_id: str
    first_name: str
    last_name: str
    middle_initial: Optional[str] = None
    date_of_birth: date
    gender: str
    group_number: str
    plan_type: str
    contact_info: ContactInfo
    employer: Optional[str] = None
    effective_date: Optional[date] = None

    @field_validator("gender")
    @classmethod
    def validate_gender(cls, v):
        valid_genders = ["M", "F", "X", "U", "MALE", "FEMALE", "OTHER", "UNKNOWN"]
        if v.upper() not in valid_genders:
            raise ValueError(f"Gender must be one of {valid_genders}")
        return v.upper()

In [6]:
import fitz


def pdf_to_images(pdf_path: str, zoom: float = 2.0) -> List[bytes]:
    """
    Converts each page of the PDF into a PNG image in memory.
    Returns a list of image bytes.
    """
    doc = fitz.open(pdf_path)
    png_bytes = []
    mat = fitz.Matrix(zoom, zoom)

    for page in doc:
        pix = page.get_pixmap(matrix=mat, alpha=False)
        img_bytes = pix.tobytes("png")
        png_bytes.append(img_bytes)

    return png_bytes

In [7]:
import base64
import json

from openai import OpenAI

client = OpenAI()


def extract_fields_from_image(image_png: bytes) -> dict:
    """
    Encodes the image as Base64 and sends it to the vision model,
    steering it to return a JSON matching our Pydantic schema.
    """
    # 1) Encode image
    b64 = base64.b64encode(image_png).decode("utf-8")
    data_uri = f"data:image/png;base64,{b64}"

    # 2) Build the “messages” payload
    system_text = (
        "You are a document-parsing assistant specialized in healthcare forms. "
        "Given an image of a specialty pharmacy claim form, extract these fields:\n\n"
        "patient:\n"
        "  member_id: string\n"
        "  first_name: string\n"
        "  last_name: string\n"
        "  middle_initial: string (optional)\n"
        "  date_of_birth: string (YYYY-MM-DD format)\n"
        "  gender: string\n"
        "  group_number: string\n"
        "  plan_type: string\n\n"
        "provider:\n"
        "  name: string\n"
        "  npi: string\n"
        "  tax_id: string\n"
        "  facility_type: string\n"
        "  network_status: string (IN_NETWORK or OUT_OF_NETWORK)\n"
        "  treating_physician: string (optional)\n"
        "  physician_npi: string (optional)\n\n"
        "medication:\n"
        "  drug_name: string\n"
        "  generic_name: string (optional)\n"
        "  ndc: string\n"
        "  dosage: string\n"
        "  route: string\n"
        "  frequency: string\n"
        "  quantity: number\n"
        "  days_supply: number\n\n"
        "clinical_info:\n"
        "  primary_diagnosis: string\n"
        "  primary_diagnosis_code: string\n"
        "  diagnosis_date: string (YYYY-MM-DD format, optional)\n\n"
        "claim:\n"
        "  claim_id: string\n"
        "  date_received: string (YYYY-MM-DD format)\n"
        "  claim_status: string\n"
        "  priority: string\n\n"
        "Respond with a single JSON object only, no extra text. "
        "Only include fields you can definitely see in the image. "
        "Use null for optional fields if not found. "
        "For dates, convert to YYYY-MM-DD format."
    )

    # 3) Call the vision-enabled model
    response = client.responses.create(
        model="gpt-4.1",  # your vision-enabled model
        input=[
            {
                "role": "system",
                "content": [{"type": "input_text", "text": system_text}],
            },
            {
                "role": "user",
                "content": [
                    {
                        "type": "input_text",
                        "text": "Please extract the form data as JSON.",
                    },
                    {"type": "input_image", "image_url": data_uri},
                ],
            },
        ],
    )

    # 4) Parse the JSON from the model output
    json_str = response.output_text.strip()
    return json.loads(json_str)

In [8]:
def parse_claim_form(pdf_path: str) -> dict:
    """
    High-level: PDF → image → vision extraction → Pydantic validation.
    """
    images = pdf_to_images(pdf_path)
    data = extract_fields_from_image(images[0])

    return data

In [None]:
from pprint import pprint

pdf_file = "data/specialty_pharmacy_claim_form_acme.pdf"
claim = parse_claim_form(pdf_file)
pprint(claim)

In [10]:
set_env_securely("VOYAGE_API_KEY", "Enter your VOYAGE API KEY: ")

In [11]:
# Non-sensitive environment variables
VOYAGE_AI_EMBEDDING_MODEL = "voyage-multimodal-3"
VOYAGE_AI_EMBEDDING_MODEL_DIMENSION = 1024

In [12]:
from io import BytesIO

import voyageai
from PIL import Image

# Initialize Voyage AI client (reads VOYAGE_API_KEY from env)
vo = voyageai.Client()


def embed_pdf_pages(
    page_png_bytes: List[bytes], model: str = "voyage-multimodal-3"
) -> List[List[float]]:
    """
    Given a list of PDF-page images (PNG bytes), return a list of embeddings,
    one per page, using a multimodal embedding model.

    Args:
        page_png_bytes: List of raw PNG image bytes (e.g. from pdf_to_images).
        model: Voyage multimodal embedding model to call.

    Returns:
        A list of embedding vectors (one list of floats per page).
    """

    print(f"Embedding {len(page_png_bytes)} pages with model {model}")
    # 1. Convert raw bytes to PIL images
    pil_images = [Image.open(BytesIO(b)) for b in page_png_bytes]

    print(f"Converting {len(pil_images)} PIL images to embeddings")
    # 2. Build the inputs list: empty text + image
    inputs = [["", img] for img in pil_images]
    print("Calling Voyage AI multimodal embed endpoint")

    # 3. Call VoyageAI multimodal embed endpoint
    result = vo.multimodal_embed(inputs, model=VOYAGE_AI_EMBEDDING_MODEL)
    print(f"Voyage AI response: {result}")
    # 4. Return only the embeddings
    return result.embeddings

In [15]:
# 1) Convert a PDF to images
images = pdf_to_images("data/specialty_pharmacy_claim_form_acme.pdf")

# 2) Embed each page
page_embeddings = embed_pdf_pages(images)

# 3) Inspect shape / content
pprint(
    {
        "num_pages": len(page_embeddings),
        "dimensionality": len(page_embeddings[0]) if page_embeddings else 0,
    }
)

Embedding 3 pages with model voyage-multimodal-3
Converting 3 PIL images to embeddings
Calling Voyage AI multimodal embed endpoint
Voyage AI response: <voyageai.object.multimodal_embeddings.MultimodalEmbeddingsObject object at 0x1203309d0>
{'dimensionality': 1024, 'num_pages': 3}


In [16]:
pages_records = []

for i, embedding in enumerate(page_embeddings):
    single_page_record = {
        "claim_id": claim["claim"]["claim_id"],
        "page_number": i + 1,
        "embedding": embedding,
    }

    pages_records.append(single_page_record)

In [17]:
set_env_securely("MONGODB_URI", "Enter your MongoDB URI: ")

In [18]:
import pymongo


def get_mongo_client(mongo_uri):
    """Establish and validate connection to the MongoDB."""

    client = pymongo.MongoClient(
        mongo_uri, appname="devrel.showcase.agents.claim_processing.python"
    )

    # Validate the connection
    ping_result = client.admin.command("ping")
    if ping_result.get("ok") == 1.0:
        # Connection successful
        print("Connection to MongoDB successful")
        return client
    else:
        print("Connection to MongoDB failed")
    return None

In [19]:
DB_NAME = "insurance_claims_back_office"
db_client = get_mongo_client(os.environ.get("MONGODB_URI"))
db = db_client[DB_NAME]

Connection to MongoDB successful


In [20]:
# Collection Names
CLAIMS_COLLECTION = "claims"
PAGES_COLLECTION = "insurance_claim_pages"
PATIENT_COLLECTION = "patients"
PHARMACY_FORMULARY_COLLECTION = "pharmacy_formulary"
SOP_COLLECTION = "sop"

CLAIMS_COLLECTION_VECTOR_INDEX_NAME = "claims_vector_search_index"
CLAIMS_COLLECTION_SEARCH_INDEX_NAME = "claims_text_search_index"

PAGES_COLLECTION_VECTOR_INDEX_NAME = "pages_vector_search_index"
PAGES_COLLECTION_SEARCH_INDEX_NAME = "pages_text_search_index"

PATIENT_COLLECTION_VECTOR_INDEX_NAME = "patient_vector_search_index"
PATIENT_COLLECTION_SEARCH_INDEX_NAME = "patient_text_search_index"

PHARMACY_FORMULARY_COLLECTION_VECTOR_INDEX_NAME = (
    "pharmacy_formulary_vector_search_index"
)
PHARMACY_FORMULARY_COLLECTION_SEARCH_INDEX_NAME = "pharmacy_formulary_text_search_index"

SOP_COLLECTION_VECTOR_INDEX_NAME = "sop_vector_search_index"
SOP_COLLECTION_SEARCH_INDEX_NAME = "sop_text_search_index"

In [21]:
def create_collections():
    existing = db.list_collection_names()
    print(f"Existing collections: {existing}")

    # 1) Claims collection: one doc per specialty pharmacy claim
    if CLAIMS_COLLECTION not in existing:
        db.create_collection(CLAIMS_COLLECTION)
        print(f"Created `{CLAIMS_COLLECTION}`")

    # 2) Pages collection: one doc per page in an insurance claim
    if PAGES_COLLECTION not in existing:
        db.create_collection(PAGES_COLLECTION)
        print(f"Created `{PAGES_COLLECTION}`")

    # 3) Patients collection: one doc per patient
    if PATIENT_COLLECTION not in existing:
        db.create_collection(PATIENT_COLLECTION)
        print(f"Created `{PATIENT_COLLECTION}`")

    # 4) Pharmarcy formulary collection
    if PHARMACY_FORMULARY_COLLECTION not in existing:
        db.create_collection(PHARMACY_FORMULARY_COLLECTION)
        print(f"Created `{PHARMACY_FORMULARY_COLLECTION}`")

    # 5) SOP Collection
    if SOP_COLLECTION not in existing:
        db.create_collection(SOP_COLLECTION)
        print(f"Created `{SOP_COLLECTION}`")

In [22]:
create_collections()

Existing collections: ['insurance_claim_pages', 'claims', 'pharmacy_formulary', 'patients', 'sop']


In [23]:
import time

from pymongo.operations import SearchIndexModel


# Create vector search index if it doesn't exist
def create_vector_search_index(collection, vector_index_name):
    # Check if index already exists
    try:
        existing_indexes = collection.list_search_indexes()
        for index in existing_indexes:
            if index["name"] == vector_index_name:
                print(f"Vector search index '{vector_index_name}' already exists.")
                return
    except Exception as e:
        print(f"Could not list search indexes: {e}")
        return

    # Create vector search index
    search_index_model = SearchIndexModel(
        definition={
            "fields": [
                {
                    "type": "vector",
                    "path": "embedding",
                    "numDimensions": VOYAGE_AI_EMBEDDING_MODEL_DIMENSION,
                    "similarity": "cosine",
                }
            ]
        },
        name=vector_index_name,
        type="vectorSearch",
    )

    try:
        result = collection.create_search_index(model=search_index_model)
        print(f"New search index named '{result}' is building.")
    except Exception as e:
        print(f"Error creating vector search index: {e}")
        return

    # Wait for initial sync to complete
    print(
        f"Polling to check if the index '{result}' is ready. This may take up to a minute."
    )
    predicate = lambda index: index.get("queryable") is True

    while True:
        try:
            indices = list(collection.list_search_indexes(result))
            if indices and predicate(indices[0]):
                break
            time.sleep(5)
        except Exception as e:
            print(f"Error checking index readiness: {e}")
            time.sleep(5)

    print(f"{result} is ready for querying.")

In [26]:
# create_vector_search_index(db[CLAIMS_COLLECTION], CLAIMS_COLLECTION_VECTOR_INDEX_NAME)
create_vector_search_index(db[PAGES_COLLECTION], PAGES_COLLECTION_VECTOR_INDEX_NAME)
# create_vector_search_index(db[PATIENT_COLLECTION], PATIENT_COLLECTION_VECTOR_INDEX_NAME)
create_vector_search_index(
    db[PHARMACY_FORMULARY_COLLECTION], PHARMACY_FORMULARY_COLLECTION_VECTOR_INDEX_NAME
)
create_vector_search_index(db[SOP_COLLECTION], SOP_COLLECTION_VECTOR_INDEX_NAME)

New search index named 'pages_vector_search_index' is building.
Polling to check if the index 'pages_vector_search_index' is ready. This may take up to a minute.
pages_vector_search_index is ready for querying.
New search index named 'pharmacy_formulary_vector_search_index' is building.
Polling to check if the index 'pharmacy_formulary_vector_search_index' is ready. This may take up to a minute.
pharmacy_formulary_vector_search_index is ready for querying.
New search index named 'sop_vector_search_index' is building.
Polling to check if the index 'sop_vector_search_index' is ready. This may take up to a minute.
sop_vector_search_index is ready for querying.


In [24]:
def create_text_search_index(collection, index_definition, index_name):
    """
    Create a search index for a MongoDB Atlas collection.

    Args:
    collection: MongoDB collection object
    index_definition: Dictionary defining the index mappings
    index_name: String name for the index

    Returns:
    str: Result of the index creation operation
    """

    try:
        search_index_model = SearchIndexModel(
            definition=index_definition, name=index_name
        )

        result = collection.create_search_index(model=search_index_model)
        print(f"Search index '{index_name}' created successfully")
        return result
    except Exception as e:
        print(f"Error creating search index: {e!s}")
        return None

In [25]:
# Claims collection text search index
claims_collection_search_index_definition = {
    "mappings": {
        "dynamic": True,
        "fields": {
            "filename": {"type": "string"},
            "text": {"type": "string"},  # full document OCR or extracted text
            "metadata.processed_by": {"type": "string"},
        },
    }
}

# Pages collection text search index
pages_collection_search_index_definition = {
    "mappings": {
        "dynamic": True,
        "fields": {
            "text": {"type": "string"},  # per page OCR or extracted text
            "document_id": {"type": "string"},  # you can filter/search by ID
        },
    }
}

# Patients collection text search index
patient_collection_search_index_definition = {
    "mappings": {
        "dynamic": True,
        "fields": {
            "name": {"type": "string"},
            "notes": {"type": "string"},
            "email": {"type": "string"},
            "phone": {"type": "string"},
            "address": {"type": "string"},
        },
    }
}

# Pharmacy forumlary text search index
pharmacy_formulary_collection_search_index_definition = {
    "mappings": {
        "dynamic": True,
        "fields": {
            "drug_name": {"type": "string"},
            "generic_name": {"type": "string"},
            "ndc": {"type": "string"},
            "dosage": {"type": "string"},
            "route": {"type": "string"},
            "frequency": {"type": "string"},
            "quantity": {"type": "string"},
            "days_supply": {"type": "string"},
        },
    }
}

# SOP text search index
sop_collection_search_index_definition = {
    "mappings": {
        "dynamic": True,
        "fields": {
            "title": {"type": "string"},
            "content": {"type": "string"},
        },
    }
}

In [26]:
create_text_search_index(
    db[CLAIMS_COLLECTION],
    claims_collection_search_index_definition,
    CLAIMS_COLLECTION_SEARCH_INDEX_NAME,
)
create_text_search_index(
    db[PAGES_COLLECTION],
    pages_collection_search_index_definition,
    PAGES_COLLECTION_SEARCH_INDEX_NAME,
)
create_text_search_index(
    db[PATIENT_COLLECTION],
    patient_collection_search_index_definition,
    PATIENT_COLLECTION_SEARCH_INDEX_NAME,
)
create_text_search_index(
    db[PHARMACY_FORMULARY_COLLECTION],
    pharmacy_formulary_collection_search_index_definition,
    PHARMACY_FORMULARY_COLLECTION_SEARCH_INDEX_NAME,
)
create_text_search_index(
    db[SOP_COLLECTION],
    sop_collection_search_index_definition,
    SOP_COLLECTION_SEARCH_INDEX_NAME,
)

Search index 'claims_text_search_index' created successfully
Error creating search index: An index named "pages_text_search_index" is already defined for collection insurance_claim_pages. Index names must be unique for a source collection and all its views., full error: {'ok': 0.0, 'errmsg': 'An index named "pages_text_search_index" is already defined for collection insurance_claim_pages. Index names must be unique for a source collection and all its views.', 'code': 68, 'codeName': 'IndexAlreadyExists', '$clusterTime': {'clusterTime': Timestamp(1757415219, 116), 'signature': {'hash': b'\xe4\x96\xe1PA\x7f\xf9;\xa7\xd3\x8e"\xe6\x14\x8b\x1b\xd9Iv\xbd', 'keyId': 7520068280199938053}}, 'operationTime': Timestamp(1757415219, 116)}
Search index 'patient_text_search_index' created successfully
Search index 'pharmacy_formulary_text_search_index' created successfully
Search index 'sop_text_search_index' created successfully


'sop_text_search_index'

In [27]:
pprint(claim)

{'claim': {'claim_id': 'SP-CIG-24051378942',
           'claim_status': None,
           'date_received': '2025-05-12',
           'priority': None},
 'clinical_info': {'diagnosis_date': None,
                   'primary_diagnosis': "Crohn's disease of small intestine, "
                                        'without complications',
                   'primary_diagnosis_code': 'K50.00'},
 'medication': {'days_supply': 56,
                'dosage': '5mg/kg (Total: 400mg)',
                'drug_name': 'REMICADE (infliximab)',
                'frequency': 'Every 8 weeks after induction',
                'generic_name': 'infliximab',
                'ndc': '57894-0030-01',
                'quantity': 4,
                'route': 'Intravenous'},
 'patient': {'date_of_birth': '1972-08-14',
             'first_name': 'Robert',
             'gender': 'Male',
             'group_number': '74289-001',
             'last_name': 'Johnson',
             'member_id': '0072354981',
             'mi

Data Ingestion

Becuase this is a demo, we will be ensuring the collection are empty

In [28]:
db[CLAIMS_COLLECTION].delete_many({})
db[PAGES_COLLECTION].delete_many({})
db[PATIENT_COLLECTION].delete_many({})
db[PHARMACY_FORMULARY_COLLECTION].delete_many({})
db[SOP_COLLECTION].delete_many({})

DeleteResult({'n': 1, 'electionId': ObjectId('7fffffff0000000000000023'), 'opTime': {'ts': Timestamp(1757415291, 8), 't': 35}, 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1757415291, 8), 'signature': {'hash': b'\xbe\x18_5\xc7\x04\xab@\xa3\xa6\xf2\xa5\x98\x0c4\xf1=\x03jH', 'keyId': 7520068280199938053}}, 'operationTime': Timestamp(1757415291, 8)}, acknowledged=True)

Now we insert new data

In [29]:
db[CLAIMS_COLLECTION].insert_one(claim["claim"])
db[PAGES_COLLECTION].insert_many(pages_records)
db[PATIENT_COLLECTION].insert_one(claim["patient"])

InsertOneResult(ObjectId('68c0077ef92ca76fa65b049a'), acknowledged=True)

Let's populate the pharmacy fomulary with some items

In [None]:
import json

with open("data/pharmacy_formulary_datapoints_full.json") as f:
    pharmarcy_formulary_datapoints = json.load(f)

print(len(pharmarcy_formulary_datapoints))  # <--- should be 6

6


In [31]:
def get_embedding(text, task_prefix="document"):
    """
    Generate embeddings for a text string with a task-specific prefix using the voyage-3-large model.

    Parameters:
        text (str): The input text to be embedded.
        task_prefix (str): A prefix describing the task; this is prepended to the text.

    Returns:
        list: The embedding vector as a list of floats.
    """

    # For multimodal_embed, we need to format the input differently
    # It expects a list of dictionaries with "text" or "image" fields
    input_data = [text]

    try:
        # Call the Voyage API to generate the embedding
        result = vo.multimodal_embed([input_data], model=VOYAGE_AI_EMBEDDING_MODEL)
        # Return the first embedding from the result
        return result.embeddings[0]
    except Exception as e:
        print(f"Error generating embedding: {e}")
        # Return an empty embedding in case of error
        return []

In [32]:
# Work with the original list of dictionaries
for entry in pharmarcy_formulary_datapoints:
    # Create a list of the fields to include in the embedding text
    fields_to_embed = [
        entry["Drug_Name"],
        entry["Generic_Name"],
        entry["Drug_Class"],
        entry.get("Failure_Criteria", ""),
        entry.get("Exception_Criteria", ""),
    ]

    # For PA_Criteria, which is a list, join it into a string first
    if entry.get("PA_Criteria"):
        fields_to_embed.append(" ".join(entry["PA_Criteria"]))

    # For Documentation_Required, which is a list, join it into a string first
    if entry.get("Documentation_Required"):
        fields_to_embed.append(" ".join(entry["Documentation_Required"]))

    # Filter out any None values and join all fields with spaces
    embedding_text = " ".join(str(field) for field in fields_to_embed if field)

    # Generate the embedding
    entry["embedding"] = get_embedding(embedding_text)

    # For testing:
    print(f"Drug: {entry['Drug_Name']} - Embedding text: {embedding_text[:100]}...")

Drug: Remicade - Embedding text: Remicade infliximab Tumor Necrosis Factor (TNF) Blocker Inadequate response, intolerance, or contrai...
Drug: Humira - Embedding text: Humira adalimumab Tumor Necrosis Factor (TNF) Blocker Inadequate response, intolerance, or contraind...
Drug: Entyvio - Embedding text: Entyvio vedolizumab Integrin Receptor Antagonist Inadequate response, loss of response, or intoleran...
Drug: Keytruda - Embedding text: Keytruda pembrolizumab Programmed Death Receptor-1 (PD-1) Blocking Antibody Not applicable as first-...
Drug: Stelara - Embedding text: Stelara ustekinumab Interleukin-12 and -23 Inhibitor Inadequate response, intolerance, or contraindi...
Drug: Ocrevus - Embedding text: Ocrevus ocrelizumab CD20-Directed Cytolytic Antibody Inadequate response, intolerance, or contraindi...


In [33]:
import pandas as pd

pharmacy_formulary_df = pd.DataFrame(pharmarcy_formulary_datapoints)
pharmacy_formulary_df.head()


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.3.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/richmondalake/miniconda3/lib/python3.11/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/Users/richmondalake/miniconda3/lib/python3.11/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/Users/richmondalake/miniconda3/lib/python3.11/site-packages/ipykernel/kernelapp.py", line 739, in start
    self.io_loop.

AttributeError: _ARRAY_API not found


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.3.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/richmondalake/miniconda3/lib/python3.11/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/Users/richmondalake/miniconda3/lib/python3.11/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/Users/richmondalake/miniconda3/lib/python3.11/site-packages/ipykernel/kernelapp.py", line 739, in start
    self.io_loop.

AttributeError: _ARRAY_API not found

Unnamed: 0,Drug_ID,NDC_Code,Drug_Name,Generic_Name,GPI_Code,Drug_Class,Tier_Level,Form,Strength,Route,...,End_Date,Required_Prior_Drugs,Failure_Criteria,Exception_Criteria,PA_Criteria,Diagnosis_Restrictions,Documentation_Required,Approval_Duration,Renewal_Criteria,embedding
0,REM45678,57894-0030-01,Remicade,infliximab,52.50.30.30.10,Tumor Necrosis Factor (TNF) Blocker,5,Powder for Injection,100 mg,Intravenous,...,,"[{'Drug_Name': 'Humira', 'Generic_Name': 'adal...","Inadequate response, intolerance, or contraind...",Documented severe disease requiring immediate ...,"[Diagnosis confirmed by endoscopy or imaging, ...","[{'ICD10_Code': 'K50.00', 'Description': 'Croh...","[Chart notes documenting diagnosis, Previous t...","Initial: 6 months, Renewal: 12 months",Documentation of positive clinical response to...,"[0.033935546875, -0.01220703125, 0.02844238281..."
1,HUM28756,0074-0243-02,Humira,adalimumab,66.40.10.10.30,Tumor Necrosis Factor (TNF) Blocker,4,Solution for Injection,40 mg/0.8 mL,Subcutaneous,...,,"[{'Drug_Name': 'Methotrexate', 'Generic_Name':...","Inadequate response, intolerance, or contraind...",Documented severe disease requiring immediate ...,[Diagnosis confirmed by appropriate clinical e...,"[{'ICD10_Code': 'M05.9', 'Description': 'Rheum...","[Chart notes documenting diagnosis, Previous t...","Initial: 6 months, Renewal: 12 months",Documentation of positive clinical response to...,"[0.038330078125, 0.0146484375, 0.02490234375, ..."
2,ENT96325,58468-0132-08,Entyvio,vedolizumab,52.92.00.40.30,Integrin Receptor Antagonist,5,Lyophilized Powder for Injection,300 mg,Intravenous,...,,"[{'Drug_Name': 'Humira', 'Generic_Name': 'adal...","Inadequate response, loss of response, or into...",Documented contraindication to TNF blockers; h...,[Diagnosis of moderate to severe ulcerative co...,"[{'ICD10_Code': 'K51.90', 'Description': 'Ulce...","[Chart notes documenting diagnosis, Previous t...","Initial: 6 months, Renewal: 12 months",Documentation of clinical remission or improve...,"[0.0224609375, -0.0299072265625, 5.55515289306..."
3,KEY78214,00085-1291-01,Keytruda,pembrolizumab,21.10.00.60.30,Programmed Death Receptor-1 (PD-1) Blocking An...,5,Solution for Injection,100 mg/4 mL,Intravenous,...,,[],Not applicable as first-line therapy for many ...,Not applicable,[FDA-approved diagnosis or NCCN-supported indi...,"[{'ICD10_Code': 'C34.90', 'Description': 'Mali...","[Pathology report confirming diagnosis, Geneti...","Initial: 6 months, Renewal: 12 months",No evidence of disease progression and no unac...,"[0.044189453125, 0.0162353515625, -0.006072998..."
4,STE42390,69639-0102-01,Stelara,ustekinumab,66.40.10.50.60,Interleukin-12 and -23 Inhibitor,5,Solution for Injection,90 mg/mL,Subcutaneous/Intravenous (depending on indicat...,...,,"[{'Drug_Name': 'Humira', 'Generic_Name': 'adal...","Inadequate response, intolerance, or contraind...",Documented severe disease requiring immediate ...,[Diagnosis confirmed by appropriate clinical e...,"[{'ICD10_Code': 'L40.0', 'Description': 'Psori...","[Chart notes documenting diagnosis, Previous t...","Initial: 6 months, Renewal: 12 months",Documentation of positive clinical response to...,"[0.0028076171875, -0.03466796875, -0.016845703..."


In [34]:
db[PHARMACY_FORMULARY_COLLECTION].insert_many(pharmarcy_formulary_datapoints)

InsertManyResult([ObjectId('68c00a29f92ca76fa65b049b'), ObjectId('68c00a29f92ca76fa65b049c'), ObjectId('68c00a29f92ca76fa65b049d'), ObjectId('68c00a29f92ca76fa65b049e'), ObjectId('68c00a29f92ca76fa65b049f'), ObjectId('68c00a29f92ca76fa65b04a0')], acknowledged=True)

Loading SOP dataset into the MongoDB collection

In [37]:
with open("data/acme_example_sop.json", encoding="utf-8") as f:
    example_sop = json.load(f)

print(example_sop["title"])

Specialty Biologic Medication Prior Authorization Process


In [38]:
def create_sop_embedding_text(sop_document):
    """
    Extracts and concatenates relevant text fields from the SOP document
    to create a text string for embedding generation.
    """
    text_components = [
        # Basic information
        sop_document.get("title", ""),
        sop_document.get("purpose", ""),
        sop_document.get("scope", ""),
        # Definitions (optional but helpful for terminology)
        " ".join(
            [
                f"{d.get('term')}: {d.get('definition')}"
                for d in sop_document.get("definitions", [])
            ]
        ),
        # Procedure steps
        " ".join(
            [
                f"{step.get('title')}. {step.get('description')} "
                + " ".join(step.get("actions", []))
                for step in sop_document.get("procedure_steps", [])
            ]
        ),
        # Decision logic
        " ".join(
            [
                "Approval criteria: "
                + " ".join(step.get("decision_logic", {}).get("approval_criteria", []))
                + " "
                + "Denial criteria: "
                + " ".join(step.get("decision_logic", {}).get("denial_criteria", []))
                + " "
                + "Exception criteria: "
                + " ".join(step.get("decision_logic", {}).get("exception_criteria", []))
                for step in sop_document.get("procedure_steps", [])
                if "decision_logic" in step
            ]
        ),
        # Indications and criteria
        " ".join(
            [
                f"For {ind.get('indication')}: "
                + "Approval criteria: "
                + " ".join(ind.get("approval_criteria", []))
                + " "
                + "Renewal criteria: "
                + " ".join(ind.get("renewal_criteria", []))
                for ind in sop_document.get("indications_and_criteria", [])
            ]
        ),
        # State requirements
        " ".join(
            [
                f"In {state.get('state')}: " + " ".join(state.get("requirements", []))
                for state in sop_document.get("state_specific_requirements", [])
            ]
        ),
        # Documentation requirements
        "Documentation required: "
        + " ".join(sop_document.get("documentation_requirements", [])),
    ]

    # Filter out empty strings and join with spaces
    embedding_text = " ".join([component for component in text_components if component])

    return embedding_text

In [39]:
sop_embedding_text = create_sop_embedding_text(example_sop)
print(sop_embedding_text)

Specialty Biologic Medication Prior Authorization Process This SOP establishes the standardized process for reviewing and determining medical necessity for specialty biologic medications requiring prior authorization under acme pharmacy benefit plans. Applies to all specialty biologic medications administered by subcutaneous injection or intravenous infusion for autoimmune and inflammatory conditions. PA: Prior Authorization TNF: Tumor Necrosis Factor DMARD: Disease-Modifying Anti-Rheumatic Drug Claim Receipt and Validation. Verify claim contains all required elements for processing. Confirm patient demographics and eligibility Verify claim includes diagnosis code(s) Confirm NDC or HCPCS code is present Validate prescriber information Benefit Verification. Confirm patient's plan covers requested medication and determine authorization requirements. Check formulary status of medication Determine tier level and patient cost-share Identify any quantity limits or duration restrictions Verif

In [40]:
example_sop["embedding"] = get_embedding(sop_embedding_text)

In [41]:
db[SOP_COLLECTION].insert_one(example_sop)

InsertOneResult(ObjectId('68c00ad8f92ca76fa65b04a1'), acknowledged=True)

In [42]:
# We are tring to:  “Approve,” “Escalate for Review,” or “Deny,” a claim

# 1. Eligibility Agent
# Pull the coverge plan of a patient when that matches the name on the claim
# Verify the patient eligibility for the treatment to ensure there's coverage
# Query the formulary database to check if the mediation if the database for the patient specfic plan

In [42]:
def search_formulary_with_claim_embedding(
    claim_embeddding, claim_id, collection, vector_index="vector_index_filter"
):
    """
    Perform a vector search in the MongoDB collection based on the user query.

    Args:
    user_query (str): The user's query string.
    collection (MongoCollection): The MongoDB collection to search.

    Returns:
    list: A list of matching documents.
    """

    if claim_embeddding is None:
        return "Invalid query or embedding generation failed."

    # Define the vector search pipeline
    vector_search_stage = {
        "$vectorSearch": {
            "index": vector_index,
            "queryVector": claim_embeddding,
            "path": "embedding",
            "numCandidates": 150,  # Number of candidate matches to consider
            "limit": 5,  # Return top 5 matches
        }
    }

    unset_stage = {
        "$unset": "embedding"  # Exclude the 'embedding' field from the results
    }

    add_score_stage = {
        "$addFields": {
            "score": {
                "$meta": "vectorSearchScore"  # Include the search score
            }
        }
    }

    pipeline = [vector_search_stage, unset_stage, add_score_stage]

    # Execute the search
    results = collection.aggregate(pipeline)
    return list(results)

In [119]:
def search_sop_with_claim_embedding(
    claim_embeddding, claim_id, collection, vector_index="vector_index_filter"
):
    """
    Perform a vector search in the MongoDB collection based on the user query.

    Args:
    user_query (str): The user's query string.
    collection (MongoCollection): The MongoDB collection to search.

    Returns:
    list: A list of matching documents.
    """

    if claim_embeddding is None:
        return "Invalid query or embedding generation failed."

    # Define the vector search pipeline
    vector_search_stage = {
        "$vectorSearch": {
            "index": vector_index,
            "queryVector": claim_embeddding,
            "path": "embedding",
            "numCandidates": 150,  # Number of candidate matches to consider
            "limit": 5,  # Return top 5 matches
        }
    }

    unset_stage = {
        "$unset": "embedding"  # Exclude the 'embedding' field from the results
    }

    add_score_stage = {
        "$addFields": {
            "score": {
                "$meta": "vectorSearchScore"  # Include the search score
            }
        }
    }

    pipeline = [vector_search_stage, unset_stage, add_score_stage]

    # Execute the search
    results = collection.aggregate(pipeline)
    return list(results)

In [None]:
from agents.tool import function_tool


def _flatten_and_pick_best(results_nested):
    flat = []
    for batch in results_nested:
        if isinstance(batch, list):
            flat.extend(batch)
        elif isinstance(batch, dict):
            flat.append(batch)
    if not flat:
        return None
    # higher vectorSearchScore is better
    flat.sort(key=lambda d: d.get("score", 0), reverse=True)
    return flat[0]


def _norm(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "")).strip().lower()


def _entry_matches_plan(
    entry, patient_plan_name: str, patient_plan_code: Optional[str] = None
):
    # Prefer exact code match if both exist
    if patient_plan_code:
        entry_codes = entry.get("plan_codes") or []
        if patient_plan_code in entry_codes:
            return True

    # Fallback to tolerant name match
    entry_plan = entry.get("Plan_ID") or entry.get("plan_name") or ""
    return _norm(entry_plan) == _norm(patient_plan_name)


@function_tool
def determine_eligibility(
    patient_first_name: str, patient_last_name: str, patient_id: str, claim_id: str
):
    """
    Determines if a patient is eligible for coverage of a specialty medication claim.

    This function uses vector search to match claim document embeddings against formulary
    entries and verifies if the medication is covered under the patient's insurance plan.

    Args:
      patient_first_name (str): Patient's first name
      patient_last_name (str): Patient's last name
      patient_id (str): Patient's member ID in the insurance system
      claim_id (str): Claim ID for the specialty pharmacy claim

    Returns:
      str: "Approve" if the claim should be covered, "Deny" if not

    Raises:
      ValueError: If patient record or claim data cannot be found
      KeyError: If required fields are missing from patient or formulary data
    """
    try:
        formulary_results = []

        patient = db[PATIENT_COLLECTION].find_one(
            {
                "first_name": patient_first_name,
                "last_name": patient_last_name,
                "member_id": patient_id,
            }
        )
        if not patient:
            raise ValueError("Patient not found")

        print("🔍Patient record found:", patient)

        patient_coverage_plan = patient.get("plan_type", "")
        patient_plan_code = patient.get("plan_code")  # optional if you store it

        # Grab all page embeddings for the claim
        print("🔍Claim ID:", claim_id)
        page_cursor = db[PAGES_COLLECTION].find({"claim_id": claim_id})
        page_embeddings = list(page_cursor)
        print("🔍Claim embeddings found:", len(page_embeddings))

        # Vector search per page
        for pe in page_embeddings:
            claim_embedding_vec = pe["embedding"]
            temp_result = search_formulary_with_claim_embedding(
                claim_embedding_vec,
                claim_id,
                db[PHARMACY_FORMULARY_COLLECTION],
                PHARMACY_FORMULARY_COLLECTION_VECTOR_INDEX_NAME,
            )
            formulary_results.append(temp_result)

        print("🔍Formulary results batches:", len(formulary_results))

        best_entry = _flatten_and_pick_best(formulary_results)
        if not best_entry:
            print("No formulary hits found")
            return "Deny"

        print("🔍Best result found:", best_entry)

        if _entry_matches_plan(best_entry, patient_coverage_plan, patient_plan_code):
            print(
                f"Plan match found for {patient_coverage_plan} / {patient_plan_code or ''}"
            )
            return "Approve"

        # If the best entry did not match the plan, try any of the top-k from all batches
        all_flat = []
        for batch in formulary_results:
            if isinstance(batch, list):
                all_flat.extend(batch)
        for entry in all_flat:
            if _entry_matches_plan(entry, patient_coverage_plan, patient_plan_code):
                print("Plan match found among top-k results")
                return "Approve"

        print(f"No matching plan found for {patient_coverage_plan}")
        return "Deny"

    except Exception as e:
        print(f"Error determining eligibility: {e}")
        return "Error"

In [121]:
# SOP Lookup Agent that connects to acme's document repository (Confluence or SharePoint), retrieves the relevant SOP document based on claim characteristics (like claim type, medication category, or processing requirements), then converts the document text into embeddings to create a searchable vector store. This allows downstream agents to quickly query specific decision logic and requirements during claim processing by performing semantic searches against the SOP content, ensuring that all processing steps follow the current approved procedures and compliance requirements.

In [122]:
@function_tool
def get_sop_by_claim(claim_id):
    sop_results = []

    page_cursor = db[PAGES_COLLECTION].find({"claim_id": claim_id})
    page_embeddings = list(page_cursor)

    for pe in page_embeddings:
        claim_embedding_vec = pe["embedding"]
        temp_result = search_sop_with_claim_embedding(
            claim_embedding_vec,
            claim_id,
            db[SOP_COLLECTION],
            SOP_COLLECTION_VECTOR_INDEX_NAME,
        )
        sop_results.append(temp_result)

    best_sop = _flatten_and_pick_best(sop_results)
    if not best_sop:
        return []

    return best_sop.get("procedure_steps", [])

In [123]:
def generate_sop_search_query(sop_document):
    """
    Generates a simple search query for finding related SOPs based on an existing SOP document.

    Args:
        sop_document (dict): A dictionary containing the SOP document

    Returns:
        str: A formatted search query string
    """
    # Extract basic information
    title = sop_document.get("title", "")
    department = sop_document.get("department", "")

    # Get medication types (take first category only)
    med_category = ""
    if (
        sop_document.get("applicable_medications")
        and len(sop_document.get("applicable_medications")) > 0
    ):
        med_category = sop_document["applicable_medications"][0].get("category", "")

    # Get first indication
    indication = ""
    if (
        sop_document.get("indications_and_criteria")
        and len(sop_document.get("indications_and_criteria")) > 0
    ):
        indication = sop_document["indications_and_criteria"][0].get("indication", "")

    # Get state codes (limit to 2)
    states = []
    for state_req in sop_document.get("state_specific_requirements", [])[:2]:
        if "state" in state_req:
            states.append(state_req["state"])

    # Build query
    query_parts = [
        "site:naic.org",
        '"Standard Operating Procedure" OR "SOP"',
        f'"{title}"' if title else "",
        f'"{department}"' if department else "",
        f'"{med_category}"' if med_category else "",
        f'"{indication}"' if indication else "",
    ]

    # Add states if available
    if states:
        query_parts.append("(" + " OR ".join(states) + ")")

    # Remove empty parts and join
    query = " ".join([part for part in query_parts if part])

    return query

In [124]:
def search_sites(search_query):
    print(f"Searching for SOPs with query in the search_sites function: {search_query}")
    search_resp = client.responses.create(
        model="gpt-4.1", tools=[{"type": "web_search_preview"}], input=search_query
    )

    print("=== Raw Web Search Preview ===")
    print(search_resp.output_text)

In [125]:
def _flatten_and_pick_best(nested):
    flat = []
    for batch in nested or []:
        if isinstance(batch, list):
            flat.extend(batch)
        elif isinstance(batch, dict):
            flat.append(batch)
    if not flat:
        return None
    flat.sort(key=lambda d: d.get("score", 0), reverse=True)
    return flat[0]

In [126]:
def _get_best_sop_for_claim(claim_id: str):
    # Get all page embeddings for the claim
    page_embeddings = list(db[PAGES_COLLECTION].find({"claim_id": claim_id}))

    sop_results = []
    for pe in page_embeddings:
        claim_embedding_vec = pe["embedding"]
        temp = search_sop_with_claim_embedding(
            claim_embedding_vec,
            claim_id,  # keep signature consistent
            db[SOP_COLLECTION],
            SOP_COLLECTION_VECTOR_INDEX_NAME,
        )
        sop_results.append(temp)

    best_sop = _flatten_and_pick_best(sop_results)
    return best_sop  # may be None

In [127]:
@function_tool
def search_for_sops(claim_id: str) -> dict:
    """
    Searches for SOPs across multiple sites based on a claim ID.

    Args:
        claim_id (str): Claim ID to find relevant SOP document for
    Returns:
        dict: Search results from each site
    """
    try:
        # First, get the SOP document using the existing function
        best_sop = _get_best_sop_for_claim(claim_id)
        if not best_sop:
            print("No relevant SOP found for this claim")
            return {}

        # Now search external sites using the SOP document
        site_options = [
            "naic.org",
            "insurance.ca.gov",
            "cms.gov",
            "ama-assn.org",
            "ashp.org",
        ]

        # Generate search query based on the SOP document
        base_query = generate_sop_search_query(best_sop)
        print(f"Searching for SOPs with query: {base_query}")

        # Search each site
        all_results = {}
        for site in site_options:
            site_query = f"site:{site} {base_query}"
            print(f"Searching {site}...")

            # Use your search_sites function
            results = search_sites(site_query)
            all_results[site] = results

        return all_results
    except Exception as e:
        print(f"Error searching for SOPs: {e}")
        return {}

In [128]:
OPENAI_MODEL = "gpt-4o"

In [129]:
class ClaimProcessingOutput(BaseModel):
    member_id: str
    patient_first_name: str
    patient_last_name: str
    claim_id: str
    processing_success: bool
    error_message: str | None = None


class BenefitValidationOutput(BaseModel):
    claim_id: str
    eligibility_status: str  # "Approve" or "Deny"
    requires_prior_authorization: bool | None = (
        None  # Changed from bool = False to bool | None = None
    )
    requires_step_therapy: bool | None = (
        None  # Changed from bool = False to bool | None = None
    )
    patient_cost_share: str | None = None
    denial_reason: str | None = None
    exception_ticket_required: bool | None = (
        None  # Changed from bool = False to bool | None = None
    )

In [130]:
@function_tool
def process_specialty_claim(pdf_path: str) -> dict:
    """
    Process a specialty pharmacy claim form by:
    1. Converting PDF to images
    2. Extracting field data using vision AI
    3. Creating embeddings for each page
    4. Storing data in MongoDB collections
    5. Returning key patient and claim identifiers

    Args:
        pdf_path (str): Path to the specialty pharmacy claim PDF file

    Returns:
        dict: Dictionary containing member_id, patient_first_name, patient_last_name, and claim_id
    """
    # Step 1: Convert PDF to images
    images = pdf_to_images(pdf_path)
    print(f"Successfully converted {pdf_path} to {len(images)} images.")

    # Step 2: Extract field data from first page (contains patient & claim info)
    extracted_data = extract_fields_from_image(images[0])
    print("Successfully extracted fields from first page.")

    # Step 3: Create embeddings for all pages
    page_embeddings = embed_pdf_pages(images)
    print(f"Successfully created embeddings for {len(images)} pages.")

    # Step 4: Extract key information for return value
    claim_id = extracted_data.get("claim", {}).get("claim_id")
    member_id = extracted_data.get("patient", {}).get("member_id")
    patient_first_name = extracted_data.get("patient", {}).get("first_name")
    patient_last_name = extracted_data.get("patient", {}).get("last_name")

    print(f"Claim ID: {claim_id}")
    print(f"Member ID: {member_id}")
    print(f"Patient Name: {patient_first_name} {patient_last_name}")

    # Step 5: Prepare records for database storage
    # Create claim document
    claim_record = extracted_data.get("claim", {})
    claim_record.update(
        {
            "document_id": f"DOC-{claim_id}",
            "filename": os.path.basename(pdf_path),
            "uploaded_at": datetime.now(),
            "metadata": {
                "total_pages": len(images),
                "processed_by": "Benefit_Coverage_Validation_Agent",
            },
        }
    )

    print("Sucessfully created claim record and stored in database.")

    # Create page records with embeddings
    pages_records = []
    for i, embedding in enumerate(page_embeddings):
        single_page_record = {
            "document_id": f"DOC-{claim_id}",
            "claim_id": claim_id,
            "page_number": i + 1,
            "embedding": embedding,
        }
        pages_records.append(single_page_record)

    # Create patient record
    patient_record = extracted_data.get("patient", {})

    # Step 6: Store in database
    try:
        db[CLAIMS_COLLECTION].insert_one(claim_record)
        db[PAGES_COLLECTION].insert_many(pages_records)
        db[PATIENT_COLLECTION].insert_one(patient_record)

        print(
            f"Successfully processed claim {claim_id} for patient {patient_first_name} {patient_last_name}"
        )
    except Exception as e:
        print(f"Error storing data in database: {e}")

    # Step 7: Return key identifiers
    return {
        "member_id": member_id,
        "patient_first_name": patient_first_name,
        "patient_last_name": patient_last_name,
        "claim_id": claim_id,
    }

In [131]:
from agents import Agent, Runner, trace

agent_process_specialty_claim = Agent(
    name="Process Specialty Claim Agent",
    model=OPENAI_MODEL,
    tools=[process_specialty_claim],
    output_type=ClaimProcessingOutput,
    instructions="""
    You are the Process Specialty Claim Agent responsible for the initial intake and processing of specialty pharmacy claim documents. Your primary role is to extract, structure, and store critical information from specialty pharmacy claim forms to enable downstream processing and verification.

    Your specific responsibilities include:

    1. Processing uploaded PDF claim documents by converting them to images for analysis
    2. Extracting key patient information including member ID, name, and demographic data
    3. Identifying claim details such as medication requested, diagnosis codes, and provider information
    4. Creating vector embeddings of each page to enable semantic search capabilities
    5. Storing the structured data and embeddings in the appropriate database collections for further processing
    6. Returning the essential identifiers (member_id, patient name, claim_id) to initiate the benefits verification workflow

    When handling claim documents, you should:
    - Ensure all processing steps are completed in the proper sequence
    - Verify that extracted information appears consistent and complete
    - Flag any issues with document quality or missing information
    - Maintain proper data formatting for downstream processing
    - Confirm successful database storage of all claim components

    Your output should be clear and concise, providing just the essential identifiers needed to proceed with benefit verification. Do not include lengthy explanations unless there were processing issues that need attention.

    You are the first step in our intelligent workflow automation for specialty pharmacy claims processing, so accuracy and completeness are critical to ensure proper routing and processing by downstream agents.
    """,
)

In [132]:
benefit_coverage_validation_agent = Agent(
    name="Benefit & Coverage Validation Agent",
    model=OPENAI_MODEL,
    tools=[determine_eligibility],
    output_type=BenefitValidationOutput,
    instructions="""
    You are a Benefit & Coverage Validation Agent for acme specialty pharmacy claims processing.
    Your role is to verify insurance coverage for specialty medications by:

    1. Querying member databases to confirm plan eligibility and coverage details
    2. Checking pharmacy formulary to verify medication coverage status
    3. Validating that the prescribed dosage meets coverage guidelines
    4. Confirming that the administration site (e.g., infusion center) is in-network
    5. Identifying any prior authorization or step therapy requirements
    6. Calculating expected patient cost-sharing based on plan benefits

    You must be thorough and precise, ensuring all verification steps are completed before
    determining coverage status. When discrepancies are found, generate clear exception
    tickets with specific details for human review. Always prioritize accuracy and compliance
    with benefit policies while providing clear explanations of coverage determinations.

    For member database queries, you should extract patient information from the claim and
    verify their plan coverage status, including specific details about specialty pharmacy
    benefits, deductible status, and applicable cost-sharing.

    When checking the pharmacy formulary, you need to determine if the prescribed medication
    is covered under the patient's specific plan, what tier it falls under, and identify any
    utilization management requirements like prior authorization or step therapy. Always
    verify the NDC code matches the one in the formulary.

    Dosage validation requires comparing the prescribed amount against plan-approved guidelines,
    accounting for patient-specific factors like weight when applicable.

    Your final determination should clearly state whether coverage is approved or if exceptions
    need human review, with specific reasoning and supporting policy references.
  """,
)

In [133]:
search_for_claims_best_practice = Agent(
    name="Claims Best Practice Search Agent",
    model=OPENAI_MODEL,
    instructions="""
   You are a Claims Best Practice Search Agent specializing in finding standard operating procedures,
   regulatory guidelines, and best practices for insurance claims processing, particularly for
   specialty pharmacy and prior authorization procedures.

   Your task is to search relevant websites and repositories for authoritative information about
   claims processing standards and procedures. When given an SOP document or topic, you will:

   1. Construct effective search queries targeting authoritative sources like NAIC, state insurance
      departments, CMS, industry associations, and accreditation organizations

   2. Extract key information about best practices, procedural standards, and regulatory requirements

   3. Compare findings with existing procedures to identify gaps, improvements, or compliance issues

   4. Summarize findings in a structured format that highlights actionable insights

   Focus on finding the most current, authoritative sources rather than general information.
   When analyzing search results, prioritize official guidelines, model procedures, and
   regulatory requirements over opinion pieces or marketing materials.

   For specialty pharmacy claims, pay particular attention to prior authorization requirements,
   step therapy protocols, medical necessity criteria, and state-specific regulations.
   """,
    tools=[search_for_sops],
)

In [None]:
import asyncio


async def process_specialty_pharmacy_claim(pdf_path: Optional[str] = None):
    """
    Process a specialty pharmacy claim from a local PDF file (no Google Colab upload).
    Pass the path to the PDF when calling the function.
    """
    # If no path was passed, ask interactively
    if not pdf_path:
        pdf_path = input(
            "Enter the path to the specialty pharmacy claim PDF file: "
        ).strip()

    if not pdf_path:
        print("❌ No file path provided.")
        return

    # Start the workflow trace
    with trace("Specialty Pharmacy Claim Processing Workflow"):
        # STEP 1: Process the claim document to extract information
        print("\n🔄 Processing claim document...")
        claim_processing_result = await Runner.run(
            agent_process_specialty_claim, pdf_path
        )

        # Check if processing was successful
        assert isinstance(claim_processing_result.final_output, ClaimProcessingOutput)
        if not claim_processing_result.final_output.processing_success:
            print(
                f"❌ Claim processing failed: {claim_processing_result.final_output.error_message}"
            )
            return

        # Print the extracted claim information
        print("✅ Claim document processed successfully!")
        print(f"📋 Claim ID: {claim_processing_result.final_output.claim_id}")
        print(
            f"👤 Patient: {claim_processing_result.final_output.patient_first_name} "
            f"{claim_processing_result.final_output.patient_last_name}"
        )
        print(f"🆔 Member ID: {claim_processing_result.final_output.member_id}")

        # STEP 2: Validate benefit coverage and search for claims best practice IN PARALLEL
        print(
            "\n🔄 Validating benefit coverage and searching for best practices in parallel..."
        )

        # Prepare the input for the benefit validation agent
        benefit_input = {
            "patient_first_name": claim_processing_result.final_output.patient_first_name,
            "patient_last_name": claim_processing_result.final_output.patient_last_name,
            "patient_id": claim_processing_result.final_output.member_id,
            "claim_id": claim_processing_result.final_output.claim_id,
        }

        # Run both agents in parallel
        benefit_validation_result, best_practice_result = await asyncio.gather(
            Runner.run(benefit_coverage_validation_agent, str(benefit_input)),
            Runner.run(search_for_claims_best_practice, str(benefit_input["claim_id"])),
        )

        # Process the validation result
        assert isinstance(
            benefit_validation_result.final_output, BenefitValidationOutput
        )

        # Display the final determination
        print("\n📊 BENEFIT VERIFICATION RESULT:")
        print(f"Claim ID: {benefit_validation_result.final_output.claim_id}")
        print(f"Status: {benefit_validation_result.final_output.eligibility_status}")

        if benefit_validation_result.final_output.eligibility_status == "Approved":
            print("✅ Claim is APPROVED for coverage")

            if benefit_validation_result.final_output.requires_prior_authorization:
                print("⚠️ Prior Authorization is required")

            if benefit_validation_result.final_output.requires_step_therapy:
                print("⚠️ Step Therapy requirements must be met")

            if benefit_validation_result.final_output.patient_cost_share:
                print(
                    f"💲 Patient cost share: {benefit_validation_result.final_output.patient_cost_share}"
                )

        else:
            print("❌ Claim is DENIED for coverage")
            print(f"Reason: {benefit_validation_result.final_output.denial_reason}")

            if benefit_validation_result.final_output.exception_ticket_required:
                print("🎫 Exception ticket has been created for review")

        # Display best practice results
        print("\n📚 CLAIMS BEST PRACTICE RECOMMENDATIONS:")
        print(f"Recommendations: {best_practice_result.final_output}")

        print("\n✅ Specialty Pharmacy Claim Processing Workflow Complete!")

In [135]:
# Modified run_workflow function
def run_workflow():
    # This will now work in a notebook environment
    loop = asyncio.get_event_loop()
    loop.run_until_complete(process_specialty_pharmacy_claim())

In [138]:
# Execute the workflow
# When asked for file path, enter the path to the specialty pharmacy claim PDF file.
# data/specialty_pharmacy_claim_form_acme.pdf
run_workflow()


🔄 Processing claim document...
Successfully converted data/specialty_pharmacy_claim_form_acme.pdf to 3 images.
Successfully extracted fields from first page.
Embedding 3 pages with model voyage-multimodal-3
Converting 3 PIL images to embeddings
Calling Voyage AI multimodal embed endpoint
Voyage AI response: <voyageai.object.multimodal_embeddings.MultimodalEmbeddingsObject object at 0x132c5ad50>
Successfully created embeddings for 3 pages.
Claim ID: SP-CIG-24051378942
Member ID: 0072354981
Patient Name: Robert Johnson
Sucessfully created claim record and stored in database.
Successfully processed claim SP-CIG-24051378942 for patient Robert Johnson
✅ Claim document processed successfully!
📋 Claim ID: SP-CIG-24051378942
👤 Patient: Robert Johnson
🆔 Member ID: 0072354981

🔄 Validating benefit coverage and searching for best practices in parallel...
Searching for SOPs with query: site:naic.org "Standard Operating Procedure" OR "SOP" "Specialty Biologic Medication Prior Authorization Process