## Getting Started with Azure AI Content Understanding

![Azure AI Content Understanding](./Assets/content-understanding.png)

%pip install python-dotenv 

### Setting up Environment Variables

In [None]:
import os
from dotenv import load_dotenv
import requests

load_dotenv()

CONTENT_UNDERSTANDING_ENDPOINT = os.getenv("CONTENT_UNDERSTANDING_ENDPOINT").strip().rstrip('/')
CONTENT_UNDERSTANDING_API_KEY = os.getenv("CONTENT_UNDERSTANDING_API_KEY")

print("Endpoint:", CONTENT_UNDERSTANDING_ENDPOINT)
print("API Key:", CONTENT_UNDERSTANDING_API_KEY)


### Using Prebuilt-Document Analyzer

In [None]:
prebuilt_document_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/prebuilt-documentAnalyzer:analyze?api-version=2025-05-01-preview"

document_url = "https://github.com/kuljotSB/RAGwithAzureOpenAI/raw/refs/heads/main/ContentUnderstanding/Samples/invoice.pdf"

body = {
    "url": document_url
}

document_analysis_result = {}

try:
    headers = {
                "Content-Type": "application/json",
                "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
            }

    response = requests.post(prebuilt_document_analyzer_url, headers=headers, json=body)
    response.raise_for_status()
    result = response.json()
    analysis_id = result.get("id")
    print("Analysis ID:", analysis_id)

    # Using the analysis ID to get results; polling until the analysis is complete
    get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version=2025-05-01-preview"
    
    headers = {
        "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
    }
    analysis_status = "Running"
    while analysis_status == "Running":
        status_response = requests.get(get_result_url, headers=headers)
        status_response.raise_for_status()
        status_result = status_response.json()
        analysis_status = status_result.get("status")
        print("Current Analysis Status:", analysis_status)
        if analysis_status == "Running":
            import time
            time.sleep(1)  # Wait before polling again
    result_response = requests.get(get_result_url, headers=headers)
    result_response.raise_for_status()
    document_analysis_result = result_response.json()
    print("Document Analysis Result:", document_analysis_result)

except requests.RequestException as e:
    print(f"Error occurred: {e}")


### Function to display Document Analyzer results

In [None]:
import json
import textwrap
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional

def _hr(char="─", width=80):
    return char * width

def _h(text: str, width=80):
    pad = " " * 2
    line = f"{pad}{text.strip()} "
    return f"{_hr('=')} \n{line}\n{_hr('=')}"

def _subh(text: str):
    return f"\n{text}\n{_hr()}"

def _kv(k: str, v: Any, k_width=22):
    k = (k or "").strip()
    if isinstance(v, (dict, list)):
        v_str = json.dumps(v, indent=2, ensure_ascii=False)
    else:
        v_str = "" if v is None else str(v)
    return f"{k:<{k_width}} : {v_str}"

def _wrap_block(text: str, width=100, indent="    "):
    if not text:
        return ""
    wrapped = textwrap.fill(text, width=width)
    return textwrap.indent(wrapped, indent)

def display_document_analyzer_content_understanding_result(
    analysis_result: Dict[str, Any],
    save_markdown_path: Optional[str] = None,
    max_markdown_chars: int = 1200,
    width: int = 100,
) -> None:
    """
    Pretty-prints Azure Content Understanding 'prebuilt-documentAnalyzer' result
    and optionally writes extracted Markdown to a file.

    Parameters
    ----------
    analysis_result : dict
        The JSON-decoded response you printed as "Analysis Result".
    save_markdown_path : str | None
        If provided (e.g., 'analysis.md'), concatenated markdown from all contents
        will be written to this path.
    max_markdown_chars : int
        Truncate console preview of markdown to this many characters (file is not truncated).
    width : int
        Wrap width for console output.
    """
    # top-level
    print(_h("Content Understanding • Analysis Summary", width))
    print(_kv("Analysis ID", analysis_result.get("id")))
    print(_kv("Status", analysis_result.get("status")))

    result = (analysis_result or {}).get("result", {})
    usage = (analysis_result or {}).get("usage", {})
    tokens = usage.get("tokens", {}) if isinstance(usage, dict) else {}

    print(_subh("Analyzer Info"))
    print(_kv("Analyzer ID", result.get("analyzerId")))
    print(_kv("API Version", result.get("apiVersion")))
    created_at = result.get("createdAt")
    try:
        created_at_local = (
            datetime.fromisoformat(created_at.replace("Z", "+00:00")).astimezone().isoformat()
            if created_at else None
        )
    except Exception:
        created_at_local = created_at
    print(_kv("Created At (UTC)", created_at))
    print(_kv("Created At (local)", created_at_local))
    warnings = result.get("warnings") or []
    print(_kv("Warnings", f"{len(warnings)}"))

    print(_subh("Usage"))
    # note: the API you showed returns floats for tokens; just print raw
    for k in ("contextualization", "input", "output"):
        if k in tokens:
            print(_kv(f"Tokens.{k}", tokens.get(k)))

    # contents
    contents = result.get("contents") or []
    print(_subh(f"Contents ({len(contents)})"))

    combined_md_parts = []
    for idx, item in enumerate(contents, start=1):
        kind = item.get("kind")
        sp = item.get("startPageNumber")
        ep = item.get("endPageNumber")
        print(_hr())
        print(f"[Content #{idx}] kind={kind}  pages={sp}–{ep}")

        # Fields block (generic)
        fields = (item.get("fields") or {})
        if fields:
            print("• Fields:")
            for fname, fval in fields.items():
                if isinstance(fval, dict):
                    ftype = fval.get("type")
                    vstr = fval.get("valueString") or fval.get("valueNumber") or fval.get("valueBoolean") or fval.get("valueArray") or fval.get("valueObject")
                    # fall back to full dict if none of the canonical keys exist
                    if vstr is None:
                        vstr = fval
                    print(_wrap_block(f"  - {fname} ({ftype}): {vstr}", width))
                else:
                    print(_wrap_block(f"  - {fname}: {fval}", width))

        # Markdown preview
        md = item.get("markdown") or ""
        combined_md_parts.append(md)
        preview = md.strip()
        preview_trunc = (preview[:max_markdown_chars] + " … [truncated]") if len(preview) > max_markdown_chars else preview
        print("• Markdown Preview:")
        print(_wrap_block(preview_trunc, width))

    # Save concatenated markdown if requested
    if save_markdown_path:
        all_md = "\n\n---\n\n".join(part for part in combined_md_parts if part)
        out_path = Path(save_markdown_path)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        out_path.write_text(all_md, encoding="utf-8")
        print(_subh("Files"))
        print(_kv("Markdown saved", str(out_path.resolve())))

