## Custom Analyzer with Azure AI Content Understanding

![image](./Assets/image.png)

In [None]:
%pip install python-dotenv

### Setting up the Environment

In [None]:
import os
from dotenv import load_dotenv
import requests

load_dotenv()

CONTENT_UNDERSTANDING_ENDPOINT = os.getenv("CONTENT_UNDERSTANDING_ENDPOINT").strip().rstrip('/')
CONTENT_UNDERSTANDING_API_KEY = os.getenv("CONTENT_UNDERSTANDING_API_KEY")
CUSTOM_ANALYZER_NAME = os.getenv("CUSTOM_ANALYZER_NAME")

print("Endpoint:", CONTENT_UNDERSTANDING_ENDPOINT)
print("API Key:", CONTENT_UNDERSTANDING_API_KEY)
print("Custom Analyzer Name:", CUSTOM_ANALYZER_NAME)


### Running Custom Analysis

In [None]:
prebuilt_document_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/{CUSTOM_ANALYZER_NAME}:analyze?api-version=2025-05-01-preview"

document_url = "https://github.com/kuljotSB/RAGwithAzureOpenAI/raw/refs/heads/main/ContentUnderstanding/Custom_Analyzer/invoices/invoice.pdf"

body = {
    "url": document_url
}

document_analysis_result = {}

try:
    headers = {
                "Content-Type": "application/json",
                "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
            }

    response = requests.post(prebuilt_document_analyzer_url, headers=headers, json=body)
    response.raise_for_status()
    result = response.json()
    analysis_id = result.get("id")
    print("Analysis ID:", analysis_id)

    # Using the analysis ID to get results; polling until the analysis is complete
    get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version=2025-05-01-preview"
    
    headers = {
        "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
    }
    analysis_status = "Running"
    while analysis_status == "Running":
        status_response = requests.get(get_result_url, headers=headers)
        status_response.raise_for_status()
        status_result = status_response.json()
        analysis_status = status_result.get("status")
        print("Current Analysis Status:", analysis_status)
        if analysis_status == "Running":
            import time
            time.sleep(1)  # Wait before polling again
    result_response = requests.get(get_result_url, headers=headers)
    result_response.raise_for_status()
    document_analysis_result = result_response.json()
    print("Document Analysis Result:", document_analysis_result)

except requests.RequestException as e:
    print(f"Error occurred: {e}")


### Displaying Results

In [None]:
import json
import textwrap
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional

def _hr(char="─", width=80):
    return char * width

def _h(text: str, width=80):
    pad = " " * 2
    line = f"{pad}{text.strip()} "
    return f"{_hr('=')} \n{line}\n{_hr('=')}"

def _subh(text: str):
    return f"\n{text}\n{_hr()}"

def _kv(k: str, v: Any, k_width=22):
    k = (k or "").strip()
    if isinstance(v, (dict, list)):
        v_str = json.dumps(v, indent=2, ensure_ascii=False)
    else:
        v_str = "" if v is None else str(v)
    return f"{k:<{k_width}} : {v_str}"

def _wrap_block(text: str, width=100, indent="    "):
    if not text:
        return ""
    wrapped = textwrap.fill(text, width=width)
    return textwrap.indent(wrapped, indent)

def display_document_analyzer_content_understanding_result(
    analysis_result: Dict[str, Any],
    save_markdown_path: Optional[str] = None,
    max_markdown_chars: int = 1200,
    width: int = 100,
) -> None:
    """
    Pretty-prints Azure Content Understanding 'prebuilt-documentAnalyzer' result
    and optionally writes extracted Markdown to a file.

    Parameters
    ----------
    analysis_result : dict
        The JSON-decoded response you printed as "Analysis Result".
    save_markdown_path : str | None
        If provided (e.g., 'analysis.md'), concatenated markdown from all contents
        will be written to this path.
    max_markdown_chars : int
        Truncate console preview of markdown to this many characters (file is not truncated).
    width : int
        Wrap width for console output.
    """
    # top-level
    print(_h("Content Understanding • Analysis Summary", width))
    print(_kv("Analysis ID", analysis_result.get("id")))
    print(_kv("Status", analysis_result.get("status")))

    result = (analysis_result or {}).get("result", {})
    usage = (analysis_result or {}).get("usage", {})
    tokens = usage.get("tokens", {}) if isinstance(usage, dict) else {}

    print(_subh("Analyzer Info"))
    print(_kv("Analyzer ID", result.get("analyzerId")))
    print(_kv("API Version", result.get("apiVersion")))
    created_at = result.get("createdAt")
    try:
        created_at_local = (
            datetime.fromisoformat(created_at.replace("Z", "+00:00")).astimezone().isoformat()
            if created_at else None
        )
    except Exception:
        created_at_local = created_at
    print(_kv("Created At (UTC)", created_at))
    print(_kv("Created At (local)", created_at_local))
    warnings = result.get("warnings") or []
    print(_kv("Warnings", f"{len(warnings)}"))

    print(_subh("Usage"))
    # note: the API you showed returns floats for tokens; just print raw
    for k in ("contextualization", "input", "output"):
        if k in tokens:
            print(_kv(f"Tokens.{k}", tokens.get(k)))

    # contents
    contents = result.get("contents") or []
    print(_subh(f"Contents ({len(contents)})"))

    combined_md_parts = []
    for idx, item in enumerate(contents, start=1):
        kind = item.get("kind")
        sp = item.get("startPageNumber")
        ep = item.get("endPageNumber")
        print(_hr())
        print(f"[Content #{idx}] kind={kind}  pages={sp}–{ep}")

        # Fields block (generic)
        fields = (item.get("fields") or {})
        if fields:
            print("• Fields:")
            for fname, fval in fields.items():
                if isinstance(fval, dict):
                    ftype = fval.get("type")
                    vstr = fval.get("valueString") or fval.get("valueNumber") or fval.get("valueBoolean") or fval.get("valueArray") or fval.get("valueObject")
                    # fall back to full dict if none of the canonical keys exist
                    if vstr is None:
                        vstr = fval
                    print(_wrap_block(f"  - {fname} ({ftype}): {vstr}", width))
                else:
                    print(_wrap_block(f"  - {fname}: {fval}", width))

        # Markdown preview
        md = item.get("markdown") or ""
        combined_md_parts.append(md)
        preview = md.strip()
        preview_trunc = (preview[:max_markdown_chars] + " … [truncated]") if len(preview) > max_markdown_chars else preview
        print("• Markdown Preview:")
        print(_wrap_block(preview_trunc, width))

    # Save concatenated markdown if requested
    if save_markdown_path:
        all_md = "\n\n---\n\n".join(part for part in combined_md_parts if part)
        out_path = Path(save_markdown_path)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        out_path.write_text(all_md, encoding="utf-8")
        print(_subh("Files"))
        print(_kv("Markdown saved", str(out_path.resolve())))

display_document_analyzer_content_understanding_result(document_analysis_result, save_markdown_path="document_analysis.md")
