In [25]:
from __future__ import annotations
from typing import Annotated, List, TypedDict, Optional, Any, Dict
import sqlite3, textwrap

from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, SystemMessage
from langchain_community.chat_models import ChatLlamaCpp
from langgraph.graph import StateGraph, START, END

In [26]:
# Global variables
DB_PATH = "Data/dummy_data.db"

# Set up local LLM usimg Llama Cpp
chat = ChatLlamaCpp(
    model_path="./LlamaCppModels/Llama-3.2-1B-Instruct/Llama-3.2-1B-Instruct-Q6_K_L.gguf",
    n_ctx=4096,
    n_threads=8,
    temperature=0.4,
    model_kwargs={"chat_format": "llama-3"},
    verbose=False,
)

llama_context: n_batch is less than GGML_KQ_MASK_PAD - increasing to 64
llama_context: n_ctx_per_seq (4096) < n_ctx_train (131072) -- the full capacity of the model will not be utilized
ggml_metal_init: skipping kernel_get_rows_bf16                     (not supported)
ggml_metal_init: skipping kernel_set_rows_bf16                     (not supported)
ggml_metal_init: skipping kernel_mul_mv_bf16_f32                   (not supported)
ggml_metal_init: skipping kernel_mul_mv_bf16_f32_c4                (not supported)
ggml_metal_init: skipping kernel_mul_mv_bf16_f32_1row              (not supported)
ggml_metal_init: skipping kernel_mul_mv_bf16_f32_l4                (not supported)
ggml_metal_init: skipping kernel_mul_mv_bf16_bf16                  (not supported)
ggml_metal_init: skipping kernel_mul_mv_id_bf16_f32                (not supported)
ggml_metal_init: skipping kernel_mul_mm_bf16_f32                   (not supported)
ggml_metal_init: skipping kernel_mul_mm_id_bf16_f16                

In [27]:
# State carries the running chat, the planned SQL, any fetched rows, and potential errors.
class ChatState(TypedDict):
    messages: Annotated[List[AnyMessage], "Running chat transcript"]
    sql: Optional[str]
    rows: Optional[List[Dict[str, Any]]]
    error: Optional[str]

# The schema inspector produces a compact, LLM-friendly description of tables, columns, and foreign keys.
def inspect_schema(db_path: str) -> str:
    con = sqlite3.connect(db_path)
    con.row_factory = sqlite3.Row
    cur = con.cursor()

    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' "
        "AND name NOT LIKE 'sqlite_%' ORDER BY name;"
    )
    tables = [r["name"] for r in cur.fetchall()]

    parts: List[str] = []
    for t in tables:
        cur.execute(f"PRAGMA table_info({t});")
        cols = cur.fetchall()
        cols_desc = ", ".join(f"{c['name']} {c['type']}" for c in cols)
        parts.append(f"- {t}({cols_desc})")

        cur.execute(f"PRAGMA foreign_key_list({t});")
        fks = cur.fetchall()
        for fk in fks:
            parts.append(f"    ↳ FK {fk['from']} → {fk['table']}({fk['to']})")

    con.close()
    if not parts:
        return "(No user tables found.)"
    return "Tables:\n" + "\n".join(parts)

# The guardrail only allows SELECT or WITH…SELECT and appends a LIMIT when missing.
def ensure_readonly_select(sql: str, default_limit: int = 50) -> str:
    s = sql.strip().rstrip(";")
    lowered = s.lower()
    if not (lowered.startswith("select") or lowered.startswith("with ")):
        raise ValueError("Only read-only SELECT queries are allowed.")
    if " limit " not in lowered:
        s += f" LIMIT {default_limit}"
    return s + ";"

# The executor runs the safe query and returns rows as dictionaries.
def run_select(db_path: str, sql: str) -> List[Dict[str, Any]]:
    con = sqlite3.connect(db_path)
    con.row_factory = sqlite3.Row
    cur = con.cursor()
    cur.execute(sql)
    rows = [dict(r) for r in cur.fetchall()]
    con.close()
    return rows