display_document_analyzer_content_understanding_result(document_analysis_result, save_markdown_path="document_analysis.md")


### Testing Image Analysis using Prebuilt-Image Analyzer

In [None]:
prebuilt_image_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/prebuilt-imageAnalyzer:analyze?api-version=2025-05-01-preview"

image_url = "https://github.com/kuljotSB/RAGwithAzureOpenAI/raw/refs/heads/main/ContentUnderstanding/Samples/pieChart.png"

body = {
    "url": image_url
}

image_analysis_result = {}

try:
    headers = {
                "Content-Type": "application/json",
                "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
            }

    response = requests.post(prebuilt_image_analyzer_url, headers=headers, json=body)
    response.raise_for_status()
    result = response.json()
    analysis_id = result.get("id")
    print("Analysis ID:", analysis_id)

    # Using the analysis ID to get results; polling until the analysis is complete
    get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version=2025-05-01-preview"
    
    headers = {
        "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
    }
    analysis_status = "Running"
    while analysis_status == "Running":
        status_response = requests.get(get_result_url, headers=headers)
        status_response.raise_for_status()
        status_result = status_response.json()
        analysis_status = status_result.get("status")
        print("Current Analysis Status:", analysis_status)
        if analysis_status == "Running":
            import time
            time.sleep(1)  # Wait before polling again
    result_response = requests.get(get_result_url, headers=headers)
    result_response.raise_for_status()
    image_analysis_result = result_response.json()
    print("Image Analysis Result:", image_analysis_result)

except requests.RequestException as e:
    print(f"Error occurred: {e}")


### Printing Image Analysis results

In [None]:
import json
import textwrap
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

# ---------- small console helpers ----------
def _hr(char="─", width=80): return char * width
def _h(text: str, width=80): return f"{_hr('=')} \n  {text.strip()} \n{_hr('=')}"
def _subh(text: str): return f"\n{text}\n{_hr()}"
def _kv(k: str, v: Any, k_width=22):
    if isinstance(v, (dict, list)):
        v_str = json.dumps(v, indent=2, ensure_ascii=False)
    else:
        v_str = "" if v is None else str(v)
    return f"{k:<{k_width}} : {v_str}"
def _wrap_block(text: str, width=100, indent="    "):
    if not text: return ""
    return textwrap.indent(textwrap.fill(text, width=width), indent)

# ---------- field formatting ----------
def _extract_value_from_fieldobj(fobj: Dict[str, Any]) -> Tuple[str, Any]:
    """
    From an Azure field object like {"type":"string","valueString":"..."}
    return (type, value) using the first present among common value* keys.
    """
    ftype = fobj.get("type") or "unknown"
    for key in ("valueString", "valueNumber", "valueBoolean", "valueArray", "valueObject"):
        if key in fobj:
            return ftype, fobj[key]
    return ftype, fobj

