In [8]:
import json
import operator
from typing import Annotated, Any, Dict, List, Optional, Union, TypedDict

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, BaseMessage
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END
from pydantic import BaseModel, Field

# Initialize LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0)

In [9]:
def get_category_status(data: Dict) -> Dict[str, Any]:
    """
    Scans top-level keys. Returns the first category that has missing required fields.
    Returns: {
        "category_key": str, 
        "missing_fields": List[Dict]  # Contains key, type, path
    }
    Or None if everything is full.
    """
    for category_key, content in data.items():
        if not isinstance(content, dict):
            continue
            
        missing_items = []
        
        # Helper to scan recursively within the category
        def scan_recursive(sub_data, path):
            for k, v in sub_data.items():
                current_path = path + [k]
                if isinstance(v, dict) and "valeur" in v and "requis" in v:
                    # Check if empty
                    if v["requis"] and v["valeur"] in [None, "", []]:
                        missing_items.append({
                            "key": k,
                            "path": current_path,
                            "type": v.get("type"),
                            "options": v.get("options")
                        })
                elif isinstance(v, dict):
                    scan_recursive(v, current_path)
        
        scan_recursive(content, [category_key])
        
        if missing_items:
            return {"category_key": category_key, "missing_fields": missing_items}
            
    return None

def update_json_value(data: Dict, path: List[str], new_value: Any):
    """Updates the JSON structure in-place."""
    ref = data
    for key in path[:-1]:
        ref = ref[key]
    ref[path[-1]]["valeur"] = new_value

In [10]:
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    contract_data: Dict
    current_category_info: Optional[Dict] # Info about the active category
    validation_error: Optional[str]       # Message if previous input was incoherent
    completed: bool

# --- SCHEMA FOR EXTRACTION ---
# This allows the LLM to return multiple updates + validation flags
class FieldExtraction(BaseModel):
    field_key: str = Field(description="The exact JSON key of the field being filled")
    value: Union[str, int, float, bool] = Field(description="The extracted value")
    is_coherent: bool = Field(description="True if the value makes logical sense. False if it seems wrong (e.g. negative rent).")
    correction_reason: Optional[str] = Field(description="If not coherent, explain why (e.g. 'Rent cannot be negative')")

class MultiFieldExtraction(BaseModel):
    extractions: List[FieldExtraction]
    general_comment: Optional[str]

In [11]:
# --- NODE 1: SELECT NEXT GOAL ---
def analyze_progress(state: AgentState):
    data = state["contract_data"]
    status = get_category_status(data)
    
    if not status:
        return {"completed": True, "current_category_info": None}
    
    return {
        "completed": False, 
        "current_category_info": status,
        # We keep validation_error if it exists from the previous turn, 
        # otherwise it clears naturally if we switch categories
    }

# --- NODE 2: GENERATE GROUPED QUESTION ---
def generate_grouped_question(state: AgentState):
    cat_info = state["current_category_info"]
    missing = cat_info["missing_fields"]
    error_msg = state.get("validation_error")
    
    # Create a list of what is needed
    needed_str = "\n".join([f"- {m['key']} (Type: {m['type']})" for m in missing])
    
    category_name = cat_info["category_key"]
    
    system_prompt = f"""
    You are a real estate legal assistant. 
    Category currently being filled: '{category_name}'.
    
    MISSING FIELDS:
    {needed_str}
    """
    
    if error_msg:
        # If there was a mistake previously, prioritize asking for correction
        system_prompt += f"\n\n‚ö†Ô∏è CONTEXT - PREVIOUS ERROR: {error_msg}\nAsk the user to clarify or correct the value."
    else:
        system_prompt += "\n\nAsk the user for these details in a single, natural, polite message in French. Group the questions logically."

    msg = llm.invoke([SystemMessage(content=system_prompt)])
    return {"messages": [msg]}

# --- NODE 3: PROCESS & VALIDATE INPUT ---
def process_grouped_answer(state: AgentState):
    last_msg = state["messages"][-1]
    cat_info = state["current_category_info"]
    
    if not cat_info:
        return {}

    # Provide context of what we are looking for to the Extractor
    missing_keys = [m["key"] for m in cat_info["missing_fields"]]
    
    extraction_prompt = f"""
    The user replied: "{last_msg.content}"
    
    We are looking for values for these keys: {missing_keys} within the category '{cat_info['category_key']}'.
    
    1. Extract values for any keys mentioned.
    2. VALIDATE: Check if the value makes sense (e.g., surface area > 0, valid email format).
    3. If a value is incoherent, set is_coherent=False and explain why in correction_reason.
    """
    
    # Use the MultiFieldExtraction schema with function calling
    structured_llm = llm.with_structured_output(MultiFieldExtraction, method="function_calling")
    result = structured_llm.invoke(extraction_prompt)
    
    current_data = state["contract_data"]
    new_validation_error = None
    
    # Process results
    if result and result.extractions:
        for item in result.extractions:
            if item.is_coherent:
                # Find the path for this key to update the JSON
                # We look up the path in our missing_fields list
                match = next((m for m in cat_info["missing_fields"] if m["key"] == item.field_key), None)
                if match:
                    update_json_value(current_data, match["path"], item.value)
            else:
                # Logic for incoherence
                new_validation_error = f"User provided value '{item.value}' for '{item.field_key}' but it was flagged: {item.correction_reason}"
    
    # If the LLM extracted nothing but the user spoke, we might need a fallback, 
    # but for this logic, we'll just loop back.
    
    return {
        "contract_data": current_data,
        "validation_error": new_validation_error
    }