# The planner system prompt asks for a single SQL statement with strict read-only constraints.
SYS_PLANNER = SystemMessage(content=textwrap.dedent("""\
You are a careful SQL planning assistant for a SQLite database.
Produce exactly one SQL statement that answers the user question using only SELECT or WITH…SELECT.
Use table and column names exactly as given by the schema.
Apply a reasonable LIMIT when the question does not require full detail.
Do not include explanations or any text besides the SQL itself.
"""))

# Optional static schema hint. Replace with inspect_schema(DB_PATH) if you want live inspection.
SCHEMA = """\
Tables:
- dummy_data(igef TEXT, test TEXT, test_result TEXT)

Available values in columns:
- test_result: OK, NOK
"""

# The planner asks the model for a single SQL statement and stores it in state.
def plan_sql(state: ChatState) -> ChatState:
    schema = SCHEMA  # or: inspect_schema(DB_PATH)
    user_msg = next((m for m in reversed(state["messages"]) if isinstance(m, HumanMessage)), None)
    question = user_msg.content if user_msg else "Show something useful from the database."

    plan_prompt = (
        f"Database schema:\n{schema}\n\n"
        f"User question:\n{question}\n\n"
        f"Return only the SQL statement."
    )

    resp = chat.invoke([SYS_PLANNER, HumanMessage(content=plan_prompt)])
    sql = (resp.content or "").strip()
    return {**state, "sql": sql, "error": None}

# The executor applies the read-only guardrail and runs the query against SQLite.
def execute_sql(state: ChatState) -> ChatState:
    if state.get("error"):
        return state
    try:
        safe_sql = ensure_readonly_select(state["sql"] or "")
        rows = run_select(DB_PATH, safe_sql)
        return {**state, "sql": safe_sql, "rows": rows, "error": None}
    except Exception as e:
        return {**state, "rows": None, "error": f"SQL execution error: {e}"}

# The responder prints the exact SQL and a compact preview of the first few rows.
def respond(state: ChatState) -> ChatState:
    if state.get("error"):
        msg = f"⚠️ {state['error']}"
        return {
            "messages": state["messages"] + [AIMessage(content=msg)],
            "sql": None,
            "rows": None,
            "error": state["error"],
        }

    rows = state.get("rows") or []
    sql = state.get("sql") or ""
    preview = rows[:10]

    if preview:
        headers = list(preview[0].keys())
        lines = [" | ".join(headers), " | ".join(["---"] * len(headers))]
        for r in preview:
            lines.append(" | ".join(str(r.get(h, "")) for h in headers))
        table_md = "\n".join(lines)
    else:
        table_md = "_No rows returned._"

    answer = (
        f"**SQL**\n"
        f"```sql\n{sql}\n```\n\n"
        f"**Preview ({len(preview)} of {len(rows)} rows)**\n"
        f"{table_md}"
    )
    return {"messages": state["messages"] + [AIMessage(content=answer)], "sql": sql, "rows": rows, "error": None}

# The graph wires the three nodes into a single planning-execution-response pass.
builder = StateGraph(ChatState)
builder.add_node("plan_sql", plan_sql)
builder.add_node("execute_sql", execute_sql)
builder.add_node("respond", respond)

builder.add_edge(START, "plan_sql")
builder.add_edge("plan_sql", "execute_sql")
builder.add_edge("execute_sql", "respond")
builder.add_edge("respond", END)

app = builder.compile()

In [28]:
initial = ChatState(messages=[HumanMessage(content="Show the 5 IGEFs where the most tests failed.")],
                    sql=None, sql_explanation=None, rows=None, error=None)
result = app.invoke(initial)
print(result["messages"][-1].content)

**SQL**
```sql
SELECT dummy_data.igef FROM dummy_data WHERE test_result = 'NOK' LIMIT 5;
```

**Preview (5 of 5 rows)**
igef
---
igef_186
igef_479
igef_133
igef_869
igef_2
