In [25]:
!pip install --q --upgrade langchain pdfplumber langgraph openai chromadb huggingface-hub langchain-experimental  \
    sentence-transformers langchain-community fastapi uvicorn pyngrok \
    httpx tiktoken openai-whisper pydub elevenlabs python-docx pandas tqdm

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.7/67.7 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.0/60.0 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m107.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m82.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# system packages for audio conversion
!apt-get update -y && apt-get install -y ffmpeg

Hit:1 https://cli.github.com/packages stable InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [83.6 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,157 kB]
Get:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:10 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 http://archive.ubuntu.com/ubuntu jammy-backpor

In [26]:
from pathlib import Path
from docx import Document
import pandas as pd
from tqdm.auto import tqdm
from langchain_community.document_loaders import PDFPlumberLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
import os
import json
import sqlite3
import time
import re
import base64
import logging
from io import BytesIO
from typing import TypedDict, Optional, List, Dict, Any
from langgraph.graph import StateGraph, END
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from openai import OpenAI
import whisper
from tiktoken import encoding_for_model

In [28]:
logging.getLogger("pdfminer").setLevel(logging.ERROR)

In [17]:
def load_docx(path: str):
    doc = Document(path)
    paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()]

    tables = []
    for table in doc.tables:
        rows = []
        for r in table.rows:
            row = [c.text.strip() for c in r.cells]
            rows.append(row)
        df = pd.DataFrame(rows)
        tables.append(df)
    return paragraphs, tables

# Convert tables to semantic text
def table_to_text(df: pd.DataFrame) -> str:
    lines = []
    cols = list(df.columns)
    for _, row in df.iterrows():
        pairs = []
        for i, val in enumerate(row):
            header = cols[i] if i < len(cols) else f"Column{i+1}"
            val = str(val).strip()
            if val:
                pairs.append(f"{header}: {val}")
        if pairs:
            lines.append(" | ".join(pairs))
    return "\n".join(lines)

def combine_text(paragraphs, table_texts):
    # Combine paragraphs and tables
    all_texts = ""
    for i, p in enumerate(paragraphs):
        all_texts += p + "\n"
    for i, t in enumerate(table_texts):
        all_texts += f"\nTable {i+1}:\n{t}"

In [29]:
# ========== Load PDFs ==========
pdf_files = ["/content/Absher_UserGuide.pdf", "/content/الأمن+العام+-+دليل+خدمة+إصدار+رخصة+القيادة.pdf",
             "/content/خدمة+إصدار+جواز+السفر.pdf", "/content/خدمة+تجديد+الهوية+الوطنية.pdf"]
docs = []

for pdf in pdf_files:
    loader = PDFPlumberLoader(pdf)
    docs.extend(loader.load())

print("Loaded pages:", len(docs))

Loaded pages: 100


In [30]:
# ========== Split text ==========
splitter = RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=200)
chunks = splitter.split_documents(docs)
print("Total chunks:", len(chunks))

Total chunks: 112


In [None]:
logger = logging.getLogger("muttamm_agent")
logger.setLevel(logging.INFO)
logging.getLogger("uvicorn").setLevel(logging.WARNING)

# ========== CONFIG ==========
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY", "")
ELEVENLABS_VOICE_ID = os.environ.get("ELEVENLABS_VOICE_ID", "")

os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

In [None]:
# ========== Models & Clients ==========
openai_client = OpenAI(api_key=OPENAI_API_KEY)

# Embeddings
embedding_model = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large")

# Whisper
try:
    whisper_model = whisper.load_model("large-v3")
except Exception as e:
    logger.warning("Failed to load whisper large-v3 locally; try 'large' or use OpenAI STT API. Error: %s", e)
    whisper_model = whisper.load_model("large")

  embedding_model = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large")
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
# Vector database
persist_dir = "./chroma_db"