def _fields_to_markdown(fields: Dict[str, Any]) -> str:
    """
    Render a fields dict to Markdown bullets with types and values.
    Complex values are JSON in fenced blocks.
    """
    if not fields:
        return "_No fields._"
    lines = []
    for fname, fval in fields.items():
        if isinstance(fval, dict) and "type" in fval:
            ftype, v = _extract_value_from_fieldobj(fval)
        else:
            ftype, v = "unknown", fval
        if isinstance(v, (dict, list)):
            v_md = "```json\n" + json.dumps(v, indent=2, ensure_ascii=False) + "\n```"
        else:
            v_md = str(v) if v is not None else ""
        lines.append(f"- **{fname}** (_{ftype}_): {v_md}")
    return "\n".join(lines)

# ---------- main function ----------
def display_image_analysis_result(
    analysis_result: Dict[str, Any],
    save_markdown_path: Optional[str] = None,
    max_markdown_chars: int = 800,
    width: int = 100,
) -> None:
    """
    Pretty-prints Azure 'prebuilt-imageAnalyzer' result and optionally writes a
    Markdown report that includes BOTH the analyzer markdown AND a Fields section.
    """
    # Console summary
    print(_h("Content Understanding • Image Analysis Summary", width))
    print(_kv("Analysis ID", analysis_result.get("id")))
    print(_kv("Status", analysis_result.get("status")))

    result = (analysis_result or {}).get("result", {})
    usage = (analysis_result or {}).get("usage", {})
    tokens = usage.get("tokens", {}) if isinstance(usage, dict) else {}

    print(_subh("Analyzer Info"))
    print(_kv("Analyzer ID", result.get("analyzerId")))
    print(_kv("API Version", result.get("apiVersion")))
    created_at = result.get("createdAt")
    try:
        created_at_local = (
            datetime.fromisoformat(created_at.replace("Z", "+00:00")).astimezone().isoformat()
            if created_at else None
        )
    except Exception:
        created_at_local = created_at
    print(_kv("Created At (UTC)", created_at))
    print(_kv("Created At (local)", created_at_local))
    warnings = result.get("warnings") or []
    print(_kv("Warnings", f"{len(warnings)}"))

    print(_subh("Usage"))
    for k in ("contextualization", "input", "output"):
        if k in tokens:
            print(_kv(f"Tokens.{k}", tokens.get(k)))

    contents = result.get("contents") or []
    print(_subh(f"Contents ({len(contents)})"))

    # Prepare Markdown report parts (if requested)
    md_parts = []
    if save_markdown_path:
        md_parts.append("# Image Analysis Report\n")
        md_parts.append("## Summary\n")
        md_parts.append(f"- **Analysis ID:** `{analysis_result.get('id')}`")
        md_parts.append(f"- **Status:** `{analysis_result.get('status')}`")
        md_parts.append(f"- **Analyzer:** `{result.get('analyzerId')}`")
        md_parts.append(f"- **API Version:** `{result.get('apiVersion')}`")
        md_parts.append(f"- **Created At (UTC):** `{created_at}`")
        md_parts.append(f"- **Warnings:** `{len(warnings)}`\n")

        if tokens:
            md_parts.append("## Usage\n")
            for k in ("contextualization", "input", "output"):
                if k in tokens:
                    md_parts.append(f"- **Tokens.{k}:** `{tokens.get(k)}`")
            md_parts.append("")

        md_parts.append("## Contents\n")

    # Iterate content blocks
    for idx, item in enumerate(contents, start=1):
        kind = item.get("kind")
        sp = item.get("startPageNumber")
        ep = item.get("endPageNumber")
        unit = item.get("unit")
        pages = item.get("pages")
        fields = (item.get("fields") or {})
        md = item.get("markdown") or ""

        # Console view
        print(_hr())
        print(f"[Content #{idx}] kind={kind}  pages={sp}–{ep}")
        if unit:  print(_kv("Unit", unit))
        if pages: print(_kv("Pages", pages))

        if fields:
            print("• Fields:")
            for fname, fval in fields.items():
                if isinstance(fval, dict):
                    ftype, v = _extract_value_from_fieldobj(fval)
                    if v is None: v = fval
                    print(_wrap_block(f"  - {fname} ({ftype}): {v}", width))
                else:
                    print(_wrap_block(f"  - {fname}: {fval}", width))

        preview = md.strip()
        preview_trunc = (preview[:max_markdown_chars] + " … [truncated]") if len(preview) > max_markdown_chars else preview
        print("• Markdown Preview:")
        print(_wrap_block(preview_trunc, width))

        # Markdown report for this content
        if save_markdown_path:
            md_parts.append(f"### Content #{idx}")
            meta_bits = []
            if kind: meta_bits.append(f"**kind:** `{kind}`")
            if sp is not None and ep is not None: meta_bits.append(f"**pages:** `{sp}–{ep}`")
            if unit: meta_bits.append(f"**unit:** `{unit}`")
            if meta_bits:
                md_parts.append("> " + " • ".join(meta_bits))
            if pages:
                md_parts.append("<details><summary>Pages metadata</summary>\n\n```json\n" +
                                json.dumps(pages, indent=2, ensure_ascii=False) +
                                "\n```\n</details>\n")
            # Analyzer-provided markdown (exact)
            md_parts.append("#### Analyzer Markdown")
            md_parts.append(md if md.strip() else "_(empty)_")
            # Fields as Markdown bullets
            md_parts.append("#### Fields")
            md_parts.append(_fields_to_markdown(fields))
            md_parts.append("---")

    # Write Markdown file if requested
    if save_markdown_path:
        out_path = Path(save_markdown_path)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        report = "\n".join(md_parts).rstrip() + "\n"
        out_path.write_text(report, encoding="utf-8")
        print(_subh("Files"))
        print(_kv("Markdown saved", str(out_path.resolve())))

