In [1]:
import time
import os
from functools import lru_cache, partial
import json
import json5
import random
import re
import asyncio
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
from pydantic import BaseModel, Field
from typing import TypedDict
from IPython.display import Image
from operator import itemgetter
from sqlglot import exp, parse_one
import sqlite3

from langchain_core.messages.tool import ToolCall
from langchain_core.messages import (
    AnyMessage,
    SystemMessage,
    HumanMessage,
    AIMessage,
)

from tqdm.asyncio import tqdm_asyncio

from langchain_openai import ChatOpenAI
from langchain_core.language_models import BaseChatModel
from langchain_community.embeddings import InfinityEmbeddings
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain.tools import tool
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, Runnable
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.graph.state import CompiledStateGraph

from dotenv import load_dotenv

load_dotenv()

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


True

# Setup components

In [2]:
LLM_BASE_URL=os.getenv("LLM_BASE_URL")
LLM_MODEL=os.getenv("LLM_MODEL")
LLM_API_KEY=os.getenv("LLM_API_KEY")

EMBED_BASE_URL=os.getenv("EMBED_BASE_URL")
EMBED_MODEL=os.getenv("EMBED_MODEL")


@lru_cache()
def get_llm_model():
    return ChatOpenAI(
        model=LLM_MODEL,
        base_url=LLM_BASE_URL,
        api_key=LLM_API_KEY,
        temperature=0.7,
        top_p=0.8,
        presence_penalty=1,
        extra_body = {
            'chat_template_kwargs': {'enable_thinking': False},
            "top_k": 20,
            "mip_p": 0,
        },
    )

@lru_cache()
def get_thinking_llm_model():
    return ChatOpenAI(
        model=LLM_MODEL,
        base_url=LLM_BASE_URL,
        api_key=LLM_API_KEY,
        temperature=0.6,
        top_p=0.95,
        presence_penalty=1,
        extra_body = {
            'chat_template_kwargs': {'enable_thinking': True},
            "top_k": 20,
            "mip_p": 0,
        },
    )

@lru_cache()
def get_embedding_model():
    return InfinityEmbeddings(
        model=EMBED_MODEL,
        infinity_api_url=EMBED_BASE_URL,
    )


@lru_cache()
def get_vector_store():
    client = QdrantClient(
        url="http://localhost",
        grpc_port=6334,
        prefer_grpc=True,
    )
    embedding_model = get_embedding_model()
    client.create_collection(
        collection_name="demo",
        vectors_config=VectorParams(
            size=len(embedding_model.embed_query("Hello")), 
            distance=Distance.COSINE
        ),
    )
    return QdrantVectorStore(
        client=client,
        collection_name="demo",
        embedding=embedding_model,
    )


# @lru_cache()
# def get_sqlite_db():
#     return SQLDatabase.from_uri("sqlite:////Users/vinhnguyen/Projects/ext-chatbot/resources/database/batdongsan.db")


# Process data

## Excel

In [108]:
# import glob
# from src.tools.table import create_sqlite, create_faiss

In [109]:
# tables = []
# for filepath in glob.glob("/Users/vinhnguyen/Projects/ext-chatbot/resources/processed_data/batdongsan_1/*.json"):
#     table_name = ".".join(filepath.split("/")[-1].split(".")[:-1])
#     with open(filepath, "r") as f:
#         table = json.load(f)
#         table["pydantic_schema"]["title"] = table_name
#         if len(table["transformed_data"]) > 100:
#             tables.append(table)

# print([table["pydantic_schema"]["title"] for table in tables])
# print(2)

In [110]:
# for table in tables:
#     create_sqlite(
#         schema=table["pydantic_schema"],
#         column_groups=table["column_groups"],
#         data=table["transformed_data"],
#         db_path="/Users/vinhnguyen/Projects/ext-chatbot/resources/database/batdongsan.db",
#     )


# for table in tables:
#     create_faiss(
#         schema=table["pydantic_schema"],
#         db_path="/Users/vinhnguyen/Projects/ext-chatbot/resources/database/batdongsan.db",
#         faiss_dir="/Users/vinhnguyen/Projects/ext-chatbot/resources/faiss/",
#     )


In [111]:
# Request more files from PO to test

# Utils

In [112]:
def extract_fn(text: str) -> tuple[str, str]:
    """Extract function name and arguments from tool call text."""
    fn_name, fn_args = '', ''
    fn_name_s = '"name": "'
    fn_name_e = '", "'
    fn_args_s = '"arguments": '
    
    i = text.find(fn_name_s)
    k = text.find(fn_args_s)
    
    if i > 0:
        _text = text[i + len(fn_name_s):]
        j = _text.find(fn_name_e)
        if j > -1:
            fn_name = _text[:j]
    
    if k > 0:
        fn_args = text[k + len(fn_args_s):]
    
    fn_args = fn_args.strip()
    if len(fn_args) > 2:
        fn_args = fn_args[:-1]
    else:
        fn_args = ''
    
    return fn_name, fn_args


def postprocess_ai_message(
    ai_message: AIMessage,
) -> AIMessage:
    """
    Convert AIMessage with <tool_call> tags to proper LangChain message with tool calls and leave it in a list to integrate with MessagesState.
    Assumes all content is text (no multimodal).
    """
    tool_id = 1
    
    content: str = ai_message.content if isinstance(ai_message.content, str) else str(ai_message.content)
    
    # Handle <think> tags - skip tool call parsing inside thinking
    if '<think>' in content:
        if '</think>' not in content:
            # Incomplete thinking, add as regular message
            return ai_message
        
        # Split thinking from rest of content
        parts = content.split('</think>')
        content = parts[-1]
        
    
    # Find tool calls in content
    if '<tool_call>' not in content:
        # No tool calls, add as regular message
        return AIMessage(content=content.strip())
    
    # Split content by tool calls
    tool_call_list = content.split('<tool_call>')
    pre_text = tool_call_list[0].strip()
    tool_calls: List[ToolCall] = []
    
    # Process each tool call
    for txt in tool_call_list[1:]:
        if not txt.strip():
            continue
        
        # Handle incomplete tool calls (no closing tag)
        if '</tool_call>' not in txt:
            fn_name, fn_args = extract_fn(txt)
            if fn_name:
                tool_calls.append(
                    ToolCall(
                        name=fn_name,
                        args=json.loads(fn_args) if fn_args else {},
                        id=str(tool_id),
                    )
                )
                tool_id += 1
                # new_messages.append(AIMessage(content='', tool_calls=tool_calls))
            continue
        
        # Handle complete tool calls
        one_tool_call_txt = txt.split('</tool_call>')[0].strip()
        
        try:
            # Try to parse as JSON
            fn = json5.loads(one_tool_call_txt)
            if 'name' in fn and 'arguments' in fn:
                tool_calls.append(
                    ToolCall(
                        name=fn['name'],
                        args=fn['arguments'],
                        id=str(tool_id),
                    )
                )
                tool_id += 1
                # new_messages.append(AIMessage(content='', tool_calls=tool_calls))
        except Exception:
            # Fallback to manual extraction
            fn_name, fn_args = extract_fn(one_tool_call_txt)
            if fn_name:
                tool_calls.append(
                    ToolCall(
                        name=fn_name,
                        args=json.loads(fn_args) if fn_args else {},
                        id=str(tool_id),
                    )
                )
                tool_id += 1
                # new_messages.append(AIMessage(content='', tool_calls=tool_calls))
        
    if tool_calls:
        return AIMessage(content=pre_text, tool_calls=tool_calls)
    elif pre_text:
        return AIMessage(content=pre_text)
    else:
        return AIMessage(content=content)

In [113]:
def get_today_date_en() -> str:
    """Get today's date formatted for system message."""
    today = datetime.today()
    day_names = [
        "Monday",
        "Tuesday",
        "Wednesday",
        "Thursday",
        "Friday",
        "Saturday",
        "Sunday",
    ]
    day_of_week = day_names[today.weekday()]
    month_name_full = today.strftime("%B")
    if today.day % 10 == 1 and today.day != 11:
        day_suffix = "st"
    elif today.day % 10 == 2 and today.day != 12:
        day_suffix = "nd"
    elif today.day % 10 == 3 and today.day != 13:
        day_suffix = "rd"
    else:
        day_suffix = "th"
    return f"{day_of_week}, {month_name_full} {today.day}{day_suffix}, {today.year}"


def get_today_date_vi() -> str:
    today = datetime.today()
    day_names = [
        "Thứ hai",
        "Thứ ba",
        "Thứ tư",
        "Thứ năm",
        "Thứ sáu",
        "Thứ bảy",
        "Chủ nhật",
    ]
    day_of_week = day_names[today.weekday()]
    return f"{day_of_week}, ngày {today.day}, tháng {today.month}, năm {today.year}"


def preprocess_messages(
    state: BaseModel,
    system_prompt: str,
) -> List[AnyMessage]:
    """
    Convert LangChain messages with tool calls to plaintext format with <tool_call> tags.
    Converts ToolMessages to <tool_response> tags.
    Assumes all content is text (no multimodal).
    """
    if "messages" not in state:
        raise ValueError("messages not found in state")
    messages: List[AnyMessage] = state["messages"]
    new_messages = []

    if messages[0].type == "system":
        new_messages.append(messages[0])
    else:
        date_info = "Hôm nay là {date}.\n".format(date=get_today_date_vi())
        new_messages.append(SystemMessage(
            content=date_info + system_prompt
        ))
        messages = [SystemMessage(content=date_info + system_prompt)] + messages

    for msg in messages[1:]:
        # Pass through human messages as-is
        if msg.type == "human":
            new_messages.append(msg)
            continue
        # Handle AI messages with tool calls
        elif msg.type == "ai":
            content = msg.content if isinstance(msg.content, str) else str(msg.content)
            
            # Convert tool calls to plaintext format
            if msg.tool_calls:
                for tool_call in msg.tool_calls:
                    fc = {
                        'name': tool_call['name'],
                        'arguments': tool_call['args']
                    }
                    fc_str = json.dumps(fc, ensure_ascii=False)
                    tool_call_text = f'<tool_call>\n{fc_str}\n</tool_call>'
                    
                    # Append to content
                    if content:
                        content += '\n' + tool_call_text
                    else:
                        content = tool_call_text
            
            # Merge consecutive AI messages
            if new_messages and new_messages[-1].type == "ai":
                prev_content = new_messages[-1].content
                if prev_content and not prev_content.endswith('\n'):
                    prev_content += '\n'
                new_messages[-1] = AIMessage(content=prev_content + content)
            else:
                new_messages.append(AIMessage(content=content))
            continue
        # Handle tool messages - convert to <tool_response> wrapped in HumanMessage
        elif msg.type == "tool":
            content = msg.content if isinstance(msg.content, str) else str(msg.content)
            response_text = f'<tool_response>\n{content}\n</tool_response>'
            if new_messages and new_messages[-1].type == "human":
                prev_content = new_messages[-1].content
                prev_content += '\n' + response_text
                new_messages[-1] = HumanMessage(content=prev_content)
            else:
                new_messages.append(HumanMessage(content=response_text))
            continue
    
    return new_messages


# SQLite Database

In [3]:
import json
import re
import os
import asyncio
from typing import Any, Dict, Iterable, List, Literal, Sequence, Tuple, Union, Optional

import faiss
import numpy as np
from sqlalchemy import (
    MetaData,
    Table,
    Column,
    create_engine,
    inspect,
    text,
)
from sqlalchemy.engine import Engine, Result
from sqlalchemy.exc import ProgrammingError, SQLAlchemyError
from sqlalchemy.types import NullType

from langchain_core.embeddings import Embeddings

In [4]:
def truncate_word(content: Any, *, length: int, suffix: str = "...") -> str:
    """Truncate a string to a certain number of words, based on the max string length."""
    if not isinstance(content, str) or length <= 0:
        return content
    if len(content) <= length:
        return content
    return content[: length - len(suffix)].rsplit(" ", 1)[0] + suffix


def _safe_filename(name: str) -> str:
    """Make a reasonably safe filename from table/column names."""
    if not isinstance(name, str):
        name = str(name)
    # Keep unicode but remove path separators and problematic chars
    name = name.replace(os.sep, "_").replace("\x00", "_")
    name = re.sub(r"[<>:\"/\\|?*\n\r\t]+", "_", name).strip()
    return name or "unnamed"