# Convert chunks into Document objects
docs = [Document(page_content=text, metadata={"chunk_id": i}) for i, text in enumerate(chunks)]

vectordb = Chroma.from_documents(docs, embedding_model, persist_directory=persist_dir)

In [None]:
# ========== Tools: SQL retrievals & Actions ==========
def _call_tool_use_llm(context: str, user_message: str, order_number: Optional[str], tool_options: Dict[str, str]) -> str:
    """
    Uses the LLM to decide which tool function to execute based on user context.
    Returns the name of the function to be called.
    """

    # Format the tool options for the LLM
    tool_descriptions = "\n".join([f" - {name}: {description}" for name, description in tool_options.items()])

    prompt_for_call = (
        "You are a function-calling expert for Muttamm, the Absher Intelligent Agent.\n"
        "Your job is to analyze the user's message and select **one** function that best helps "
        "with a government service task.\n"
        "\n"
        "Guidelines:\n"
        "- Choose ONLY the single most relevant function.\n"
        "- Return ONLY the function name (e.g., check_passport_status)\n"
        "- DO NOT return arguments, explanations, or quotes.\n"
        "- If no available function fits the user request, respond with: NO_TOOL_NEEDED.\n"
        "\n"
        "Available Functions:\n"
        f"{tool_descriptions}\n"
        "\n"
        f"Request ID (if any): {request_id}\n"
        f"Conversation Context:\n{context}\n\n"
        f"User Message:\n{user_message}\n\n"
        "Return the function name to call:"
    )

    try:
        resp = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role":"user","content": prompt_for_call}],
            temperature=0.0,
            max_tokens=50
        )
        return resp.choices[0].message.content.strip().replace('"', '').replace("'", "")
    except Exception as e:
        logger.warning(f"Tool LLM decision failed: {e}")
        return "NO_TOOL_NEEDED"

In [None]:
# ========== Knowledge Retriever ==========
def knowledge_retriever(query: str, k: int = 10) -> List[str]:
    global vectordb
    if vectordb is None:
        return []
    try:
        results = vectordb.similarity_search(query, k=k)
        return [r.page_content for r in results] if results else []
    except Exception as e:
        logger.warning("Chroma retrieval error: %s", e)
        return []

# ========== Vision analysis helper  ==========
def gpt4o_image_analyze(image_bytes: bytes, caption: str = "") -> str:
    b64 = base64.b64encode(image_bytes).decode("utf-8")
    data_uri = f"data:image/jpeg;base64,{b64}"
    prompt_text = (
        "You are an assistant that analyzes user-submitted images related to Absher services (e.g., ID cards, passports, residency permits, official documents, supporting files, or photos relevant to a service request)."
            "Your duties:"
            "1. Provide a clear general description of the image or document."
            "2. Identify whether the document appears complete and readable (no missing sections, no major obstructions, no severe blur)."
            "3. Extract any visible textual fields relevant to Absher workflows (e.g., ID number, passport number, name, expiry dates, residency number, reference numbers, etc.)."
            "4. Detect any potential issues unrelated to technical specifications (e.g., cropped information, illegible text, obstructed fields, mismatched document type)."
            "5. Assess whether the document seems appropriate for the intended service."
            "6. Output:"
              "- A concise English summary."
              "- A structured JSON object with the following keys:"
                "- summary"
                "- issues_found"
                "- issues_details"
                "- extracted_fields"
                "- approval_status (ready / needs_fix / unusable)"
                "- confidence (0–1)"
            f"Caption: {caption}"
    )
    resp = openai_client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You analyze images and produce structured outputs."},
            {"role": "user", "content": [
                {"type":"text","text": prompt_text},
                {"type":"image_url","image_url": {"url": data_uri}}
            ]}
        ],
        temperature=0.0,
    )
    txt = resp.choices[0].message.content.strip()
    return txt