def save_contract(state: AgentState):
    with open("contrat_finalise.json", "w", encoding="utf-8") as f:
        json.dump(state["contract_data"], f, ensure_ascii=False, indent=2)
    return {"messages": [AIMessage(content="‚úÖ Formidable ! Toutes les sections sont compl√®tes. Le contrat est sauvegard√©.")]}

In [12]:
workflow = StateGraph(AgentState)

workflow.add_node("analyze_progress", analyze_progress)
workflow.add_node("generate_grouped_question", generate_grouped_question)
workflow.add_node("process_grouped_answer", process_grouped_answer)
workflow.add_node("save_contract", save_contract)

# --- EDGES ---

def route_start(state: AgentState):
    # If last message is human, process it
    if state["messages"] and isinstance(state["messages"][-1], HumanMessage):
        return "process_grouped_answer"
    return "analyze_progress"

def route_after_analysis(state: AgentState):
    if state["completed"]:
        return "save_contract"
    return "generate_grouped_question"

workflow.set_conditional_entry_point(
    route_start,
    {
        "process_grouped_answer": "process_grouped_answer", 
        "analyze_progress": "analyze_progress"
    }
)

workflow.add_edge("process_grouped_answer", "analyze_progress")
workflow.add_conditional_edges(
    "analyze_progress",
    route_after_analysis,
    {
        "save_contract": "save_contract", 
        "generate_grouped_question": "generate_grouped_question"
    }
)
workflow.add_edge("generate_grouped_question", END)
workflow.add_edge("save_contract", END)

app = workflow.compile()

In [13]:
# Mock Template with Sections
initial_contract_template = {
    "proprietaire": {
        "nom": {"valeur": None, "requis": True, "type": "texte"},
        "prenom": {"valeur": None, "requis": True, "type": "texte"},
        "ville": {"valeur": None, "requis": True, "type": "texte"}
    },
    "logement": {
        "adresse_bien": {"valeur": None, "requis": True, "type": "texte"},
        "surface_m2": {"valeur": None, "requis": True, "type": "nombre"},
        "loyer_mensuel": {"valeur": None, "requis": True, "type": "nombre"}
    }
}

def run_smart_session():
    print("--- üè¢ Assistant Intelligent (Mode Group√© & Validation) ---")
    
    current_state = {
        "messages": [],
        "contract_data": initial_contract_template,
        "current_category_info": None,
        "validation_error": None,
        "completed": False
    }

    while True:
        events = app.invoke(current_state)
        current_state = events
        
        last_message = current_state["messages"][-1]
        print(f"\nü§ñ Assistant: {last_message.content}")

        if current_state.get("completed", False):
            break
        
        user_input = input("\nüë§ Vous: ")
        if user_input.lower() in ["quit", "q"]:
            break
            
        current_state["messages"].append(HumanMessage(content=user_input))

# Run it
run_smart_session()

--- üè¢ Assistant Intelligent (Mode Group√© & Validation) ---

ü§ñ Assistant: Pour compl√©ter le dossier du propri√©taire, pourriez-vous s'il vous pla√Æt me fournir les informations suivantes : 

- Votre nom
- Votre pr√©nom
- La ville o√π vous r√©sidez

Merci beaucoup pour votre coop√©ration !

ü§ñ Assistant: Pour compl√©ter le dossier concernant le logement, pourriez-vous s'il vous pla√Æt me fournir quelques informations suppl√©mentaires ? J'aurais besoin de conna√Ætre l'adresse compl√®te du bien. De plus, pourriez-vous m'indiquer la surface en m√®tres carr√©s ainsi que le montant du loyer mensuel ? Merci beaucoup pour votre coop√©ration.

ü§ñ Assistant: Pour compl√©ter le dossier concernant le logement, pourriez-vous s'il vous pla√Æt me fournir l'adresse compl√®te du bien immobilier ? Cela inclut le num√©ro, la rue, le code postal et la ville. Merci d'avance pour votre coop√©ration.

ü§ñ Assistant: ‚úÖ Formidable ! Toutes les sections sont compl√®tes. Le contrat est sauvegard√©.