class SQLiteDatabase:
    """SQLAlchemy wrapper around a SQLite database with column comments support."""

    def _render_type(self, col_type: Any, *, default: str = "TEXT") -> str:
        """Render SQLAlchemy type using this engine's dialect when possible."""
        if col_type is None or isinstance(col_type, NullType):
            return default
        try:
            compiled = col_type.compile(dialect=self._engine.dialect)
            if isinstance(compiled, str) and compiled.strip():
                return compiled.strip()
        except Exception:
            pass
        try:
            rendered = str(col_type)
            return rendered.strip() if rendered.strip() else default
        except Exception:
            return default


    def __init__(
        self,
        engine: Engine,
        ignore_tables: Optional[List[str]] = None,
        include_tables: Optional[List[str]] = None,
        indexes_in_table_info: bool = False,
        max_string_length: int = 200,
        lazy_table_reflection: bool = False,
        faiss_dir: Optional[str] = None,
        embeddings: Optional[Embeddings] = None,
        concurrency_limit: int = 10,
    ):
        """
        Create SQLite database wrapper.
        
        Args:
            engine: SQLAlchemy engine connected to SQLite database
            ignore_tables: List of table names to ignore
            include_tables: List of table names to include (mutually exclusive with ignore_tables)
            indexes_in_table_info: Whether to include index information in table info
            max_string_length: Maximum string length for truncating values
            lazy_table_reflection: Whether to lazily reflect tables
            faiss_dir: Root directory that stores FAISS artifacts (see create_faiss)
            embeddings: Optional pre-initialized InfinityEmbeddings instance to reuse
            embed_model: Model name for InfinityEmbeddings (used when embeddings is None)
            infinity_api_url: Infinity API endpoint (used when embeddings is None)
        """
        self._engine = engine
        if self._engine.dialect.name != "sqlite":
            raise ValueError("SQLiteDatabase only supports SQLite databases")
        
        if include_tables and ignore_tables:
            raise ValueError("Cannot specify both include_tables and ignore_tables")

        self._inspector = inspect(self._engine)
        self._all_tables = set(self._inspector.get_table_names())

        self._include_tables = set(include_tables) if include_tables else set()
        if self._include_tables:
            missing_tables = self._include_tables - self._all_tables
            if missing_tables:
                raise ValueError(f"include_tables {missing_tables} not found in database")
        
        self._ignore_tables = set(ignore_tables) if ignore_tables else set()
        if self._ignore_tables:
            missing_tables = self._ignore_tables - self._all_tables
            if missing_tables:
                raise ValueError(f"ignore_tables {missing_tables} not found in database")
        
        usable_tables = self.get_usable_table_names()
        self._usable_tables = set(usable_tables) if usable_tables else self._all_tables

        self._indexes_in_table_info = indexes_in_table_info
        self._max_string_length = max_string_length
        self._faiss_dir = faiss_dir
        self._faiss_indexes: Dict[str, Dict[str, Dict[str, Any]]] = {}
        self._faiss_embeddings = embeddings
        self._semaphore = asyncio.Semaphore(concurrency_limit)

        self._metadata = MetaData()
        if not lazy_table_reflection:
            self._metadata.reflect(
                bind=self._engine,
                only=list(self._usable_tables),
            )

        if self._faiss_dir:
            self._load_faiss_indexes()


    @classmethod
    def from_uri(
        cls,
        database_uri: str,
        engine_args: Optional[dict] = None,
        **kwargs: Any,
    ) -> "SQLiteDatabase":
        """Construct a SQLiteDatabase from URI."""
        _engine_args = engine_args or {}
        return cls(create_engine(database_uri, **_engine_args), **kwargs)


    @property
    def dialect(self) -> str:
        """Return string representation of dialect to use."""
        return "SQLite"


    def get_usable_table_names(self) -> Iterable[str]:
        """Get names of tables available."""
        if self._include_tables:
            base = set(self._include_tables)
        else:
            base = self._all_tables - self._ignore_tables

        # filter out metadata tables (companion EAV tables)
        base = {tbl for tbl in base if not tbl.endswith("__metadata") and tbl != "tables_metadata"}
        return sorted(base)


    def get_table_overview(self) -> List[Dict[str, Optional[str]]]:
        """
        Return high-level metadata for all tables/documents based on the EAV metadata table.

        Returns:
            List of dicts like:
            {
                "name": "<table/document name>",
                "data_source": "<sql|vector|...> or None",
                "summary": "<summary text> or None",
            }
        """
        # If the metadata table does not exist, just return an empty list
        with self._engine.connect() as conn:
            inspector = self._inspector
            all_tables = {t.lower() for t in inspector.get_table_names()}
            if "tables_metadata" not in all_tables:
                return []

            # Read all rows for attributes we care about
            result = conn.execute(
                text(
                    """
                    SELECT entity, attribute, value
                    FROM "tables_metadata"
                    WHERE attribute IN ('summary', 'data_source')
                    """
                )
            )

            by_entity: Dict[str, Dict[str, Optional[str]]] = {}
            for entity, attribute, value in result:
                if entity not in by_entity:
                    by_entity[entity] = {
                        "name": entity,
                        "data_source": None,
                        "summary": None,
                    }
                if attribute == "summary":
                    by_entity[entity]["summary"] = value
                elif attribute == "data_source":
                    by_entity[entity]["data_source"] = value

        return list(by_entity.values())


    def get_column_datatype(
        self,
        table_name: str,
        column_name: str,
        default: str = "TEXT",
    ) -> str:
        """
        Return SQL datatype for a column in a table.

        Notes:
        - Uses SQLAlchemy inspector, so it does not require table reflection.
        - Returns `default` when the table/column is not found or the type is unknown.
        """
        all_table_names = set(self.get_usable_table_names())
        if table_name not in all_table_names:
            raise ValueError(
                f"Table '{table_name}' not found in database. Available tables: {sorted(all_table_names)}"
            )

        try:
            cols = self._inspector.get_columns(table_name)
        except SQLAlchemyError:
            return default

        for col in cols:
            if col.get("name") != column_name:
                continue
            col_type = col.get("type")
            return self._render_type(col_type, default=default)

        return default


    def get_column_names(self, table_name: str) -> List[str] | None:
        """
        Return the names of columns in a table.
        """
        all_table_names = list(self.get_usable_table_names())
        if table_name not in all_table_names:
            raise ValueError(f"Table '{table_name}' not found in database. Available tables: {all_table_names}")
        try:
            column_names = []
            for col in self._inspector.get_columns(table_name):
                col_name = col.get("name")
                if isinstance(col_name, str):
                    column_names.append(col_name)
            return column_names
        except SQLAlchemyError:
            return None


    def get_table_info(
        self,
        table_name: str,
        get_col_comments: bool = False,
        allowed_col_names: Optional[List[str]] = None,
        sample_count: Optional[int] = None,
        column_sample_values: Optional[Dict[str, List[str]]] = None,
    ) -> str:
        """
        Get information about a specified table.

        Args:
            table_name: Name of the table to get info for
            get_col_comments: Whether to include column comments in the output
            allowed_col_names: If provided, only include these columns in the output.
                              If None, include all columns.
            sample_count: Number of distinct example values to include for each column.
                          If None, no example values are included (unless provided via
                          column_sample_values).
            column_sample_values: Optional mapping from column name to a list of
                          precomputed example values. For columns present in this
                          mapping, these values are used directly. For remaining
                          columns, example values are fetched via `_get_sample_values`
                          when sample_count is provided.

        Returns:
            String containing table schema (CREATE TABLE statement) and optionally
            column comments and sample rows.
        """
        all_table_names = list(self.get_usable_table_names())
        if table_name not in all_table_names:
            raise ValueError(f"Table '{table_name}' not found in database. Available tables: {all_table_names}")

        # Ensure table is reflected
        metadata_table_names = [tbl.name for tbl in self._metadata.sorted_tables]
        if table_name not in metadata_table_names:
            self._metadata.reflect(
                bind=self._engine,
                only=[table_name],
            )

        # Find the table object
        table = None
        for tbl in self._metadata.sorted_tables:
            if tbl.name == table_name:
                table = tbl
                break

        if table is None:
            raise ValueError(f"Table '{table_name}' could not be reflected")

        # Remove NullType columns
        try:
            for _, v in table.columns.items():
                if type(v.type) is NullType:
                    table._columns.remove(v)
        except AttributeError:
            for _, v in dict(table.columns).items():
                if type(v.type) is NullType:
                    table._columns.remove(v)

        # Filter columns if allowed_col_names is specified
        display_columns = list(table.columns) if not allowed_col_names else [col for col in table.columns if col.name in allowed_col_names]
        if not display_columns:
            raise ValueError(f"No matching columns found. Requested: {allowed_col_names}")

        # Get sample values for columns:
        # - Prefer values passed in via column_sample_values for those columns.
        # - For remaining columns, fetch values via _get_sample_values when sample_count is set.
        provided_sample_values: Dict[str, List[str]] = column_sample_values or {}
        fetched_sample_values: Dict[str, List[str]] = {}
        if sample_count:
            columns_to_fetch = [
                col for col in display_columns if col.name not in provided_sample_values
            ]
            if columns_to_fetch:
                fetched_sample_values = self._get_sample_values(
                    table, columns_to_fetch, sample_count
                )
            provided_sample_values = {
                col_name: values[:sample_count]
                for col_name, values in provided_sample_values.items()
            }

        # Merge, giving precedence to explicitly provided sample values
        column_sample_values = {**fetched_sample_values, **provided_sample_values}

        # Build custom CREATE TABLE statement with filtered columns
        col_defs = []
        column_descriptions = (
            self._get_column_descriptions_from_metadata(table_name)
            if get_col_comments
            else {}
        )
        for col in display_columns:
            col_type = self._render_type(col.type, default="TEXT")
            col_def = f'\t"{col.name}" {col_type}'
            
            # Build comment with description and example values
            comment_parts = []
            col_cmt = column_descriptions.get(col.name, "")
            if col_cmt:
                comment_parts.append(col_cmt)
            
            # Add sample values if available
            if col.name in column_sample_values and column_sample_values[col.name]:
                raw_sample_values = column_sample_values[col.name]
                display_values: List[str] = []
                for sample in raw_sample_values:
                    # Normalize to string and ensure string values are quoted,
                    # unless they already appear quoted, to match _get_sample_values.
                    if isinstance(sample, str):
                        val_str = sample
                        if not (val_str.startswith('"') and val_str.endswith('"')):
                            val_str = f'"{val_str}"'
                    else:
                        val_str = str(sample)
                    display_values.append(val_str)

                examples_str = ", ".join(display_values)
                comment_parts.append(f"Một vài (không phải tất cả) giá trị trong cột \"{col.name}\": {examples_str},...")
            
            if comment_parts:
                comment_text = " ".join(comment_parts)
                col_def = f"{col_def}\t/* {comment_text} */"
            
            col_defs.append(col_def)

        col_defs.sort()        
        create_table = f'CREATE TABLE "{table_name}" (\n' + ", \n".join(col_defs) + "\n)"

        table_info = f"{create_table.rstrip()}"
            
        # Add indexes if needed
        if self._indexes_in_table_info:
            table_info += "\n\n/*"
            table_info += f"\n{self._get_table_indexes(table)}\n"
            table_info += "*/"

        return table_info


    def _get_column_descriptions_from_metadata(
        self, table_name: str
    ) -> Dict[str, str]:
        """
        Fetch column descriptions from the metadata EAV table created alongside the data table.

        Expects a companion table named "{table_name}__metadata" with rows:
            entity = column name
            attribute = "description"
            value = description text
        """
        metadata_table = f"{table_name}__metadata"
        if metadata_table not in self._all_tables:
            return {}

        try:
            query = text(
                f'SELECT entity, value FROM "{metadata_table}" WHERE attribute = :attr'
            )
            with self._engine.connect() as connection:
                result: Result = connection.execute(query, {"attr": "description"})
                return {row[0]: row[1] for row in result if row[1] is not None}
        except (ProgrammingError, SQLAlchemyError):
            return {}


    def get_column_groups(self, table_name: str) -> List[List[str]]:
        """
        Return column groups for a table based on its metadata companion table.

        Reads rows where attribute == "group" from "{table_name}__metadata" and
        builds a list of column-name lists, ordered by group id.
        """
        metadata_table = f"{table_name}__metadata"
        if metadata_table not in self._all_tables:
            return []

        groups: Dict[int, List[str]] = {}
        try:
            query = text(
                f'SELECT entity, value FROM "{metadata_table}" WHERE attribute = :attr'
            )
            with self._engine.connect() as connection:
                result: Result = connection.execute(query, {"attr": "group"})
                for entity, value in result:
                    if value is None:
                        continue
                    try:
                        group_id = int(value)
                    except (TypeError, ValueError):
                        continue
                    groups.setdefault(group_id, []).append(entity)
        except (ProgrammingError, SQLAlchemyError):
            return []

        if not groups:
            return []

        return [groups[idx] for idx in sorted(groups.keys())]


    def _get_table_indexes(self, table: Table) -> str:
        """Get formatted index information for a table."""
        indexes = self._inspector.get_indexes(table.name)
        indexes_formatted = "\n".join(
            f"Name: {idx['name']}, Unique: {idx['unique']}, Columns: {idx['column_names']}"
            for idx in indexes
        )
        return f"Table Indexes:\n{indexes_formatted}"


    def _get_sample_values(
        self,
        table: Table,
        columns: List[Column],
        sample_count: int,
    ) -> Dict[str, List[str]]:
        """
        Get up to sample_count distinct example values per column.

        Strings are quoted to reflect their type. Values longer than 100 chars are skipped.
        """
        if sample_count <= 0:
            return {}

        column_sample_values: Dict[str, List[str]] = {col.name: [] for col in columns}
        for col in columns:
            query = text(
                f'SELECT DISTINCT "{col.name}" '
                f'FROM "{table.name}" '
                f'WHERE "{col.name}" IS NOT NULL '
                f"LIMIT {sample_count}"
            )

            try:
                with self._engine.connect() as connection:
                    result = connection.execute(query)
                    remaining_length = 1000
                    for val, in result:
                        val_str = str(val)
                        # Represent type: quote strings, leave others as-is
                        display_val = f'"{val_str}"' if isinstance(val, str) else val_str
                        column_sample_values[col.name].append(display_val)
                        remaining_length -= len(display_val)
                        if remaining_length <= 0:
                            break

            except ProgrammingError:
                continue

        return column_sample_values


    def _execute(
        self,
        command: str,
        fetch: Literal["all", "one", "cursor"] = "all",
        *,
        parameters: Optional[Dict[str, Any]] = None,
        execution_options: Optional[Dict[str, Any]] = None,
    ) -> Union[Sequence[Dict[str, Any]], Result]:
        """Execute SQL command through underlying engine."""
        parameters = parameters or {}
        execution_options = execution_options or {}
        
        with self._engine.begin() as connection:
            cursor = connection.execute(
                text(command),
                parameters,
                execution_options=execution_options,
            )

            if cursor.returns_rows:
                if fetch == "all":
                    result = [x._asdict() for x in cursor.fetchall()]
                elif fetch == "one":
                    first_result = cursor.fetchone()
                    result = [] if first_result is None else [first_result._asdict()]
                elif fetch == "cursor":
                    return cursor
                else:
                    raise ValueError("Fetch parameter must be either 'one', 'all', or 'cursor'")
                return result
        return []


    def _run_sync(
        self,
        command: str,
        fetch: Literal["all", "one", "cursor"],
        include_columns: bool,
        parameters: Optional[Dict[str, Any]],
        execution_options: Optional[Dict[str, Any]],
    ) -> Union[Sequence[Dict[str, Any]], Sequence[Tuple[Any, ...]], Result[Any]]:
        """
        Helper method containing the synchronous logic for `run`.
        This handles the CPU-bound result formatting after the IO-bound execution.
        """
        result = self._execute(
            command, fetch, parameters=parameters, execution_options=execution_options
        )

        if fetch == "cursor":
            return result

        if include_columns:
            return [
                {
                    column: truncate_word(value, length=self._max_string_length)
                    for column, value in r.items()
                }
                for r in result
            ]
        else:
            return [
                tuple(
                    truncate_word(value, length=self._max_string_length)
                    for value in r.values()
                )
                for r in result
            ]


    async def run(
        self,
        command: str,
        fetch: Literal["all", "one", "cursor"] = "all",
        include_columns: bool = False,
        *,
        parameters: Optional[Dict[str, Any]] = None,
        execution_options: Optional[Dict[str, Any]] = None,
    ) -> Union[Sequence[Dict[str, Any]], Sequence[Tuple[Any, ...]], Result[Any]]:
        """
        Execute a SQL command asynchronously.
        Offloads the blocking SQLAlchemy call to a separate thread.
        """
        await self._semaphore.acquire()
        try:
            return await asyncio.to_thread(
                self._run_sync,
                command,
                fetch,
                include_columns,
                parameters,
                execution_options,
            )
        except Exception as e:
            raise e
        finally:
            self._semaphore.release()


    async def run_no_throw(
        self,
        command: str,
        fetch: Literal["all", "one"] = "all",
        include_columns: bool = False,
        *,
        parameters: Optional[Dict[str, Any]] = None,
        execution_options: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """Execute a SQL command and return results or error message."""
        try:
            res = await self.run(
                command,
                fetch,
                parameters=parameters,
                execution_options=execution_options,
                include_columns=include_columns,
            )
            return {
                "result": res,
                "error": None,
            }
        except SQLAlchemyError as e:
            return {
                "result": [],
                "error": f"Error: {e}",
            }


    def get_table_info_no_throw(
        self,
        table_name: str,
        get_col_comments: bool = False,
        allowed_col_names: Optional[List[str]] = None,
        sample_count: Optional[int] = None,
        column_sample_values: Optional[Dict[str, List[str]]] = None,
    ) -> str:
        """Get table info without throwing exceptions."""
        try:
            return self.get_table_info(
                table_name,
                get_col_comments=get_col_comments,
                allowed_col_names=allowed_col_names,
                sample_count=sample_count,
                column_sample_values=column_sample_values,
            )
        except ValueError as e:
            return f"Error: {e}"


    def get_context(self) -> Dict[str, Any]:
        """Return db context that you may want in agent prompt."""
        table_names = list(self.get_usable_table_names())
        # Get info for all tables
        table_infos = []
        for tbl in table_names:
            table_infos.append(self.get_table_info_no_throw(tbl))
        table_info = "\n\n".join(table_infos)
        return {"table_info": table_info, "table_names": ", ".join(table_names)}


    def _load_faiss_indexes(self) -> None:
        """Eagerly load FAISS indexes and their value mappings if the directory is present."""
        if not self._faiss_dir:
            return

        for table_name in self._usable_tables:
            table_dir = os.path.join(
                self._faiss_dir,
                _safe_filename(table_name),
            )
            if not os.path.isdir(table_dir):
                continue

            # Build a lookup of column names to quickly check candidate files.
            try:
                columns = {c["name"] for c in self._inspector.get_columns(table_name)}
            except SQLAlchemyError:
                continue

            for col_name in columns:
                index_path = os.path.join(
                    table_dir, f"{_safe_filename(col_name)}.faiss"
                )
                values_path = os.path.join(
                    table_dir, f"{_safe_filename(col_name)}.json"
                )
                if not (os.path.exists(index_path) and os.path.exists(values_path)):
                    continue

                try:
                    index = faiss.read_index(index_path)
                    with open(values_path, "r", encoding="utf-8") as f:
                        values = json.load(f) or []
                    if not isinstance(values, list):
                        continue

                    metric = getattr(index, "metric_type", None)
                    normalize = metric == faiss.METRIC_INNER_PRODUCT

                    # Guard against mismatched artifacts.
                    if hasattr(index, "ntotal") and index.ntotal != len(values):
                        # Skip inconsistent artifacts to avoid incorrect lookups.
                        continue

                    self._faiss_indexes.setdefault(table_name, {})[col_name] = {
                        "index": index,
                        "values": values,
                        "normalize": normalize,
                    }
                except Exception:
                    # Ignore malformed artifacts; consumers can still use SQL methods.
                    continue


    async def batch_search_similar_predicate_values(
        self,
        predicate_values: List[Tuple[str, str, str]],
        k: int = 5,
    ) -> Dict[str, Dict[str, List[str]]]:
        """
        Asynchronously search for similar predicate values for a batch of predicate values.
        
        Args:
            predicate_values: List of (table_name, column_name, value) tuples.
            k: Number of nearest neighbors to retrieve.

        Returns:
            Mapping of table_name -> column_name -> list of similar values (no scores).
            If an index is missing for a predicate value, that table/column entry is an
            empty list.
        """
        if not self._faiss_indexes:
            raise ValueError("FAISS indexes are not loaded for this database")
        if self._faiss_embeddings is None:
            raise ValueError("Embeddings client is not configured")
        if not predicate_values:
            return {}

        # 1. Validation and preparation
        valid_queries = []
        valid_indices = []
        texts_to_embed = []
        
        # Initialize results mapping table -> column -> list of values
        results: Dict[str, Dict[str, List[str]]] = {}
        for table, col, _ in predicate_values:
            results.setdefault(table, {}).setdefault(col, [])

        for i, (table, col, val) in enumerate(predicate_values):
            table_indexes = self._faiss_indexes.get(table)
            if table_indexes and col in table_indexes:
                valid_queries.append((table, col))
                valid_indices.append(i)
                texts_to_embed.append(str(val))
            # Note: Invalid predicate values (no index found) remain [] in the results mapping

        if not valid_queries:
            return results

        # 2. Batch Embedding (I/O Bound)
        # Use the async batch embedding method from LangChain
        embeddings = await self._get_batch_embeddings(texts_to_embed)

        # 3. Parallel FAISS Search (CPU Bound, offloaded to threads)
        tasks = []

        for i, embedding in zip(valid_indices, embeddings):
            table_name, column_name = predicate_values[i][0], predicate_values[i][1]
            index_data = self._faiss_indexes[table_name][column_name]
            
            task = self._execute_search_with_semaphore(
                semaphore=self._semaphore,
                index_data=index_data,
                vector=embedding,
                k=k
            )
            tasks.append(task)

        # Wait for all search tasks to complete
        search_results = await asyncio.gather(*tasks)

        # 4. Map results back to table/column buckets (drop scores)
        for original_idx, res in zip(valid_indices, search_results):
            table_name, column_name, _ = predicate_values[original_idx]
            value_list = [
                str(r["value"])
                for r in res
                if isinstance(r, dict) and "value" in r and r["value"] is not None
            ]
            results[table_name][column_name].extend(value_list)

        return results


    async def search_similar_values_from_message(
        self,
        user_message: str,
        linked_schema: Optional[Dict[str, Dict[str, str]]] = None,
        k: int = 5,
    ) -> Dict[str, Dict[str, List[str]]]:
        """
        Search for similar values across text columns using a user message.
        
        Embeds the user message once and searches for top k similar values in each
        text column that has a FAISS index.
        
        Args:
            user_message: The user's message to search for similar values.
            linked_schema: Optional dictionary mapping table_name -> column_name -> column_datatype.
                           If None, the schema is automatically constructed from all usable tables.
                           Only columns with text datatypes (containing "TEXT") will be searched.
            k: Number of nearest neighbors to retrieve per column.

        Returns:
            Mapping of table_name -> column_name -> list of similar values (no scores).
            Only includes columns that have text datatypes and FAISS indexes.
        """
        if not self._faiss_indexes:
            raise ValueError("FAISS indexes are not loaded for this database")
        if self._faiss_embeddings is None:
            raise ValueError("Embeddings client is not configured")
        if not user_message:
            return {}

        # If no schema is provided, build it from all usable tables in the database
        if linked_schema is None:
            linked_schema = {}
            for table_name in self.get_usable_table_names():
                try:
                    column_names = self.get_column_names(table_name) or []
                except ValueError:
                    # Skip tables that cannot be introspected
                    continue

                table_schema: Dict[str, str] = {}
                for column_name in column_names:
                    datatype = self.get_column_datatype(
                        table_name, column_name, default="TEXT"
                    )
                    table_schema[column_name] = datatype

                if table_schema:
                    linked_schema[table_name] = table_schema

        # If schema (either provided or auto-built) is empty, nothing to search
        if not linked_schema:
            return {}

        # 1. Filter columns to only text datatypes and check for FAISS indexes
        valid_queries: List[Tuple[str, str]] = []  # (table_name, column_name)
        
        # Initialize results mapping table -> column -> list of values
        results: Dict[str, Dict[str, List[str]]] = {}
        
        for table_name, columns in linked_schema.items():
            for column_name, datatype in columns.items():
                # Check if datatype is text (case-insensitive check for "TEXT")
                if "TEXT" not in str(datatype).upper():
                    continue
                
                # Check if FAISS index exists for this column
                table_indexes = self._faiss_indexes.get(table_name)
                if table_indexes and column_name in table_indexes:
                    valid_queries.append((table_name, column_name))
                    results.setdefault(table_name, {}).setdefault(column_name, [])

        if not valid_queries:
            return results

        # 2. Embed the user message once (I/O Bound)
        embedding = await self._get_batch_embeddings([user_message])
        # Extract single embedding from batch result
        message_embedding = embedding[0]

        # 3. Parallel FAISS Search for all valid columns (CPU Bound, offloaded to threads)
        tasks = []
        for table_name, column_name in valid_queries:
            index_data = self._faiss_indexes[table_name][column_name]
            
            task = self._execute_search_with_semaphore(
                semaphore=self._semaphore,
                index_data=index_data,
                vector=message_embedding,
                k=k
            )
            tasks.append(task)

        # Wait for all search tasks to complete
        search_results = await asyncio.gather(*tasks)

        # 4. Map results back to table/column buckets (drop scores)
        for (table_name, column_name), res in zip(valid_queries, search_results):
            value_list = [
                str(r["value"])
                for r in res
                if isinstance(r, dict) and "value" in r and r["value"] is not None
            ]
            results[table_name][column_name].extend(value_list)

        return results


    async def _get_batch_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Helper to fetch embeddings asynchronously."""
        if self._faiss_embeddings is None:
            raise ValueError("Infinity embeddings client is not configured")
        if not hasattr(self._faiss_embeddings, "aembed_documents"):
            raise ValueError("Infinity embeddings client does not support aembed_documents")
        return await self._faiss_embeddings.aembed_documents(texts)


    async def _execute_search_with_semaphore(
        self,
        semaphore: asyncio.Semaphore,
        index_data: Dict[str, Any],
        vector: List[float],
        k: int
    ) -> List[Dict[str, Any]]:
        """Acquires semaphore and offloads CPU work to a thread."""
        await semaphore.acquire()
        try:
            return await asyncio.to_thread(
                self._run_faiss_search_job,
                index=index_data["index"],
                values=index_data["values"],
                normalize=index_data["normalize"],
                vector=vector,
                k=k
            )
        except Exception as e:
            raise e
        finally:
            semaphore.release()


    @staticmethod
    def _run_faiss_search_job(
        index: Any,
        values: List[str],
        normalize: bool,
        vector: List[float],
        k: int
    ) -> List[Dict[str, Any]]:
        """
        Pure CPU-bound static method to perform the FAISS search.
        Running this in a separate thread avoids blocking the asyncio event loop.
        """
        x = np.asarray(vector, dtype="float32")
        
        # Ensure correct shape (1, embedding_dim)
        if x.ndim == 1:
            x = x.reshape(1, -1)

        if normalize:
            faiss.normalize_L2(x)

        distances, indices = index.search(x, min(k, len(values)))
        
        results: List[Dict[str, Any]] = []
        found_indices = indices[0]
        found_distances = distances[0]

        for idx, score in zip(found_indices, found_distances):
            if idx < 0 or idx >= len(values):
                continue
            results.append({
                "value": values[int(idx)],
                "score": float(score)
            })
            
        return results

In [5]:
@lru_cache()
def get_sqlite_db(business_name: str):
    return SQLiteDatabase.from_uri(
        f"sqlite:////Users/vinhnguyen/Projects/ext-chatbot/resources/database/{business_name}.db",
        faiss_dir=f"/Users/vinhnguyen/Projects/ext-chatbot/resources/faiss/{business_name}/",
        concurrency_limit=10,
        embeddings=get_embedding_model()
    )

In [6]:
db = get_sqlite_db("batdongsan")

In [7]:
# db.run("""
# select * from "BĐS Bán 500" limit 5;
# """, include_columns=True)

In [8]:
db.dialect

'SQLite'

In [9]:
db.get_usable_table_names()

['BĐS Bán 500', 'BĐS Cho thuê 500']

In [10]:
# db.get_column_groups("BĐS Bán 500")

In [11]:
print(db.get_table_info_no_throw(
    "BĐS Bán 500",
    get_col_comments=True,
    allowed_col_names=["Bãi đỗ xe", "Chiều dài (m)"],
    sample_count=5
))

CREATE TABLE "BĐS Bán 500" (
	"Bãi đỗ xe" TEXT	/* Thông tin về khả năng đỗ xe (bao gồm số lượng và loại phương tiện). Một vài (không phải tất cả) giá trị trong cột "Bãi đỗ xe": "Có", "Không", "Nhiều xe máy", "1 ô tô", "2 ô tô",... */, 
	"Chiều dài (m)" REAL	/* Độ dài chiều dài của bất động sản tính theo mét. Một vài (không phải tất cả) giá trị trong cột "Chiều dài (m)": "3.9", "8.6", "16.4", "7.7", "32.6",... */
)


# Chains

In [12]:
llm = get_llm_model()
llm_reasoning = get_thinking_llm_model()

## SQL Chain Components

### Schema Linking Chain

In [124]:
# table and column selection (run async for multi table / run one time for all tables)
# tools: retrieve_values_in_columns, query_database, return_result

In [125]:
SCHEMA_LINKING_TEMPLATE = """
You are an expert in SQL schema linking. 
Given a {dialect} table schema (DDL) and a user query, determine if the table is relevant to the query.

Your task:
1. Analyze the table schema and the user query to decide if they are related.
2. Answer "Y" (Yes) or "N" (No).
3. If the answer is "Y", list ALL columns that are semantically related to the query topics. 
   - You do NOT need to identify the exact columns for the final SQL query. 
   - You SHOULD include any columns that provide context, identifiers, or potential join keys related to the entities in the query.

Output must be a valid JSON object inside a ```json code block using this format:
```json
{{
    "reasoning": "Reasoning of the decision",
    "is_related": "Y or N",
    "columns": ["column name 1", "column name 2"]
}}
```

Table Schema (DDL):
{table_info}

User Query:
{query}
""".strip()

schema_linking_chain = (
    ChatPromptTemplate([("human", SCHEMA_LINKING_TEMPLATE)])
    | llm
    | JsonOutputParser()
)

async def _link_schema_one(
    query: str,
    table_name: str,
    allowed_col_names: Optional[List[str]] = None
) -> Dict[str, Any]:
    try:
        table_info = db.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=allowed_col_names,
            sample_count=3
        )
        result = await schema_linking_chain.ainvoke(
            {"table_info": table_info, "query": query, "dialect": db.dialect}
        )
        if "is_related" not in result or result["is_related"] not in ["Y", "N"]:
            raise ValueError("Invalid response from schema linking chain")
        if result["is_related"] == "Y" and not result.get("columns"):
            result["columns"] = ["ROWID"]

        if result["is_related"] == "N":
            return {
                "input_item": {
                    "table_name": table_name,
                    "query": query,
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": None,
                "error": None
            }
        else:
            return {
                "input_item": {"table_name": table_name, "query": query, "allowed_col_names": allowed_col_names},
                "filtered_schema": (table_name, result["columns"]),
                "error": None
            }
    except Exception as e:
        return {
            "input_item": {"table_name": table_name, "query": query},
            "filtered_schema": None,
            "error": str(e)
        }


async def link_schema(
    _input: dict,
) -> Dict[str, Dict[str, str]]:
    query = _input.get("query")
    if not query:
        raise ValueError("query is required")
    max_retries = _input.get("max_retries", 1)
    # queue = []
    # for table in  db.get_usable_table_names():
    #     for col_group in db.get_column_groups(table):
    #         queue.append({
    #             "table_name": table,
    #             "allowed_col_names": col_group,
    #             "query": query
    #         })
    queue = [{"table_name": table_name, "query": query} for table_name in db.get_usable_table_names()]
    successful_results = []
    for _ in range(max_retries):
        tasks = [_link_schema_one(**input_item) for input_item in queue]
        results = await tqdm_asyncio.gather(*tasks)
        successful_results.extend([
            res for res in results if res["error"] is None
        ])
        failed_items = [
            res["input_item"] for res in results if res["error"] is not None
        ]
        queue = failed_items
        if not queue:
            break
    
    linked_schema = [
        result["filtered_schema"] for result in successful_results if result["filtered_schema"]
    ]

    # Return per-table mapping: column_name -> datatype
    final_schema: Dict[str, Dict[str, str]] = {}
    for table_name, col_names in linked_schema:
        table_schema = final_schema.setdefault(table_name, {})
        for col_name in col_names:
            col_type = db.get_column_datatype(
                table_name,
                col_name,
                default="NULL",
            )
            if col_type != "NULL":
                table_schema[col_name] = col_type

    return final_schema

In [126]:
query = "Tìm danh sách nhà cho thuê ở trên đường Láng"
linked_schema = await link_schema({"query": query})
linked_schema



CancelledError: 

### SQL Generation

In [None]:
SQL_AGENT_PROMPT_TEMPLATE = """
### DATE INFORMATION:
Today is {date}

### INSTRUCTIONS:
You write SQL queries for a {dialect} database. Users are querying their company database, and your task is to assist by generating valid SQL queries strictly adhering to the database schema provided.

**Table Schema**:
{table_infos}

Translate the user's request into one valid {dialect} query. SQL should be written as a markdown code block:
For example:
```sql
SELECT * FROM table WHERE condition;
```

### GUIDELINES:

1.  **Schema Adherence**:
    *   Use only tables, columns, and relationships explicitly listed in the provided schema.
    *   Do not make assumptions about missing or inferred columns/tables.

2.  **{dialect}-Specific Syntax**:
    *   Use only {dialect} syntax. Be aware that {dialect} has limited built-in date/time functions compared to other sql dialects.

3.  **Conditions**:
    *   Always include default conditions for filtering invalid data, e.g., `deleted_at IS NULL` and `status != 'cancelled'` if relevant.
    *   Ensure these conditions match the query's intent unless explicitly omitted in the user request.

4.  **Output Consistency**:
    *   The output fields must match the query's intent exactly. Do not add extra columns or omit requested fields.

5.  **Reserved Keywords and Case Sensitivity**:
    *   Escape reserved keywords or case-sensitive identifiers using double quotes (" "), e.g., "order".

If the user's question is ambiguous or unclear, you must make your best reasonable guess based on the schema.
Translate the user's intent into a **single valid {dialect} query** based on the schema provided.
Ensure the query is optimized, precise, and error-free.

**You must ONLY output ONE SINGLE valid SQL query as markdown codeblock.**
""".strip()


_sql_markdown_re = re.compile(r"```sql\s*([\s\S]*?)\s*```", re.DOTALL)
def parse_sql_output(msg_content: str) -> str:
    try:
        match = _sql_markdown_re.search(msg_content)
        if match:
            return match.group(1).strip()
        else:
            raise ValueError("No SQL query found in the content")
    except Exception:
        return msg_content


def preprocess_for_sql_query_generation(
    _input: dict,
) -> List[AnyMessage]:
    linked_schema: Dict[str, Dict[str, str]] = _input.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema not found in the input")
    table_infos = "\n\n".join([
        db.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=list(col_types.keys()),
            sample_count=5,
            column_sample_values=_input.get("tbl_col_sample_values", {}).get(table_name, None),
        )
        for table_name, col_types in linked_schema.items()
    ])
    system_prompt = SystemMessage(SQL_AGENT_PROMPT_TEMPLATE.format(
        table_infos=table_infos,
        date=get_today_date_en(),
        dialect=db.dialect
    ))
    human_message = HumanMessage(content=_input["query"])
    return [system_prompt, human_message]


def get_sql_query_from_content(content: str) -> str:
    sql_block_pattern = re.compile(r"```sql\s*([\s\S]*?)\s*```", re.MULTILINE)
    match = sql_block_pattern.search(content)
    if match:
        return match.group(1).strip()
    else:
        raise ValueError("No SQL query found in the content")

sql_query_generation_chain = (
    preprocess_for_sql_query_generation
    | get_llm_model()
    | StrOutputParser()
    | parse_sql_output
)

In [None]:
print(sql_query_generation_chain.invoke({
    "query": "Tìm danh sách nhà cho thuê ở trên đường Láng hoặc Cầu Giấy",
    "linked_schema": linked_schema
}))

SELECT * FROM "BĐS Cho thuê 500" 
WHERE "Địa chỉ_đường" = "Láng" OR "Địa chỉ_đường" = "Cầu Giấy";


### SQL Chain without Retry

In [None]:
from operator import itemgetter

from langchain_core.runnables import RunnableLambda


sql_chain_without_retry = RunnablePassthrough.assign(
    linked_schema=link_schema
) | RunnablePassthrough.assign(
    sql_query=sql_query_generation_chain
) | RunnablePassthrough.assign(
    db_output=(
        RunnableLambda(itemgetter("sql_query")) 
        | functools.partial(
            db.run_no_throw, include_columns=True
        )
    )
)

In [None]:
import json

_output = await sql_chain_without_retry.ainvoke({
    "query": "Tìm danh sách nhà cho thuê ở trên đường Láng hoặc Cầu Giấy",
})
print(json.dumps(_output, ensure_ascii=False, indent=2))

100%|██████████| 2/2 [00:04<00:00,  2.32s/it]


{
  "query": "Tìm danh sách nhà cho thuê ở trên đường Láng hoặc Cầu Giấy",
  "linked_schema": {
    "BĐS Bán 500": {
      "Địa chỉ_Tên đường": "TEXT",
      "Địa chỉ": "TEXT"
    },
    "BĐS Cho thuê 500": {
      "Địa chỉ": "TEXT",
      "Địa chỉ_đường": "TEXT",
      "Phường/Xã": "TEXT",
      "Quận/Huyện": "TEXT",
      "Tỉnh/TP": "TEXT"
    }
  },
  "sql_query": "SELECT \"Địa chỉ\", \"Địa chỉ_đường\", \"Phường/Xã\", \"Quận/Huyện\", \"Tỉnh/TP\"\nFROM \"BĐS Cho thuê 500\"\nWHERE \"Địa chỉ_đường\" = 'Láng' OR \"Địa chỉ_đường\" = 'Cầu Giấy';",
  "db_output": {
    "result": [],
    "error": null
  }
}


### Parse SQL

In [None]:
from sqlglot import exp, parse_one