# ========== Speech (Whisper) ==========
def transcribe_audio_bytes(audio_bytes: bytes) -> str:
    tmp_path = "/content/temp_audio.ogg"
    with open(tmp_path, "wb") as f:
        f.write(audio_bytes)
    try:
        res = whisper_model.transcribe(tmp_path, language="ar")
        return res.get("text","").strip()
    except Exception as e:
        logger.warning("Whisper local transcribe failed: %s", e)
        # fallback try without forcing language
        res = whisper_model.transcribe(tmp_path)
        return res.get("text","").strip()

# ========== ElevenLabs TTS ==========
async def tts_elevenlabs_arabic(text: str) -> bytes:
    url = f"https://api.elevenlabs.io/v1/text-to-speech/{ELEVENLABS_VOICE_ID}"
    headers = {"xi-api-key": ELEVENLABS_API_KEY, "Content-Type": "application/json"}
    payload = {"text": text, "voice_settings": {"stability":0.6,"similarity_boost":0.6}}
    async with httpx.AsyncClient() as client:
        r = await client.post(url, headers=headers, json=payload, timeout=120)
        r.raise_for_status()
        return r.content

In [None]:
# ========== Prompts ==========
EMOTION_PROMPT = """You are a short Arabic emotion classifier specialized on Saudi dialect.
Given this user message (Arabic), answer with one word: angry, frustrated, neutral, satisfied, grateful.
answer angry only if the user is very very angry, given an anger score of 4/5 or 5/5, and wait a bit for how the conversation turns out before answering angry.
Only return the word."""

INTENT_PROMPT = """You are a classifier for Absher-related government services.
Given the Arabic (Saudi dialect) message, classify it into exactly one of:

[الأحوال المدنية, الجوازات, المرور, شؤون الوافدين]

Choose the category that best matches the user’s request, even if indirect.
Return only the category name.
"""

RESPONSE_PROMPT_TEMPLATE = """
You are Muttamm, a smart assistant that helps users navigate Saudi government services covering four main categories
(الأحوال المدنية، الجوازات، المرور، شؤون الوافدين).

Guidelines:
- Always respond in clear Saudi dialect.
- Provide direct, practical steps that match the user’s request.
- Use information from `context` naturally without sounding robotic.
- If an image was analyzed, acknowledge what was detected and connect it to the user’s need.
- Avoid unnecessary apologies; be calm and reassuring.
- If the user is angry or frustrated, begin with empathy and then guide them clearly.
- If you asked for missing info earlier, integrate the user's reply smoothly and continue the workflow.
- Escalate to a human officer only when required and after exhausting your available steps.
- Keep responses short (2–4 natural sentences), helpful, and fit the tone of government service support.

Context:
{context}

User message:
{user_message}

Emotion: {emotion}
Detected Category: {intent}

Produce the final answer in Saudi Arabic.
"""

In [None]:
# ---------------- State schema ----------------
class MuttammState(TypedDict):
    user_id: str
    user_first_message: Optional[str]
    user_current_message: Optional[str]
    chat_history: List[Dict[str,str]]
    message_type: Optional[str]
    extracted_text: Optional[str]
    emotion: Optional[str]
    intent: Optional[str]
    service_name: Optional[str]
    collected_info: Dict[str, Any]
    pending_task: Optional[str]
    sql_result: Optional[Any]
    retrieved_context: Optional[List[str]]
    escalate: bool
    unsolved_count: int
    final_response: Optional[str]
    voice_bytes: Optional[bytes]
    image_bytes: Optional[bytes]
    image_caption: Optional[str]
    reply_with_voice: bool

In [None]:
# ---------------- LangGraph nodes ----------------
def detect_input_type_node(state: MuttammState) -> MuttammState:
    updates = {}

    mtype = state.get("message_type")

    # ALWAYS reset voice reply flag
    updates["reply_with_voice"] = (mtype == "voice")
    if mtype == "text":
        updates["extracted_text"] = state.get("user_current_message","")
    return updates