display_image_analysis_result(image_analysis_result, save_markdown_path="image_analysis.md")


### Testing Audio Analysis using Prebuilt-Audio Analyzer

In [None]:
prebuilt_audio_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/prebuilt-audioAnalyzer:analyze?api-version=2025-05-01-preview"

audio_url = "https://github.com/kuljotSB/RAGwithAzureOpenAI/raw/refs/heads/main/ContentUnderstanding/Samples/audio.wav"

body = {
    "url": audio_url
}

audio_analysis_result = {}

try:
    headers = {
                "Content-Type": "application/json",
                "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
            }

    response = requests.post(prebuilt_audio_analyzer_url, headers=headers, json=body)
    response.raise_for_status()
    result = response.json()
    analysis_id = result.get("id")
    print("Analysis ID:", analysis_id)

    # Using the analysis ID to get results; polling until the analysis is complete
    get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version=2025-05-01-preview"
    
    headers = {
        "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
    }
    analysis_status = "Running"
    while analysis_status == "Running":
        status_response = requests.get(get_result_url, headers=headers)
        status_response.raise_for_status()
        status_result = status_response.json()
        analysis_status = status_result.get("status")
        print("Current Analysis Status:", analysis_status)
        if analysis_status == "Running":
            import time
            time.sleep(3)  # Wait before polling again
    result_response = requests.get(get_result_url, headers=headers)
    result_response.raise_for_status()
    audio_analysis_result = result_response.json()
    print("Audio Analysis Result:", audio_analysis_result)

except requests.RequestException as e:
    print(f"Error occurred: {e}")


### Printing Audio Analysis results

In [None]:
import json
import textwrap
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, List

# ---------- small console helpers ----------
def _hr(char="─", width=80): return char * width
def _h(text: str, width=80): return f"{_hr('=')} \n  {text.strip()} \n{_hr('=')}"
def _subh(text: str): return f"\n{text}\n{_hr()}"
def _kv(k: str, v: Any, k_width=24):
    if isinstance(v, (dict, list)):
        v_str = json.dumps(v, indent=2, ensure_ascii=False)
    else:
        v_str = "" if v is None else str(v)
    return f"{k:<{k_width}} : {v_str}"
def _wrap_block(text: str, width=100, indent="    "):
    if not text: return ""
    return textwrap.indent(textwrap.fill(text, width=width), indent)

# ---------- field formatting ----------
def _extract_value_from_fieldobj(fobj: Dict[str, Any]) -> Tuple[str, Any]:
    """
    From an Azure 'field' object like {"type":"string","valueString":"..."}
    return (type, value) using the first present among value* keys.
    """
    ftype = fobj.get("type") or "unknown"
    for key in ("valueString", "valueNumber", "valueBoolean", "valueArray", "valueObject"):
        if key in fobj:
            return ftype, fobj[key]
    return ftype, fobj

def _fields_to_markdown(fields: Dict[str, Any]) -> str:
    """
    Render a fields dict to Markdown bullets with types and values.
    Complex values are JSON in fenced blocks.
    """
    if not fields:
        return "_No fields._"
    lines = []
    for fname, fval in fields.items():
        if isinstance(fval, dict) and "type" in fval:
            ftype, v = _extract_value_from_fieldobj(fval)
        else:
            ftype, v = "unknown", fval
        if isinstance(v, (dict, list)):
            v_md = "```json\n" + json.dumps(v, indent=2, ensure_ascii=False) + "\n```"
        else:
            v_md = str(v) if v is not None else ""
        lines.append(f"- **{fname}** (_{ftype}_): {v_md}")
    return "\n".join(lines)