def get_predicate_values(
    _input: dict
) -> List[Dict[str, Any]]:
    sql_query: str = _input.get("sql_query")
    if not sql_query:
        raise ValueError("")
    schema: Dict[str, Dict[str, str]] = _input.get("linked_schema")
    if not schema:
        raise ValueError("")
    parsed = parse_one(sql_query, read=db.dialect.lower())
    
    # --- Step A: Resolve Aliases (c -> customers) ---
    alias_map = {}
    
    # 1. Check FROM
    for node in parsed.find_all(exp.From):
        for table in node.find_all(exp.Table):
            real_name = table.name
            alias = table.alias if table.alias else real_name
            alias_map[alias] = real_name

    # 2. Check JOINs
    for node in parsed.find_all(exp.Join):
        table = node.this
        real_name = table.name
        alias = table.alias if table.alias else real_name
        alias_map[alias] = real_name

    print(f"DEBUG: Found Aliases: {alias_map}")

    extracted_data = []

    # --- Step B: Recursive Visitor ---
    def visit_node(node):
        if not node:
            return

        # 1. Handle Binary Logic (AND, OR)
        # sqlglot stores left side in 'this' and right side in 'expression'
        if isinstance(node, (exp.And, exp.Or)):
            visit_node(node.this)
            visit_node(node.expression)
            return

        # 2. Handle Wrappers (Parenthesis, NOT, WHERE)
        # These only have one child stored in 'this'
        if isinstance(node, (exp.Paren, exp.Not, exp.Where)):
            visit_node(node.this)
            return

        # 3. Handle Comparisons (Column = 'Value', !=, LIKE)
        if isinstance(node, (exp.EQ, exp.NEQ, exp.Like, exp.ILike)):
            # We look for: Column op Literal
            if isinstance(node.left, exp.Column) and isinstance(node.right, exp.Literal):
                if node.right.is_string:
                    process_extraction(node.left, node.right.this, node.key)
            return

        # 4. Handle IN (Column IN ('A', 'B'))
        if isinstance(node, exp.In):
            if isinstance(node.this, exp.Column):
                # The list of values is in args['expressions']
                for item in node.args.get('expressions', []):
                    if isinstance(item, exp.Literal) and item.is_string:
                        process_extraction(node.this, item.this, "IN")
            return

    # Helper to validate and store
    def process_extraction(col_node, value_str, operator):
        col_name = col_node.name
        table_alias = col_node.table
        
        real_table_name = None

        # Resolve Alias
        if table_alias:
            real_table_name = alias_map.get(table_alias)
        else:
            # Try to guess table from schema if no alias provided
            matches = [t for t, cols in schema.items() if col_name in cols]
            if len(matches) == 1:
                real_table_name = matches[0]

        # Validation
        if real_table_name and real_table_name in schema:
            cols = schema[real_table_name]
            if col_name in cols:
                if cols[col_name] == "TEXT":
                    extracted_data.append({
                        "table_name": real_table_name,
                        "column_name": col_name,
                        "value": value_str,
                        "operator": operator
                    })
                else:
                    print(f"DEBUG: Skipped {col_name} (Not TEXT)")
            else:
                print(f"DEBUG: Skipped {col_name} (Not in {real_table_name})")
        else:
            print(f"DEBUG: Skipped {col_name} (Unknown table/alias)")

    # --- Step C: Start Traversal ---
    where_clause = parsed.find(exp.Where)
    if where_clause:
        # Crucial Fix: Pass where_clause.this (the content) OR rely on the updated visitor handling exp.Where
        visit_node(where_clause)
    
    return extracted_data

In [None]:
# ---------------------------------------------------------
# 1. SETUP
# ---------------------------------------------------------

SQL_QUERY = """
SELECT c.id, c.first_name || ' ' || c.last_name AS full_name 
FROM customers c
INNER JOIN orders o ON c.id = o.customer_id
INNER JOIN products p ON o.product_id = p.id
WHERE 
    c.country IN ('USA', 'Canada', 'UK', 'Germany')
    AND o.order_date BETWEEN '2023-01-01' AND '2023-12-31'
    AND o.status != 'Cancelled'
    AND p.category = 'Electronics'
    AND full_name = 'David Jones' 
"""

DB_SCHEMA = {
    "customers": {"id": "INTEGER", "first_name": "TEXT", "last_name": "TEXT", "country": "TEXT"},
    "orders": {"id": "INTEGER", "customer_id": "INTEGER", "order_date": "TEXT", "status": "TEXT"},
    "products": {"id": "INTEGER", "product_name": "TEXT", "category": "TEXT"}
}

# ---------------------------------------------------------
# 2. ROBUST PARSER LOGIC
# ---------------------------------------------------------



# ---------------------------------------------------------
# 3. EXECUTION
# ---------------------------------------------------------

results = get_predicate_values({"sql_query": SQL_QUERY, "linked_schema": DB_SCHEMA})

print("\n--- FINAL EXTRACTED PAIRS FOR VECTOR SEARCH ---")
for res in results:
    print(res)

DEBUG: Found Aliases: {'c': 'customers', 'o': 'orders', 'p': 'products'}
DEBUG: Skipped full_name (Unknown table/alias)

--- FINAL EXTRACTED PAIRS FOR VECTOR SEARCH ---
{'table_name': 'customers', 'column_name': 'country', 'value': 'USA', 'operator': 'IN'}
{'table_name': 'customers', 'column_name': 'country', 'value': 'Canada', 'operator': 'IN'}
{'table_name': 'customers', 'column_name': 'country', 'value': 'UK', 'operator': 'IN'}
{'table_name': 'customers', 'column_name': 'country', 'value': 'Germany', 'operator': 'IN'}
{'table_name': 'orders', 'column_name': 'status', 'value': 'Cancelled', 'operator': 'neq'}
{'table_name': 'products', 'column_name': 'category', 'value': 'Electronics', 'operator': 'eq'}


### Retry

In [None]:
from operator import itemgetter

from langchain_core.runnables import RunnableLambda


sql_chain_without_retry = RunnablePassthrough.assign(
    linked_schema=link_schema
) | RunnablePassthrough.assign(
    sql_query=sql_query_generation_chain
) | RunnablePassthrough.assign(
    db_output=(
        RunnableLambda(itemgetter("sql_query")) 
        | functools.partial(
            db.run_no_throw, include_columns=True
        )
    )
)

In [None]:
sql_query_regeneration_chain = RunnablePassthrough.assign(
    predicate_values=get_predicate_values
) | RunnablePassthrough.assign(
    tbl_col_sample_values=(
        RunnableLambda(itemgetter("predicate_values"))
        | RunnableLambda(lambda predicate_values: [
            (v["table_name"], v["column_name"], v["value"])
            for v in predicate_values
        ])
        | functools.partial(
            db.batch_search_similar_values, k=5
        )
    )
) | RunnablePassthrough.assign(
    sql_query_retry=sql_query_generation_chain
) | RunnablePassthrough.assign(
    db_output=(
        RunnableLambda(itemgetter("sql_query_retry")) 
        | functools.partial(db.run_no_throw, include_columns=True))
)

In [None]:
tmp = await sql_chain_with_retry.ainvoke(result)
tmp

DEBUG: Found Aliases: {'BĐS Cho thuê 500': 'BĐS Cho thuê 500'}