def speech_to_text_node(state: MuttammState) -> MuttammState:
    updates = {}
    if state.get("message_type") == "voice" and state.get("voice_bytes"):
        updates["reply_with_voice"] = True
        text = transcribe_audio_bytes(state["voice_bytes"])
        updates["extracted_text"] = text
        new_history = state["chat_history"] + [{"role":"agent","content": text}]
        updates["chat_history"] = new_history
        updates["message_type"] = "text"
    return updates

def vision_analyzer_node(state: MuttammState) -> MuttammState:
    updates = {}
    if state.get("message_type") == "image" and state.get("image_bytes"):
        try:
            desc = gpt4o_image_analyze(state["image_bytes"], caption=state.get("image_caption",""))
            updates["extracted_text"] = desc
            new_history = state["chat_history"] + [{"role":"agent","content": f"[image_analysis]{desc}"}]
            updates["chat_history"] = new_history
            updates["message_type"] = "text"
        except Exception as e:
            logger.warning("vision analyze error: %s", e)
    return updates

def emotion_detection_node(state: MuttammState) -> MuttammState:
    updates = {}
    txt = state.get("extracted_text","")
    if not txt:
        return updates
    try:
        resp = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role":"system","content":"You are a concise classifier."},
                      {"role":"user","content": EMOTION_PROMPT + "\n\nMessage:\n" + txt}],
            temperature=0.0
        )
        out = resp.choices[0].message.content.strip().lower()
    except Exception as e:
        logger.warning("Emotion LLM error: %s", e)
        out = ""
    if out not in ["angry","frustrated","neutral","satisfied","grateful"]:
        if any(k in txt for k in ["غضب","غاضب","مغتاظ","مستاء"]):
            out = "angry"
        else:
            out = "neutral"
    updates["emotion"] = out
    return updates

def pre_escalation_check_node(state: MuttammState) -> MuttammState:
    updates = {}
    if state.get("emotion") == "angry":
        updates["escalate"] = True
    return updates

def intent_classification_node(state: MuttammState) -> MuttammState:
    updates = {}
    txt = state.get("extracted_text","")
    if not txt:
        return updates
    try:
        resp = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role":"system","content":"You are a short intent classifier."},
                      {"role":"user","content": INTENT_PROMPT + "\n\nMessage:\n" + txt}],
            temperature=0.0
        )
        intent = resp.choices[0].message.content.strip()
    except Exception as e:
        logger.warning("Intent LLM error: %s", e)
        intent = ""
    possible = ["الأحوال المدنية", "الجوازات", "المرور", "شؤون الوافدين"]

    if intent not in possible:
        t = txt.lower()

        # الأحوال المدنية – Civil Affairs
        if any(w in t for w in [
            "هوية", "بطاقة", "سجل", "أسرة", "بدل فاقد", "بدل تالف",
            "مولود", "وفاة", "تعديل مهنة", "تحديث", "الأحوال"
        ]):
            intent = "الأحوال المدنية"

        # الجوازات – Passports
        elif any(w in t for w in [
            "جواز", "جوازات", "تجديد جواز", "خروج وعودة", "تمديد",
            "هوية مقيم", "الإقامة", "نقل خدمات", "تأشيرة"
        ]):
            intent = "الجوازات"

        # المرور – Traffic
        elif any(w in t for w in [
            "رخصة", "قيادة", "استمارة", "تفويض", "حادث", "مخالفة",
            "المرور", "نقل ملكية", "تجديد استمارة"
        ]):
            intent = "المرور"

        # شؤون الوافدين – Expatriate Affairs
        elif any(w in t for w in [
            "وافد", "كفيل", "نقل كفالة", "بلاغ هروب", "تأشيرة عمل",
            "تعديل مهنة عامل", "إلغاء هروب"
        ]):
            intent = "شؤون الوافدين"

        # Default fallback
        else:
            intent = "الأحوال المدنية"
        updates["intent"] = intent
        return updates