# ---------- audio utilities ----------
def _fmt_ms(ms: int) -> str:
    """Format milliseconds as HH:MM:SS.mmm"""
    if ms is None:
        return ""
    td = timedelta(milliseconds=int(ms))
    # Manually format to always show milliseconds
    total_seconds = int(td.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60
    millis = int(ms) % 1000
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{millis:03d}"

def _phrases_to_markdown(phrases: List[Dict[str, Any]]) -> str:
    """
    Render transcriptPhrases into a compact Markdown list with timestamps, speaker, confidence, and text.
    """
    if not phrases:
        return "_No transcript phrases._"
    out = []
    for p in phrases:
        spk = p.get("speaker") or "Speaker"
        st = _fmt_ms(p.get("startTimeMs"))
        et = _fmt_ms(p.get("endTimeMs"))
        conf = p.get("confidence")
        txt = p.get("text") or ""
        conf_str = f"{conf:.3f}" if isinstance(conf, (int, float)) else str(conf) if conf is not None else ""
        out.append(f"- `{st} → {et}` **{spk}** (_conf: {conf_str}_): {txt}")
    return "\n".join(out)

# ---------- main function ----------
def display_audio_analysis_result(
    analysis_result: Dict[str, Any],
    save_markdown_path: Optional[str] = None,
    max_markdown_chars: int = 1600,
    width: int = 100,
) -> None:
    """
    Pretty-prints Azure 'prebuilt-audioAnalyzer' result and optionally writes a
    Markdown report that includes BOTH the analyzer markdown AND a Fields section,
    plus a compact transcript phrases section.

    Parameters
    ----------
    analysis_result : dict
        The JSON-decoded response ("Audio Analysis Result").
    save_markdown_path : str | None
        If provided, writes a Markdown report with analyzer markdown, fields, and phrases.
    max_markdown_chars : int
        Truncate console preview of analyzer markdown to this many characters.
    width : int
        Wrap width for console output.
    """
    # Console summary
    print(_h("Content Understanding • Audio Analysis Summary", width))
    print(_kv("Analysis ID", analysis_result.get("id")))
    print(_kv("Status", analysis_result.get("status")))

    result = (analysis_result or {}).get("result", {})
    usage = (analysis_result or {}).get("usage", {}) or {}
    tokens = usage.get("tokens", {}) if isinstance(usage, dict) else {}
    audio_hours = usage.get("audioHours")

    print(_subh("Analyzer Info"))
    print(_kv("Analyzer ID", result.get("analyzerId")))
    print(_kv("API Version", result.get("apiVersion")))
    print(_kv("String Encoding", result.get("stringEncoding")))
    created_at = result.get("createdAt")
    try:
        created_at_local = (
            datetime.fromisoformat(created_at.replace("Z", "+00:00")).astimezone().isoformat()
            if created_at else None
        )
    except Exception:
        created_at_local = created_at
    print(_kv("Created At (UTC)", created_at))
    print(_kv("Created At (local)", created_at_local))
    warnings = result.get("warnings") or []
    print(_kv("Warnings", f"{len(warnings)}"))

    print(_subh("Usage"))
    if audio_hours is not None:
        print(_kv("Audio Hours", audio_hours))
    for k in ("contextualization", "input", "output"):
        if k in tokens:
            print(_kv(f"Tokens.{k}", tokens.get(k)))

    contents = result.get("contents") or []
    print(_subh(f"Contents ({len(contents)})"))

    # Prepare Markdown report parts (if requested)
    md_parts = []
    if save_markdown_path:
        md_parts.append("# Audio Analysis Report\n")
        md_parts.append("## Summary\n")
        md_parts.append(f"- **Analysis ID:** `{analysis_result.get('id')}`")
        md_parts.append(f"- **Status:** `{analysis_result.get('status')}`")
        md_parts.append(f"- **Analyzer:** `{result.get('analyzerId')}`")
        md_parts.append(f"- **API Version:** `{result.get('apiVersion')}`")
        md_parts.append(f"- **String Encoding:** `{result.get('stringEncoding')}`")
        md_parts.append(f"- **Created At (UTC):** `{created_at}`")
        md_parts.append(f"- **Warnings:** `{len(warnings)}`\n")
        if audio_hours is not None:
            md_parts.append(f"- **Audio Hours:** `{audio_hours}`")
        if tokens:
            md_parts.append("\n## Usage")
            for k in ("contextualization", "input", "output"):
                if k in tokens:
                    md_parts.append(f"- **Tokens.{k}:** `{tokens.get(k)}`")
        md_parts.append("\n## Contents\n")

    # Iterate content blocks
    for idx, item in enumerate(contents, start=1):
        kind = item.get("kind")
        start_ms = item.get("startTimeMs")
        end_ms = item.get("endTimeMs")
        fields = (item.get("fields") or {})
        md = item.get("markdown") or ""
        phrases = item.get("transcriptPhrases") or []

        # Console view
        print(_hr())
        print(f"[Content #{idx}] kind={kind}  time={_fmt_ms(start_ms)}–{_fmt_ms(end_ms)}")
        if fields:
            print("• Fields:")
            for fname, fval in fields.items():
                if isinstance(fval, dict):
                    ftype, v = _extract_value_from_fieldobj(fval)
                    if v is None: v = fval
                    print(_wrap_block(f"  - {fname} ({ftype}): {v}", width))
                else:
                    print(_wrap_block(f"  - {fname}: {fval}", width))

        preview = md.strip()
        preview_trunc = (preview[:max_markdown_chars] + " … [truncated]") if len(preview) > max_markdown_chars else preview
        print("• Analyzer Markdown Preview:")
        print(_wrap_block(preview_trunc, width))

        if phrases:
            print("• Transcript Phrases (compact):")
            print(_wrap_block(_phrases_to_markdown(phrases), width))

        # Markdown report for this content
        if save_markdown_path:
            md_parts.append(f"### Content #{idx}")
            meta_bits = []
            if kind: meta_bits.append(f"**kind:** `{kind}`")
            if start_ms is not None and end_ms is not None:
                meta_bits.append(f"**time:** `{_fmt_ms(start_ms)}–{_fmt_ms(end_ms)}`")
            if meta_bits:
                md_parts.append("> " + " • ".join(meta_bits))

            # Analyzer-provided markdown (exact)
            md_parts.append("#### Analyzer Markdown")
            md_parts.append(md if md.strip() else "_(empty)_")

            # Fields
            md_parts.append("#### Fields")
            md_parts.append(_fields_to_markdown(fields))

            # Transcript phrases (compact bullets)
            md_parts.append("#### Transcript Phrases")
            md_parts.append(_phrases_to_markdown(phrases))

            md_parts.append("---")

    # Write Markdown file if requested
    if save_markdown_path:
        out_path = Path(save_markdown_path)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        report = "\n".join(md_parts).rstrip() + "\n"
        out_path.write_text(report, encoding="utf-8")
        print(_subh("Files"))
        print(_kv("Markdown saved", str(out_path.resolve())))

display_audio_analysis_result(audio_analysis_result, save_markdown_path="audio_analysis.md")


### Testing Video Analysis using Prebuilt-Video Analyzer

In [None]:
prebuilt_video_analyzer_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzers/prebuilt-videoAnalyzer:analyze?api-version=2025-05-01-preview"

video_url = "https://github.com/kuljotSB/RAGwithAzureOpenAI/raw/refs/heads/main/ContentUnderstanding/Samples/FlightSimulator.mp4"

body = {
    "url": video_url
}

video_analysis_result = {}

try:
    headers = {
                "Content-Type": "application/json",
                "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
            }

    response = requests.post(prebuilt_video_analyzer_url, headers=headers, json=body)
    response.raise_for_status()
    result = response.json()
    analysis_id = result.get("id")
    print("Analysis ID:", analysis_id)

    # Using the analysis ID to get results; polling until the analysis is complete
    get_result_url = f"{CONTENT_UNDERSTANDING_ENDPOINT}/contentunderstanding/analyzerResults/{analysis_id}?api-version=2025-05-01-preview"
    
    headers = {
        "Ocp-Apim-Subscription-Key": CONTENT_UNDERSTANDING_API_KEY
    }
    analysis_status = "Running"
    while analysis_status == "Running":
        status_response = requests.get(get_result_url, headers=headers)
        status_response.raise_for_status()
        status_result = status_response.json()
        analysis_status = status_result.get("status")
        print("Current Analysis Status:", analysis_status)
        if analysis_status == "Running":
            import time
            time.sleep(3)  # Wait before polling again
    result_response = requests.get(get_result_url, headers=headers)
    result_response.raise_for_status()
    video_analysis_result = result_response.json()
    print("Video Analysis Result:", video_analysis_result)

except requests.RequestException as e:
    print(f"Error occurred: {e}")


In [None]:
import json
import textwrap
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, List

# ---------- small console helpers ----------
def _hr(char="─", width=80): return char * width
def _h(text: str, width=80): return f"{_hr('=')} \n  {text.strip()} \n{_hr('=')}"
def _subh(text: str): return f"\n{text}\n{_hr()}"
def _kv(k: str, v: Any, k_width=24):
    if isinstance(v, (dict, list)):
        v_str = json.dumps(v, indent=2, ensure_ascii=False)
    else:
        v_str = "" if v is None else str(v)
    return f"{k:<{k_width}} : {v_str}"
def _wrap_block(text: str, width=100, indent="    "):
    if not text: return ""
    return textwrap.indent(textwrap.fill(text, width=width), indent)

# ---------- field formatting ----------
def _extract_value_from_fieldobj(fobj: Dict[str, Any]) -> Tuple[str, Any]:
    """
    From an Azure field object like {"type":"string","valueString":"..."}
    return (type, value) using the first present among value* keys.
    """
    ftype = fobj.get("type") or "unknown"
    for key in ("valueString", "valueNumber", "valueInteger", "valueBoolean", "valueArray", "valueObject"):
        if key in fobj:
            return ftype, fobj[key]
    return ftype, fobj

def _fields_to_markdown(fields: Dict[str, Any]) -> str:
    """
    Render 'fields' dict to Markdown bullets with types and values.
    Complex values are JSON in fenced blocks. Handles arrays/objects.
    """
    if not fields:
        return "_No fields._"
    lines = []
    for fname, fval in fields.items():
        if isinstance(fval, dict) and "type" in fval:
            ftype, v = _extract_value_from_fieldobj(fval)
        else:
            ftype, v = "unknown", fval
        if isinstance(v, (dict, list)):
            v_md = "```json\n" + json.dumps(v, indent=2, ensure_ascii=False) + "\n```"
        else:
            v_md = "" if v is None else str(v)
        lines.append(f"- **{fname}** (_{ftype}_): {v_md}")
    return "\n".join(lines)

# ---------- video/audio utilities ----------
def _fmt_ms(ms: Optional[int]) -> str:
    """Format milliseconds as HH:MM:SS.mmm"""
    if ms is None:
        return ""
    td = timedelta(milliseconds=int(ms))
    total_seconds = int(td.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60
    millis = int(ms) % 1000
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{millis:03d}"

def _phrases_to_markdown(phrases: List[Dict[str, Any]]) -> str:
    """
    Render transcriptPhrases into a compact Markdown list with timestamps, speaker, confidence, and text.
    """
    if not phrases:
        return "_No transcript phrases._"
    out = []
    for p in phrases:
        spk = p.get("speaker") or "Speaker"
        st = _fmt_ms(p.get("startTimeMs"))
        et = _fmt_ms(p.get("endTimeMs"))
        conf = p.get("confidence")
        txt = p.get("text") or ""
        conf_str = f"{conf:.3f}" if isinstance(conf, (int, float)) else (str(conf) if conf is not None else "")
        out.append(f"- `{st} → {et}` **{spk}** (_conf: {conf_str}_): {txt}")
    return "\n".join(out)

def _segments_to_markdown(segments: List[Dict[str, Any]]) -> str:
    """
    Render plain segment dicts like [{'startTimeMs':.., 'endTimeMs':.., 'description':.., 'segmentId':..}]
    """
    if not segments:
        return "_No segments._"
    out = []
    for s in segments:
        st = _fmt_ms(s.get("startTimeMs"))
        et = _fmt_ms(s.get("endTimeMs"))
        desc = s.get("description") or ""
        sid = s.get("segmentId")
        head = f"- **Segment {sid}**" if sid else "- **Segment**"
        out.append(f"{head} `{st} → {et}`: {desc}")
    return "\n".join(out)

def _times_to_markdown(times_ms: List[int], title: str) -> str:
    if not times_ms:
        return f"_No {title.lower()}._"
    return "\n".join(f"- `{_fmt_ms(t)}`" for t in times_ms)

# ---------- main function ----------
def display_video_analysis_result(
    analysis_result: Dict[str, Any],
    save_markdown_path: Optional[str] = None,
    max_markdown_chars: int = 2000,
    width: int = 100,
) -> None:
    """
    Pretty-prints Azure 'prebuilt-videoAnalyzer' result and optionally writes a
    Markdown report that includes analyzer markdown, Fields, segments, keyframes,
    camera shots, and transcript phrases.

    Parameters
    ----------
    analysis_result : dict
        The JSON-decoded response ("Video Analysis Result").
    save_markdown_path : str | None
        If provided, writes a Markdown report with analyzer markdown, fields, and extras.
    max_markdown_chars : int
        Truncate console preview of analyzer markdown to this many characters.
    width : int
        Wrap width for console output.
    """
    # Console summary
    print(_h("Content Understanding • Video Analysis Summary", width))
    print(_kv("Analysis ID", analysis_result.get("id")))
    print(_kv("Status", analysis_result.get("status")))

    result = (analysis_result or {}).get("result", {})
    usage = (analysis_result or {}).get("usage", {}) or {}
    tokens = usage.get("tokens", {}) if isinstance(usage, dict) else {}
    video_hours = usage.get("videoHours")

    print(_subh("Analyzer Info"))
    print(_kv("Analyzer ID", result.get("analyzerId")))
    print(_kv("API Version", result.get("apiVersion")))
    print(_kv("String Encoding", result.get("stringEncoding")))
    created_at = result.get("createdAt")
    try:
        created_at_local = (
            datetime.fromisoformat(created_at.replace("Z", "+00:00")).astimezone().isoformat()
            if created_at else None
        )
    except Exception:
        created_at_local = created_at
    print(_kv("Created At (UTC)", created_at))
    print(_kv("Created At (local)", created_at_local))
    warnings = result.get("warnings") or []
    print(_kv("Warnings", f"{len(warnings)}"))

    print(_subh("Video Metadata"))
    print(_kv("Start Time", _fmt_ms(result.get("startTimeMs"))))
    print(_kv("End Time", _fmt_ms(result.get("endTimeMs"))))
    print(_kv("Width", result.get("width")))
    print(_kv("Height", result.get("height")))

    print(_subh("Usage"))
    if video_hours is not None:
        print(_kv("Video Hours", video_hours))
    for k in ("contextualization", "input", "output"):
        if k in tokens:
            print(_kv(f"Tokens.{k}", tokens.get(k)))

    contents = result.get("contents") or []
    print(_subh(f"Contents ({len(contents)})"))

    # Prepare Markdown report parts (if requested)
    md_parts = []
    if save_markdown_path:
        md_parts.append("# Video Analysis Report\n")
        md_parts.append("## Summary\n")
        md_parts.append(f"- **Analysis ID:** `{analysis_result.get('id')}`")
        md_parts.append(f"- **Status:** `{analysis_result.get('status')}`")
        md_parts.append(f"- **Analyzer:** `{result.get('analyzerId')}`")
        md_parts.append(f"- **API Version:** `{result.get('apiVersion')}`")
        md_parts.append(f"- **String Encoding:** `{result.get('stringEncoding')}`")
        md_parts.append(f"- **Created At (UTC):** `{created_at}`")
        md_parts.append(f"- **Warnings:** `{len(warnings)}`")
        md_parts.append(f"- **Video Window:** `{_fmt_ms(result.get('startTimeMs'))} → {_fmt_ms(result.get('endTimeMs'))}`")
        md_parts.append(f"- **Resolution:** `{result.get('width')} x {result.get('height')}`\n")
        if video_hours is not None:
            md_parts.append(f"- **Video Hours:** `{video_hours}`")
        if tokens:
            md_parts.append("\n## Usage")
            for k in ("contextualization", "input", "output"):
                if k in tokens:
                    md_parts.append(f"- **Tokens.{k}:** `{tokens.get(k)}`")
        md_parts.append("\n## Contents\n")

    # Iterate content blocks
    for idx, item in enumerate(contents, start=1):
        kind = item.get("kind")
        fields = (item.get("fields") or {})
        md = item.get("markdown") or ""

        # Console view
        print(_hr())
        print(f"[Content #{idx}] kind={kind}")

        if fields:
            print("• Fields:")
            for fname, fval in fields.items():
                if isinstance(fval, dict):
                    ftype, v = _extract_value_from_fieldobj(fval)
                    if v is None: v = fval
                    print(_wrap_block(f"  - {fname} ({ftype}): {v}", width))
                else:
                    print(_wrap_block(f"  - {fname}: {fval}", width))

        preview = md.strip()
        preview_trunc = (preview[:max_markdown_chars] + " … [truncated]") if len(preview) > max_markdown_chars else preview
        print("• Analyzer Markdown Preview:")
        print(_wrap_block(preview_trunc, width))

        # Markdown report for this content
        if save_markdown_path:
            md_parts.append(f"### Content #{idx}")
            meta_bits = []
            if kind: meta_bits.append(f"**kind:** `{kind}`")
            if meta_bits:
                md_parts.append("> " + " • ".join(meta_bits))

            # Analyzer-provided markdown (exact)
            md_parts.append("#### Analyzer Markdown")
            md_parts.append(md if md.strip() else "_(empty)_")

            # Fields
            md_parts.append("#### Fields")
            md_parts.append(_fields_to_markdown(fields))

            md_parts.append("---")

    # Extras outside 'contents'
    keyframe_times = result.get("KeyFrameTimesMs") or []
    camera_shots = result.get("cameraShotTimesMs") or []
    plain_segments = result.get("segments") or []
    transcript_phrases = result.get("transcriptPhrases") or []

    # Console extras
    print(_subh("Key Frames"))
    if keyframe_times:
        print(_wrap_block(_times_to_markdown(keyframe_times, "Key Frames"), width))
    else:
        print("_No key frames._")
    print(_subh("Camera Shots"))
    if camera_shots:
        print(_wrap_block(_times_to_markdown(camera_shots, "Camera Shots"), width))
    else:
        print("_No camera shots._")
    print(_subh("Segments"))
    print(_wrap_block(_segments_to_markdown(plain_segments), width))
    print(_subh("Transcript Phrases (compact)"))
    print(_wrap_block(_phrases_to_markdown(transcript_phrases), width))

    # Markdown extras
    if save_markdown_path:
        md_parts.append("## Key Frames")
        md_parts.append(_times_to_markdown(keyframe_times, "Key Frames"))
        md_parts.append("\n## Camera Shots")
        md_parts.append(_times_to_markdown(camera_shots, "Camera Shots"))
        md_parts.append("\n## Segments")
        md_parts.append(_segments_to_markdown(plain_segments))
        md_parts.append("\n## Transcript Phrases")
        md_parts.append(_phrases_to_markdown(transcript_phrases))

        out_path = Path(save_markdown_path)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        report = "\n".join(md_parts).rstrip() + "\n"
        out_path.write_text(report, encoding="utf-8")
        print(_subh("Files"))
        print(_kv("Markdown saved", str(out_path.resolve())))

display_video_analysis_result(video_analysis_result, save_markdown_path="video_analysis.md")