{'query': 'Tìm danh sách nhà cho thuê ở trên đường Láng hoặc Cầu Giấy',
 'linked_schema': {'BĐS Bán 500': {'Địa chỉ_Tên đường': 'TEXT',
   'Địa chỉ': 'TEXT'},
  'BĐS Cho thuê 500': {'Địa chỉ': 'TEXT',
   'Địa chỉ_đường': 'TEXT',
   'Phường/Xã': 'TEXT',
   'Quận/Huyện': 'TEXT',
   'Tỉnh/TP': 'TEXT'}},
 'sql_query': 'SELECT "Địa chỉ", "Địa chỉ_đường", "Phường/Xã", "Quận/Huyện", "Tỉnh/TP"\nFROM "BĐS Cho thuê 500"\nWHERE "Địa chỉ_đường" = \'Láng\' OR "Địa chỉ_đường" = \'Cầu Giấy\';',
 'db_output': {'result': [{'Địa chỉ': 'Park Hill, Láng Hạ, Hoàn Kiếm',
    'Địa chỉ_đường': 'Láng Hạ',
    'Phường/Xã': 'Phường 4',
    'Quận/Huyện': 'Hoàn Kiếm',
    'Tỉnh/TP': 'Hà Nội'},
   {'Địa chỉ': '787 Láng Hạ, Cầu Giấy',
    'Địa chỉ_đường': 'Láng Hạ',
    'Phường/Xã': 'Phường 4',
    'Quận/Huyện': 'Cầu Giấy',
    'Tỉnh/TP': 'Hà Nội'},
   {'Địa chỉ': 'Ecopark, Láng Hạ, Đống Đa',
    'Địa chỉ_đường': 'Láng Hạ',
    'Phường/Xã': 'Phường 1',
    'Quận/Huyện': 'Đống Đa',
    'Tỉnh/TP': 'Hà Nội'},
   {'Địa 

## Full Langgraph Workflow (Worked)

In [None]:
class SQLAssistantState(TypedDict):
    user_query: str
    linked_schema: Dict[str, Dict[str, str]]
    sql_queries: List[str]
    predicate_values: List[Dict[str, Any]]
    tbl_col_sample_values: Dict[str, Dict[str, List[Any]]]
    db_output: Dict[str, Any]
    final_answer: str

In [None]:
SCHEMA_LINKING_TEMPLATE = """
You are an expert in SQL schema linking. 
Given a {dialect} table schema (DDL) and a user query, determine if the table is relevant to the query.

Your task:
1. Analyze the table schema and the user query to decide if they are related.
2. Answer "Y" (Yes) or "N" (No).
3. If the answer is "Y", list ALL columns that are semantically related to the query topics. 
   - You do NOT need to identify the exact columns for the final SQL query. 
   - You SHOULD include any columns that provide context, identifiers, or potential join keys related to the entities in the query.

Output must be a valid JSON object inside a ```json code block using this format:
```json
{{
    "reasoning": "Reasoning of the decision",
    "is_related": "Y or N",
    "columns": ["column name 1", "column name 2"]
}}
```

Table Schema (DDL):
{table_info}

User Query:
{user_query}
""".strip()


# Cache for schema linking chains keyed by model instance ID
_schema_linking_chain_cache: Dict[int, Runnable] = {}
def get_schema_linking_chain(chat_model: BaseChatModel) -> Runnable:
    # Use model instance ID as cache key (since ChatOpenAI objects aren't hashable)
    chat_model_id = id(chat_model)
    
    if chat_model_id not in _schema_linking_chain_cache:
        _schema_linking_chain_cache[chat_model_id] = (
            ChatPromptTemplate([("human", SCHEMA_LINKING_TEMPLATE)])
            | chat_model
            | JsonOutputParser()
        )
    
    return _schema_linking_chain_cache[chat_model_id]


async def _link_schema_one(
    user_query: str,
    table_name: str,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
    allowed_col_names: Optional[List[str]] = None,
) -> Dict[str, Any]:
    try:
        column_names = database.get_column_names(table_name)
        if isinstance(column_names, list) and len(column_names) <= 5:
            return {
                "input_item": {
                    "table_name": table_name,
                    "user_query": user_query,
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": (table_name, column_names),
                "error": None
            }
    
        table_info = database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=allowed_col_names,
            sample_count=3
        )
        result = await get_schema_linking_chain(chat_model).ainvoke(
            {"table_info": table_info, "user_query": user_query, "dialect": database.dialect}
        )
        if "is_related" not in result or result["is_related"] not in ["Y", "N"]:
            raise ValueError("Invalid response from schema linking chain")
        if result["is_related"] == "Y" and not result.get("columns"):
            result["columns"] = ["ROWID"]

        if result["is_related"] == "N":
            return {
                "input_item": {
                    "table_name": table_name,
                    "user_query": user_query,
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": None,
                "error": None
            }
        else:
            return {
                "input_item": {"table_name": table_name, "user_query": user_query, "allowed_col_names": allowed_col_names},
                "filtered_schema": (table_name, result["columns"]),
                "error": None
            }
    except Exception as e:
        return {
            "input_item": {"table_name": table_name, "user_query": user_query},
            "filtered_schema": None,
            "error": str(e)
        }


async def link_schema(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> Dict[str, Dict[str, str]]:
    user_query = state.get("user_query")
    if not user_query:
        raise ValueError("user_query is required")
    max_retries = state.get("max_retries", 1)
    # queue = []
    # for table in  database.get_usable_table_names():
    #     for col_group in database.get_column_groups(table):
    #         queue.append({
    #             "table_name": table,
    #             "allowed_col_names": col_group,
    #             "user_query": user_query
    #         })
    queue = [
        {"table_name": table_name, "user_query": user_query} 
        for table_name in database.get_usable_table_names()
    ]
    successful_results = []
    for _ in range(max_retries):
        tasks = [_link_schema_one(chat_model=chat_model, database=database, **input_item) for input_item in queue]
        results = await asyncio.gather(*tasks)
        successful_results.extend([
            res for res in results if res["error"] is None
        ])
        failed_items = [
            res["input_item"] for res in results if res["error"] is not None
        ]
        queue = failed_items
        if not queue:
            break
    
    linked_schema = [
        result["filtered_schema"] 
        for result in successful_results 
        if result["filtered_schema"]
    ]
    # Return per-table mapping: column_name -> datatype
    final_schema: Dict[str, Dict[str, str]] = {}
    for table_name, col_names in linked_schema:
        table_schema = final_schema.setdefault(table_name, {})
        for col_name in col_names:
            col_type = database.get_column_datatype(
                table_name,
                col_name,
                default="NULL",
            )
            if col_type != "NULL":
                table_schema[col_name] = col_type

    state["linked_schema"] = final_schema
    return state

In [None]:
SQL_GEN_TEMPLATE = """
### DATE INFORMATION:
Today is {date}

### INSTRUCTIONS:
You write SQL queries for a {dialect} database. Users are querying their company database, and your task is to assist by generating valid SQL queries strictly adhering to the database schema provided.

**Table Schema**:
{table_infos}

Translate the user's request into one valid {dialect} query. SQL should be written as a markdown code block:
For example:
```sql
SELECT column1, column2 FROM table WHERE condition;
```

### GUIDELINES:

1.  **Schema Adherence**:
    *   Use only tables, columns, and relationships explicitly listed in the provided schema.
    *   Do not make assumptions about missing or inferred columns/tables.

2.  **{dialect}-Specific Syntax**:
    *   Use only {dialect} syntax. Be aware that {dialect} has limited built-in date/time functions compared to other sql dialects.

3.  **Conditions**:
    *   Always include default conditions for filtering invalid data, e.g., `deleted_at IS NULL` and `status != 'cancelled'` if relevant.
    *   Ensure these conditions match the query's intent unless explicitly omitted in the user request.

4.  **Output Consistency**:
    *   The output fields must match the query's intent exactly. Do not add extra columns or omit requested fields.

5.  **Reserved Keywords and Case Sensitivity**:
    *   Escape reserved keywords or case-sensitive identifiers using double quotes (" "), e.g., "order".

If the user's question is ambiguous or unclear, you must make your best reasonable guess based on the schema.
Translate the user's intent into a **single valid {dialect} query** based on the schema provided.
Ensure the query is optimized, precise, and error-free.

**You must ONLY output ONE SINGLE valid SQL query as markdown codeblock.**
""".strip()


_sql_markdown_re = re.compile(r"```sql\s*([\s\S]*?)\s*```", re.DOTALL)
def parse_sql_output(msg_content: str) -> str:
    try:
        match = _sql_markdown_re.search(msg_content)
        if match:
            return match.group(1).strip()
        else:
            raise ValueError("No SQL query found in the content")
    except Exception:
        return msg_content


def preprocess_for_sql_query_generation(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> List[AnyMessage]:
    linked_schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema not found in the input")
    user_query = state.get("user_query")
    if not user_query:
        raise ValueError("user_query not found in the input")
    table_infos = "\n\n".join([
        database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=list(col_types.keys()),
            sample_count=5,
            column_sample_values=state.get("tbl_col_sample_values", {}).get(table_name, None),
        )
        for table_name, col_types in linked_schema.items()
    ])
    system_prompt = SystemMessage(SQL_GEN_TEMPLATE.format(
        table_infos=table_infos,
        date=get_today_date_en(),
        dialect=database.dialect
    ))
    human_message = HumanMessage(content=user_query)
    return [system_prompt, human_message]


_sql_query_generation_chain_cache: Dict[int, Runnable] = {}
def get_sql_query_generation_chain(
    chat_model: BaseChatModel, database: SQLiteDatabase
) -> Runnable:
    chat_model_id, database_id = id(chat_model), id(database)

    if (chat_model_id, database_id) not in _sql_query_generation_chain_cache:
        _sql_query_generation_chain_cache[(chat_model_id, database_id)] = (
            RunnableLambda(partial(preprocess_for_sql_query_generation, database=database))
            | chat_model
            | StrOutputParser()
            | parse_sql_output
        )
    
    return _sql_query_generation_chain_cache[(chat_model_id, database_id)]


async def generate_sql_query(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    if not state.get("sql_queries"):
        state["sql_queries"] = []
    sql_gen_chain = get_sql_query_generation_chain(chat_model, database)
    sql_query = await sql_gen_chain.ainvoke(state)
    state["sql_queries"].append(sql_query)
    return state

In [None]:
def get_predicate_values(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    sql_queries: List[str] = state.get("sql_queries")
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    if not sql_query:
        raise ValueError("SQL query is required")
    schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not schema:
        raise ValueError("Schema is required")
    parsed = parse_one(sql_query, read=database.dialect.lower())
    
    # ---------------------------------------------------------
    # 1. Map Aliases AND Track Active Tables
    # ---------------------------------------------------------
    alias_map = {}
    
    # Helper to register tables found in FROM/JOIN
    def register_table(table_node):
        real_name = table_node.name
        alias = table_node.alias if table_node.alias else real_name
        alias_map[alias] = real_name

    for from_node in parsed.find_all(exp.From):
        for table in from_node.find_all(exp.Table):
            register_table(table)

    for join_node in parsed.find_all(exp.Join):
        register_table(join_node.this)

    print(f"DEBUG: Found Aliases: {alias_map}")

    extracted_data = []

    # ---------------------------------------------------------
    # 2. Logic to Resolve Table for a Column
    # ---------------------------------------------------------
    def resolve_table(col_node):
        col_name = col_node.name
        table_alias = col_node.table
        
        # Case A: Alias is explicit (e.g., c.country)
        if table_alias:
            return alias_map.get(table_alias)
        
        # Case B: No alias (e.g., country). 
        # FIX: Check only tables present in the current query (alias_map.values())
        active_tables = set(alias_map.values())
        
        candidates = []
        for table in active_tables:
            # Check if table exists in schema AND column exists in that table
            if table in schema and col_name in schema[table]:
                candidates.append(table)
        
        if len(candidates) == 1:
            return candidates[0]
        elif len(candidates) > 1:
            print(f"DEBUG: Ambiguous column '{col_name}' found in multiple active tables: {candidates}")
            return None
        else:
            return None

    # ---------------------------------------------------------
    # 3. Recursive Visitor
    # ---------------------------------------------------------
    def visit_node(node):
        if not node: 
            return

        if isinstance(node, (exp.And, exp.Or)):
            visit_node(node.this)
            visit_node(node.expression)
            return

        if isinstance(node, (exp.Paren, exp.Not, exp.Where)):
            visit_node(node.this)
            return

        # Handle Binary Comparisons (=, !=, LIKE)
        if isinstance(node, (exp.EQ, exp.NEQ, exp.Like, exp.ILike)):
            if isinstance(node.left, exp.Column) and isinstance(node.right, exp.Literal):
                if node.right.is_string:
                    process_extraction(node.left, node.right.this, node.key)
            return

        # Handle IN clause
        if isinstance(node, exp.In) and isinstance(node.this, exp.Column):
            for item in node.args.get('expressions', []):
                if isinstance(item, exp.Literal) and item.is_string:
                    process_extraction(node.this, item.this, "IN")
            return

    def process_extraction(col_node, value_str, operator):
        col_name = col_node.name
        real_table_name = resolve_table(col_node)

        if real_table_name:
            # Verify data type is TEXT
            col_type = schema[real_table_name].get(col_name)
            if col_type == "TEXT":
                extracted_data.append({
                    "table_name": real_table_name,
                    "column_name": col_name,
                    "value": value_str,
                    "operator": operator
                })
            else:
                print(f"DEBUG: Skipped {col_name} (Type is {col_type}, not TEXT)")
        else:
            print(f"DEBUG: Skipped {col_name} (Could not resolve table)")

    # ---------------------------------------------------------
    # 4. Execution
    # ---------------------------------------------------------
    where_clause = parsed.find(exp.Where)
    if where_clause:
        visit_node(where_clause)
    state["predicate_values"] = extracted_data
    return state


async def get_similar_predicate_values(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    predicate_values = state.get("predicate_values")
    if not predicate_values:
        state["tbl_col_sample_values"] = {}
        return state
    state["tbl_col_sample_values"] = await database.batch_search_similar_values(
        [
            (v["table_name"], v["column_name"], v["value"])
            for v in predicate_values
        ], 
        k=5
    )
    return state


def restrict_select_columns(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    """
    Replaces SELECT * with SELECT t.col1, t.col2 based on filtered_schema.
    """
    sql_queries: List[str] = state.get("sql_queries")
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    if not sql_query:
        raise ValueError("SQL query is required")
    schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not schema:
        raise ValueError("Schema is required")
    parsed = parse_one(sql_query, read=database.dialect.lower())
    
    # ---------------------------------------------------------
    # 1. Build Alias Map (Map Alias -> Real Table Name)
    # ---------------------------------------------------------
    # We need to know the order of tables to expand * correctly
    active_tables_ordered = [] 
    alias_map = {}

    def register_table(table_node):
        real_name = table_node.name
        alias = table_node.alias if table_node.alias else real_name
        
        # Only register if we haven't seen this alias yet
        if alias not in alias_map:
            alias_map[alias] = real_name
            active_tables_ordered.append(alias)

    # Scan FROM
    for from_node in parsed.find_all(exp.From):
        for table in from_node.find_all(exp.Table):
            register_table(table)

    # Scan JOINs
    for join_node in parsed.find_all(exp.Join):
        register_table(join_node.this)

    print(f"DEBUG: Active Tables: {alias_map}")

    # ---------------------------------------------------------
    # 2. Helper to Generate Column Expressions
    # ---------------------------------------------------------
    def get_columns_for_table(table_alias):
        real_name = alias_map.get(table_alias)
        if not real_name or real_name not in schema:
            return [] # Table not in our allowed schema, return nothing (or handle error)
        
        # Create sqlglot Column objects: alias.column_name
        cols = schema[real_name].keys()
        return [
            exp.Column(
                this=exp.Identifier(this=col, quoted=True),
                table=exp.Identifier(this=table_alias, quoted=True)
            ) for col in cols
        ]

    # ---------------------------------------------------------
    # 3. Rewrite SELECT Expressions
    # ---------------------------------------------------------
    # We only want to transform the main SELECT statement(s)
    for select_node in parsed.find_all(exp.Select):
        new_expressions = []
        
        for expression in select_node.expressions:
            # Case A: Naked * (SELECT *)
            if isinstance(expression, exp.Star) and not isinstance(expression, exp.Count):
                # Expand columns for ALL active tables in the query
                for alias in active_tables_ordered:
                    expanded_cols = get_columns_for_table(alias)
                    new_expressions.extend(expanded_cols)
            
            # Case B: Qualified * (SELECT t.*)
            elif isinstance(expression, exp.Column) and isinstance(expression.this, exp.Star):
                # Extract the table alias (e.g., 't' from 't.*')
                table_alias = expression.table
                expanded_cols = get_columns_for_table(table_alias)
                new_expressions.extend(expanded_cols)
                
            # Case C: Regular column or other expression (Keep it)
            else:
                new_expressions.append(expression)

        # Replace the old expressions with the new expanded list
        if new_expressions:
            select_node.set("expressions", new_expressions)

    restricted_sql_query = parsed.sql(dialect=database.dialect.lower())
    def normalize_sql_query(sql_query: str) -> str:
        return re.sub(r"\s+", " ", sql_query).strip().lower()
    if normalize_sql_query(restricted_sql_query) != normalize_sql_query(sql_query):
        state["sql_queries"].append(restricted_sql_query)
    return state

In [None]:
ANSWER_GEN_TEMPLATE = """
### THÔNG TIN NGÀY THÁNG:
Hôm nay là {date}

### NHIỆM VỤ:
Bạn là một trợ lý phân tích dữ liệu chuyên nghiệp. Nhiệm vụ của bạn là đưa ra câu trả lời bằng **Tiếng Việt** rõ ràng, chính xác và súc tích cho câu hỏi của người dùng, dựa hoàn toàn vào kết quả cơ sở dữ liệu (database results) được cung cấp.

**Dữ liệu đầu vào**:
1. **Lược đồ bảng (Table Schema)**:
{table_infos}

2. **Câu hỏi người dùng (User Question)**:
{user_query}

3. **Truy vấn SQL (SQL Query)**:
```sql
{sql_query}
```

4. **Kết quả từ Database (Database Results)**:
{db_result}

### CÁC NGUYÊN TẮC HƯỚNG DẪN:

1.  **Chính xác và Tuân thủ dữ liệu**:
    *   Câu trả lời phải dựa **TUYỆT ĐỐI** vào phần "Kết quả từ Database".
    *   Không được tự suy diễn hoặc đưa vào các kiến thức bên ngoài không có trong dữ liệu.
    *   Nếu kết quả trả về là rỗng (empty), hãy thông báo lịch sự rằng không tìm thấy dữ liệu phù hợp với yêu cầu.

2.  **Định dạng câu trả lời**:
    *   **Trả lời trực tiếp**: Đi thẳng vào vấn đề.
    *   **Danh sách/Bảng**: Nếu kết quả có nhiều dòng, hãy trình bày dưới dạng danh sách gạch đầu dòng hoặc bảng Markdown cho dễ đọc.
    *   **Số liệu tổng hợp**: Nếu kết quả là một con số duy nhất (tổng, đếm, trung bình), hãy viết thành một câu hoàn chỉnh (Ví dụ: "Tổng doanh thu là 50.000.000 VNĐ").

3.  **Trình bày dữ liệu (Formatting)**:
    *   **Con số**: Sử dụng dấu phân cách hàng nghìn (ví dụ: 1.000 hoặc 1,000 tùy theo ngữ cảnh, nhưng phải nhất quán).
    *   **Tiền tệ**: Thêm đơn vị tiền tệ phù hợp nếu có (ví dụ: VNĐ, $, USD).
    *   **Ngày tháng**: Chuyển đổi sang định dạng ngày tháng Tiếng Việt tự nhiên (ví dụ: "Ngày 01 tháng 01 năm 2024").

4.  **Ngữ cảnh và Thuật ngữ**:
    *   Sử dụng "Truy vấn SQL" để hiểu ngữ cảnh lọc dữ liệu (ví dụ: nếu SQL có `WHERE status = 'active'`, hãy nói rõ đây là các đơn hàng có trạng thái là "đang hoạt động").
    *   Sử dụng ngôn ngữ kinh doanh/đời thường. **Không** nhắc đến tên bảng kỹ thuật (như `tbl_users`, `col_price`) hoặc cú pháp code trong câu trả lời cuối cùng.

5.  **Văn phong**:
    *   Chuyên nghiệp, khách quan và hữu ích.
    *   Tránh các câu máy móc như mà hãy trả lời tự nhiên như một con người.

**Đầu ra**:
Chỉ xuất ra câu trả lời cuối cùng bằng Tiếng Việt (sử dụng Markdown).
""".strip()


def preprocess_for_answer_generation(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> List[AnyMessage]:
    user_query = state.get("user_query")
    if not user_query:
        raise ValueError("user_query not found in the input")
    linked_schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema not found in the input")
    db_output: Dict[str, Any] = state.get("db_output", {})
    sql_queries: List[str] = state.get("sql_queries", [])
    if not sql_queries:
        raise ValueError("sql_queries not found in the input")
    sql_query = sql_queries[-1]
    if db_output.get("error", "Error") is not None:
        raise ValueError("No valid database result found")
    db_result = db_output.get("result", [])
    
    table_infos = "\n\n".join([
        database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=list(col_types.keys()),
            sample_count=5,
            column_sample_values=state.get("tbl_col_sample_values", {}).get(table_name, None),
        )
        for table_name, col_types in linked_schema.items()
    ])
    
    human_message = HumanMessage(content=ANSWER_GEN_TEMPLATE.format(
        date=get_today_date_vi(),
        table_infos=table_infos,
        user_query=user_query,
        sql_query=sql_query,
        db_result=db_result
    ))
    return [human_message]


_answer_generation_chain_cache: Dict[int, Runnable] = {}
def get_answer_generation_chain(chat_model: BaseChatModel, database: SQLiteDatabase) -> Runnable:
    chat_model_id, database_id = id(chat_model), id(database)

    if (chat_model_id, database_id) not in _answer_generation_chain_cache:
        _answer_generation_chain_cache[(chat_model_id, database_id)] = (
            RunnableLambda(partial(preprocess_for_answer_generation, database=database))
            | chat_model
            | StrOutputParser()
        )
    
    return _answer_generation_chain_cache[(chat_model_id, database_id)]


async def generate_answer(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    answer_chain = get_answer_generation_chain(chat_model, database)
    answer = await answer_chain.ainvoke(state)
    state["final_answer"] = answer
    return state

In [None]:
# add condition for rewrite sql query: dont rewrite if we can't find predicate values or similar predicate values contains original

def retry_condition(
    state: SQLAssistantState
) -> Literal["gen_sql_query_2", "restrict_select_columns"]:
    predicate_values = state.get("predicate_values")
    if not predicate_values:
        return "restrict_select_columns"
    similar_predicate_values = state.get("tbl_col_sample_values")
    if not similar_predicate_values:
        return "restrict_select_columns"
    
    # Check if all original predicate values are found in the similar values
    all_found = True
    for pred_value in predicate_values:
        table_name = pred_value["table_name"]
        column_name = pred_value["column_name"]
        original_value = pred_value["value"]
        
        # Get the list of similar values for this table/column pair
        similar_values = similar_predicate_values.get(table_name, {}).get(column_name, [])
        
        # If the original value is NOT found in similar values, we need to rewrite
        if original_value not in similar_values:
            all_found = False
            break
    
    # If all original values were found in similar values, we don't need to rewrite
    if all_found:
        return "restrict_select_columns"
    
    # If any original value was not found in similar values, we should rewrite
    return "gen_sql_query_2"
    


async def sql_execution(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    sql_queries = state.get("sql_queries", [])
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    state["db_output"] = await database.run_no_throw(sql_query, include_columns=True)
    return state


def build_sql_assistant_pipeline(
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> CompiledStateGraph:
    builder = StateGraph(SQLAssistantState)
    # Add nodes
    builder.add_node(
        "link_schema",
        partial(
            link_schema,
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "gen_sql_query_1",
        partial(
            generate_sql_query, 
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "get_predicate_values", 
        partial(
            get_predicate_values, 
            database=database
        )
    )
    builder.add_node(
        "get_similar_predicate_values", 
        partial(
            get_similar_predicate_values, 
            database=database
        )
    )
    builder.add_node(
        "gen_sql_query_2",
        partial(
            generate_sql_query,
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "restrict_select_columns",
        partial(
            restrict_select_columns,
            database=database
        )
    )
    builder.add_node(
        "sql_execution", 
        partial(
            sql_execution,
            database=database
        )
    )
    builder.add_node(
        "answer_generation", 
        partial(
            generate_answer,
            chat_model=chat_model,
            database=database
        )
    )

    # Add edges
    builder.add_edge(START, "link_schema")
    builder.add_edge("link_schema", "gen_sql_query_1")
    builder.add_edge("gen_sql_query_1", "get_predicate_values")
    builder.add_edge("get_predicate_values", "get_similar_predicate_values")
    builder.add_conditional_edges(
        "get_similar_predicate_values",
        retry_condition,
    )
    builder.add_edge("gen_sql_query_2", "restrict_select_columns")
    builder.add_edge("restrict_select_columns", "sql_execution")
    builder.add_edge("sql_execution", "answer_generation")
    builder.add_edge("answer_generation", END)

    return builder.compile()

In [None]:
sql_assistant = build_sql_assistant_pipeline(llm, db)

# display(Image(sql_assistant.get_graph().draw_mermaid_png()))

In [None]:
user_query = "tôi muốn thuê một căn 2 phòng ngủ, giá dưới 6tr 1 tháng"
state = await sql_assistant.ainvoke({"user_query": user_query})
print(state["final_answer"])

DEBUG: Found Aliases: {'BĐS Cho thuê 500': 'BĐS Cho thuê 500'}
DEBUG: Active Tables: {'BĐS Cho thuê 500': 'BĐS Cho thuê 500'}
Đã tìm thấy 1 căn hộ phù hợp với yêu cầu của bạn:

- Diện tích: 52 m²  
- Giá thuê: 5.6 triệu đồng/tháng  
- Giá/m²/tháng: 107.692 VNĐ  
- Số phòng ngủ: 2 phòng


In [None]:
state

{'user_query': 'tôi muốn thuê một căn 2 phòng ngủ, giá dưới 6tr 1 tháng',
 'linked_schema': {'BĐS Bán 500': {'Số phòng ngủ': 'INTEGER',
   'Giá (tỷ VNĐ)': 'REAL',
   'Giá/m²_so_sanh': 'REAL',
   'Diện tích (m²)': 'INTEGER',
   'Loại BĐS': 'TEXT'},
  'BĐS Cho thuê 500': {'Số phòng ngủ': 'INTEGER',
   'Giá thuê (triệu/tháng)': 'REAL',
   'Giá/m²/tháng_số': 'REAL',
   'Diện tích (m²)': 'INTEGER'}},
 'sql_queries': ['SELECT "Diện tích (m²)", "Giá thuê (triệu/tháng)", "Giá/m²/tháng_số", "Số phòng ngủ" \nFROM "BĐS Cho thuê 500" \nWHERE "Số phòng ngủ" = 2 AND "Giá thuê (triệu/tháng)" < 6;',
  'SELECT "Diện tích (m²)", "Giá thuê (triệu/tháng)", "Giá/m²/tháng_số", "Số phòng ngủ" FROM "BĐS Cho thuê 500" WHERE "Số phòng ngủ" = 2 AND "Giá thuê (triệu/tháng)" < 6'],
 'predicate_values': [],
 'tbl_col_sample_values': {},
 'db_output': {'result': [{'Diện tích (m²)': 52,
    'Giá thuê (triệu/tháng)': 5.6,
    'Giá/m²/tháng_số': 107692.0,
    'Số phòng ngủ': 2}],
  'error': None},
 'final_answer': 'Đã 

## Full Langgraph workflow - Multi-turn (Worked)

In [None]:
class SQLAssistantState(TypedDict):
    conversation: List[AnyMessage]
    linked_schema: Dict[str, Dict[str, str]]
    sql_queries: List[str]
    predicate_values: List[Dict[str, Any]]
    tbl_col_sample_values: Dict[str, Dict[str, List[Any]]]
    db_output: Dict[str, Any]
    final_answer: str

In [None]:
SCHEMA_LINKING_TEMPLATE = """
You are an expert in SQL schema linking. 
Given a {dialect} table schema (DDL) and a conversation history, determine if the table is relevant to the latest customer query.

Your task:
1. Analyze the table schema and the conversation history. Focus on the latest customer message, using previous messages for context (e.g., to resolve references). Evaluate the Table Name and Table Comment to see if the general topic matches the query. Answer "Y" (Yes) or "N" (No) regarding the table's relevance to the latest query.
2. If the answer is "Y", list ALL columns that are semantically related. 
   - You do NOT need to identify the exact columns for the final SQL query. 
   - You MUST include all columns that provide context, identifiers, or potential join keys related to the entities in the query.

Output must be a valid JSON object inside a ```json code block using this format:
```json
{{
    "explanation": "Explanation of the decision",
    "is_related": "Y or N",
    "columns": ["column name 1", "column name 2"]
}}
```

Table Schema (DDL):
{table_info}

Conversation History:
{formatted_conversation}
""".strip()


# Cache for schema linking chains keyed by model instance ID
_schema_linking_chain_cache: Dict[int, Runnable] = {}
def get_schema_linking_chain(chat_model: BaseChatModel) -> Runnable:
    # Use model instance ID as cache key (since ChatOpenAI objects aren't hashable)
    chat_model_id = id(chat_model)
    
    if chat_model_id not in _schema_linking_chain_cache:
        _schema_linking_chain_cache[chat_model_id] = (
            ChatPromptTemplate([("human", SCHEMA_LINKING_TEMPLATE)])
            | chat_model
            | JsonOutputParser()
        )
    
    return _schema_linking_chain_cache[chat_model_id]


def format_conversation(conversation: List[AnyMessage]) -> str:
    formatted_conversation = ""
    end_index = len(conversation) - 1 
    for ind in range(len(conversation) - 1, -1, -1):
        if conversation[ind].type == "human":
            end_index = ind
            break
    for message in conversation[:end_index]:
        if message.type == "human":
            formatted_conversation += f"Customer: {message.content}\n"
        elif message.type == "ai":
            formatted_conversation += f"Support Team: {message.content}\n"
    
    formatted_conversation += f"\nLatest Customer Message: {conversation[end_index].content}"
    return formatted_conversation


async def _link_schema_one(
    conversation: List[AnyMessage],
    table_name: str,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
    allowed_col_names: Optional[List[str]] = None,
) -> Dict[str, Any]:
    try:
        column_names = database.get_column_names(table_name)
        if isinstance(column_names, list) and len(column_names) <= 5:
            return {
                "input_item": {
                    "table_name": table_name,
                    "conversation": conversation,
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": (table_name, column_names),
                "error": None
            }
        table_info = database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=allowed_col_names,
            sample_count=3
        )
        result = await get_schema_linking_chain(chat_model).ainvoke({
            "table_info": table_info, 
            "formatted_conversation": format_conversation(conversation), 
            "dialect": database.dialect
        })
        print(result["explanation"])
        if "is_related" not in result or result["is_related"] not in ["Y", "N"]:
            raise ValueError("Invalid response from schema linking chain")
        if result["is_related"] == "Y" and not result.get("columns"):
            result["columns"] = ["ROWID"]

        if result["is_related"] == "N":
            return {
                "input_item": {
                    "table_name": table_name,
                    "conversation": conversation,
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": None,
                "error": None
            }
        else:
            return {
                "input_item": {"table_name": table_name, "conversation": conversation, "allowed_col_names": allowed_col_names},
                "filtered_schema": (table_name, result["columns"]),
                "error": None
            }
    except Exception as e:
        return {
            "input_item": {"table_name": table_name, "conversation": conversation},
            "filtered_schema": None,
            "error": str(e)
        }


async def link_schema(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> Dict[str, Dict[str, str]]:
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation is required")
    max_retries = 1
    # queue = []
    # for table in  database.get_usable_table_names():
    #     for col_group in database.get_column_groups(table):
    #         queue.append({
    #             "table_name": table,
    #             "allowed_col_names": col_group,
    #             "conversation": conversation
    #         })
    queue = [
        {"table_name": table_name, "conversation": conversation} 
        for table_name in database.get_usable_table_names()
    ]
    successful_results = []
    for _ in range(max_retries):
        tasks = [_link_schema_one(chat_model=chat_model, database=database, **input_item) for input_item in queue]
        results = await tqdm_asyncio.gather(*tasks)
        successful_results.extend([
            res for res in results if res["error"] is None
        ])
        failed_items = [
            res["input_item"] for res in results if res["error"] is not None
        ]
        queue = failed_items
        if not queue:
            break
    
    linked_schema = [
        result["filtered_schema"] 
        for result in successful_results 
        if result["filtered_schema"]
    ]
    # Return per-table mapping: column_name -> datatype
    final_schema: Dict[str, Dict[str, str]] = {}
    for table_name, col_names in linked_schema:
        table_schema = final_schema.setdefault(table_name, {})
        for col_name in col_names:
            col_type = database.get_column_datatype(
                table_name,
                col_name,
                default="NULL",
            )
            if col_type != "NULL":
                table_schema[col_name] = col_type

    state["linked_schema"] = final_schema
    return state

In [None]:
# print(db.get_table_info_no_throw(
#     db.get_usable_table_names()[1],
#     get_col_comments=True,
#     sample_count=3
# ))

In [None]:
# tmp = await link_schema({"conversation": [
#     HumanMessage("Hello"),
#     AIMessage("Em là trợ lý ảo Guso có thể hỗ trợ anh/chị về các sản phẩm và dịch vụ của BDS Guru. Chào anh/chị ạ! Nếu anh/chị có bất kỳ câu hỏi hay nhu cầu nào, em rất sẵn sàng giúp đỡ ạ!"),
#     HumanMessage("Có bao nhiêu nhà đang được cho thuê nhỉ"),
#     AIMessage("Tổng số nhà đang được cho thuê là 111 nhà ạ."),
#     HumanMessage("Tôi muốn thuê một căn 2 phòng ngủ, giá dưới 6tr 1 tháng"),
#     AIMessage("Có 1 căn hộ 2 phòng ngủ với giá thuê dưới 6 triệu đồng/tháng ạ."),
#     HumanMessage("Thế có văn phòng nào giá dưới 6tr ở HN không"),
# ]}, get_llm_model(), db)
# tmp["linked_schema"]

In [None]:
# tmp = await link_schema({"conversation": [
#     HumanMessage("Hello"),
#     AIMessage("Em là trợ lý ảo Guso có thể hỗ trợ anh/chị về các sản phẩm và dịch vụ của BDS Guru. Chào anh/chị ạ! Nếu anh/chị có bất kỳ câu hỏi hay nhu cầu nào, em rất sẵn sàng giúp đỡ ạ!"),
#     HumanMessage("Có bao nhiêu nhà đang được cho thuê nhỉ"),
#     AIMessage("Tổng số nhà đang được cho thuê là 111 nhà ạ."),
#     HumanMessage(content="Tôi muốn thuê một căn 2 phòng ngủ, giá dưới 6tr 1 tháng"),
# ]}, llm, db)
# tmp

In [None]:
# print(format_conversation([
#     HumanMessage("Hello"),
#     AIMessage("Em là trợ lý ảo Guso có thể hỗ trợ anh/chị về các sản phẩm và dịch vụ của BDS Guru. Chào anh/chị ạ! Nếu anh/chị có bất kỳ câu hỏi hay nhu cầu nào, em rất sẵn sàng giúp đỡ ạ!"),
#     HumanMessage("Có bao nhiêu nhà đang được cho thuê nhỉ"),
#     AIMessage("Tổng số nhà đang được cho thuê là 111 nhà ạ."),
#     HumanMessage("Tôi muốn thuê một căn 2 phòng ngủ, giá dưới 6tr 1 tháng"),
# ]))

In [None]:
SQL_GEN_TEMPLATE = """
### DATE INFORMATION:
Today is {date}

### INSTRUCTIONS:
You write SQL queries for a {dialect} database. The Support Team is querying the database to answer Customer questions, and your task is to assist by generating valid SQL queries strictly adhering to the database schema provided.

**Table Schema**:
{table_infos}


Translate the latest customer message into one valid {dialect} query, using the conversation history for context (e.g., resolving pronouns or follow-up filters). SQL should be written as a markdown code block:
For example:
```sql
SELECT column1, column2 FROM table WHERE condition;
```

### GUIDELINES:

1.  **Schema Adherence**:
    *   Use only tables, columns, and relationships explicitly listed in the provided schema.
    *   Do not make assumptions about missing or inferred columns/tables.

2.  **{dialect}-Specific Syntax**:
    *   Use only {dialect} syntax. Be aware that {dialect} has limited built-in date/time functions compared to other sql dialects.

3.  **Conditions**:
    *   Always include default conditions for filtering invalid data, e.g., `deleted_at IS NULL` and `status != 'cancelled'` if relevant.
    *   Ensure these conditions match the query's intent unless explicitly omitted in the customer's request.

4.  **Output Consistency**:
    *   The output fields must match the query's intent exactly. Do not add extra columns or omit requested fields.

5.  **Reserved Keywords and Case Sensitivity**:
    *   Escape reserved keywords or case-sensitive identifiers using double quotes (" "), e.g., "order".

If the customer's question is ambiguous or unclear, you must make your best reasonable guess based on the schema.
Translate the customer's intent into a **single valid {dialect} query** based on the schema provided.
Ensure the query is optimized, precise, and error-free.

**You must ONLY output ONE SINGLE valid SQL query as markdown codeblock.**

### CONVERSATION HISTORY:
{formatted_conversation}
""".strip()


_sql_markdown_re = re.compile(r"```sql\s*([\s\S]*?)\s*```", re.DOTALL)
def parse_sql_output(msg_content: str) -> str:
    try:
        match = _sql_markdown_re.findall(msg_content)
        if match:
            return match[-1].strip()
        else:
            raise ValueError("No SQL query found in the content")
    except Exception:
        return msg_content


def preprocess_for_sql_query_generation(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> List[AnyMessage]:
    linked_schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema not found in the input")
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation not found in the input")
    formatted_conversation = format_conversation(conversation)
    table_infos = "\n\n".join([
        database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=list(col_types.keys()),
            sample_count=5,
            column_sample_values=state.get("tbl_col_sample_values", {}).get(table_name, None),
        )
        for table_name, col_types in linked_schema.items()
    ])
    return [HumanMessage(SQL_GEN_TEMPLATE.format(
        date=get_today_date_en(),
        dialect=database.dialect,
        table_infos=table_infos,
        formatted_conversation=formatted_conversation,
    ))]


_sql_query_generation_chain_cache: Dict[tuple[int, int], Runnable] = {}
def get_sql_query_generation_chain(
    chat_model: BaseChatModel, database: SQLiteDatabase
) -> Runnable:
    chat_model_id, database_id = id(chat_model), id(database)
    if (chat_model_id, database_id) not in _sql_query_generation_chain_cache:
        _sql_query_generation_chain_cache[(chat_model_id, database_id)] = (
            RunnableLambda(partial(preprocess_for_sql_query_generation, database=database))
            | chat_model
            | StrOutputParser()
            | parse_sql_output
        )
    
    return _sql_query_generation_chain_cache[(chat_model_id, database_id)]


async def generate_sql_query(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    if not state.get("sql_queries"):
        state["sql_queries"] = []
    sql_gen_chain = get_sql_query_generation_chain(chat_model, database)
    sql_query = await sql_gen_chain.ainvoke(state)
    state["sql_queries"].append(sql_query)
    return state

In [None]:
# SQL_GEN_TEMPLATE = """
# ### DATE INFORMATION:
# Today is {date}

# ### INSTRUCTIONS:
# You write SQL queries for a {dialect} database. Users are querying their company database, and your task is to assist by generating valid SQL queries strictly adhering to the database schema provided and calling the tool to execute them.

# **Table Schema**:
# {table_infos}

# ### GUIDELINES:
# 1.  **Schema Adherence**:
#     *   Use only tables, columns, and relationships explicitly listed in the provided schema.
#     *   Do not make assumptions about missing or inferred columns/tables.

# 2.  **{dialect}-Specific Syntax**:
#     *   Use only {dialect} syntax. Be aware that {dialect} has limited built-in date/time functions compared to other sql dialects.

# 3.  **Conditions**:
#     *   Always include default conditions for filtering invalid data, e.g., `deleted_at IS NULL` and `status != 'cancelled'` if relevant.
#     *   Ensure these conditions match the query's intent unless explicitly omitted in the user request.

# 4.  **Output Consistency**:
#     *   The output fields must match the query's intent exactly. Do not add extra columns or omit requested fields.

# 5.  **Reserved Keywords and Case Sensitivity**:
#     *   Escape reserved keywords or case-sensitive identifiers using double quotes (" "), e.g., "order".

# If the user's intent is ambiguous or unclear, you must make your best reasonable guess based on the schema. Translate the user's intent into a **single valid {dialect} query** based on the schema provided.
# Ensure the query is optimized, precise, and error-free.
# """.strip()


# QUERY_DATABASE_TOOL = json.dumps({
#     'type': 'function',
#     'function': {
#         'name': 'query_database',
#         'description': 'Thực hiện câu truy vấn {{dialect}} và trả về kết quả',
#         'parameters': {
#             'properties': {
#                 'sql_query': {
#                     'description': 'Câu truy vấn {{dialect}}',
#                     'type': 'string'
#                 }
#             },
#             'required': ['sql_query'],
#             'type': 'object'
#         }
#     }
# })


# def preprocess_for_sql_query_generation(
#     state: SQLAssistantState,
#     database: SQLiteDatabase,
# ) -> List[AnyMessage]:
#     linked_schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
#     if not linked_schema:
#         raise ValueError("linked_schema not found in the input")
#     conversation = state.get("conversation")
#     if not conversation:
#         raise ValueError("conversation not found in the input")
#     table_infos = "\n\n".join([
#         database.get_table_info_no_throw(
#             table_name,
#             get_col_comments=True,
#             allowed_col_names=list(col_types.keys()),
#             sample_count=5,
#             column_sample_values=state.get("tbl_col_sample_values", {}).get(table_name, None),
#         )
#         for table_name, col_types in linked_schema.items()
#     ])
#     system_prompt = SystemMessage(SQL_GEN_TEMPLATE.format(
#         table_infos=table_infos,
#         date=get_today_date_en(),
#         dialect=database.dialect
#     ))
#     return [system_prompt] + conversation


# _sql_query_generation_chain_cache: Dict[tuple[int, int], Runnable] = {}
# def get_sql_query_generation_chain(
#     chat_model: BaseChatModel, database: SQLiteDatabase
# ) -> Runnable:
#     chat_model_id, database_id = id(chat_model), id(database)
#     tool = QUERY_DATABASE_TOOL.replace("{{dialect}}", database.dialect)
#     if (chat_model_id, database_id) not in _sql_query_generation_chain_cache:
#         _sql_query_generation_chain_cache[(chat_model_id, database_id)] = (
#             RunnableLambda(partial(preprocess_for_sql_query_generation, database=database))
#             | chat_model.bind(tools=[json.loads(tool)])
#             | postprocess_ai_message
#             | RunnableLambda(
#                 lambda ai_message: ai_message.tool_calls[-1].get("args", {}).get("sql_query", "")
#             )
#         )
    
#     return _sql_query_generation_chain_cache[(chat_model_id, database_id)]


# async def generate_sql_query(
#     state: SQLAssistantState,
#     chat_model: BaseChatModel,
#     database: SQLiteDatabase,
# ) -> SQLAssistantState:
#     if not state.get("sql_queries"):
#         state["sql_queries"] = []
#     sql_gen_chain = get_sql_query_generation_chain(chat_model, database)
#     sql_query = await sql_gen_chain.ainvoke(state)
#     state["sql_queries"].append(sql_query)
#     return state

In [None]:
def get_predicate_values(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    sql_queries: List[str] = state.get("sql_queries")
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    if not sql_query:
        raise ValueError("SQL query is required")
    schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not schema:
        raise ValueError("Schema is required")
    parsed = parse_one(sql_query, read=database.dialect.lower())
    
    # ---------------------------------------------------------
    # 1. Map Aliases AND Track Active Tables
    # ---------------------------------------------------------
    alias_map = {}
    
    # Helper to register tables found in FROM/JOIN
    def register_table(table_node):
        real_name = table_node.name
        alias = table_node.alias if table_node.alias else real_name
        alias_map[alias] = real_name

    for from_node in parsed.find_all(exp.From):
        for table in from_node.find_all(exp.Table):
            register_table(table)

    for join_node in parsed.find_all(exp.Join):
        register_table(join_node.this)

    print(f"DEBUG: Found Aliases: {alias_map}")

    extracted_data = []

    # ---------------------------------------------------------
    # 2. Logic to Resolve Table for a Column
    # ---------------------------------------------------------
    def resolve_table(col_node):
        col_name = col_node.name
        table_alias = col_node.table
        
        # Case A: Alias is explicit (e.g., c.country)
        if table_alias:
            return alias_map.get(table_alias)
        
        # Case B: No alias (e.g., country). 
        # FIX: Check only tables present in the current query (alias_map.values())
        active_tables = set(alias_map.values())
        
        candidates = []
        for table in active_tables:
            # Check if table exists in schema AND column exists in that table
            if table in schema and col_name in schema[table]:
                candidates.append(table)
        
        if len(candidates) == 1:
            return candidates[0]
        elif len(candidates) > 1:
            print(f"DEBUG: Ambiguous column '{col_name}' found in multiple active tables: {candidates}")
            return None
        else:
            return None

    # ---------------------------------------------------------
    # 3. Recursive Visitor
    # ---------------------------------------------------------
    def visit_node(node):
        if not node: 
            return

        if isinstance(node, (exp.And, exp.Or)):
            visit_node(node.this)
            visit_node(node.expression)
            return

        if isinstance(node, (exp.Paren, exp.Not, exp.Where)):
            visit_node(node.this)
            return

        # Handle Binary Comparisons (=, !=, LIKE)
        if isinstance(node, (exp.EQ, exp.NEQ, exp.Like, exp.ILike)):
            if isinstance(node.left, exp.Column) and isinstance(node.right, exp.Literal):
                if node.right.is_string:
                    process_extraction(node.left, node.right.this, node.key)
            return

        # Handle IN clause
        if isinstance(node, exp.In) and isinstance(node.this, exp.Column):
            for item in node.args.get('expressions', []):
                if isinstance(item, exp.Literal) and item.is_string:
                    process_extraction(node.this, item.this, "IN")
            return

    def process_extraction(col_node, value_str, operator):
        col_name = col_node.name
        real_table_name = resolve_table(col_node)

        if real_table_name:
            # Verify data type is TEXT
            col_type = schema[real_table_name].get(col_name)
            if col_type == "TEXT":
                extracted_data.append({
                    "table_name": real_table_name,
                    "column_name": col_name,
                    "value": value_str,
                    "operator": operator
                })
            else:
                print(f"DEBUG: Skipped {col_name} (Type is {col_type}, not TEXT)")
        else:
            print(f"DEBUG: Skipped {col_name} (Could not resolve table)")

    # ---------------------------------------------------------
    # 4. Execution
    # ---------------------------------------------------------
    where_clause = parsed.find(exp.Where)
    if where_clause:
        visit_node(where_clause)
    state["predicate_values"] = extracted_data
    return state


async def get_similar_predicate_values(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    predicate_values = state.get("predicate_values")
    if not predicate_values:
        state["tbl_col_sample_values"] = {}
        return state
    state["tbl_col_sample_values"] = await database.batch_search_similar_values(
        [
            (v["table_name"], v["column_name"], v["value"])
            for v in predicate_values
        ], 
        k=5
    )
    return state


def restrict_select_columns(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    """
    Replaces SELECT * with SELECT t.col1, t.col2 based on filtered_schema.
    """
    sql_queries: List[str] = state.get("sql_queries")
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    if not sql_query:
        raise ValueError("SQL query is required")
    schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not schema:
        raise ValueError("Schema is required")
    parsed = parse_one(sql_query, read=database.dialect.lower())
    
    # ---------------------------------------------------------
    # 1. Build Alias Map (Map Alias -> Real Table Name)
    # ---------------------------------------------------------
    # We need to know the order of tables to expand * correctly
    active_tables_ordered = [] 
    alias_map = {}

    def register_table(table_node):
        real_name = table_node.name
        alias = table_node.alias if table_node.alias else real_name
        
        # Only register if we haven't seen this alias yet
        if alias not in alias_map:
            alias_map[alias] = real_name
            active_tables_ordered.append(alias)

    # Scan FROM
    for from_node in parsed.find_all(exp.From):
        for table in from_node.find_all(exp.Table):
            register_table(table)

    # Scan JOINs
    for join_node in parsed.find_all(exp.Join):
        register_table(join_node.this)

    print(f"DEBUG: Active Tables: {alias_map}")

    # ---------------------------------------------------------
    # 2. Helper to Generate Column Expressions
    # ---------------------------------------------------------
    def get_columns_for_table(table_alias):
        real_name = alias_map.get(table_alias)
        if not real_name or real_name not in schema:
            return [] # Table not in our allowed schema, return nothing (or handle error)
        
        # Create sqlglot Column objects: alias.column_name
        cols = schema[real_name].keys()
        return [
            exp.Column(
                this=exp.Identifier(this=col, quoted=True),
                table=exp.Identifier(this=table_alias, quoted=True)
            ) for col in cols
        ]

    # ---------------------------------------------------------
    # 3. Rewrite SELECT Expressions
    # ---------------------------------------------------------
    # We only want to transform the main SELECT statement(s)
    for select_node in parsed.find_all(exp.Select):
        new_expressions = []
        
        for expression in select_node.expressions:
            # Case A: Naked * (SELECT *)
            if isinstance(expression, exp.Star) and not isinstance(expression, exp.Count):
                # Expand columns for ALL active tables in the query
                for alias in active_tables_ordered:
                    expanded_cols = get_columns_for_table(alias)
                    new_expressions.extend(expanded_cols)
            
            # Case B: Qualified * (SELECT t.*)
            elif isinstance(expression, exp.Column) and isinstance(expression.this, exp.Star):
                # Extract the table alias (e.g., 't' from 't.*')
                table_alias = expression.table
                expanded_cols = get_columns_for_table(table_alias)
                new_expressions.extend(expanded_cols)
                
            # Case C: Regular column or other expression (Keep it)
            else:
                new_expressions.append(expression)

        # Replace the old expressions with the new expanded list
        if new_expressions:
            select_node.set("expressions", new_expressions)

    restricted_sql_query = parsed.sql(dialect=database.dialect.lower())
    def normalize_sql_query(sql_query: str) -> str:
        return re.sub(r"\s+", " ", sql_query).strip().strip(";").lower()
    if normalize_sql_query(restricted_sql_query) != normalize_sql_query(sql_query):
        state["sql_queries"].append(restricted_sql_query)
    return state

In [None]:
ANSWER_GEN_TEMPLATE = """
### THÔNG TIN NGÀY THÁNG:
Hôm nay là {date}

### NHIỆM VỤ:
Bạn là một trợ lý phân tích dữ liệu chuyên nghiệp. Nhiệm vụ của bạn là đưa ra câu trả lời bằng **Tiếng Việt** rõ ràng, chính xác và súc tích cho câu hỏi của người dùng, dựa hoàn toàn vào kết quả cơ sở dữ liệu (database results) được cung cấp.

**Lược đồ bảng (Table Schema)**:
{table_infos}

### CÁC NGUYÊN TẮC HƯỚNG DẪN:

1.  **Chính xác và Tuân thủ dữ liệu**:
    *   Câu trả lời phải dựa **TUYỆT ĐỐI** vào phần "Kết quả từ Database".
    *   Không được tự suy diễn hoặc đưa vào các kiến thức bên ngoài không có trong dữ liệu.
    *   Nếu kết quả trả về là rỗng (empty), hãy thông báo lịch sự rằng không tìm thấy dữ liệu phù hợp với yêu cầu.

2.  **Định dạng câu trả lời**:
    *   **Trả lời trực tiếp**: Đi thẳng vào vấn đề.
    *   **Danh sách/Bảng**: Nếu kết quả có nhiều dòng, hãy trình bày dưới dạng danh sách gạch đầu dòng hoặc bảng Markdown cho dễ đọc.
    *   **Số liệu tổng hợp**: Nếu kết quả là một con số duy nhất (tổng, đếm, trung bình), hãy viết thành một câu hoàn chỉnh (Ví dụ: "Tổng doanh thu là 50.000.000 VNĐ").

3.  **Trình bày dữ liệu (Formatting)**:
    *   **Con số**: Sử dụng dấu phân cách hàng nghìn (ví dụ: 1.000 hoặc 1,000 tùy theo ngữ cảnh, nhưng phải nhất quán).
    *   **Tiền tệ**: Thêm đơn vị tiền tệ phù hợp nếu có (ví dụ: VNĐ, $, USD).
    *   **Ngày tháng**: Chuyển đổi sang định dạng ngày tháng Tiếng Việt tự nhiên (ví dụ: "Ngày 01 tháng 01 năm 2024").

4.  **Ngữ cảnh và Thuật ngữ**:
    *   Sử dụng "Truy vấn SQL" để hiểu ngữ cảnh lọc dữ liệu (ví dụ: nếu SQL có `WHERE status = 'active'`, hãy nói rõ đây là các đơn hàng có trạng thái là "đang hoạt động").
    *   Sử dụng ngôn ngữ kinh doanh/đời thường. **Không** nhắc đến tên bảng kỹ thuật (như `tbl_users`, `col_price`) hoặc cú pháp code trong câu trả lời cuối cùng.

5.  **Văn phong**:
    *   Chuyên nghiệp, khách quan và hữu ích.
    *   Tránh các câu máy móc như mà hãy trả lời tự nhiên như một con người.

**Đầu ra**:
Chỉ xuất ra câu trả lời cuối cùng bằng Tiếng Việt (sử dụng Markdown).
""".strip()


class QueryDatabaseInput(BaseModel):
    sql_query: str = Field(description="Câu truy vấn SQLite")


@tool("query_database", args_schema=QueryDatabaseInput)
def query_database(sql_query: str) -> str:
    """Thực hiện câu truy vấn SQLite và trả về kết quả"""
    return ""


def preprocess_for_answer_generation(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> List[AnyMessage]:
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation not found in the input")
    linked_schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema not found in the input")
    db_output: Dict[str, Any] = state.get("db_output", {})
    sql_queries: List[str] = state.get("sql_queries", [])
    if not sql_queries:
        raise ValueError("sql_queries not found in the input")
    sql_query = sql_queries[-1]
    if db_output.get("error", "Error") is not None:
        raise ValueError("No valid database result found")
    db_result = db_output.get("result", [])
    
    table_infos = "\n\n".join([
        database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=list(col_types.keys()),
            sample_count=5,
            column_sample_values=state.get("tbl_col_sample_values", {}).get(table_name, None),
        )
        for table_name, col_types in linked_schema.items()
    ])
    tool_call = {
        "name": "query_database",
        "arguments": {"sql_query": sql_query}
    }
    
    system_message = SystemMessage(content=ANSWER_GEN_TEMPLATE.format(
        date=get_today_date_vi(),
        table_infos=table_infos,
    ))
    sql_conversation = [system_message] + conversation
    sql_conversation.append(AIMessage('<tool_call>\n' + json.dumps(tool_call, ensure_ascii=False) + '\n</tool_call>'))
    sql_conversation.append(HumanMessage(content=str(db_result)))
    return sql_conversation


_answer_generation_chain_cache: Dict[int, Runnable] = {}
def get_answer_generation_chain(chat_model: BaseChatModel, database: SQLiteDatabase) -> Runnable:
    chat_model_id, database_id = id(chat_model), id(database)
    openai_tool_schema = {
        'type': 'function',
        'function': {
            'name': 'query_database',
            'description': f'Thực hiện câu truy vấn {database.dialect} và trả về kết quả',
            'parameters': {
                'properties': {
                    'sql_query': {
                        'description': f'Câu truy vấn {database.dialect}',
                        'type': 'string'
                    }
                },
                'required': ['sql_query'],
                'type': 'object'
            }
        }
    }
    if (chat_model_id, database_id) not in _answer_generation_chain_cache:
        _answer_generation_chain_cache[(chat_model_id, database_id)] = (
            RunnableLambda(partial(preprocess_for_answer_generation, database=database))
            | chat_model.bind(tools=[openai_tool_schema])
            | StrOutputParser()
        )
    
    return _answer_generation_chain_cache[(chat_model_id, database_id)]


async def generate_answer(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    answer_chain = get_answer_generation_chain(chat_model, database)
    answer = await answer_chain.ainvoke(state)
    state["final_answer"] = answer
    return state

In [None]:
# add condition for rewrite sql query: dont rewrite if we can't find predicate values or similar predicate values contains original

def retry_condition(
    state: SQLAssistantState
) -> Literal["gen_sql_query_2", "restrict_select_columns"]:
    predicate_values = state.get("predicate_values")
    if not predicate_values:
        return "restrict_select_columns"
    similar_predicate_values = state.get("tbl_col_sample_values")
    if not similar_predicate_values:
        return "restrict_select_columns"
    
    # Check if all original predicate values are found in the similar values
    all_found = True
    for pred_value in predicate_values:
        table_name = pred_value["table_name"]
        column_name = pred_value["column_name"]
        original_value = pred_value["value"]
        
        # Get the list of similar values for this table/column pair
        similar_values = similar_predicate_values.get(table_name, {}).get(column_name, [])
        
        # If the original value is NOT found in similar values, we need to rewrite
        if original_value not in similar_values:
            all_found = False
            break
    
    # If all original values were found in similar values, we don't need to rewrite
    if all_found:
        return "restrict_select_columns"
    
    print(predicate_values)
    print(similar_predicate_values)
    # If any original value was not found in similar values, we should rewrite
    return "gen_sql_query_2"
    


async def sql_execution(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    sql_queries = state.get("sql_queries", [])
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    state["db_output"] = await database.run_no_throw(sql_query, include_columns=True)
    return state


def build_sql_assistant_pipeline(
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> CompiledStateGraph:
    builder = StateGraph(SQLAssistantState)
    # Add nodes
    builder.add_node(
        "link_schema",
        partial(
            link_schema,
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "gen_sql_query_1",
        partial(
            generate_sql_query, 
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "get_predicate_values", 
        partial(
            get_predicate_values, 
            database=database
        )
    )
    builder.add_node(
        "get_similar_predicate_values", 
        partial(
            get_similar_predicate_values, 
            database=database
        )
    )
    builder.add_node(
        "gen_sql_query_2",
        partial(
            generate_sql_query,
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "restrict_select_columns",
        partial(
            restrict_select_columns,
            database=database
        )
    )
    builder.add_node(
        "sql_execution", 
        partial(
            sql_execution,
            database=database
        )
    )
    builder.add_node(
        "answer_generation", 
        partial(
            generate_answer,
            chat_model=chat_model,
            database=database
        )
    )

    # Add edges
    builder.add_edge(START, "link_schema")
    builder.add_edge("link_schema", "gen_sql_query_1")
    builder.add_edge("gen_sql_query_1", "get_predicate_values")
    builder.add_edge("get_predicate_values", "get_similar_predicate_values")
    builder.add_conditional_edges(
        "get_similar_predicate_values",
        retry_condition,
    )
    builder.add_edge("gen_sql_query_2", "restrict_select_columns")
    builder.add_edge("restrict_select_columns", "sql_execution")
    builder.add_edge("sql_execution", "answer_generation")
    builder.add_edge("answer_generation", END)

    return builder.compile()

In [None]:
sql_assistant = build_sql_assistant_pipeline(get_llm_model(), db)

# display(Image(sql_assistant.get_graph().draw_mermaid_png()))

In [None]:
# state = await sql_assistant.ainvoke({"conversation": [
#     HumanMessage("Hello"),
#     AIMessage("Em là trợ lý ảo Guso có thể hỗ trợ anh/chị về các sản phẩm và dịch vụ của BDS Guru. Chào anh/chị ạ! Nếu anh/chị có bất kỳ câu hỏi hay nhu cầu nào, em rất sẵn sàng giúp đỡ ạ!"),
#     HumanMessage("Có bao nhiêu nhà đang được cho thuê nhỉ"),
#     AIMessage("Tổng số nhà đang được cho thuê là 111 nhà ạ."),
#     HumanMessage("Tôi muốn thuê một căn 2 phòng ngủ, giá dưới 6tr 1 tháng"),
#     AIMessage("Có 1 căn hộ 2 phòng ngủ với giá thuê dưới 6 triệu đồng/tháng ạ."),
#     HumanMessage("Thế có văn phòng nào giá dưới 6tr ở HN không"),
# ]})

state = await sql_assistant.ainvoke({"conversation": [
    HumanMessage("hello có văn phòng cho thuê dưới 10tr ko, văn phòng nhé"),
    AIMessage("Không tìm thấy văn phòng cho thuê có giá dưới 10 triệu đồng/tháng trong cơ sở dữ liệu."),
    HumanMessage("thế dưới 30tr thì sao"),
]})

 50%|█████     | 1/2 [00:02<00:02,  2.27s/it]

The latest customer query is asking about office spaces available for rent under 30 million VND. The table 'BĐS Bán 500' contains information about real estate properties for sale, not for rent. Therefore, the table is not relevant to the query about renting office space.


100%|██████████| 2/2 [00:05<00:00,  2.71s/it]

The latest customer query is asking about office spaces for rent under 30 million VND per month. The table 'BĐS Cho thuê 500' contains information about rental properties, including 'Giá thuê (triệu/tháng)' which represents the monthly rent in millions of VND. Since the customer is inquiring about rental prices and the table includes relevant price data, the table is related to the query. Additional columns such as 'Loại BĐS' (property type) and 'Diện tích (m²)' (area) are also relevant for context.





DEBUG: Found Aliases: {'BĐS Cho thuê 500': 'BĐS Cho thuê 500'}
DEBUG: Active Tables: {'BĐS Cho thuê 500': 'BĐS Cho thuê 500'}


InternalServerError: Error code: 500 - {'detail': '400: {"object":"error","message":"This model\'s maximum context length is 8092 tokens. However, you requested 13472 tokens in the messages, Please reduce the length of the messages. None","type":"BadRequestError","param":null,"code":400}'}

In [None]:
print(state["final_answer"])

Không tìm thấy văn phòng cho thuê có giá dưới 10 triệu đồng/tháng trong cơ sở dữ liệu.


In [None]:
print(state["sql_queries"])

['SELECT "Giá thuê (triệu/tháng)", "ID", "Loại BĐS", "Địa chỉ" \nFROM "BĐS Cho thuê 500" \nWHERE "Loại BĐS" = \'Văn phòng\' \n  AND "Giá thuê (triệu/tháng)" < 10;']


## Full Langgraph workflow - Multi-turn V1 (Beta)

In [None]:
class SQLAssistantState(TypedDict):
    conversation: List[AnyMessage]
    rewritten_message: str
    sample_values: Dict[str, Dict[str, List[Any]]]
    linked_schema: Dict[str, Dict[str, str]]
    sql_queries: List[str]
    db_output: Dict[str, Any]
    final_answer: str


def format_conversation(conversation: List[AnyMessage]) -> str:
    formatted_conversation = ""
    end_index = len(conversation) - 1 
    for ind in range(len(conversation) - 1, -1, -1):
        if conversation[ind].type == "human":
            end_index = ind
            break
    for message in conversation[:end_index]:
        if message.type == "human":
            formatted_conversation += f"Customer: {message.content}\n"
        elif message.type == "ai":
            formatted_conversation += f"Support Team: {message.content}\n"
    
    formatted_conversation += f"\nLatest Customer Message: {conversation[end_index].content}"
    return formatted_conversation

In [None]:
MESSAGE_REWRITING_TEMPLATE = """
### Role
You are an expert Context Extractor for a database chatbot. Your task is to analyze a conversation between a "Customer" and a "Support Team" and identify the **background context** required to understand the Customer's LATEST message.

### Rules
1. The context must be fully interpretable in isolation, requiring no access to the conversation history to understand. You must identify the core subject, ALL active references and constraints from the dialogue and synthesize them into the context. **Explicitly resolve** all pronouns and relative references by substituting them with the specific entities, dates, IDs, feature, values, etc. mentioned previously.
2. Only include the context that is related to the Customer's LATEST message. If there is no relevant context, return an empty string.
3. The context must sound like the customer are re-describing the context for the support team to understand.
4. Output a JSON object inside a json markdown code block using this format:
```json
{{
    "context": "the relevant and specific context in Vietnamese"
}}
```

### Conversation:
{formatted_conversation}
""".strip()


_message_rewriting_chain_cache: Dict[int, Runnable] = {}
def get_message_rewriting_chain(chat_model: BaseChatModel) -> Runnable:
    # Use model instance ID as cache key (since ChatOpenAI objects aren't hashable)
    chat_model_id = id(chat_model)
    
    if chat_model_id not in _message_rewriting_chain_cache:
        _message_rewriting_chain_cache[chat_model_id] = (
            ChatPromptTemplate([("human", MESSAGE_REWRITING_TEMPLATE)])
            | chat_model
            | StrOutputParser()
        )
    
    return _message_rewriting_chain_cache[chat_model_id]


async def rewrite_message(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
) -> Dict[str, Dict[str, str]]:
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation is required")
    rewritten_message = await get_message_rewriting_chain(chat_model).ainvoke({
        "formatted_conversation": format_conversation(conversation)
    })
    state["rewritten_message"] = rewritten_message
    return state

In [None]:
# prompt = MESSAGE_REWRITING_TEMPLATE.format(formatted_conversation=format_conversation([
#     HumanMessage("Hello"),
#     AIMessage("Em là trợ lý ảo Guso có thể hỗ trợ anh/chị về các sản phẩm và dịch vụ của BDS Guru. Chào anh/chị ạ! Nếu anh/chị có bất kỳ câu hỏi hay nhu cầu nào, em rất sẵn sàng giúp đỡ ạ!"),
#     HumanMessage("Có bao nhiêu nhà đang được cho thuê nhỉ"),
#     AIMessage("Tổng số nhà đang được cho thuê là 111 nhà ạ."),
#     HumanMessage("Tôi muốn thuê một căn 2 phòng ngủ, giá dưới 6tr 1 tháng"),
#     AIMessage("Có 1 căn hộ 2 phòng ngủ với giá thuê dưới 6 triệu đồng/tháng ạ."),
#     HumanMessage("ok xin thêm thông tin với"),
# ]))
# tmp = llm.invoke(prompt)
# print(tmp.content)

In [None]:
async def get_sample_values(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    rewritten_message = state.get("rewritten_message")
    if not rewritten_message:
        state["sample_values"] = {}
        return state
    state["sample_values"] = await database.search_similar_values_from_message(
        rewritten_message,
        k=5
    )
    return state

In [None]:
SCHEMA_LINKING_TEMPLATE = """
You are an expert in SQL schema linking. 
Given a {dialect} table schema (DDL) and a conversation history, determine if the table is relevant to the latest customer query.

Your task:
1. Analyze the table schema and the conversation history. Focus on the latest customer message, using previous messages for context (e.g., to resolve references). Evaluate the Table Name and Table Comment to see if the general topic matches the query. Answer "Y" (Yes) or "N" (No) regarding the table's relevance to the latest query.
2. If the answer is "Y", list ALL columns that are semantically related. 
   - You do NOT need to identify the exact columns for the final SQL query. 
   - You MUST include all columns that provide context, identifiers, or potential join keys related to the entities in the query.

Output must be a valid JSON object inside a ```json code block using this format:
```json
{{
    "explanation": "Explanation of the decision",
    "is_related": "Y or N",
    "columns": ["column name 1", "column name 2"]
}}
```

Table Schema (DDL):
{table_info}

Conversation History:
{formatted_conversation}
""".strip()


# Cache for schema linking chains keyed by model instance ID
_schema_linking_chain_cache: Dict[int, Runnable] = {}
def get_schema_linking_chain(chat_model: BaseChatModel) -> Runnable:
    # Use model instance ID as cache key (since ChatOpenAI objects aren't hashable)
    chat_model_id = id(chat_model)
    
    if chat_model_id not in _schema_linking_chain_cache:
        _schema_linking_chain_cache[chat_model_id] = (
            ChatPromptTemplate([("human", SCHEMA_LINKING_TEMPLATE)])
            | chat_model
            | JsonOutputParser()
        )
    
    return _schema_linking_chain_cache[chat_model_id]


async def _link_schema_one(
    conversation: List[AnyMessage],
    table_name: str,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
    column_sample_values: Optional[Dict[str, List[str]]] = None,
    allowed_col_names: Optional[List[str]] = None,
) -> Dict[str, Any]:
    try:
        column_names = database.get_column_names(table_name)
        if isinstance(column_names, list) and len(column_names) <= 5:
            return {
                "input_item": {
                    "table_name": table_name,
                    "conversation": conversation,
                    "allowed_col_names": allowed_col_names,
                    "column_sample_values": column_sample_values
                },
                "filtered_schema": (table_name, column_names),
                "error": None
            }
        table_info = database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=allowed_col_names,
            column_sample_values=column_sample_values,
            sample_count=3
        )
        result = await get_schema_linking_chain(chat_model).ainvoke({
            "table_info": table_info, 
            "formatted_conversation": format_conversation(conversation), 
            "dialect": database.dialect
        })
        print(result["explanation"])
        if "is_related" not in result or result["is_related"] not in ["Y", "N"]:
            raise ValueError("Invalid response from schema linking chain")
        if result["is_related"] == "Y" and not result.get("columns"):
            result["columns"] = ["ROWID"]

        if result["is_related"] == "N":
            return {
                "input_item": {
                    "table_name": table_name,
                    "conversation": conversation,
                    "allowed_col_names": allowed_col_names,
                    "column_sample_values": column_sample_values
                },
                "filtered_schema": None,
                "error": None
            }
        else:
            return {
                "input_item": {
                    "table_name": table_name,
                    "conversation": conversation,
                    "allowed_col_names": allowed_col_names,
                    "column_sample_values": column_sample_values
                },
                "filtered_schema": (table_name, result["columns"]),
                "error": None
            }
    except Exception as e:
        return {
            "input_item": {
                "table_name": table_name, 
                "conversation": conversation,
                "allowed_col_names": allowed_col_names,
                "column_sample_values": column_sample_values
            },
            "filtered_schema": None,
            "error": str(e)
        }


async def link_schema(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> Dict[str, Dict[str, str]]:
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation is required")
    sample_values = state.get("sample_values", {})
    max_retries = 1
    # queue = []
    # for table in  database.get_usable_table_names():
    #     for col_group in database.get_column_groups(table):
    #         queue.append({
    #             "table_name": table,
    #             "allowed_col_names": col_group,
    #             "conversation": conversation
    #         })
    queue = [
        {
            "table_name": table_name, 
            "conversation": conversation,
            "column_sample_values": sample_values.get(table_name),
        } 
        for table_name in database.get_usable_table_names()
    ]
    successful_results = []
    for _ in range(max_retries):
        tasks = [_link_schema_one(chat_model=chat_model, database=database, **input_item) for input_item in queue]
        results = await tqdm_asyncio.gather(*tasks)
        successful_results.extend([
            res for res in results if res["error"] is None
        ])
        failed_items = [
            res["input_item"] for res in results if res["error"] is not None
        ]
        queue = failed_items
        if not queue:
            break
    
    linked_schema = [
        result["filtered_schema"] 
        for result in successful_results 
        if result["filtered_schema"]
    ]
    # Return per-table mapping: column_name -> datatype
    final_schema: Dict[str, Dict[str, str]] = {}
    for table_name, col_names in linked_schema:
        table_schema = final_schema.setdefault(table_name, {})
        for col_name in col_names:
            col_type = database.get_column_datatype(
                table_name,
                col_name,
                default="NULL",
            )
            if col_type != "NULL":
                table_schema[col_name] = col_type

    state["linked_schema"] = final_schema
    return state

In [None]:
SQL_GEN_TEMPLATE = """
### DATE INFORMATION:
Today is {date}

### Instructions:
You write SQL queries for a {dialect} database. The Support Team is querying the database to answer Customer questions, and your task is to assist by generating valid SQL queries strictly adhering to the database schema provided. Translate the latest customer message into a **single valid {dialect} query**, using the conversation history for context (e.g., resolving pronouns or follow-up filters).

**Table Schema**:
{table_infos}

### Guidelines:
1.  Schema Adherence: Use only tables, columns, and relationships explicitly listed in the provided schema. Do not make assumptions about missing or inferred columns/tables.
2.  {dialect}-Specific Syntax: Use only {dialect} syntax. Be aware that {dialect} has limited built-in date/time functions compared to other sql dialects.
3.  Conditions: Always include default conditions for filtering invalid data, e.g., `deleted_at IS NULL` and `status != 'cancelled'` if relevant. Ensure these conditions match the query's intent unless explicitly omitted in the customer's request.
4.  Reserved Keywords and Case Sensitivity: Escape reserved keywords or case-sensitive identifiers using double quotes (" "), e.g., "order".

If the customer's question is ambiguous or unclear, you must make your best reasonable guess based on the schema. Ensure the query is optimized, precise, and error-free. Output SQL should be written in a sql markdown code block. For example:
```sql
SELECT column1, column2 FROM table WHERE condition;
```

### Conversation:
{formatted_conversation}
""".strip()


_sql_markdown_re = re.compile(r"```sql\s*([\s\S]*?)\s*```", re.DOTALL)
def parse_sql_output(msg_content: str) -> str:
    try:
        match = _sql_markdown_re.findall(msg_content)
        if match:
            return match[-1].strip()
        else:
            raise ValueError("No SQL query found in the content")
    except Exception:
        return msg_content


def preprocess_for_sql_query_generation(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> List[AnyMessage]:
    linked_schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema not found in the input")
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation not found in the input")
    formatted_conversation = format_conversation(conversation)
    table_infos = "\n\n".join([
        database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=list(col_types.keys()),
            sample_count=5,
            column_sample_values=state.get("sample_values", {}).get(table_name),
        )
        for table_name, col_types in linked_schema.items()
    ])
    return [HumanMessage(SQL_GEN_TEMPLATE.format(
        date=get_today_date_en(),
        dialect=database.dialect,
        table_infos=table_infos,
        formatted_conversation=formatted_conversation,
    ))]


_sql_query_generation_chain_cache: Dict[tuple[int, int], Runnable] = {}
def get_sql_query_generation_chain(
    chat_model: BaseChatModel, database: SQLiteDatabase
) -> Runnable:
    chat_model_id, database_id = id(chat_model), id(database)
    if (chat_model_id, database_id) not in _sql_query_generation_chain_cache:
        _sql_query_generation_chain_cache[(chat_model_id, database_id)] = (
            RunnableLambda(partial(preprocess_for_sql_query_generation, database=database))
            | chat_model
            | StrOutputParser()
            | parse_sql_output
        )
    
    return _sql_query_generation_chain_cache[(chat_model_id, database_id)]


async def generate_sql_query(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    if not state.get("sql_queries"):
        state["sql_queries"] = []
    sql_gen_chain = get_sql_query_generation_chain(chat_model, database)
    sql_query = await sql_gen_chain.ainvoke(state)
    state["sql_queries"].append(sql_query)
    return state

In [None]:
def restrict_select_columns(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    """
    Replaces SELECT * with SELECT t.col1, t.col2 based on filtered_schema.
    """
    sql_queries: List[str] = state.get("sql_queries")
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    if not sql_query:
        raise ValueError("SQL query is required")
    schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not schema:
        raise ValueError("Schema is required")
    parsed = parse_one(sql_query, read=database.dialect.lower())
    
    # ---------------------------------------------------------
    # 1. Build Alias Map (Map Alias -> Real Table Name)
    # ---------------------------------------------------------
    # We need to know the order of tables to expand * correctly
    active_tables_ordered = [] 
    alias_map = {}

    def register_table(table_node):
        real_name = table_node.name
        alias = table_node.alias if table_node.alias else real_name
        
        # Only register if we haven't seen this alias yet
        if alias not in alias_map:
            alias_map[alias] = real_name
            active_tables_ordered.append(alias)

    # Scan FROM
    for from_node in parsed.find_all(exp.From):
        for table in from_node.find_all(exp.Table):
            register_table(table)

    # Scan JOINs
    for join_node in parsed.find_all(exp.Join):
        register_table(join_node.this)

    print(f"DEBUG: Active Tables: {alias_map}")

    # ---------------------------------------------------------
    # 2. Helper to Generate Column Expressions
    # ---------------------------------------------------------
    def get_columns_for_table(table_alias):
        real_name = alias_map.get(table_alias)
        if not real_name or real_name not in schema:
            return [] # Table not in our allowed schema, return nothing (or handle error)
        
        # Create sqlglot Column objects: alias.column_name
        cols = schema[real_name].keys()
        return [
            exp.Column(
                this=exp.Identifier(this=col, quoted=True),
                table=exp.Identifier(this=table_alias, quoted=True)
            ) for col in cols
        ]

    # ---------------------------------------------------------
    # 3. Rewrite SELECT Expressions
    # ---------------------------------------------------------
    # We only want to transform the main SELECT statement(s)
    for select_node in parsed.find_all(exp.Select):
        new_expressions = []
        
        for expression in select_node.expressions:
            # Case A: Naked * (SELECT *)
            if isinstance(expression, exp.Star) and not isinstance(expression, exp.Count):
                # Expand columns for ALL active tables in the query
                for alias in active_tables_ordered:
                    expanded_cols = get_columns_for_table(alias)
                    new_expressions.extend(expanded_cols)
            
            # Case B: Qualified * (SELECT t.*)
            elif isinstance(expression, exp.Column) and isinstance(expression.this, exp.Star):
                # Extract the table alias (e.g., 't' from 't.*')
                table_alias = expression.table
                expanded_cols = get_columns_for_table(table_alias)
                new_expressions.extend(expanded_cols)
                
            # Case C: Regular column or other expression (Keep it)
            else:
                new_expressions.append(expression)

        # Replace the old expressions with the new expanded list
        if new_expressions:
            select_node.set("expressions", new_expressions)

    restricted_sql_query = parsed.sql(dialect=database.dialect.lower())
    def normalize_sql_query(sql_query: str) -> str:
        return re.sub(r"\s+", " ", sql_query).strip().strip(";").lower()
    if normalize_sql_query(restricted_sql_query) != normalize_sql_query(sql_query):
        state["sql_queries"].append(restricted_sql_query)
    return state

In [None]:
QUERY_DATABASE_TOOL = json.dumps({
    'type': 'function',
    'function': {
        'name': 'query_database',
        'description': 'Thực hiện câu truy vấn {{dialect}} và trả về kết quả',
        'parameters': {
            'properties': {
                'sql_query': {
                    'description': 'Câu truy vấn {{dialect}}',
                    'type': 'string'
                }
            },
            'required': ['sql_query'],
            'type': 'object'
        }
    }
})


ANSWER_GEN_TEMPLATE = """
### THÔNG TIN NGÀY THÁNG:
Hôm nay là {date}

### NHIỆM VỤ:
Bạn là một trợ lý phân tích dữ liệu chuyên nghiệp. Nhiệm vụ của bạn là đưa ra câu trả lời bằng **Tiếng Việt** rõ ràng, chính xác và súc tích cho câu hỏi của người dùng, dựa hoàn toàn vào kết quả cơ sở dữ liệu (database results) được cung cấp.

**Lược đồ bảng (Table Schema)**:
{table_infos}

### CÁC NGUYÊN TẮC HƯỚNG DẪN:

1.  **Chính xác và Tuân thủ dữ liệu**:
    *   Câu trả lời phải dựa **TUYỆT ĐỐI** vào phần "Kết quả từ Database".
    *   Không được tự suy diễn hoặc đưa vào các kiến thức bên ngoài không có trong dữ liệu.
    *   Nếu kết quả trả về là rỗng (empty), hãy thông báo lịch sự rằng không tìm thấy dữ liệu phù hợp với yêu cầu.

2.  **Định dạng câu trả lời**:
    *   **Trả lời trực tiếp**: Đi thẳng vào vấn đề.
    *   **Danh sách/Bảng**: Nếu kết quả có nhiều dòng, hãy trình bày dưới dạng danh sách gạch đầu dòng hoặc bảng Markdown cho dễ đọc.
    *   **Số liệu tổng hợp**: Nếu kết quả là một con số duy nhất (tổng, đếm, trung bình), hãy viết thành một câu hoàn chỉnh (Ví dụ: "Tổng doanh thu là 50.000.000 VNĐ").

3.  **Trình bày dữ liệu (Formatting)**:
    *   **Con số**: Sử dụng dấu phân cách hàng nghìn (ví dụ: 1.000 hoặc 1,000 tùy theo ngữ cảnh, nhưng phải nhất quán).
    *   **Tiền tệ**: Thêm đơn vị tiền tệ phù hợp nếu có (ví dụ: VNĐ, $, USD).
    *   **Ngày tháng**: Chuyển đổi sang định dạng ngày tháng Tiếng Việt tự nhiên (ví dụ: "Ngày 01 tháng 01 năm 2024").

4.  **Ngữ cảnh và Thuật ngữ**:
    *   Sử dụng "Truy vấn SQL" để hiểu ngữ cảnh lọc dữ liệu (ví dụ: nếu SQL có `WHERE status = 'active'`, hãy nói rõ đây là các đơn hàng có trạng thái là "đang hoạt động").
    *   Sử dụng ngôn ngữ kinh doanh/đời thường. **Không** nhắc đến tên bảng kỹ thuật (như `tbl_users`, `col_price`) hoặc cú pháp code trong câu trả lời cuối cùng.

5.  **Văn phong**:
    *   Chuyên nghiệp, khách quan và hữu ích.
    *   Tránh các câu máy móc như mà hãy trả lời tự nhiên như một con người.

**Đầu ra**:
Chỉ xuất ra câu trả lời cuối cùng bằng Tiếng Việt (sử dụng Markdown).
""".strip()


class QueryDatabaseInput(BaseModel):
    sql_query: str = Field(description="Câu truy vấn SQLite")


@tool("query_database", args_schema=QueryDatabaseInput)
def query_database(sql_query: str) -> str:
    """Thực hiện câu truy vấn SQLite và trả về kết quả"""
    return ""


def preprocess_for_answer_generation(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> List[AnyMessage]:
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation not found in the input")
    linked_schema: Dict[str, Dict[str, str]] = state.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema not found in the input")
    db_output: Dict[str, Any] = state.get("db_output", {})
    sql_queries: List[str] = state.get("sql_queries", [])
    if not sql_queries:
        raise ValueError("sql_queries not found in the input")
    sql_query = sql_queries[-1]
    if db_output.get("error", "Error") is not None:
        raise ValueError("No valid database result found")
    db_result = db_output.get("result", [])
    
    table_infos = "\n\n".join([
        database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=list(col_types.keys()),
            sample_count=5,
            column_sample_values=state.get("sample_values", {}).get(table_name, None),
        )
        for table_name, col_types in linked_schema.items()
    ])
    tool_call = {
        "name": "query_database",
        "arguments": {"sql_query": sql_query}
    }
    system_message = SystemMessage(content=ANSWER_GEN_TEMPLATE.format(
        date=get_today_date_vi(),
        table_infos=table_infos,
    ))
    
    sql_conversation = [system_message] + conversation
    sql_conversation.append(AIMessage('<tool_call>\n' + json.dumps(tool_call, ensure_ascii=False) + '\n</tool_call>'))
    sql_conversation.append(HumanMessage(content=str(db_result)))
    return sql_conversation


_answer_generation_chain_cache: Dict[tuple[int, int], Runnable] = {}
def get_answer_generation_chain(chat_model: BaseChatModel, database: SQLiteDatabase) -> Runnable:
    chat_model_id, database_id = id(chat_model), id(database)
    openai_tool_schema = QUERY_DATABASE_TOOL.replace("{{dialect}}", database.dialect)
    if (chat_model_id, database_id) not in _answer_generation_chain_cache:
        _answer_generation_chain_cache[(chat_model_id, database_id)] = (
            RunnableLambda(partial(preprocess_for_answer_generation, database=database))
            | chat_model.bind(tools=[openai_tool_schema])
            | StrOutputParser()
        )
    
    return _answer_generation_chain_cache[(chat_model_id, database_id)]


async def generate_answer(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    answer_chain = get_answer_generation_chain(chat_model, database)
    answer = await answer_chain.ainvoke(state)
    state["final_answer"] = answer
    return state

In [None]:
async def sql_execution(
    state: SQLAssistantState,
    database: SQLiteDatabase,
) -> SQLAssistantState:
    sql_queries = state.get("sql_queries", [])
    if not sql_queries:
        raise ValueError("SQL queries are required")
    sql_query = sql_queries[-1]
    state["db_output"] = await database.run_no_throw(sql_query, include_columns=True)
    return state


def build_sql_assistant(
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> CompiledStateGraph:
    builder = StateGraph(SQLAssistantState)
    # Add nodes
    builder.add_node(
        "rewrite_message",
        partial(
            rewrite_message,
            chat_model=chat_model,
        )
    )
    builder.add_node(
        "get_sample_values",
        partial(
            get_sample_values,
            database=database
        )
    )
    builder.add_node(
        "link_schema",
        partial(
            link_schema,
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "gen_sql_query",
        partial(
            generate_sql_query, 
            chat_model=chat_model,
            database=database
        )
    )
    builder.add_node(
        "restrict_select_columns",
        partial(
            restrict_select_columns,
            database=database
        )
    )
    builder.add_node(
        "sql_execution", 
        partial(
            sql_execution,
            database=database
        )
    )
    # builder.add_node(
    #     "answer_generation", 
    #     partial(
    #         generate_answer,
    #         chat_model=chat_model,
    #         database=database
    #     )
    # )

    # Add edges
    builder.add_edge(START, "rewrite_message")
    builder.add_edge("rewrite_message", "get_sample_values")
    builder.add_edge("get_sample_values", "link_schema")
    builder.add_edge("link_schema", "gen_sql_query")
    builder.add_edge("gen_sql_query", "restrict_select_columns")
    builder.add_edge("restrict_select_columns", "sql_execution")
    # builder.add_edge("sql_execution", "answer_generation")
    # builder.add_edge("answer_generation", END)
    builder.add_edge("sql_execution", END)

    return builder.compile()


In [None]:
sql_assistant = build_sql_assistant(llm, db)

In [None]:
state = await sql_assistant.ainvoke({"conversation": [
    # HumanMessage("hello có văn phòng cho thuê dưới 10tr ko, văn phòng nhé"),
    # AIMessage("Không tìm thấy văn phòng cho thuê có giá dưới 10 triệu đồng/tháng trong cơ sở dữ liệu."),
    # HumanMessage("thế dưới 30tr thì sao"),
    HumanMessage("xin thông tin các văn phòng giá dưới 6tr 1 tháng ở HCM"),
]})

 50%|█████     | 1/2 [00:04<00:04,  4.72s/it]

The latest customer query is asking for information about office spaces in HCM with a price below 6 million VND per month. The table 'BĐS Cho thuê 500' contains data about rental properties, including the 'Loại BĐS' column which can identify office spaces and the 'Giá thuê (triệu/tháng)' column which indicates the monthly rent. Additionally, the 'Tỉnh/TP' column can be used to filter for HCM. These columns are semantically related to the query.


100%|██████████| 2/2 [00:04<00:00,  2.50s/it]

The latest customer query is asking for information about office spaces in HCM with a price below 6 million VND per month. The table 'BĐS Bán 500' represents real estate properties for sale, not rental or office spaces. The table contains columns related to property type (e.g., 'Loại BĐS'), location (e.g., 'Quận/Huyện', 'Tỉnh/TP'), and price (e.g., 'Giá (tỷ VNĐ)', 'Giá/m²'). However, none of the columns directly relate to rental properties, office spaces, or monthly pricing. Therefore, the table is not relevant to the customer's query.





DEBUG: Active Tables: {'BĐS Cho thuê 500': 'BĐS Cho thuê 500'}


In [None]:
input_req = {
    "current_message": "Có dự án nào tương tự nhưng giá tốt hơn không?",
    "conversation_history": [
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin này. Anh/chị vui lòng liên hệ tổng đài BDS Guru 1900 1881 để được hỗ trợ chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Có hỗ trợ vay ngân hàng không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về chính sách ưu đãi/chiết khấu. Anh/chị vui lòng liên hệ tổng đài BDS Guru 1900 1881 để được hỗ trợ chi tiết nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Cần đóng bao nhiêu tiền ban đầu?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về mức phí đặt cọc hoặc số tiền phải trả ban đầu. Anh/chị vui lòng liên hệ tổng đài BDS Guru qua số 1900 1881 để được tư vấn chi tiết nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Dự án có sổ hồng/sổ đỏ không?",
            "role": "user"
        },
        {
            "content": "1. Một số dự án có giấy tờ pháp lý như sau:\n      + *Sổ đỏ chính chủ*\n      + *Sổ hồng riêng*\n   - Tuy nhiên, nhiều tài sản không thuộc bất kỳ dự án cụ thể nào (*\"Không thuộc dự án\"* ).\n   - Để biết chi tiết về một dự án cụ thể, anh/chị vui lòng cung cấp tên dự án hoặc liên hệ tổng đài BDS Guru 1900 1881 ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Dự án có những tiện ích nội khu gì?",
            "role": "user"
        },
        {
            "content": "Dưới đây là các tiện ích nội khu của các dự án được ghi nhận:  \n- The Garden Hill\n      + Hồ bơi\n      + Tầng hầm\n      + Gym\n- Vinhomes Central Park\n      + Sân tennis\n      + Phòng sinh hoạt cộng đồng\n      + Sân golf\n      + Hồ bơi\n- Gateway Thảo Điền\n      + Công viên\n      + Sân thượng\n      + Thang máy\n      + Camera an ninh\n      + An ninh 24/7\n- Saigon Pearl\n      + Sân vườn\n      + Trường học\n      + Tầng hầm\n      + Thang máy\n      + Sân thượng\n      + Khu vui chơi trẻ em\n      + Camera an ninh\n      + Ban công\n- The Gold View\n      + Khu BBQ\n      + Bãi đỗ xe\n      + Sân golf\n      + Sân vườn\n      + Sân tennis\n      + Gym\n      + Trường học\n   Nếu anh/chị cần thông tin chi tiết hơn về một dự án cụ thể, vui lòng cung cấp tên dự án để em hỗ trợ thêm nhé! 😊",
            "role": "assistant"
        },
        {
            "content": "Khu này có ngập nước / kẹt xe không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa có thông tin cụ thể về tình trạng ngập nước hay ùn tắc giao thông ở khu vực này. Anh/chị vui lòng gọi trực tiếp tổng đài BDS Guru 1900 1881 để được hỗ trợ đầy đủ và chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Có thể xem nhà/xem căn mẫu không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về việc xem nhà/xem căn mẫu. Anh/chị vui lòng liên hệ tổng đài BDS Guru qua số 1900 1881 để được hướng dẫn chi tiết và chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Phí quản lý hàng tháng bao nhiêu?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về phí quản lý hàng tháng. Anh/chị vui lòng liên hệ tổng đài BDS Guru qua số 1900 1881 để được tư vấn chi tiết và chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Bên bạn có chBên bạn có chăm sóc sau bán không?ăm sóc sau bán không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về dịch vụ chăm sóc sau bán. Anh/chị vui lòng liên hệ hotline của BDS Guru 1900 1881 để được hỗ trợ chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Nên đầu tư ngắn hạn hay dài hạn?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về chiến lược đầu tư ngắn hạn hay dài hạn. Đây là lĩnh vực chuyên sâu và cần sự đánh giá riêng biệt từ chuyên gia tài chính. Anh/chị vui lòng liên hệ tổng đài BDS Guru 1900 1881 để được tư vấn chi tiết và chính xác nhất ạ! 😊",
            "role": "assistant"
        }
    ]
}

conversation = [
    HumanMessage(msg["content"]) if msg["role"] == "user" else AIMessage(msg["content"])
    for msg in input_req["conversation"]
]
# conversation.append(HumanMessage(input_req["current_message"]))

In [None]:
input_req = {
    "conversation": [
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin này. Anh/chị vui lòng liên hệ tổng đài BDS Guru 1900 1881 để được hỗ trợ chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Có hỗ trợ vay ngân hàng không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về chính sách ưu đãi/chiết khấu. Anh/chị vui lòng liên hệ tổng đài BDS Guru 1900 1881 để được hỗ trợ chi tiết nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Cần đóng bao nhiêu tiền ban đầu?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về mức phí đặt cọc hoặc số tiền phải trả ban đầu. Anh/chị vui lòng liên hệ tổng đài BDS Guru qua số 1900 1881 để được tư vấn chi tiết nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Dự án có sổ hồng/sổ đỏ không?",
            "role": "user"
        },
        {
            "content": "1. Một số dự án có giấy tờ pháp lý như sau:\n      + *Sổ đỏ chính chủ*\n      + *Sổ hồng riêng*\n   - Tuy nhiên, nhiều tài sản không thuộc bất kỳ dự án cụ thể nào (*\"Không thuộc dự án\"* ).\n   - Để biết chi tiết về một dự án cụ thể, anh/chị vui lòng cung cấp tên dự án hoặc liên hệ tổng đài BDS Guru 1900 1881 ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Dự án có những tiện ích nội khu gì?",
            "role": "user"
        },
        {
            "content": "Dưới đây là các tiện ích nội khu của các dự án được ghi nhận:  \n- The Garden Hill\n      + Hồ bơi\n      + Tầng hầm\n      + Gym\n- Vinhomes Central Park\n      + Sân tennis\n      + Phòng sinh hoạt cộng đồng\n      + Sân golf\n      + Hồ bơi\n- Gateway Thảo Điền\n      + Công viên\n      + Sân thượng\n      + Thang máy\n      + Camera an ninh\n      + An ninh 24/7\n- Saigon Pearl\n      + Sân vườn\n      + Trường học\n      + Tầng hầm\n      + Thang máy\n      + Sân thượng\n      + Khu vui chơi trẻ em\n      + Camera an ninh\n      + Ban công\n- The Gold View\n      + Khu BBQ\n      + Bãi đỗ xe\n      + Sân golf\n      + Sân vườn\n      + Sân tennis\n      + Gym\n      + Trường học\n   Nếu anh/chị cần thông tin chi tiết hơn về một dự án cụ thể, vui lòng cung cấp tên dự án để em hỗ trợ thêm nhé! 😊",
            "role": "assistant"
        },
        {
            "content": "Khu này có ngập nước / kẹt xe không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa có thông tin cụ thể về tình trạng ngập nước hay ùn tắc giao thông ở khu vực này. Anh/chị vui lòng gọi trực tiếp tổng đài BDS Guru 1900 1881 để được hỗ trợ đầy đủ và chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Có thể xem nhà/xem căn mẫu không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về việc xem nhà/xem căn mẫu. Anh/chị vui lòng liên hệ tổng đài BDS Guru qua số 1900 1881 để được hướng dẫn chi tiết và chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Phí quản lý hàng tháng bao nhiêu?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về phí quản lý hàng tháng. Anh/chị vui lòng liên hệ tổng đài BDS Guru qua số 1900 1881 để được tư vấn chi tiết và chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Bên bạn có chBên bạn có chăm sóc sau bán không?ăm sóc sau bán không?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về dịch vụ chăm sóc sau bán. Anh/chị vui lòng liên hệ hotline của BDS Guru 1900 1881 để được hỗ trợ chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Nên đầu tư ngắn hạn hay dài hạn?",
            "role": "user"
        },
        {
            "content": "Dạ, em xin lỗi anh/chị hiện tại em chưa hỗ trợ thông tin về chiến lược đầu tư ngắn hạn hay dài hạn. Đây là lĩnh vực chuyên sâu và cần sự đánh giá riêng biệt từ chuyên gia tài chính. Anh/chị vui lòng liên hệ tổng đài BDS Guru 1900 1881 để được tư vấn chi tiết và chính xác nhất ạ! 😊",
            "role": "assistant"
        },
        {
            "content": "Có dự án nào tương tự nhưng giá tốt hơn không?",
            "role": "user"
        }
    ],
}

conversation = [
    HumanMessage(msg["content"]) if msg["role"] == "user" else AIMessage(msg["content"])
    for msg in input_req["conversation"]
]
# conversation.append(HumanMessage(input_req["current_message"]))

In [None]:
# print(MESSAGE_REWRITING_TEMPLATE.format(
#     formatted_conversation=format_conversation(conversation)
# ))

rewritten_message = "Tôi muốn tìm các dự án tương tự như The Garden Hill, Vinhomes Central Park, Gateway Thảo Điền, Saigon Pearl, The Gold View nhưng có giá tốt hơn."

In [None]:
sample_values = await db.search_similar_values_from_message(
    rewritten_message,
    k=5
)

In [None]:
table_infos = "\n\n".join([
    db.get_table_info_no_throw(
        table_name,
        get_col_comments=True,
        sample_count=5,
        column_sample_values=sample_values.get(table_name, None),
    ) if table_name != "tables_metadata" else ""
    for table_name in db.get_usable_table_names()
]).strip()

print(SQL_GEN_TEMPLATE.format(
    date=get_today_date_en(),
    dialect=db.dialect,
    table_infos=table_infos,
    formatted_conversation=format_conversation(conversation),
))

### DATE INFORMATION:
Today is Friday, December 26th, 2025

### Instructions:
You write SQL queries for a SQLite database. The Support Team is querying the database to answer Customer questions, and your task is to assist by generating valid SQL queries strictly adhering to the database schema provided. Translate the latest customer message into a **single valid SQLite query**, using the conversation history for context (e.g., resolving pronouns or follow-up filters).

**Table Schema**:
CREATE TABLE "BĐS Bán 500" (
	"Bãi đỗ xe" TEXT	/* Thông tin về khả năng đỗ xe (bao gồm số lượng và loại phương tiện). Một vài (không phải tất cả) giá trị trong cột "Bãi đỗ xe": "Có", "1 ô tô", "Nhiều xe máy", "2 ô tô", "Không",... */, 
	"Chiều dài (m)" REAL	/* Độ dài chiều dài của bất động sản tính theo mét. Một vài (không phải tất cả) giá trị trong cột "Chiều dài (m)": "3.9", "8.6", "16.4", "7.7", "32.6",... */, 
	"Chiều ngang (m)" REAL	/* Độ dài chiều ngang của bất động sản tính theo mét. Một vài (khô

In [None]:
tmp = llm_reasoning.invoke([HumanMessage(
    SQL_GEN_TEMPLATE.format(
        date=get_today_date_en(),
        dialect=db.dialect,
        table_infos=table_infos,
        formatted_conversation=format_conversation([HumanMessage("Tôi muốn tìm các dự án tương tự như The Garden Hill, Vinhomes Central Park, Gateway Thảo Điền, Saigon Pearl, The Gold View nhưng có giá tốt hơn.")]),
    )
)])

In [None]:
print(tmp.content)

<think>
Okay, let's tackle this customer query. The user wants projects similar to The Garden Hill, Vinhomes Central Park, Gateway Thảo Điền, Saigon Pearl, The Gold View but with better prices. 

First, I need to figure out which table to use. The customer mentioned "dự án" (projects) in the context of real estate, so looking at the schema, the "BĐS Bán 500" table has a "Dự án" column. The "BĐS Cho thuê 500" table also has a "Dự án" column, but since the user is asking about projects in general, not specifically for rent, I'll focus on the "BĐS Bán 500" table.

Next, the user wants projects similar to the listed ones. So I need to filter rows where "Dự án" is in that list. Then, they want "giá tốt hơn" (better price). The "Giá (tỷ VNĐ)" column in "BĐS Bán 500" is the price. But how to define "better price"? Probably lower than the listed projects. However, the user didn't specify exact values, so maybe we can assume that "giá tốt hơn" means lower than the average or specific prices of 

In [None]:
tmp = await db.run_no_throw("""
SELECT
  "Dự án" AS project,
  COUNT(*) AS listings,
  ROUND(AVG("Giá (tỷ VNĐ)"), 2) AS avg_price_ty,
  MAX("Tiện ích" LIKE '%Hồ bơi%') AS has_ho_boi,
  MAX("Tiện ích" LIKE '%Tầng hầm%') AS has_tang_ham,
  MAX("Tiện ích" LIKE '%Gym%') AS has_gym,
  GROUP_CONCAT(DISTINCT "Tiện ích") AS sample_tien_ich
FROM "BĐS Bán 500"
WHERE "Dự án" IS NOT NULL
  AND "Dự án" <> 'The Garden Hill'
  AND (
    "Tiện ích" LIKE '%Hồ bơi%' OR
    "Tiện ích" LIKE '%Tầng hầm%' OR
    "Tiện ích" LIKE '%Gym%'
  )
GROUP BY "Dự án"
HAVING AVG("Giá (tỷ VNĐ)") < (
  SELECT AVG("Giá (tỷ VNĐ)")
  FROM "BĐS Bán 500"
  WHERE "Dự án" = 'The Garden Hill' AND "Giá (tỷ VNĐ)" IS NOT NULL
)
ORDER BY avg_price_ty ASC
LIMIT 10;

""", include_columns=True)
for row in tmp["result"]:
    print(row)


{'project': 'Vista Verde', 'listings': 1, 'avg_price_ty': 2.35, 'has_ho_boi': 0, 'has_tang_ham': 1, 'has_gym': 1, 'sample_tien_ich': 'Trường học, Sân tennis, Khu vui chơi trẻ em, Khu BBQ, Tầng hầm, Gym, Ban công, Sân vườn'}
{'project': 'Diamond Island', 'listings': 1, 'avg_price_ty': 7.88, 'has_ho_boi': 1, 'has_tang_ham': 0, 'has_gym': 0, 'sample_tien_ich': 'Ban công, Trường học, Hồ bơi'}
{'project': 'Vinhomes Ocean Park', 'listings': 2, 'avg_price_ty': 10.26, 'has_ho_boi': 0, 'has_tang_ham': 0, 'has_gym': 1, 'sample_tien_ich': 'Khu BBQ, Gym, Công viên, Siêu thị, Thang máy, Bãi đỗ xe, Sân vườn,An ninh 24/7, Ban công, Gym, Sân vườn, Trường học'}
{'project': 'Royal City', 'listings': 1, 'avg_price_ty': 10.37, 'has_ho_boi': 0, 'has_tang_ham': 0, 'has_gym': 1, 'sample_tien_ich': 'Gym, An ninh 24/7, Bệnh viện, Sân golf, Bãi đỗ xe, Sân thượng'}
{'project': 'Lạc Hồng Lotus', 'listings': 2, 'avg_price_ty': 10.77, 'has_ho_boi': 1, 'has_tang_ham': 1, 'has_gym': 1, 'sample_tien_ich': 'Khu BBQ, Gy

## Full Langgraph workflow - Multi-turn V2

In [13]:
class SQLAssistantState(TypedDict):
    conversation: List[AnyMessage]
    linked_schema: Dict[str, Dict[str, str]]
    relevant_rows: List[Dict[str, Any]]
    db_output: Dict[str, Any]


def format_conversation(conversation: List[AnyMessage]) -> str:
    formatted_conversation = ""
    end_index = len(conversation) - 1 
    for ind in range(len(conversation) - 1, -1, -1):
        if conversation[ind].type == "human":
            end_index = ind
            break
    for message in conversation[:end_index]:
        if message.type == "human":
            formatted_conversation += f"Customer: {message.content}\n"
        elif message.type == "ai":
            formatted_conversation += f"Support Team: {message.content}\n"
    
    formatted_conversation += f"\nLatest Customer Message: {conversation[end_index].content}"
    return formatted_conversation

In [14]:
SCHEMA_LINKING_TEMPLATE = """
You are an expert in SQL schema linking. 
Given a {dialect} table schema (DDL) and a conversation history, determine if the table is relevant to the latest customer query.

Your task:
1. Analyze the table schema and the conversation history. Focus on the latest customer message, using previous messages for context (e.g., to resolve references). Evaluate the Table Name and Table Comment to see if the general topic matches the query. Answer "Y" (Yes) or "N" (No) regarding the table's relevance to the latest query.
2. If the answer is "Y", list ALL columns that are semantically related. 
   - You do NOT need to identify the exact columns for the final SQL query. 
   - You MUST include all columns that provide context, identifiers, or potential join keys related to the entities in the query.

Output must be a valid JSON object inside a json code block using this format:
```json
{{
    "reasoning": "Reasoning of the decision",
    "is_related": "Y or N",
    "columns": ["column name 1", "column name 2"]
}}
```

Table Schema (DDL):
{table_info}

Conversation History:
{formatted_conversation}
""".strip()


# Cache for schema linking chains keyed by model instance ID
_schema_linking_chain_cache: Dict[int, Runnable] = {}
def get_schema_linking_chain(chat_model: BaseChatModel) -> Runnable:
    # Use model instance ID as cache key (since ChatOpenAI objects aren't hashable)
    chat_model_id = id(chat_model)
    
    if chat_model_id not in _schema_linking_chain_cache:
        _schema_linking_chain_cache[chat_model_id] = (
            ChatPromptTemplate([("human", SCHEMA_LINKING_TEMPLATE)])
            | chat_model
            | JsonOutputParser()
        )
    
    return _schema_linking_chain_cache[chat_model_id]


async def _link_schema_one(
    conversation: List[AnyMessage],
    table_name: str,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
    allowed_col_names: Optional[List[str]] = None,
) -> Dict[str, Any]:
    try:
        column_names = database.get_column_names(table_name)
        if isinstance(column_names, list) and len(column_names) <= 5:
            return {
                "input_item": {
                    "table_name": table_name,
                    "conversation": conversation,
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": (table_name, column_names),
                "error": None
            }

        table_info = database.get_table_info_no_throw(
            table_name,
            get_col_comments=True,
            allowed_col_names=allowed_col_names,
            sample_count=3
        )
        result = await get_schema_linking_chain(chat_model).ainvoke({
            "table_info": table_info, 
            "formatted_conversation": format_conversation(conversation), 
            "dialect": database.dialect
        })
        
        if "is_related" not in result or result["is_related"] not in ["Y", "N"]:
            raise ValueError("Invalid response from schema linking chain")
        if result["is_related"] == "Y" and not result.get("columns"):
            result["columns"] = ["ROWID"]

        if result["is_related"] == "N":
            return {
                "input_item": {
                    "table_name": table_name,
                    "conversation": conversation,
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": None,
                "error": None
            }
        else:
            return {
                "input_item": {
                    "table_name": table_name, 
                    "conversation": conversation, 
                    "allowed_col_names": allowed_col_names
                },
                "filtered_schema": (table_name, result["columns"]),
                "error": None
            }
    except Exception as e:
        return {
            "input_item": {
                "table_name": table_name, 
                "conversation": conversation, 
                "allowed_col_names": allowed_col_names
            },
            "filtered_schema": None,
            "error": str(e)
        }


async def link_schema(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> Dict[str, Dict[str, str]]:
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation is required")
    max_retries = 1
    # queue = []
    # for table in  database.get_usable_table_names():
    #     for col_group in database.get_column_groups(table):
    #         queue.append({
    #             "table_name": table,
    #             "allowed_col_names": col_group,
    #             "conversation": conversation
    #         })
    queue = [
        {"table_name": table_name, "conversation": conversation} 
        for table_name in database.get_usable_table_names()
    ]
    successful_results = []
    for _ in range(max_retries):
        tasks = [_link_schema_one(chat_model=chat_model, database=database, **input_item) for input_item in queue]
        results = await asyncio.gather(*tasks)
        successful_results.extend([
            res for res in results if res["error"] is None
        ])
        failed_items = [
            res["input_item"] for res in results if res["error"] is not None
        ]
        queue = failed_items
        if not queue:
            break
    
    linked_schema = [
        result["filtered_schema"] 
        for result in successful_results 
        if result["filtered_schema"]
    ]
    # Return per-table mapping: column_name -> datatype
    final_schema: Dict[str, Dict[str, str]] = {}
    for table_name, col_names in linked_schema:
        table_schema = final_schema.setdefault(table_name, {})
        for col_name in col_names:
            col_type = database.get_column_datatype(
                table_name,
                col_name,
                default="NULL",
            )
            if col_type != "NULL":
                table_schema[col_name] = col_type

    state["linked_schema"] = final_schema
    return state

In [None]:
RERANK_ROWS_TEMPLATE = """
### Role
You are an expert Database Row Filter and Re-ranker. Your task is to analyze a conversation and a list of candidate rows retrieved from the table `{table_name}`. You must identify which rows strictly satisfy the user's intent and constraints.

### Table Info
- **Table Name**: {table_name}
- **Description**: {table_description}

### Rules
1. Analyze Intent: Read the **Conversation** to understand what the "Customer" is looking for. Pay attention to the **LATEST** message but use previous messages to resolve context (filters, exact values, numerical ranges, categories).
2. Verify Rows: The "Candidate Rows" section contains raw data retrieved via vector search. Many of these are **NOISE**. You must verify the data in each row against the user's constraints.
3. Strict ID Extraction: You must only return the `rowid` of rows that are relevant. If a row is ambiguous or does not match, ignore it. 
4. Output Format: Output a single JSON object inside a json markdown code block.
```json
{{
    "reasoning": "Reasoning of the decision",
    "relevant_row_ids": [ row_id_1, row_id_2, ... ] # list of row ids as integers
}}
```
If no rows match, return an empty list `[]`.

### Conversation
{formatted_conversation}

### Candidate Rows
{formatted_rows}
""".strip()


_rows_reranking_chain_cache: Dict[int, Runnable] = {}
def get_rows_reranking_chain(chat_model: BaseChatModel) -> Runnable:
    # Use model instance ID as cache key (since ChatOpenAI objects aren't hashable)
    chat_model_id = id(chat_model)
    
    if chat_model_id not in _rows_reranking_chain_cache:
        _rows_reranking_chain_cache[chat_model_id] = (
            ChatPromptTemplate([("human", RERANK_ROWS_TEMPLATE)])
            | chat_model
            | JsonOutputParser()
        )
    
    return _rows_reranking_chain_cache[chat_model_id]


def iter_batch(_iterable: Iterable, batch_size: int):
    for i in range(0, len(_iterable), batch_size):
        yield _iterable[i:i+batch_size]


async def _rerank_one(
    conversation: List[AnyMessage],
    table_name: str,
    row_ids: List[int],
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> Dict[str, Any]:
    try:
        
        rows = await database.run_no_throw(
            f"SELECT rowid, * FROM \"{table_name}\" WHERE rowid IN ({', '.join(map(str, row_ids))});",
            include_columns=True
        )
        formatted_rows = "\n".join(json.dumps(row, ensure_ascii=False) for row in rows["result"])
        formatted_conversation = format_conversation(conversation)
        table_overview = database.get_table_overview()
        table_description = ""
        for table in table_overview:
            if table["name"] == table_name:
                table_description = table.get("summary", "")
                break

        result = await get_rows_reranking_chain(chat_model).ainvoke({
            "table_name": table_name, 
            "table_description": table_description, 
            "formatted_conversation": formatted_conversation, 
            "formatted_rows": formatted_rows
        })

        if "relevant_row_ids" not in result:
            raise ValueError("Invalid response from rows re-ranking chain")
        
        relevant_row_ids = list(set(result["relevant_row_ids"]))
        if len(relevant_row_ids) > len(row_ids):
            raise ValueError("More relevant rows than candidate rows")
        for rowid in relevant_row_ids:
            if rowid not in row_ids:
                raise ValueError(f"Row ID {rowid} not in candidate rows")

        return {
            "input_item": {
                "table_name": table_name,
                "conversation": conversation,
                "row_ids": row_ids,
            },
            "filtered_row_ids": relevant_row_ids,
            "error": None
        }
    except Exception as e:
        return {
            "input_item": {
                "table_name": table_name,
                "conversation": conversation,
                "row_ids": row_ids,
            },
            "filtered_row_ids": None,
            "error": str(e)
        }


async def rerank_rows(
    state: SQLAssistantState,
    chat_model: BaseChatModel,
    database: SQLiteDatabase,
) -> Dict[str, Dict[str, str]]:
    conversation = state.get("conversation")
    if not conversation:
        raise ValueError("conversation is required")
    linked_schema = state.get("linked_schema")
    if not linked_schema:
        raise ValueError("linked_schema is required")
    filtered_tables = list(linked_schema.keys())
    max_retries = 1
    queue = []
    for table_name in filtered_tables:
        all_row_ids = await database.run_no_throw(f"select rowid from \"{table_name}\"")
        for batch_row_ids in iter_batch(all_row_ids["result"], 3):
            queue.append({
                "table_name": table_name,
                "conversation": conversation,
                "row_ids": [item[0] for item in batch_row_ids],
            })
    successful_results = []
    for _ in range(max_retries):
        tasks = [_rerank_one(chat_model=chat_model, database=database, **input_item) for input_item in queue]
        results = await tqdm_asyncio.gather(*tasks)
        successful_results.extend([
            res for res in results if res["error"] is None
        ])
        failed_items = [
            res["input_item"] for res in results if res["error"] is not None
        ]
        queue = failed_items
        if not queue:
            break
    
    filtered_row_ids = []
    for result in successful_results:
        if result.get("filtered_row_ids"):
            filtered_row_ids.extend(result["filtered_row_ids"])
    filtered_row_ids = list(set(filtered_row_ids))
    filtered_rows = await database.run_no_throw(
        f"SELECT rowid, * FROM \"{table_name}\" WHERE rowid IN ({', '.join(map(str, filtered_row_ids))});",
        include_columns=True
    )

    state["relevant_rows"] = filtered_rows["result"]
    return state

In [27]:
state = await rerank_rows({
    "conversation": [
        HumanMessage("Tôi muốn tìm kiếm bất động sản có diện tích 155m2")
    ],
    "linked_schema": {
        "BĐS Bán 500": ["Diện tích (m²)", "Giá (tỷ VNĐ)", "Quận/Huyện", "Tỉnh/TP", "Hướng nhà", "Nội thất", "Địa chỉ"]
    }
}, llm, db)

100%|██████████| 167/167 [00:41<00:00,  4.00it/s]


In [28]:
state

{'conversation': [HumanMessage(content='Tôi muốn tìm kiếm bất động sản có diện tích 155m2', additional_kwargs={}, response_metadata={})],
 'linked_schema': {'BĐS Bán 500': ['Diện tích (m²)',
   'Giá (tỷ VNĐ)',
   'Quận/Huyện',
   'Tỉnh/TP',
   'Hướng nhà',
   'Nội thất',
   'Địa chỉ']},
 'relevant_rows': [{'rowid': 45,
   'ID': 'BDS-S00045',
   'Dự án': 'Không thuộc dự án',
   'Giá (tỷ VNĐ)': 24.47,
   'Giá/m²': '158.9 triệu',
   'Chiều ngang (m)': 6.0,
   'Chiều dài (m)': 14.5,
   'Giá/m²_so_sanh': 158.9,
   'Giá/m²_don_vi': 'triệu',
   'Số phòng ngủ': 4,
   'Ngày hết hạn': '19/12/2025',
   'Số phòng tắm': 1,
   'Số tầng': 2,
   'Hướng nhà': 'Nam',
   'Hướng ban công': 'N/A',
   'Nội thất': 'Nội thất đầy đủ',
   'Pháp lý': 'Giấy tờ hợp lệ',
   'Tình trạng': 'Đang hoàn thiện',
   'Năm xây dựng': '2022',
   'Chủ đầu tư': 'Tư nhân',
   'Bãi đỗ xe': '2 ô tô',
   'Email': 'contact45@batdongsan.vn',
   'Hoa hồng': '2%',
   'Ghi chú': 'Chính chủ',
   'Diện tích (m²)': 154,
   'Loại BĐS': 'Vi

In [30]:
tmp = await db.run_no_throw("""
SELECT rowid, *
FROM "BĐS Cho thuê 500"
WHERE "Tiện ích lân cận" LIKE '%Trường học%'
""", include_columns=True)
for row in tmp["result"]:
    print(json.dumps(row, ensure_ascii=False))

{"rowid": 1, "ID": "BDS-R00001", "Dự án": "Không thuộc dự án", "Giá thuê (triệu/tháng)": 34.2, "Giá/m²/tháng": "388,636 đ", "Giá/m²/tháng_số": 388636.0, "Số phòng ngủ": 4, "Ngày có thể chuyển vào": "19/12/2025", "Số phòng tắm": 1, "Số tầng": 1, "Hướng": "Nam", "Nội thất": "Nội thất đầy đủ", "Phường/Xã": "Phường 6", "Ngày có thể chuyển vào_ngày": 19, "Ngày có thể chuyển vào_tháng": 12, "Ngày có thể chuyển vào_năm": 2025, "Tình trạng": "Sắp trống", "Cho phép nuôi thú cưng": "Thỏa thuận", "Bếp": "Khu bếp đầy đủ", "Điều hòa": "2 máy", "Nóng lạnh": "Có", "Bãi đỗ xe": "Không có", "Thời hạn thuê tối thiểu": "Tối thiểu 1 năm", "Phí dịch vụ": "Không", "Tiền điện nước": "Theo đồng hồ", "Internet": "200k/tháng", "Người liên hệ": "Anh Quang", "SĐT liên hệ": "0905321402", "Zalo": "Có", "Email": "rent1@batdongsan.vn", "Ghi chú": "Gần chợ", "Diện tích (m²)": 88, "Loại BĐS": "Shophouse", "Quận/Huyện": "Quận 3", "Tiêu đề": "Cho thuê Shophouse 88m² tại Quận 3, Vũng Tàu", "Tỉnh/TP": "Vũng Tàu", "Địa chỉ"

In [49]:
tmp = await db.run_no_throw("""
SELECT rowid, *
FROM "BĐS Cho thuê 500"
WHERE rowid in (70,71,72)
""", include_columns=True)
for row in tmp["result"]:
    print(json.dumps(row, ensure_ascii=False))

{"rowid": 70, "ID": "BDS-R00070", "Dự án": "Không thuộc dự án", "Giá thuê (triệu/tháng)": 7.1, "Giá/m²/tháng": "473,333 đ", "Giá/m²/tháng_số": 473333.0, "Số phòng ngủ": 0, "Ngày có thể chuyển vào": "28/10/2025", "Số phòng tắm": 2, "Số tầng": 1, "Hướng": "Nam", "Nội thất": "Một phần nội thất", "Phường/Xã": "Phường 6", "Ngày có thể chuyển vào_ngày": 28, "Ngày có thể chuyển vào_tháng": 10, "Ngày có thể chuyển vào_năm": 2025, "Tình trạng": "Đang có người thuê", "Cho phép nuôi thú cưng": "Có", "Bếp": "Có bếp riêng", "Điều hòa": "2 máy", "Nóng lạnh": "Không", "Bãi đỗ xe": "Miễn phí", "Thời hạn thuê tối thiểu": "1 năm", "Phí dịch vụ": "Không", "Tiền điện nước": "Điện 4k, nước 100k/người", "Internet": "Tự lắp", "Người liên hệ": "Chị Quang", "SĐT liên hệ": "0917296779", "Zalo": "Có", "Email": "rent70@batdongsan.vn", "Ghi chú": "Giá cả thương lượng", "Diện tích (m²)": 15, "Loại BĐS": "Phòng trọ", "Quận/Huyện": "Quận 2", "Tiêu đề": "Cho thuê Phòng trọ 15m² tại Quận 2, Cần Thơ", "Tỉnh/TP": "Cần Th

In [50]:
print(db.get_table_overview()[2]["summary"])

Bảng này chứa thông tin về các bất động sản cho thuê tại Việt Nam, bao gồm giá thuê (triệu/tháng), diện tích (m²), loại hình BĐS (Studio, Phòng trọ, Mặt bằng kinh doanh, Shophouse), địa chỉ chi tiết (đường, phường, quận, tỉnh), khoảng cách đến trung tâm, điều kiện thanh toán (tiền cọc, thời hạn thuê tối thiểu), tiện ích kèm theo (điều hòa, bãi đỗ xe, an ninh), và thông tin liên hệ. Các thuộc tính quan trọng giúp xác định đối tượng thuê, giá cả, vị trí, và điều kiện hợp đồng.


In [53]:
llm_kwargs = {
    "temperature": 0.6,
    "top_p": 0.95,
    "presence_penalty": 1,
    "extra_body": {
        'chat_template_kwargs': {'enable_thinking': True},
        "top_k": 20,
        "mip_p": 0,
    }
}
print(json.dumps(llm_kwargs, ensure_ascii=False))

{"temperature": 0.6, "top_p": 0.95, "presence_penalty": 1, "extra_body": {"chat_template_kwargs": {"enable_thinking": true}, "top_k": 20, "mip_p": 0}}


In [51]:
tmp = """
### Role
You are an expert Database Row Filter and Re-ranker. Your task is to analyze a conversation and a list of candidate rows retrieved from the table `BĐS Bán 500`. You must identify which rows strictly satisfy the user's intent and constraints.

### Table Info
- **Table Name**: BĐS Cho thuê 500
- **Description**: Bảng này chứa thông tin về các bất động sản cho thuê tại Việt Nam, bao gồm giá thuê (triệu/tháng), diện tích (m²), loại hình BĐS (Studio, Phòng trọ, Mặt bằng kinh doanh, Shophouse), địa chỉ chi tiết (đường, phường, quận, tỉnh), khoảng cách đến trung tâm, điều kiện thanh toán (tiền cọc, thời hạn thuê tối thiểu), tiện ích kèm theo (điều hòa, bãi đỗ xe, an ninh), và thông tin liên hệ. Các thuộc tính quan trọng giúp xác định đối tượng thuê, giá cả, vị trí, và điều kiện hợp đồng.

### Rules
1. Analyze Intent: Read the **Conversation** to understand what the "Customer" is looking for. Pay attention to the **LATEST** message but use previous messages to resolve context (filters, exact values, numerical ranges, categories).
2. Verify Rows: The "Candidate Rows" section contains raw data retrieved via vector search. You must verify the data in each row against the user's constraints.
3. Strict ID Extraction: You must only return the `rowid` of rows that are relevant. If a row is ambiguous or does not match, ignore it. 
4. Output Format: Output a single JSON object inside a json markdown code block.
```json
{
    "reasoning": "Reasoning of the decision",
    "relevant_row_ids": [ row_id_1, row_id_2, ... ] # list of row ids as integers
}
```
If no rows match, return an empty list `[]`.

### Conversation
Customer: hi
Support Team: Em là trợ lý ảo Guso, có thể hỗ trợ anh/chị về các sản phẩm và dịch vụ của BDS Guru ạ! Em chưa rõ anh/chị đang quan tâm đến việc mua, thuê hay bán bất động sản chứ? Nếu anh/chị cần hỗ trợ tìm kiếm căn hộ, đất nền hay tư vấn pháp lý, em sẽ đồng hành cùng anh/chị từng bước nhé! 😊
Customer: hiện tại bên Guso đang cung cấp những dịch vụ nào
Support Team: Em là trợ lý ảo Guso, có thể hỗ trợ anh/chị về các sản phẩm và dịch vụ của BDS Guru ạ! Em chưa rõ anh/chị đang quan tâm đến việc mua, thuê hay bán bất động sản chứ? Nếu anh/chị cần hỗ trợ tìm kiếm căn hộ, đất nền hay tư vấn pháp lý, em sẽ đồng hành cùng anh/chị từng bước nhé! 😊
Customer: Guru có những sản phẩm nào cho thuê không ạ
Support Team: Dưới đây là danh sách các sản phẩm cho thuê mà BDS Guru đang cung cấp:  
- Shophouse
      + Ví dụ: BDS-R00001 (88m⁲ tại Quận 3, Vũng Tàu) với giá thuê 34.2 triệu/tháng .
      + BDS-R00003 (173m⁲ tại Quận 3, Vũng Tàu) với giá thuê 18.5 triệu/tháng .
- Căn hộ dịch vụ
      + Ví dụ: BDS-R00002 (30m⁲ tại Quận 2, Biên Hòa) với giá thuê 28.5 triệu/tháng .
- Phòng trọ
      + Ví dụ: BDS-R00004 (35m⁲ tại Quận 5, Vũng Tàu) với giá thuê 4.1 triệu/tháng .
- Biệt thự
      + Ví dụ: BDS-R00005 (157m⁲ tại Quận 2, Nha Trang) với giá thuê 136.6 triệu/tháng .
   Các bất động sản này có đa dạng diện tích và vị trí tại TP.HCM (Vũng Tàu, Nha Trang), phù hợp nhu cầu thuê ở hoặc kinh doanh . Nếu anh/chị cần hỗ trợ chọn lựa cụ thể hơn, em có thể gợi ý dựa trên ngân sách hoặc khu vực mong muốn! 😊

Latest Customer Message: có căn hộ nào gần trường học không

### Candidate Rows
{"rowid": 70, "ID": "BDS-R00070", "Dự án": "Không thuộc dự án", "Giá thuê (triệu/tháng)": 7.1, "Giá/m²/tháng": "473,333 đ", "Giá/m²/tháng_số": 473333.0, "Số phòng ngủ": 0, "Ngày có thể chuyển vào": "28/10/2025", "Số phòng tắm": 2, "Số tầng": 1, "Hướng": "Nam", "Nội thất": "Một phần nội thất", "Phường/Xã": "Phường 6", "Ngày có thể chuyển vào_ngày": 28, "Ngày có thể chuyển vào_tháng": 10, "Ngày có thể chuyển vào_năm": 2025, "Tình trạng": "Đang có người thuê", "Cho phép nuôi thú cưng": "Có", "Bếp": "Có bếp riêng", "Điều hòa": "2 máy", "Nóng lạnh": "Không", "Bãi đỗ xe": "Miễn phí", "Thời hạn thuê tối thiểu": "1 năm", "Phí dịch vụ": "Không", "Tiền điện nước": "Điện 4k, nước 100k/người", "Internet": "Tự lắp", "Người liên hệ": "Chị Quang", "SĐT liên hệ": "0917296779", "Zalo": "Có", "Email": "rent70@batdongsan.vn", "Ghi chú": "Giá cả thương lượng", "Diện tích (m²)": 15, "Loại BĐS": "Phòng trọ", "Quận/Huyện": "Quận 2", "Tiêu đề": "Cho thuê Phòng trọ 15m² tại Quận 2, Cần Thơ", "Tỉnh/TP": "Cần Thơ", "Địa chỉ": "981 Đường B17, Quận 2", "Địa chỉ_đường": "Đường B17", "An ninh": "Ban ngày", "Thang máy": "N/A", "Ban công/Sân thượng": "Không", "Tiện ích lân cận": "Nhà thuốc, Chợ, Hồ bơi, ATM, Bệnh viện, Công viên", "Khoảng cách tới trung tâm": "6.0 km", "Tiền cọc": "7.1 triệu (1 tháng)", "Khoảng cách tới trung tâm_số": 6.0, "Khoảng cách tới trung tâm_đơn vị": "km", "Tiền cọc_số": 7.1, "Tiền cọc_Số tháng": "1 tháng"}
{"rowid": 71, "ID": "BDS-R00071", "Dự án": "Không thuộc dự án", "Giá thuê (triệu/tháng)": 330.7, "Giá/m²/tháng": "97,236 đ", "Giá/m²/tháng_số": 97236.0, "Số phòng ngủ": 0, "Ngày có thể chuyển vào": "25/11/2025", "Số phòng tắm": 1, "Số tầng": 2, "Hướng": "Tây Bắc", "Nội thất": "Nội thất cơ bản", "Phường/Xã": "Phường 13", "Ngày có thể chuyển vào_ngày": 25, "Ngày có thể chuyển vào_tháng": 11, "Ngày có thể chuyển vào_năm": 2025, "Tình trạng": "Trống", "Cho phép nuôi thú cưng": "Thỏa thuận", "Bếp": "Bếp chung", "Điều hòa": "Có sẵn", "Nóng lạnh": "Không", "Bãi đỗ xe": "Có phí", "Thời hạn thuê tối thiểu": "Tối thiểu 1 năm", "Phí dịch vụ": "Không", "Tiền điện nước": "Điện 4k, nước 100k/người", "Internet": "Miễn phí", "Người liên hệ": "Chị Quang", "SĐT liên hệ": "0852329041", "Zalo": "Không", "Email": "rent71@batdongsan.vn", "Ghi chú": "Yên tĩnh", "Diện tích (m²)": 3401, "Loại BĐS": "Nhà xưởng", "Quận/Huyện": "Quận 5", "Tiêu đề": "Cho thuê Nhà xưởng 3401m² tại Quận 5, Vũng Tàu", "Tỉnh/TP": "Vũng Tàu", "Địa chỉ": "778 Đường A7, Quận 5", "Địa chỉ_đường": "Đường A7", "An ninh": "Cơ bản", "Thang máy": "Không", "Ban công/Sân thượng": "Sân thượng", "Tiện ích lân cận": "ATM, Quán cafe, Siêu thị, Nhà thuốc, Công viên", "Khoảng cách tới trung tâm": "14.4 km", "Tiền cọc": "330.7 triệu (1 tháng)", "Khoảng cách tới trung tâm_số": 14.4, "Khoảng cách tới trung tâm_đơn vị": "km", "Tiền cọc_số": 330.7, "Tiền cọc_Số tháng": "1 tháng"}
{"rowid": 72, "ID": "BDS-R00072", "Dự án": "Không thuộc dự án", "Giá thuê (triệu/tháng)": 8.4, "Giá/m²/tháng": "118,310 đ", "Giá/m²/tháng_số": 118310.0, "Số phòng ngủ": 4, "Ngày có thể chuyển vào": "05/11/2025", "Số phòng tắm": 2, "Số tầng": 1, "Hướng": "Nam", "Nội thất": "Một phần nội thất", "Phường/Xã": "Phường 1", "Ngày có thể chuyển vào_ngày": 5, "Ngày có thể chuyển vào_tháng": 11, "Ngày có thể chuyển vào_năm": 2025, "Tình trạng": "Mới sửa chữa", "Cho phép nuôi thú cưng": "Thỏa thuận", "Bếp": "Không có bếp", "Điều hòa": "Có sẵn", "Nóng lạnh": "Máy nóng lạnh", "Bãi đỗ xe": "1 ô tô + xe máy", "Thời hạn thuê tối thiểu": "Tối thiểu 1 năm", "Phí dịch vụ": "2.0 triệu/tháng", "Tiền điện nước": "Điện 3.5k, nước 20k", "Internet": "200k/tháng", "Người liên hệ": "Anh Trang", "SĐT liên hệ": "0818161096", "Zalo": "Không", "Email": "rent72@batdongsan.vn", "Ghi chú": "Chính chủ", "Diện tích (m²)": 71, "Loại BĐS": "Căn hộ chung cư", "Quận/Huyện": "Quận 4", "Tiêu đề": "Cho thuê Căn hộ chung cư 71m² tại Quận 4, Hải Phòng", "Tỉnh/TP": "Hải Phòng", "Địa chỉ": "266 Đường C2, Quận 4", "Địa chỉ_đường": "Đường C2", "An ninh": "Cơ bản", "Thang máy": "N/A", "Ban công/Sân thượng": "Ban công", "Tiện ích lân cận": "Chợ, ATM, Quán cafe, Nhà thuốc, Công viên, Gym", "Khoảng cách tới trung tâm": "2.7 km", "Tiền cọc": "25.2 triệu (3 tháng)", "Khoảng cách tới trung tâm_số": 2.7, "Khoảng cách tới trung tâm_đơn vị": "km", "Tiền cọc_số": 25.2, "Tiền cọc_Số tháng": "3 tháng"}
""".strip()

print(llm.invoke(tmp))

content='```json\n{\n    "reasoning": "The latest customer message is asking for a \'căn hộ\' (apartment) that is \'gần trường học\' (near a school). The candidate rows provided are for different types of real estate. Row 72 is for a \'Căn hộ chung cư\' (apartment), which matches the \'căn hộ\' requirement. However, none of the rows explicitly mention proximity to a school. Since the user\'s intent is to find an apartment near a school and no row satisfies this condition, no row is relevant.",\n    "relevant_row_ids": []\n}\n```' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 129, 'prompt_tokens': 2950, 'total_tokens': 3079, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_provider': 'openai', 'model_name': 'Qwen3-8B', 'system_fingerprint': None, 'id': 'chatcmpl-6d643d0e0e8b40e9ba85d7f4019499d2', 'finish_reason': 'stop', 'logprobs': None} id='lc_run--019b6d4f-92a5-7351-9c41-b3563609e711-0' usage_metadata={'input_toke

In [54]:
llm_kwargs = {
    "reasoning": {"effort": "none"},
}
print(json.dumps(llm_kwargs, ensure_ascii=False))

{"reasoning": {"effort": "none"}}


# Generate table description

In [None]:
tmp = await db.run_no_throw("""
SELECT * 
FROM "BĐS Bán 500" 
WHERE "Diện tích (m²)" = 155
  AND LOWER("Quận/Huyện") = 'quận 1' 
  AND LOWER("Tỉnh/TP") = 'biên hòa' 
  AND "Giá (tỷ VNĐ)" = 24.94 
  AND "Hướng nhà" = 'Đông Nam' 
  AND LOWER("Nội thất") = 'nội thất đầy đủ' 
  AND "Địa chỉ" = '771 Đường C16, Quận 1';
""")
print(tmp)

{'result': [('BDS-S00327', 'Không thuộc dự án', 24.94, '160.9 triệu', 5.8, 15.7, 160.9, 'triệu', 4, '07/08/2025', 3, 4, 'Đông Nam', 'N/A', 'Nội thất đầy đủ', 'Giấy tờ hợp lệ', 'Nhà mới xây', '2014', 'Tư nhân', '2 ô tô', 'contact327@batdongsan.vn', '1%', 'Vị trí đẹp', 155, 'Nhà mặt tiền', 'Phường 15', 'Quận 1', 'Nhà mặt tiền 155m² tại Quận 1, Biên Hòa', 'Biên Hòa', '771 Đường C16, Quận 1', 'C16', 'View công viên', 'Sân golf, An ninh 24/7, Bệnh viện, Sân thượng, Trường học, Khu BBQ, Gym, Khu vui chơi trẻ em', 4.7, '08/06/2025', 'Anh Dũng', '0839252747')], 'error': None}


In [None]:
from src.prompts import SQL_GEN_TEMPLATE, SCHEMA_LINKING_TEMPLATE
from src.sql_assistant_v1.utils import format_conversation
from src.utils import get_today_date_en

In [None]:
test_dataset = []
with open("/Users/vinhnguyen/Projects/ext-chatbot/resources/logs/batdongsan_test.jsonl", "r") as f:
    for line in f:
        test_dataset.append(json.loads(line))

In [None]:
ind = 1
openai_conversation = test_dataset[ind]["conversation_history"] + [{"role": "user", "content": test_dataset[ind]["current_message"]}]
langchain_conversation = [
    HumanMessage(msg["content"]) if msg["role"] == "user" else AIMessage(msg["content"])
    for msg in openai_conversation
]
formatted_conversation = format_conversation(langchain_conversation)
table_ind = 1
table_infos = "\n\n".join([
    db.get_table_info_no_throw(
        table_name,
        get_col_comments=True,
        sample_count=3,
    )
    for table_name in db.get_usable_table_names()
][table_ind:table_ind+1])
print(SCHEMA_LINKING_TEMPLATE.format(
    table_info=table_infos,
    formatted_conversation=formatted_conversation,
    dialect=db.dialect
))


You are an expert in SQL schema linking. 
Given a SQLite table schema (DDL) and a conversation history, determine if the table is relevant to the latest customer query.

Your task:
1. Analyze the table schema and the conversation history. Focus on the latest customer message, using previous messages for context (e.g., to resolve references). Evaluate the Table Name and Table Comment to see if the general topic matches the query. Answer "Y" (Yes) or "N" (No) regarding the table's relevance to the latest query.
2. If the answer is "Y", list ALL columns that are semantically related. 
   - You do NOT need to identify the exact columns for the final SQL query. 
   - You MUST include all columns that provide context, identifiers, or potential join keys related to the entities in the query.

Output must be a valid JSON object inside a ```json code block using this format:
```json
{
    "is_related": "Y or N",
    "columns": ["column name 1", "column name 2"]
}
```

Table Schema (DDL):
CREATE

In [None]:
# 

In [None]:
db_output = await db.run_no_throw("""
SELECT *
FROM "BĐS Bán 500"
WHERE
  lower("Ghi chú") LIKE '%vay%'
  OR lower("Ghi chú") LIKE '%hỗ trợ%'
  OR lower("Ghi chú") LIKE '%ho tro%'
  OR lower("Ghi chú") LIKE '%hỗtrợ%'
  OR lower("Tiêu đề") LIKE '%vay%'
  OR lower("Pháp lý") LIKE '%vay%';
""", include_columns=True)
for row in db_output["result"]:
    print(row)

In [None]:
TABLE_DESCRIPTION_PROMPT = """
Bạn là một chuyên gia quản trị dữ liệu (Data Steward). Nhiệm vụ của bạn là phân tích cấu trúc và dữ liệu mẫu của một bảng (table) để tạo ra mô tả tóm tắt (metadata).

Mô tả này sẽ được sử dụng bởi một AI Router để quyết định xem câu hỏi của người dùng có liên quan đến bảng này hay không.

HÃY PHÂN TÍCH DỰA TRÊN TÊN VÀ MỘT VÀI HÀNG DỮ LIỆU CỦA BẢNG:
**Business Name**:
{business_name}

**Sheet Name**:
{sheet_name}

**Table Data Snippet**:
{table_data_snippet}
...

YÊU CẦU ĐẦU RA (Định dạng JSON):
Hãy trả về một JSON object duy nhất đặt trong ```json ...``` với các trường sau:
- "human_name": Tên ngắn gọn, dễ hiểu cho người dùng.
- "summary": Phần mô tả bảng bằng tiếng Việt. Nêu rõ bảng này chứa thông tin về **đối tượng gì** (Entity) và **thuộc tính quan trọng nào**.
""".strip()

sheet_name = "Tổng quan"
with open(f"/Users/vinhnguyen/Projects/ext-chatbot/resources/processed_data/batdongsan_1/{sheet_name}.json", "r") as f:
    data = json.load(f)
table_data_snippet = random.sample(data["transformed_data"], min(5, len(data["transformed_data"])))
table_data_snippet = "\n".join(str(row) for row in table_data_snippet)

In [None]:
print(TABLE_DESCRIPTION_PROMPT.format(
        business_name="Batdongsan.com.vn by PropertyGuru",
        sheet_name=sheet_name,
        table_data_snippet=table_data_snippet
))

Bạn là một chuyên gia quản trị dữ liệu (Data Steward). Nhiệm vụ của bạn là phân tích cấu trúc và dữ liệu mẫu của một bảng (table) để tạo ra mô tả tóm tắt (metadata).

Mô tả này sẽ được sử dụng bởi một AI Router để quyết định xem câu hỏi của người dùng có liên quan đến bảng này hay không.

HÃY PHÂN TÍCH DỰA TRÊN TÊN VÀ MỘT VÀI HÀNG DỮ LIỆU CỦA BẢNG:
**Business Name**:
Batdongsan.com.vn by PropertyGuru

**Sheet Name**:
Tổng quan

**Table Data Snippet**:
{'Thông tin': 'Số lượng chi nhánh', 'Chi tiết': '5 chi nhánh chính'}
{'Thông tin': 'Tổng BĐS cho thuê', 'Chi tiết': '500 bất động sản'}
{'Thông tin': 'Cập nhật lần cuối', 'Chi tiết': '24/10/2025 02:49'}
{'Thông tin': 'Tên công ty', 'Chi tiết': 'Batdongsan.com.vn - PropertyGuru Vietnam'}
{'Thông tin': 'Địa chỉ trụ sở chính', 'Chi tiết': 'Tầng 31, Keangnam Hanoi Landmark, Phạm Hùng, Nam Từ Liêm, Hà Nội'}
...

YÊU CẦU ĐẦU RA (Định dạng JSON):
Hãy trả về một JSON object duy nhất đặt trong ```json ...``` với các trường sau:
- "human_name": Tên

In [None]:
tmp = (llm_reasoning | JsonOutputParser()).invoke(
    TABLE_DESCRIPTION_PROMPT.format(
        business_name="Batdongsan.com.vn by PropertyGuru",
        sheet_name=sheet_name,
        table_data_snippet=table_data_snippet
    )
)
print(tmp)

{'human_name': 'Bản tin Tổng quan Công ty Batdongsan', 'summary': 'Bảng này chứa thông tin tổng quan về công ty Batdongsan.com.vn, bao gồm tên công ty, số lượng chi nhánh, tổng bất động sản cho thuê, ngày cập nhật cuối cùng và địa chỉ trụ sở chính.'}


In [None]:
import sqlite3


# function to create table description
def create_table_metadata(
    rows: List[Tuple[str, str, str]],
    db_path: str,
    table_name: str = "tables_metadata",
    if_exists: Literal["replace", "append", "fail"] = "replace"
) -> str:
    """
    Create a SQLite table to store table metadata using EAV (Entity-Attribute-Value) structure.
    
    Args:
        rows: List of tuples, each containing (entity, attribute, value)
              - entity: The table name or identifier (e.g., "Bất Động Sản Cho Thuê")
              - attribute: The metadata key (e.g., "human_name", "summary")
              - value: The metadata value
        db_path: Path to the SQLite database file
        table_name: Name of the metadata table (default: "table_metadata")
        if_exists: What to do if table exists: "replace" (drop and recreate), 
                  "append" (add to existing), or "fail" (raise error)
    
    Returns:
        The name of the created table
    
    Example:
        rows = [
            ("Bất Động Sản Cho Thuê", "human_name", "Bất Động Sản Cho Thuê"),
            ("Bất Động Sản Cho Thuê", "summary", "Bảng này chứa thông tin chi tiết...")
        ]
        create_table_metadata(rows, "database.db", "table_metadata", "replace")
    """
    # Validate if_exists parameter
    if if_exists not in ["replace", "append", "fail"]:
        raise ValueError(f"Invalid if_exists value: {if_exists}. Must be 'replace', 'append', or 'fail'")
    
    # Validate rows format
    for i, row in enumerate(rows):
        if not isinstance(row, tuple) or len(row) != 3:
            raise ValueError(f"Row {i} must be a tuple of 3 elements (entity, attribute, value), got: {row}")
    
    # Create table SQL with EAV structure
    create_table_sql = f'''
    CREATE TABLE IF NOT EXISTS "{table_name}" (
        entity TEXT NOT NULL,      -- table name or identifier
        attribute TEXT NOT NULL,   -- metadata key (e.g., "human_name", "summary")
        value TEXT,                -- metadata value
        PRIMARY KEY (entity, attribute)
    )
    '''.strip()
    
    # Connect to database
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    try:
        # Handle if_exists option
        if if_exists == "replace":
            cursor.execute(f'DROP TABLE IF EXISTS "{table_name}"')
        elif if_exists == "append":
            # Check if table exists, if not create it
            cursor.execute('''
                SELECT name FROM sqlite_master 
                WHERE type='table' AND name=?
            ''', (table_name,))
            if not cursor.fetchone():
                # Table doesn't exist, create it
                cursor.execute(create_table_sql)
        elif if_exists == "fail":
            # Check if table exists
            cursor.execute('''
                SELECT name FROM sqlite_master 
                WHERE type='table' AND name=?
            ''', (table_name,))
            if cursor.fetchone():
                raise ValueError(f"Table '{table_name}' already exists")
            cursor.execute(create_table_sql)
        
        # Create table if it doesn't exist (for append case where table might already exist)
        cursor.execute(create_table_sql)
        
        # Prepare insert statement (using INSERT OR REPLACE to handle duplicates)
        insert_sql = f'''
        INSERT OR REPLACE INTO "{table_name}" (entity, attribute, value)
        VALUES (?, ?, ?)
        '''
        
        # Insert data
        if rows:
            cursor.executemany(insert_sql, rows)
        
        conn.commit()
        return table_name
        
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()


In [None]:
# rows = [
#     (sheet_name, "summary", tmp["summary"])
# ]
# create_table_metadata(
#     rows, 
#     "/Users/vinhnguyen/Projects/ext-chatbot/resources/database/batdongsan.db", 
#     "tables_metadata", 
#     "append"
# )

'tables_metadata'

In [None]:
rows = [
    ("BĐS Bán 500", "data_source", "sql"),
    ("BĐS Cho thuê 500", "data_source", "sql"),
    ("Chi nhánh", "data_source", "vector"),
    ("Tổng quan", "data_source", "vector"),
    ("Thống kê BĐS Bán", "data_source", "vector"),
    ("Thống kê BĐS Thuê", "data_source", "vector"),
]
create_table_metadata(
    rows, 
    "/Users/vinhnguyen/Projects/ext-chatbot/resources/database/batdongsan.db", 
    "tables_metadata", 
    "append"
)

'tables_metadata'

In [None]:
from typing import List, Dict


def get_tables_overview(
    db_path: str,
    table_name: str = "tables_metadata",
) -> List[Dict[str, str]]:
    """Fetch combined metadata (name, data_source, summary) for all tables.

    Reads from the EAV-style metadata table and returns a list of dicts:
    [{"name": <table_name>, "data_source": <data_source>, "summary": <summary>}, ...].
    
    Args:
        db_path: Path to the SQLite database file.
        table_name: Name of the metadata table (default: "tables_metadata").
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    try:
        # Get all metadata rows for the attributes we care about
        query = f'''
        SELECT entity, attribute, value
        FROM "{table_name}"
        WHERE attribute IN ("summary", "data_source")
        '''
        cursor.execute(query)
        rows = cursor.fetchall()

        # Aggregate into a per-entity dict
        by_entity: Dict[str, Dict[str, str]] = {}
        for entity, attribute, value in rows:
            if entity not in by_entity:
                by_entity[entity] = {"name": entity, "data_source": None, "summary": None}
            if attribute == "summary":
                by_entity[entity]["summary"] = value
            elif attribute == "data_source":
                by_entity[entity]["data_source"] = value

        # Convert to list, filtering out entities that don't have at least a name
        result: List[Dict[str, str]] = list(by_entity.values())
        return result

    finally:
        conn.close()

In [None]:
table_overview = get_tables_overview(
    "/Users/vinhnguyen/Projects/ext-chatbot/resources/database/batdongsan.db",
    "tables_metadata"
)
print(table_overview)

[{'name': 'Chi nhánh', 'data_source': 'vector', 'summary': "Bảng 'Chi nhánh' chứa thông tin về các chi nhánh của Batdongsan.com.vn, bao gồm tên chi nhánh, địa chỉ đầy đủ, quận/huyện, thành phố, số điện thoại và email liên hệ. Mỗi chi nhánh được gán mã STT và phân bổ tại các thành phố lớn như TP.HCM, Hà Nội, Đà Nẵng, Cần Thơ và Hải Phòng."}, {'name': 'BĐS Bán 500', 'data_source': 'sql', 'summary': 'Bảng chứa thông tin về bất động sản đang được bán, bao gồm giá cả (tỷ VNĐ và triệu/m²), diện tích (m²), vị trí (đường, quận, tỉnh), loại hình (nhà liền kề, resort, đất thổ cư...), năm xây dựng, tiện ích, pháp lý, tình trạng nhà, thông tin liên hệ và các đặc điểm khác như hướng nhà, số phòng, bãi đỗ xe, view. Dữ liệu có thể dùng để phân tích thị trường bất động sản và hỗ trợ tìm kiếm tài sản phù hợp.'}, {'name': 'BĐS Cho thuê 500', 'data_source': 'sql', 'summary': 'Bảng này chứa thông tin về các bất động sản cho thuê tại Việt Nam, bao gồm giá thuê (triệu/tháng), diện tích (m²), loại hình BĐS (

# Router

In [None]:
ROUTER_TEMPLATE = """
You are an Intent Router. Your goal is to determine which data source contains the information needed to answer the latest Customer message, based strictly on the provided descriptions of the available data.

### Data Sources
1. **sql** or **vector**: Use this if the customer's request matches the descriptions in the [SQL DATABASE SUMMARIES] or [VECTOR DATABASE SUMMARIES] sections.
2. **none**: Use this if the request does not relate to any of the provided descriptions (e.g., general conversation, greetings, meta-comments about the chat, or general knowledge).

### Routing Logic
- **Historical Context**: Use the [CONVERSATION HISTORY] to understand what the customer is referring to.
- **Description Matching**: Compare the customer's core intent with the specific summaries provided below. Choose the source that is most likely to contain the answer.
- **Ambiguity**: If a query could potentially be answered by both, prioritize the source whose description matches the specific action (e.g., if the customer wants "analysis" or "totals," lean towards `sql`; if they want "descriptions" or "policies," lean towards `vector`).

### Available Data Descriptions

[SQL DATABASE SUMMARIES]
{sql_summaries}

[VECTOR DATABASE SUMMARIES]
{vector_summaries}

---
### Conversation:
{formatted_conversation}

### Instruction
Based ONLY on the summaries above and the conversation history, output a single JSON object inside a ```json ... ``` block without any explanation or preamble:
```json
{{
  "choice": "sql" or "vector" or "none"
}}
```
""".strip()

In [None]:
router_chain = (
    ChatPromptTemplate(["human", ROUTER_TEMPLATE])
    | llm
    | JsonOutputParser()
    | itemgetter("choice")
)
def route(conversation: List[AnyMessage]) -> dict:
    formatted_conversation = format_conversation(conversation)
    table_overview = db.get_table_overview()
    sql_summaries = "\n".join(str({
        "name": table["name"],
        "summary": table["summary"]
    }) for table in table_overview if table["data_source"] == "sql")
    vector_summaries = "\n".join(str({
        "name": table["name"],
        "summary": table["summary"]
    }) for table in table_overview if table["data_source"] == "vector")
    return router_chain.invoke({
        "sql_summaries": sql_summaries,
        "vector_summaries": vector_summaries,
        "formatted_conversation": formatted_conversation
    })

In [None]:
tmp = route([
    HumanMessage("hello có văn phòng cho thuê dưới 10tr ko, văn phòng nhé"),
    AIMessage("Không tìm thấy văn phòng cho thuê có giá dưới 10 triệu đồng/tháng trong cơ sở dữ liệu."),
    HumanMessage("ok tks nha"),
])

In [None]:
tmp

'none'