def router_node(state: MuttammState) -> MuttammState:
    updates = {}
    if state.get("escalate"):
        updates["pending_task"] = "escalate"
        return updates
    updates["pending_task"] = state.get("intent","Other")
    return updates

# Sub-agent nodes
def ahwal_subagent_node(state: MuttammState) -> MuttammState:
    updates = {}
    text = state.get("extracted_text", "")
    service = detect_specific_service(text)

    # 1. Ask for missing required info
    missing = required_info_missing("ahwal", service, state["collected_info"])
    if missing:
        new_history = state["chat_history"] + [{
            "role": "agent",
            "content": f"علشان أقدر أساعدك بدقة، احتاج منك: {missing}"
        }]
        updates["chat_history"] = new_history
        updates["pending_task"] = "await_missing_info"
        return updates

    # 2. Define available tools for الأحوال المدنية
    ahwal_tools = {
        "get_id_expiry": "Retrieves ID expiration date.",
        "get_id_status": "Retrieves the national ID validity/status.",
        "get_ahwal_requirements": "Retrieves official requirements for this service."
    }

    # 3. LLM picks best tool
    context = "\n".join([f"{m['role']}: {m['content']}"
                         for m in state["chat_history"][-4:]])
    tool_name = _call_tool_use_llm(context, text, service, ahwal_tools)

    # 4. Execute tool
    try:
        if tool_name == "get_id_expiry":
            data = get_id_expiry(state["user_id"])
        elif tool_name == "get_id_status":
            data = get_id_status(state["user_id"])
        else:
            data = get_ahwal_requirements(service)
    except Exception as e:
        data = {"error": f"Error executing tool: {str(e)}"}

    updates["sql_result"] = data

    # Update history
    updates["chat_history"] = state["chat_history"] + [{
        "role": "agent",
        "content": "أتحقق لك الآن من البيانات"
    }]
    return updates


def jawazat_subagent_node(state: MuttammState) -> MuttammState:
    updates = {}
    text = state.get("extracted_text", "")
    service = detect_specific_service(text)

    missing = required_info_missing("jawazat", service, state["collected_info"])
    if missing:
        updates["chat_history"] = state["chat_history"] + [{
            "role": "agent",
            "content": f"ممتاز، بس قبل أكمل احتاج منك: {missing}"
        }]
        updates["pending_task"] = "await_missing_info"
        return updates

    jawazat_tools = {
        "get_passport_expiry": "Returns passport expiration date.",
        "get_passport_status": "Returns passport validity & travel eligibility.",
        "get_jawazat_requirements": "Steps & requirements for the selected service."
    }

    context = "\n".join([f"{m['role']}: {m['content']}"
                         for m in state["chat_history"][-4:]])
    tool_name = _call_tool_use_llm(context, text, service, jawazat_tools)

    try:
        if tool_name == "get_passport_expiry":
            data = get_passport_expiry(state["user_id"])
        elif tool_name == "get_passport_status":
            data = get_passport_status(state["user_id"])
        else:
            data = get_jawazat_requirements(service)
    except Exception as e:
        data = {"error": f"Error executing tool: {str(e)}"}

    updates["sql_result"] = data

    updates["chat_history"] = state["chat_history"] + [{
        "role": "agent",
        "content": "أشيّك على بيانات الجواز الحين…"
    }]
    return updates


def murur_subagent_node(state: MuttammState) -> MuttammState:
    updates = {}
    text = state.get("extracted_text", "")
    service = detect_specific_service(text)

    missing = required_info_missing("murur", service, state["collected_info"])
    if missing:
        updates["chat_history"] = state["chat_history"] + [{
            "role": "agent",
            "content": f"بس نحتاج منك هذي المعلومة أول: {missing}"
        }]
        updates["pending_task"] = "await_missing_info"
        return updates

    murur_tools = {
        "get_license_status": "Returns driver's license validity and expiry.",
        "get_vehicle_info": "Returns vehicle ownership and expiration info.",
        "get_violations": "Lists traffic violations with details."
    }

    context = "\n".join([f"{m['role']}: {m['content']}"
                         for m in state["chat_history"][-4:]])
    tool_name = _call_tool_use_llm(context, text, service, murur_tools)

    try:
        if tool_name == "get_license_status":
            data = get_license_status(state["user_id"])
        elif tool_name == "get_vehicle_info":
            data = get_vehicle_info(state["user_id"])
        else:
            data = get_violations(state["user_id"])
    except Exception as e:
        data = {"error": f"Error executing tool: {str(e)}"}

    updates["sql_result"] = data
    updates["chat_history"] = state["chat_history"] + [{
        "role": "agent",
        "content": "ثواني أشيّك لك على بيانات المرور…"
    }]
    return updates


def wafeedin_subagent_node(state: MuttammState) -> MuttammState:
    updates = {}
    text = state.get("extracted_text", "")
    service = detect_specific_service(text)

    missing = required_info_missing("wafeedin", service, state["collected_info"])
    if missing:
        updates["chat_history"] = state["chat_history"] + [{
            "role": "agent",
            "content": f" {missing} تمام، احتاج عشان أكمل لك."
        }]
        updates["pending_task"] = "await_missing_info"
        return updates

    wafeedin_tools = {
        "get_iqama_status": "Returns iqama expiry and validity.",
        "get_dependents": "Returns dependents linked to the user.",
        "get_exit_reentry_status": "Checks exit/re-entry visa validity."
    }

    context = "\n".join([f"{m['role']}: {m['content']}"
                         for m in state["chat_history"][-4:]])

    tool_name = _call_tool_use_llm(context, text, service, wafeedin_tools)

    try:
        if tool_name == "get_iqama_status":
            data = get_iqama_status(state["user_id"])
        elif tool_name == "get_dependents":
            data = get_dependents(state["user_id"])
        else:
            data = get_exit_reentry_status(state["user_id"])
    except Exception as e:
        data = {"error": f"Error executing tool: {str(e)}"}

    updates["sql_result"] = data
    updates["chat_history"] = state["chat_history"] + [{
        "role": "agent",
        "content": "أشيّك لك على بيانات الإقامة…"
    }]
    return updates


def other_subagent_node(state: MuttammState) -> MuttammState:
    updates = {}
    text = state.get("extracted_text", "")

    kr = knowledge_retriever(text, k=10)
    if not kr:
        updates["chat_history"] = state["chat_history"] + [{
            "role": "agent",
            "content": "تقدر توضّح لي سؤالك أكثر؟"
        }]
        updates["pending_task"] = "await_clarification"
        return updates

    updates["retrieved_context"] = kr
    return updates

def post_handler_node(state: MuttammState) -> MuttammState:
    updates = {}
    if state.get("escalate"):
        return updates
    sql = state.get("sql_result")
    resolved = False
    if sql:
        if (isinstance(sql, dict) and len(sql)>0) or (isinstance(sql, list) and len(sql)>0):
            resolved = True
    if not resolved:
        updates["unsolved_count"] = state.get("unsolved_count",0) + 1
    if state.get("unsolved_count",0) >= 30:
        updates["escalate"] = True
    return updates

def response_generator_node(state: MuttammState) -> MuttammState:
    updates = {}
    if state.get("escalate"):
        final_response = "حسنًا، سأحوّل محادثتك الآن إلى موظف خدمة العملاء لمتابعة المشكلة. يرجى الانتظار لحظة."
        updates["final_response"] = final_response
        new_history = state["chat_history"] + [{"role":"agent","content": final_response}]
        updates["chat_history"] = new_history
        return updates
    context_parts = []
    if state.get("sql_result"):
        context_parts.append("SQL results: " + json.dumps(state["sql_result"], ensure_ascii=False))
    if state.get("retrieved_context"):
        context_parts.append("Knowledge: " + " || ".join(state["retrieved_context"]))
    recent = "\n".join([f"{m['role']}: {m['content']}" for m in state.get("chat_history",[])[-6:]])
    context = "\n".join(context_parts + [recent])
    prompt = RESPONSE_PROMPT_TEMPLATE.format(context=context, user_message=state.get("extracted_text",""), emotion=state.get("emotion",""), intent=state.get("intent",""))
    try:
        resp = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role":"user","content": prompt}],
            temperature=0.3,
            max_tokens=500
        )
        out = resp.choices[0].message.content.strip()
    except Exception as e:
        logger.warning("Response generation failed: %s", e)
        out = "عذرًا، حدث خطأ أثناء تجهيز الرد. سأحوّل المحادثة لموظف خدمة العملاء."
        updates["escalate"] = True
    updates["final_response"] = out
    new_history = state["chat_history"] + [{"role":"agent","content": out}]
    updates["chat_history"] = new_history

    return updates

def task_router(state):
    task = state.get("pending_task")
    return task

In [None]:
# ---------------- Build LangGraph with correct conditional routing ----------------
workflow = StateGraph(MuttammState)

# Core perception & understanding nodes
workflow.add_node("detect_input_type", detect_input_type_node)
workflow.add_node("speech_to_text", speech_to_text_node)
workflow.add_node("vision_analyzer", vision_analyzer_node)
workflow.add_node("emotion_detection", emotion_detection_node)
workflow.add_node("pre_escalation_check", pre_escalation_check_node)
workflow.add_node("intent_classification", intent_classification_node)

# Main router node
workflow.add_node("router", router_node)

# Sub-agents
workflow.add_node("ahwal_agent", ahwal_subagent_node)
workflow.add_node("jawazat_agent", jawazat_subagent_node)
workflow.add_node("murur_agent", murur_subagent_node)
workflow.add_node("wafeedin_agent", wafeedin_subagent_node)
workflow.add_node("other_agent", other_subagent_node)

# Post handler & response
workflow.add_node("post_handler", post_handler_node)
workflow.add_node("response_generator", response_generator_node)


# ---------------- Detect input type router ----------------
def detect_input_router(state: MuttammState):
    t = state.get("message_type")
    if t == "voice":
        return "voice"
    if t == "image":
        return "image"
    return "text"


workflow.add_conditional_edges(
    "detect_input_type",
    detect_input_router,
    {
        "voice": "speech_to_text",
        "image": "vision_analyzer",
        "text": "emotion_detection",
    }
)

workflow.add_edge("speech_to_text", "emotion_detection")
workflow.add_edge("vision_analyzer", "emotion_detection")
workflow.add_edge("emotion_detection", "pre_escalation_check")
workflow.add_edge("pre_escalation_check", "intent_classification")
workflow.add_edge("intent_classification", "router")


# ---------------- Main router for Muttamm ----------------
workflow.add_conditional_edges(
    "router",
    task_router,
    {
        "أحوال مدنية": "ahwal_agent",
        "جوازات": "jawazat_agent",
        "مرور": "murur_agent",
        "شؤون وافدين": "wafeedin_agent",
        "Other": "other_agent",
        "escalate": "response_generator",
    }
)


# ---------------- Sub-agent → Post handler ----------------
workflow.add_edge("ahwal_agent", "post_handler")
workflow.add_edge("jawazat_agent", "post_handler")
workflow.add_edge("murur_agent", "post_handler")
workflow.add_edge("wafeedin_agent", "post_handler")
workflow.add_edge("other_agent", "post_handler")


# ---------------- Post handler → Response generator ----------------
workflow.add_edge("post_handler", "response_generator")
workflow.add_edge("response_generator", END)


# ---------------- Entry point ----------------
workflow.set_entry_point("detect_input_type")
agent_app = workflow.compile()
