# NL Analytics Copilot (Multi-Agent SQL Assistant)

This notebook presents an **NL Analytics Copilot** ‚Äî a production-style, multi-agent LLM system that allows users to ask natural-language analytics questions and receive **safe, grounded, and interpretable answers** over a relational database.

The system is designed for **analytics and product intelligence use cases**, where correctness, transparency, and query safety are critical. Rather than directly generating SQL from user input, the copilot decomposes each request into structured steps handled by specialized agents.

### How it works
A Python Supervisor orchestrates a set of purpose-built agents:
- **Classifier** ‚Äî determines whether a question can be answered using the available schema
- **Planner** ‚Äî converts the question into a structured, schema-aware query plan
- **SQL Writer** ‚Äî generates read-only, SQLite-compatible SQL
- **SQL Critic** ‚Äî validates syntax, schema usage, and safety constraints
- **Executor** ‚Äî runs the query and returns results
- **Visualization Agent** ‚Äî selects and renders charts when appropriate

All agents communicate through a shared state, enabling full execution tracing and graceful handling of unsupported or ambiguous questions.

### Design goals
- Safely translate natural-language questions into SQL
- Enforce strict schema awareness and read-only execution
- Support multi-step analytical reasoning (e.g., CTEs and derived metrics)
- Automatically produce meaningful visualizations when appropriate
- Provide transparency through an explicit agent execution trace


In [None]:
!pip install \
  "langchain==1.1.0" \
  "langchain-core==1.1.0" \
  "langchain-classic==1.0.0" \
  "langchain-community==0.4.1" \
  "langchain-openai==1.1.0" \
  "langchain-text-splitters==1.0.0" \
  "langgraph==1.0.4" \
  "langgraph-prebuilt==1.0.5" \
  "langgraph-checkpoint==3.0.1" \
  "langgraph-sdk==0.2.10" \
  "langsmith==0.4.48" \
  python-dotenv \
  networkx \
  matplotlib \
  pydot \
  "requests==2.32.5"


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import json
import logging
import getpass
import pandas as pd
import os
import sqlite3
import matplotlib.pyplot as plt

##Generate the Database & Define Schema

The underlying database represents a **beauty and fashion e-commerce platform**, including customers, orders, order items, products, and acquisition channels.  
The schema is intentionally lightweight to emphasize reasoning, safety, and analytics logic rather than data engineering complexity.


In [None]:
!pip install Faker

In [None]:
import sqlite3, random, datetime

BASE_DIR = "/content/drive/MyDrive/ai_analyst_project/data"
os.makedirs(BASE_DIR, exist_ok=True)

DB_PATH = os.path.join(BASE_DIR, "retail.db")
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

# Drop old tables if re-running
cursor.execute("DROP TABLE IF EXISTS customers;")
cursor.execute("DROP TABLE IF EXISTS products;")
cursor.execute("DROP TABLE IF EXISTS orders;")
cursor.execute("DROP TABLE IF EXISTS order_items;")

#Customers
cursor.execute("""
CREATE TABLE customers (
  cust_id TEXT PRIMARY KEY,
  first_name TEXT,
  last_name TEXT,
  mobile INTEGER,
  state TEXT,
  joined_at DATE,
  acquisition_channel TEXT
);
""")

# Products
cursor.execute("""
CREATE TABLE products (
    product_id INTEGER PRIMARY KEY,
    category TEXT,
    price REAL
);
""")

# Orders
cursor.execute("""
CREATE TABLE orders (
    order_id INTEGER PRIMARY KEY,
    cust_id INTEGER,
    order_date DATE,
    status TEXT,
    FOREIGN KEY(cust_id) REFERENCES customers(cust_id)
);
""")

# Order Items
cursor.execute("""
CREATE TABLE order_items (
    row_id INTEGER PRIMARY KEY,
    order_id INTEGER,
    product_id INTEGER,
    FOREIGN KEY(order_id) REFERENCES orders(order_id),
    FOREIGN KEY(product_id) REFERENCES products(product_id)
);
""")
conn.commit()


In [None]:
from datetime import date, timedelta
from faker import Faker
from faker.providers import address
fake = Faker('en_US')

# Remove all rows if re-running
conn.execute("DELETE FROM customers;")
conn.execute("DELETE FROM products;")
conn.execute("DELETE FROM orders;")
conn.execute("DELETE FROM order_items;")

# fill customers
customers = []
acq_channels = ["Instagram Ads", "Google Ads", "Organic Search", "Influencer"]
weights = [0.15, 0.10, 0.20, 0.55]
for _ in range(1, 201):
  cust_id = f"C{_:03d}"
  first_name = fake.first_name()
  last_name = fake.last_name()
  mobile = random.randint(2000000000, 9999999999)
  state = fake.state()
  joined_at  = fake.date_between(start_date="-3y", end_date="today")
  acq = random.choices(acq_channels, weights=weights, k=1)[0]
  customers.append((cust_id, first_name, last_name, mobile, state, joined_at, acq))


conn.executemany("""INSERT INTO customers(cust_id, first_name, last_name, mobile, state, joined_at, acquisition_channel)
 VALUES(?,?,?,?,?,?,?); """, customers)
conn.commit()

# fill Products

products = []
categories = ["Skincare", "Haircare", "Makeup", "Fragrance", "Handbags", "Accessories"]
for _ in range(1,16):
  price = random.uniform(5,50)
  products.append((_, categories[0], round(price,2)))

for _ in range(16,27):
  price = random.uniform(6,40)
  products.append((_, categories[1], round(price,2)))

for _ in range(27,46):
  price = random.uniform(7,35)
  products.append((_, categories[2], round(price,2)))

for _ in range(46,52):
  price = random.uniform(45,70)
  products.append((_, categories[3], round(price,2)))

for _ in range(52,58):
  price = random.uniform(20,80)
  products.append((_, categories[4], round(price,2)))

for _ in range(58,67):
  price = random.uniform(12,50)
  products.append((_, categories[5], round(price,2)))

conn.executemany("""INSERT INTO products(product_id, category, price) VALUES(?,?,?); """, products)
conn.commit()


# fill orders

orders = []
cid = [c[0] for c in customers]
pid = [p[0] for p in products]
statuses = ["completed", "cancelled", "returned", "pending"]
weights = [0.78, 0.10, 0.08, 0.04]
for _ in range(1001, 5001):
  order_id = _
  c_id = random.choice(cid)
  joined_at = next(c[5] for c in customers if c[0] == c_id)
  days_ago = random.randint(0, 365)
  order_date = date.today() - timedelta(days=days_ago)
  order_date = max(order_date, joined_at)
  status = random.choices(statuses, weights=weights, k=1)[0]
  orders.append((order_id, c_id, status, order_date.isoformat()))

conn.executemany("""
INSERT INTO orders (order_id, cust_id, status, order_date)
VALUES (?, ?, ?, ?);
""", orders)
conn.commit()


# fill order items
price_map = {row[0]: row[2] for row in products}
row_id = 0
order_item_rows = []
oid = [o[0] for o in orders]
for i in range(len(oid)):
  o_id = oid[i]
  num_of_items = random.randint(1,10)
  prod_prices = []
  for q in range(num_of_items):
    order_item_rows.append((row_id, o_id, random.choice(pid)))
    row_id+=1

conn.executemany("""
INSERT INTO order_items (row_id, order_id, product_id)
VALUES (?, ?, ?);
""", order_item_rows)

conn.commit()

print(customers[:5])
print(products[:5])
print(orders[:5])
print(order_item_rows[:15])

##Setup Logging & Config
This section initializes shared infrastructure used across the system, including environment
configuration, database connectivity, and structured logging.

In [None]:
os.environ["DATABASE"] = "/content/drive/MyDrive/ai_analyst_project/data/retail.db"
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")

Enter your OpenAI API key: ¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑


In [None]:
import logging

# REMOVE all existing handlers for root logger
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("langchain").setLevel(logging.WARNING)
logging.getLogger("langchain_core").setLevel(logging.WARNING)
logging.getLogger("langchain_community").setLevel(logging.WARNING)
logger = logging.getLogger("agent")
logger.setLevel(logging.INFO)


In [None]:
import sqlite3
from pathlib import Path

def test_db_connection():
    try:
        # Set the path to the database and create a connection
        db_path = os.getenv("DATABASE")
        conn = sqlite3.connect(db_path)

        # Simple test query
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM customers")
        customer_count = cursor.fetchone()[0]

        # Close the database connection and notify the user of a successful test
        print(f"Database connection successful. Found {customer_count} customers.")
        conn.close()
        return True

    except Exception as e:
        print(f"Database connection failed: {e}")
        return False

if __name__ == "__main__":
    test_db_connection()

Database connection successful. Found 200 customers.


##Schema Summarizer
Before processing user questions, the system constructs a summary of the database schema, including tables, columns, and key relationships.

This summarized schema is shared with downstream agents to ensure that all planning and SQL generation remains **schema-aware**. By explicitly grounding agents in the available structure, the copilot avoids hallucinated tables, columns, or joins and can reliably reject unsupported questions.

In [None]:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_classic.chains import LLMChain
from langchain_core.output_parsers import StrOutputParser

In [None]:
from sqlalchemy import create_engine, MetaData
from sqlalchemy.engine import reflection
import json, hashlib, os

class SchemaSummarizer:
  def __init__(self,db_path, cache_path="schema_snapshot.json"):
    self.db_path = db_path
    self.cache_path = cache_path
    self.snapshot = None
    self.engine = create_engine(db_path)
    self.meta = MetaData()

  def hash_schema(self, schema_dict):
    dump = json.dumps(schema_dict, sort_keys=True)
    return hashlib.sha256(dump.encode()).hexdigest()

  def build_snapshot(self):
    """Reflect schema and build JSON snapshot with tables, columns, PKs, FKs"""
    inspector = reflection.Inspector.from_engine(self.engine)
    snapshot = {"tables": {}, "join_graph": []}

    for table in inspector.get_table_names():
      cols = []
      for col in inspector.get_columns(table):
        cols.append({
          "name": col["name"],
          "type": str(col["type"]),
          "pk": col.get("primary_key", False),
          "isOptional": col.get("nullable", True),
        })
        snapshot["tables"][table] = {"columns": cols}

      #foreign keys
      for fk in inspector.get_foreign_keys(table):
        for local, remote in zip(fk["constrained_columns"], fk["referred_columns"]):
          snapshot["join_graph"].append({
              "from": f"{table}.{local}",
              "to": f"{fk['referred_table']}.{remote}",
              "table_from": table,
              "table_to": fk["referred_table"],
          })
    snapshot["schema_hash"] = self.hash_schema(snapshot)
    self.snapshot = snapshot
    return snapshot

  def save_cache(self):
    if self.snapshot is None:
      raise ValueError("No snapshot built yet. Call build_snapshot() first")
    with open(self.cache_path, "w") as f:
      json.dump(self.snapshot, f, indent=2)

  def load_cache(self):
    if not os.path.exists(self.cache_path):
      return None
    with open(self.cache_path, "r") as f:
      print("it exists")
      self.snapshot = json.load(f)
    return self.snapshot

  def ensure_snapshot(self):
    snap = self.load_cache()
    if snap is None:
      snap = self.build_snapshot()
      self.save_cache()
    return snap


In [None]:
db_path = os.getenv("DATABASE")
db_url = f"sqlite:///{db_path}"
logger.info("Summarizing schema....")
summarizer = SchemaSummarizer(db_url)
schema_snapshot = summarizer.ensure_snapshot()
print(json.dumps(schema_snapshot, indent=2))

2025-12-11 18:48:12 - INFO - Summarizing schema....


{
  "tables": {
    "customers": {
      "columns": [
        {
          "name": "cust_id",
          "type": "TEXT",
          "pk": 1,
          "isOptional": true
        },
        {
          "name": "first_name",
          "type": "TEXT",
          "pk": 0,
          "isOptional": true
        },
        {
          "name": "last_name",
          "type": "TEXT",
          "pk": 0,
          "isOptional": true
        },
        {
          "name": "mobile",
          "type": "INTEGER",
          "pk": 0,
          "isOptional": true
        },
        {
          "name": "state",
          "type": "TEXT",
          "pk": 0,
          "isOptional": true
        },
        {
          "name": "joined_at",
          "type": "DATE",
          "pk": 0,
          "isOptional": true
        },
        {
          "name": "acquisition_channel",
          "type": "TEXT",
          "pk": 0,
          "isOptional": true
        }
      ]
    },
    "order_items": {
      "columns": [
     

  inspector = reflection.Inspector.from_engine(self.engine)


In [None]:
print("total # tables in the DB:", len(schema_snapshot["tables"]))
print("Table names: ", schema_snapshot["tables"].keys())

total # tables in the DB: 4
Table names:  dict_keys(['customers', 'order_items', 'orders', 'products'])


### Schema Metadata
Lightweight column metadata (e.g., enums and semantic meanings) is included where available to help
agents interpret business concepts such as order status or acquisition channel.



In [None]:
SCHEMA_METADATA = {
    "orders.status": {
        "type": "enum",
        "values": ["completed", "cancelled", "returned", "pending"],
        "meaning": {
            "returned": "order was returned by the customer",
            "completed": "order was delivered",
            "cancelled": "order was cancelled before fulfillment"
        }
    },
    "customers.acquisition_channel": {
        "type": "enum",
        "values": ["Organic Search", "Instagram Ads", "Google Ads", "Influencer"]
    },
    "products.category": {
        "type": "enum",
        "values": ["Skincare", "Haircare", "Makeup", "Fragrance", "Handbags", "Accessories"]
    }
}


## Classifier Agent
The Classifier determines whether a user question can be answered using the available database
schema. It acts as the first safety gate in the pipeline.

If a question references unsupported concepts, missing data, or non-analytic operations, the
classifier rejects it early with a clear reason. This prevents downstream agents from attempting to
hallucinate or force invalid queries.

In [None]:
classifier_prompt = PromptTemplate(
    template = """
    You are a classifier that determines whether a user question can be answered
    using the given SQLite schema and metadata. Give short reason for support.

    schema -
    {schema}

    metadata -
    {metadata}

    question -
    {question}

    Rules -
    - A question is answerable if all required concepts exist in schema OR metadata
    - Answer TRUE if the question is related to this database
    - Answer FALSE if its irrelevant or data not relevant.
    - Output JSON only

    Examples -
    {{"is_db_query": false, "reason": "Asks about weather, not in schema"}}
    {{"is_db_query": false, "reason": "unrelated chit-chat"}}

    """,
    input_variables=['schema', 'metadata', 'question']
)

In [None]:
## TESTING
from tabulate import tabulate

# Format: (question, expected_is_db_query)
test_cases = [
    # Clear DB queries
    ("Top 10 customers by total spend", True),
    ("Show me revenue trends by month", True),
    ("Orders placed in the last 7 days", True),

    # Chit-chat
    ("What‚Äôs your name?", False),
    ("Tell me a joke.", False),

    # Irrelevant DB-like queries
    ("How many employees are in the HR database?", False),
    ("What‚Äôs the average temperature last month?", False),

    # Ambiguous borderline
    ("Tell me about customers.", True),  # maps to customers table
    ("Give me insights on regions.", True),  # maps to 'state' column in customers
    ("List suppliers by country.", False),   # no suppliers table
]

llm = ChatOpenAI(model_name="gpt-5-nano", temperature=1)
# classifier_chain = LLMChain(llm=llm, prompt=classifier_prompt)
classifier_chain = classifier_prompt | llm | StrOutputParser()

results = []
for question, expected in test_cases:
    # raw_output = classifier_chain.run(schema=schema_context, question=question)
    raw_output = classifier_chain.invoke({
        "schema": schema_context,
        "question": question
    })
    try:
        parsed = json.loads(raw_output.strip())
    except Exception:
        parsed = {"is_db_query": False, "reason": "Invalid JSON"}

    actual = parsed.get("is_db_query", False)
    reason = parsed.get("reason", "No reason given")
    passed = (actual == expected)

    results.append([question, expected, actual, reason, "‚úÖ" if passed else "‚ùå"])

# Result table
print(tabulate(results, headers=["Question", "Expected", "Actual", "Reason", "Pass/Fail"], tablefmt="grid"))


+--------------------------------------------+------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| Question                                   | Expected   | Actual   | Reason                                                                                                                                               | Pass/Fail   |
| Top 10 customers by total spend            | True       | True     | Can be answered with the schema by joining customers, orders, order_items, and products and summing product prices per customer, then taking top 10. | ‚úÖ          |
+--------------------------------------------+------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
| Show me revenue trends by month            | True    

## Planner
The Planner converts a valid natural-language question into a structured, schema-aware query plan.
Rather than writing SQL directly, it identifies the required tables, joins, filters, aggregations,
and grouping logic.

For multi-step analytics, the planner can introduce derived computations (e.g., intermediate
aggregations) that are later translated into SQL. This separation of planning and execution enables
more reliable reasoning and safer query generation.


In [None]:
def get_planner_prompt():
  planner_prompt = PromptTemplate(
      template = """
      You are a planner for read-only analytics SQL over an SQLite database.
      Given the schema, optional column metadata, and a user question, produce a JSON plan
      for how to answer the question. Do NOT write SQL, only the plan.

      Return a JSON object with these keys:
      - "entities": [table names...]
      - "joins": [{{"left": "<table.col>", "right": "<table.col>"}}]
      - "filters": [{{"field": "<table.col>", "op": "<=, >=, =, between, in, like>", "values": [..]}}]
      - "metrics": [{{"expr": "<aggregation expression>", "alias": "<name>"}}]
      - "group_by": ["<table.col or expression>", ...]
      - "order_by": [{{"expr": "<field or metric>", "dir": "asc|desc"}}]
      - "derived": [
          {{
            "name": "<cte_name>",
            "description": "<what this intermediate computation is for>",
            "select": [ {{ "expr": "...", "alias": "..." }} ],
            "group_by": ["col1", "col2"]
          }}
        ]
        (omit this field entirely if not needed)
      - "limit": integer or null
      - On failure, return: {{"error": "<short reason>"}}

      Rules:
      - Only support read-only analytics. If the question asks to create, update, or delete data,
        return {{"error": "..."}}.
      - Use table and column names exactly as in the schema.
      - Use the metadata to interpret business concepts (e.g. status values, return rate,
        acquisition_channel, categories).
      - Include a "derived" field only when the final metric cannot be computed
        directly from the base tables such as window function logic, multi-stage joins etc.
      - Never invent columns. All expressions in metrics, filters, or derived/select
        must come from the schema or be simple expressions over schema fields

      Schema -
      {schema}

      Metadata-
      {metadata}

      question -
      {question}
      """,
      input_variables=['schema', "metadata", 'question']
  )
  return planner_prompt

In [None]:
## TESTING PLANNER
llm = ChatOpenAI(model_name="gpt-5-nano", temperature=1)
planner_prompt = get_planner_prompt()
planner_chain = planner_prompt | llm | StrOutputParser()

planner_questions = ["DROP TABLE customers", "Show names of all customers who ordered Electronics in march 2023"]
for q in planner_questions:
  test_plan = planner_chain.invoke({
        "schema": schema_context,
        "metadata": SCHEMA_METADATA,
        "question": q
    })
  print(q)
  try:
    print(json.loads(test_plan))
  except Exception as e:
    print(f"Error occured: {e}")

  planner_chain = LLMChain(llm=llm, prompt=planner_prompt)
  test_plan = planner_chain.run(schema=schema_context, question=q)


DROP TABLE customers
{'error': 'Drop table operations are not permitted.'}
Show names of all customers who ordered Electronics in march 2023
{'entities': ['customers', 'orders', 'order_items', 'products'], 'joins': [{'left': 'orders.cust_id', 'right': 'customers.cust_id'}, {'left': 'orders.order_id', 'right': 'order_items.order_id'}, {'left': 'order_items.product_id', 'right': 'products.product_id'}], 'filters': [{'field': 'orders.order_date', 'op': 'between', 'values': ['2023-03-01', '2023-03-31']}, {'field': 'products.category', 'op': '=', 'values': ['Electronics']}], 'metrics': [], 'group_by': ['customers.cust_id', 'customers.first_name', 'customers.last_name'], 'order_by': []}


## Writer
The SQL Writer translates the structured query plan into executable, SQLite-compatible SQL.
It is constrained to generate read-only queries and may only reference tables and columns explicitly
defined in the plan.

By operating strictly on the planner output, the writer avoids interpreting user intent directly,
reducing the risk of unsafe or incorrect SQL generation.


In [None]:
def get_writer_prompt():
  writer_prompt = PromptTemplate(
      template = """
      You write SQL for SQLite only.
      Convert the given query plan into a valid SQLite query.

      Rules:
      - Use only tables/columns in the plan.
      - - If the plan contains a "derived" section:
        ‚Ä¢ Generate a WITH clause.
        ‚Ä¢ Each derived block becomes a CTE using its "select" and "group_by" fields.
    - After CTEs (if any), write the final SELECT using:
        ‚Ä¢ metrics
        ‚Ä¢ group_by
        ‚Ä¢ order_by
        ‚Ä¢ joins
        ‚Ä¢ filters
      - Never generate DELETE, DROP, UPDATE, INSERT or any DDL/DML statements.
      - Produce SELECT queries only.
      - If "limit" exists, include it in the SQL.
      - Output only SQL, no commentary.

      Query Plan:
      {plan}
      """,
      input_variables=["plan"]
  )
  return writer_prompt

In [None]:
test_plan= {'entities': ['customers'],
 'joins': [],
 'filters': [{'field': 'customers.joined_at',
   'op': 'between',
   'values': ['2023-01-01', '2023-12-31']}],
 'metrics': [],
 'group_by': [],
 'order_by': []}

In [None]:
## Testing
llm = ChatOpenAI(model_name="gpt-5-nano", temperature=1)
writer_prompt = get_writer_prompt()
writer_chain = writer_prompt | llm

output = writer_chain.invoke({"plan": test_plan})
print(output.content)

SELECT customers.* FROM customers WHERE customers.joined_at BETWEEN '2023-01-01' AND '2023-12-31';


## SQL Crtitic
The SQL Critic validates generated SQL before execution. It enforces safety and correctness by
checking syntax, schema consistency, and compliance with read-only constraints.

Queries that violate these rules are rejected with a short, explicit error message. This agent acts
as a final safeguard before any database execution occurs.

In [None]:
def get_critic_prompt():
  critic_prompt = PromptTemplate(
      template = """
      You are an SQL Critic for SQLite only. Review SQL queries for safety and correctness.

      Reject SQL if:
      - It is not a SELECT query.
      - It uses non-SQLite functions (DATE_TRUNC, EXTRACT, INTERVAL, DATEADD, DATEDIFF, TO_CHAR).
      - It references tables/columns not in the schema.
      - Mentions enum values not present in metadata.
      - Syntax is invalid or unsafe.
      - Output valid JSON only, no commentary.

      If valid, output:
      {{"sql": "<cleaned_sql>"}}

      If invalid:
      {{"error": "<reason>"}}

      Schema:
      {schema}

      metadata:
      {metadata}

      SQL to check:
      {sql}

      """,
      input_variables = ['schema', 'metadata', 'sql']
  )
  return critic_prompt

In [None]:
## TESTING
llm = ChatOpenAI(model_name="gpt-5-nano", temperature=1)
critic_prompt = get_critic_prompt()
critic_chain = LLMChain(llm=llm, prompt=critic_prompt)

writer_queries = ["SELECT * FROM customers WHERE join_date BETWEEN '2023-01-01' AND '2023-12-31'",
                  "SELECT price FROM customers;",
                  "DROP table customers"]
for q in writer_queries:
  print(critic_chain.run(schema=schema_context, metadata=SCHEMA_METADATA ,sql=q))

{"error":"Unknown column 'join_date' in 'where clause'; expected 'joined_at'."}
{"error":"Column 'price' does not exist in table 'customers'."}
{"error":"Unsafe: only SELECT queries are allowed; DROP statements are not permitted."}


## Executor
The Executor runs validated SQL queries against the database and returns the result as a structured
dataframe.

In [None]:
class Executor:
  def __init__(self, db_path:str):
    self.db_path = db_path
    self.result = None

  def implement_sql(self, sql_query):
    try:
      conn = sqlite3.connect(self.db_path)
      cursor = conn.cursor()
      cursor.execute(sql_query)
      rows = cursor.fetchall()
      cols = [desc[0] for desc in cursor.description]
      # result_df = pd.DataFrame(cursor.fetchall())
      # result_df.columns = cursor.keys()
      # result = result_df
      conn.close()
      return {"cols": cols, "rows": rows}
    except Exception as e:
      return {"error" : str(e)}


In [None]:
## TESTING
db_path = os.getenv("DATABASE")
executor = Executor(db_path)

result = executor.implement_sql("select * from customers limit 10")
result_df = pd.DataFrame(result['rows'], columns = result['cols'])
result_df

Unnamed: 0,cust_id,first_name,last_name,mobile,state,joined_at
0,1,Clark,Jerry,2005251729,Washington,2022-09-23
1,2,Graves,Debra,2233694415,Arizona,2023-01-27
2,3,Shaffer,Sandra,3076269399,Arizona,2022-12-06
3,4,Morris,David,3534070738,North Dakota,2024-04-12
4,5,Walker,Harold,3115801552,New Hampshire,2025-08-23
5,6,Medina,Laura,5829227575,Idaho,2022-12-02
6,7,Lloyd,Jessica,6931607876,Louisiana,2023-08-28
7,8,Taylor,Cheryl,2012703457,Montana,2023-03-18
8,9,Hurst,William,8407141283,Connecticut,2023-05-18
9,10,Barron,Alex,2448599204,Nevada,2025-07-24


## Visualizer
The Visualization Agent inspects the query result and determines whether a chart would meaningfully
represent the data. When appropriate, it selects a chart type (e.g., bar or line) and maps result
columns to visual encodings.

In [None]:
def get_chart_detection_prompt():
  chart_detection_prompt = PromptTemplate(
      template="""
      You are a visualization planner.
      Given a user's question and the SQL query result summry + example, decide the best chart.

      Question: {question}
      column_summary: {column_summary}
      Example row: {sample_row}

      Choose the most suitable visualization and map x/y columns.
      Valid chart types example: ["bar", "line", "pie", "scatter", "Not Possible"]

      Useful Guidelines:
        - Use "bar" for one categorical + one numeric column.
        - Use "line" for time-series or ordered data.
        - Use "pie" for showing share, distribution or percentage data.
        - Use "scatter" for two numeric columns (to show correlation).
        - If the result has only one row OR only one column then return "Not Possible".
        - Use "Not Possible" when no meaningful visualization fits.

      Output JSON only:
      {{
        "chart_type": "...",
        "x": "...",
        "y": "..."
      }}
      """,
      input_variables=["question", "column_summary", "sample_row"]
  )
  return chart_detection_prompt


In [None]:
from matplotlib import pyplot as plt
import plotly.express as px

class ChartBuilder:
    def __init__(self, llm):
      self.llm = llm

    def detect_visualization_intent(self, question, result):
      col_summary, df = self.summarize_columns(result)

      sample_row = df.iloc[0].to_dict() if not df.empty else {}

      #Ask LLM for an initial guess
      prompt=get_chart_detection_prompt()
      chain = prompt | self.llm | StrOutputParser()
      raw_response = chain.invoke({
          "question": question,
          "column_summary": col_summary,
          "sample_row": sample_row
      })

      try:
          parsed = json.loads(raw_response)
          chart_type = parsed.get("chart_type", "table")
          x = parsed.get("x")
          y = parsed.get("y")
      except Exception:
          chart_type, x, y = "Not Possible", None, None

      # safety checks
      numeric_cols = df.select_dtypes(include=["float64", "int64"]).columns.tolist()
      categorical_cols = df.select_dtypes(exclude=["float64", "int64"]).columns.tolist()

      if df.shape[0] <= 1:
          chart_type = "Not Possible"
      if df.shape[0] < 3 and chart_type != "pie":
          chart_type = "Not Possible"

      if chart_type not in ["bar", "line", "pie", "scatter"]:
          chart_type = "Not Possible"

      if chart_type == "scatter":
          if x not in numeric_cols or y not in numeric_cols:
                chart_type = "Not Possible"

      if chart_type == "Not Possible":
        agent_log("Visualizer", f"Chart type selected: {chart_type}")
        return "Not Possible", None, None, df

      agent_log("Visualizer", f"Chart type selected: {chart_type}")
      return chart_type, x, y, df

    def summarize_columns(self, result):
      df = pd.DataFrame(result["rows"], columns=result["cols"])
      summary = []
      for c in df.columns:
          dtype = str(df[c].dtype)
          n_unique = df[c].nunique()
          summary.append({"column": c, "dtype": dtype, "unique_values": n_unique})
      return summary, df

    def render_visualization(self, chart_type, df, x=None, y=None):
      default_size = dict(width=500, height=350)
      if chart_type == "bar":
          if x is None or y is None:
            agent_log("Visualizer", "Bar chart requires 'x' and 'y' fields.")
          fig = px.bar(df.sort_values(y, ascending=False).head(10), x=x, y=y, title=f"{y.title()} by {x.title()}", **default_size)
          return fig
      elif chart_type == "line":
          if x is None or y is None:
            agent_log("Visualizer", "Line chart requires 'x' and 'y' fields.")
          fig = px.line(df.sort_values(x), x=x, y=y, title=f"{y.title()} over {x.title()}", **default_size)
          return fig
      elif chart_type == "pie":
          if x is None or y is None:
            agent_log("VVisualizer", "Pie chart requires 'x' and 'y' fields.")
          fig = px.pie(df.sort_values(y, ascending=False).head(10), names=x, values=y, title=f"{y.title()} share by {x.title()}", **default_size)
          return fig
      elif chart_type == "scatter":
          if x is None or y is None:
            agent_log("Visualizer", "Scatter plot requires 'x' and 'y' fields.")
          fig = px.scatter(df, x=x, y=y, trendline="ols", title=f"{y.title()} vs {x.title()}", **default_size)
          return fig
      else:
          return None

In [None]:
## TESTING

llm = ChatOpenAI(model_name="gpt-5-nano", temperature=1)
chart_builder = ChartBuilder(llm=llm)

tests = [
    {
        "question": "Top 5 products by revenue.",
        "result": {"cols": ["id", "product", "revenue", "category"],
                   "rows": [(1,"iPhone",900, "electronics"),(2,"Galaxy",850, "electronics"),(3,"Pixel",780, "electronics")]}
    },
    {
        "question": "Share of revenue by category.",
        "result": {"cols": ["row", "category","revenue_share"],
                   "rows": [(1001, "Electronics",0.45),(1002, "Home",0.30),(1003, "Clothing",0.25)]}
    },
    {
        "question": "Monthly sales trend in 2024.",
        "result": {"cols": ["month","total_sales"],
                   "rows": [("Jan",5000),("Feb",6000),("Mar",7000),("Apr",6800)]}
    },
    {
        "question": "Compare total sales across states and categories.",
        "result": {"cols": ["state","category","total_sales"],
                   "rows": [("CA","Electronics",10000),("CA","Home",8000),("TX","Electronics",9000)]}
    },
    {
        "question": "Average price vs quantity sold.",
        "result": {"cols": ["avg_price","quantity_sold"],
                   "rows": [(200,50),(150,80),(300,30)]}
    },
    {
    "question": "List all product categories and their descriptions.",
    "result": {
        "cols": ["category_name", "description"],
        "rows": [
            ("Electronics", "Devices and gadgets like phones, TVs, and laptops."),
            ("Home", "Furniture, kitchen appliances, and decor items."),
            ("Clothing", "Men's, women's, and kids' apparel.")
        ]
    }
  }
]

for test in tests:
    chart_type, x, y, df = chart_builder.detect_visualization_intent(
        test["question"], test["result"]
    )

    print(f"\nüß© Question: {test['question']}")
    print(f"Detected Intent ‚Üí chart_type: {chart_type}, x: {x}, y: {y},")

    chart_output = chart_builder.render_visualization(chart_type, df, x, y)
    if isinstance(chart_output, str):
      print(chart_output)
    else:
      chart_output.show()


üß© Question: Top 5 products by revenue.
Detected Intent ‚Üí chart_type: bar, x: product, y: revenue,



üß© Question: Share of revenue by category.
Detected Intent ‚Üí chart_type: pie, x: category, y: revenue_share,



üß© Question: Monthly sales trend in 2024.
Detected Intent ‚Üí chart_type: line, x: month, y: total_sales,



üß© Question: Compare total sales across states and categories.
Detected Intent ‚Üí chart_type: bar, x: state, y: total_sales,



üß© Question: Average price vs quantity sold.
Detected Intent ‚Üí chart_type: scatter, x: avg_price, y: quantity_sold,



üß© Question: List all product categories and their descriptions.
Detected Intent ‚Üí chart_type: table, x: , y: ,
No meaningful visualization possible for this result.


## Agent State
All agents communicate through a shared, structured state object that captures the evolving context
of a user request as it moves through the pipeline.
This explicit state management allows:
- Transparent execution tracing
- Early exits on failure
- Deterministic agent behavior
- Easier debugging and extension

In [None]:
from typing import TypedDict, Optional, List, Any

class AgentState(TypedDict, total=False):
    question: Optional[str]
    schema: Optional[str]
    metadata: Optional[dict]
    classification: Optional[bool]
    classification_reason: Optional[str]
    plan: Optional[dict]
    plan_error: Optional[str]
    sql: Optional[str]
    final_critic_sql: Optional[str]
    critic_error: Optional[str]
    executor_df: Optional[pd.DataFrame]
    executor_error: Optional[str]
    chart_type: Optional[str]
    chart_figure: Optional[Any]
    insight: Optional[str]

## Supervisor Agent
A central Supervisor coordinates all agents and manages shared execution state. It controls the
order of agent invocation, handles early exits on failure, and aggregates outputs into a final user
response.

In [None]:
def schema_to_string(snapshot: dict) -> str:
    lines = []
    for table, meta in snapshot["tables"].items():
        cols = ", ".join([c["name"] for c in meta["columns"]])
        lines.append(f"{table}({cols})")
    return "\n".join(lines)

schema_context = schema_to_string(schema_snapshot)

In [None]:
schema_context

'customers(cust_id, first_name, last_name, mobile, state, joined_at, acquisition_channel)\norder_items(row_id, order_id, product_id)\norders(order_id, cust_id, order_date, status)\nproducts(product_id, category, price)'

In [None]:
def agent_log(step, message):
    logger.info(f"[{step}] {message}")

In [None]:
class Supervisor:
  def __init__(self):
    self.state: AgentState = {}
    self.llm_nano = ChatOpenAI(model_name="gpt-5-nano", temperature=1)

  def handle_query(self, question):
    self.state["question"] = question
    self.state["schema"] = schema_context
    self.state["metadata"] = SCHEMA_METADATA

    if not self.step1_classify():
      return self.state
    if not self.step2_planner():
      return self.state
    if not self.step3_writer():
      return self.state
    if not self.step4_critic():
      return self.state
    if not self.step5_executor():
      return self.state

    self.step6_visualization()
    return self.state


  def step1_classify(self):
    agent_log("Classifier", "Classifying query")
    classifier_chain = classifier_prompt | self.llm_nano | StrOutputParser()
    raw_output = classifier_chain.invoke({
        "schema": self.state["schema"],
        "metadata": self.state["metadata"],
        "question": self.state["question"]
    })
    try:
      parsed = json.loads(raw_output.strip())
    except:
      parsed = {"is_db_query": False, "reason": "Invalid JSON"}

    is_db_true = parsed.get("is_db_query", False)
    reason = parsed.get("reason", False)
    self.state["classification"] = is_db_true
    self.state["classification_reason"] = reason
    agent_log("Classifier", f"Classification Done <{is_db_true}: {reason}>")
    if not is_db_true:
      return False
    return True

  def step2_planner(self):
    agent_log("Planner", "Building Plan")
    planner_prompt = get_planner_prompt()
    planner_chain = planner_prompt | self.llm_nano | StrOutputParser()
    planner_output = planner_chain.invoke({
        "schema": self.state["schema"],
        "metadata": self.state["metadata"],
        "question": self.state["question"]
    })
    try:
      parsed = json.loads(planner_output.strip())
      if "error" in parsed:
        err_message = parsed["error"]
        self.state["plan_error"] = err_message
        agent_log("Planner", f"Invalid plan. ERR <{err_message}>")
        return False
      else:
        self.state["plan"] = parsed
        agent_log("Planner", "Plan generated.")
    except Exception as e:
      agent_log("Planner", f"Error in generating plan. ERR <{e}>")
      return False
    return True

  def step3_writer(self):
    agent_log("Writer", "Generating SQL")
    writer_prompt = get_writer_prompt()
    writer_chain = writer_prompt | self.llm_nano | StrOutputParser()
    writer_output = writer_chain.invoke({"plan": self.state["plan"]})
    # plan = self.state["plan"]
    # writer_output = writer_chain.run(plan=plan)
    self.state["sql"] = writer_output
    agent_log("Writer", "SQL generated successfully")
    return True

  def step4_critic(self):
    agent_log("Critic", "Reviewing SQL")
    critic_prompt = get_critic_prompt()
    # critic_chain=LLMChain(llm=self.llm_nano, prompt=critic_prompt)
    # critic_output = critic_chain.run(schema=self.state["schema"], sql=self.state["sql"])
    critic_chain = critic_prompt | self.llm_nano | StrOutputParser()
    critic_output = critic_chain.invoke({"schema": self.state["schema"],
                                         "metadata": self.state["metadata"],
                                         "sql": self.state["sql"]})
    try:
      parsed = json.loads(critic_output)
      if "error" in parsed:
        self.state["critic_error"] = parsed["error"]
        agent_log("Critic", f"Error in critic results ERR <{parsed['error']}>")
        return False
      else:
        self.state["final_critic_sql"] = parsed["sql"]
    except Exception as e:
      agent_log("Critic", f"Error running critic ERR <{e}>")
      agent_log("Critic", "Executable SQL approved")
    return True

  def step5_executor(self):
    agent_log("Executor", "Running SQL on DB")
    executor = Executor(db_path)
    result = executor.implement_sql(self.state["final_critic_sql"])
    if "error" in result:
      self.state["executor_error"] = result["error"]
      agent_log("Executor", f"Error in executor results {result['error']}")
      return False
    df = pd.DataFrame(result["rows"], columns=result["cols"])
    self.state["executor_df"] = df
    agent_log("Executor", "Finished")
    return True

  def step6_visualization(self):
    agent_log("Visualizater", "Detecting chart type")
    chart_builder = ChartBuilder(llm=self.llm_nano)
    result = self.state.get("executor_df")

    if result is None or result.empty or result.shape[0]<=1:
        agent_log("Visualizer", "No results to visualize")
        self.state["chart_type"] = "Not Possible"
        return False

    question = self.state["question"]
    chart_type, x, y, df = chart_builder.detect_visualization_intent(question, {"cols": result.columns, "rows": result.values.tolist()})

    if chart_type == "Not Possible":
        agent_log("Visualizer", "Chart not possible for this query")
        self.state["chart_type"] = "Not Possible"
        return True

    fig = chart_builder.render_visualization(chart_type, df, x, y)
    self.state["chart_type"] = chart_type
    self.state["chart_figure"] = fig
    return True


## Pipeline Orchestration & Output

###End-to-End Query Execution
This section defines the primary execution entry point for the system.

In [None]:
def run_query(q, title=None):
  supervisor = Supervisor()
  state = supervisor.handle_query(q)
  resp = user_response(state)
  if isinstance(resp, pd.DataFrame):
    if title:
      print(title)
    display(resp.head(20))
  else:
    print(resp)
  fig = state.get("chart_figure")
  if fig is not None:
    fig.show()
  print_agent_trace(state)
  return state

### User-Facing Response Assembly
This component formats the final output presented to the user.

In [None]:
def user_response(state):
  if not state["classification"]:
    return f"I cannot answer because: {state['classification_reason']}"
  if "plan_error" in state:
    return f"I could not understand how to answer this question.\nReason: {state['plan_error']}"
  if state.get("critic_error"):
    return f"I generated SQL, but it was unsafe or invalid.\nReason: {state['critic_error']}"
  if state.get("executor_error"):
      return f"I attempted to run the SQL but it failed.\nReason: {state['executor_error']}"

  df = state.get("executor_df")
  if df is None or df.empty:
        return "The query ran successfully, but returned no results."

  return df

### Agent Execution Trace
The system exposes a structured trace of agent execution. The trace
indicates which agents ran, which were skipped, and where errors occurred

In [None]:
def print_agent_trace(state):

    def mark(ok):
        return "‚úì" if ok else "‚úó"

    print("\n=== AGENT EXECUTION TRACE ===")

    # Classifier
    cls = state.get("classification")
    reason = state.get("classification_reason", "")
    print(f"[Classifier]   {mark(cls)}  ‚Üí {reason}")

    # Planner
    if "plan_error" in state:
        print(f"[Planner]      ‚úó ")
    elif "plan" in state:
        print(f"[Planner]      ‚úì  ‚Üí Plan OK")
    else:
        print(f"[Planner]      -  ‚Üí Skipped")

    # Writer
    if state.get("sql"):
        print(f"[Writer]       ‚úì  ‚Üí SQL generated")
    else:
        print(f"[Writer]       -  ‚Üí Skipped")

    # Critic
    if "critic_error" in state:
        print(f"[Critic]       ‚úó")
    elif state.get("final_critic_sql"):
        print(f"[Critic]       ‚úì  ‚Üí SQL approved")
    else:
        print(f"[Critic]       -  ‚Üí Skipped")

    # Executor
    if "executor_error" in state:
        print(f"[Executor]     ‚úó")
    elif state.get("executor_df") is not None:
        rows = len(state["executor_df"])
        print(f"[Executor]     ‚úì  ‚Üí Returned {rows} rows")
    else:
        print(f"[Executor]     -  ‚Üí Skipped")

    # Visualization
    if state.get("chart_type") == "Not Possible":
        print(f"[Viz]          -  ‚Üí No chart possible")
    elif "chart_figure" in state:
        print(f"[Viz]          ‚úì  ‚Üí {state['chart_type']} chart")
    else:
        print(f"[Viz]          -  ‚Üí Skipped")


# Curated Analytics Use Cases

The following use cases demonstrate realistic analytics questions commonly asked by product,
growth, and business teams. Each example highlights different reasoning patterns, SQL complexity,
and visualization behavior supported by the copilot.




**Quick Example:** Safe Rejection of Unsupported Queries

Let's first run a query that references a concept
not present in the database schema. Rather than hallucinating an answer, the system correctly
rejects the query and explains why it cannot be answered.

In [None]:
state = run_query("Show average rating per product")

2025-12-11 13:18:01 - INFO - [Classifier] Classifying query
2025-12-11 13:18:06 - INFO - [Classifier] Classification Done <False: Schema lacks ratings data; no rating-related table or column.>


I cannot answer because: Schema lacks ratings data; no rating-related table or column.

=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úó  ‚Üí Schema lacks ratings data; no rating-related table or column.
[Planner]      -  ‚Üí Skipped
[Writer]       -  ‚Üí Skipped
[Critic]       -  ‚Üí Skipped
[Executor]     -  ‚Üí Skipped
[Viz]          -  ‚Üí Skipped


##Case 1 - Revenue by Product Category
**Why this matters:**  Understanding revenue contribution by category helps teams evaluate product mix and pricing
strategy.

In [None]:
supervisor = Supervisor()
q = "Show revenue by product category"

state = supervisor.handle_query(q)

# Show results
resp = user_response(state)
if isinstance(resp, pd.DataFrame):
    print("Top categories by revenue:")
    display(resp.head(15))
else:
    print(resp)

# Show chart if available
fig = state.get("chart_figure")
if fig is not None:
    fig.show()

# Show agent trace (user-friendly explanation)
print_agent_trace(state)

2025-12-09 10:21:16 - INFO - [Classifier] Classifying query
2025-12-09 10:21:23 - INFO - [Classifier] Classification Done <True: Targets revenue by product category, computable from products and order_items (and orders for revenue context).>
2025-12-09 10:21:23 - INFO - [Planner] Building Plan
2025-12-09 10:21:41 - INFO - [Planner] Plan generated.
2025-12-09 10:21:41 - INFO - [Writer] Generating SQL
2025-12-09 10:21:47 - INFO - [Writer] SQL generated successfully
2025-12-09 10:21:47 - INFO - [Critic] Reviewing SQL
2025-12-09 10:21:55 - INFO - [Executor] Running SQL on DB
2025-12-09 10:21:55 - INFO - [Executor] Finished
2025-12-09 10:21:55 - INFO - [Visualization] Detecting chart type


Top categories by revenue:


Unnamed: 0,category,revenue
0,Skincare,168266.49
1,Handbags,122748.66
2,Makeup,121383.38
3,Fragrance,119518.48
4,Haircare,91341.36
5,Accessories,77541.32



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Targets revenue by product category, computable from products and order_items (and orders for revenue context).
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 6 rows
[Viz]          ‚úì  ‚Üí bar chart
[Insight]      -  ‚Üí Skipped


In [None]:
print(state["plan"])
print(state["sql"])
print(state["final_critic_sql"])

{'entities': ['orders', 'order_items', 'products'], 'joins': [{'left': 'orders.order_id', 'right': 'order_items.order_id'}, {'left': 'order_items.product_id', 'right': 'products.product_id'}], 'filters': [], 'metrics': [{'expr': 'SUM(products.price)', 'alias': 'revenue'}], 'group_by': ['products.category'], 'order_by': [{'expr': 'revenue', 'dir': 'desc'}]}
SELECT
  products.category,
  SUM(products.price) AS revenue
FROM
  orders
  JOIN order_items ON orders.order_id = order_items.order_id
  JOIN products ON order_items.product_id = products.product_id
GROUP BY
  products.category
ORDER BY
  revenue DESC;
SELECT products.category, SUM(products.price) AS revenue FROM orders JOIN order_items ON orders.order_id = order_items.order_id JOIN products ON order_items.product_id = products.product_id GROUP BY products.category ORDER BY revenue DESC;


## Case 2 - Monthly Revenue Trend
**Why this matters:**
Reveals revenue growth and seasonality over time, helping teams track performance and identify trends or anomalies.

In [None]:
supervisor = Supervisor()
q = "Show monthly revenue for 2025"

state = supervisor.handle_query(q)

# Show results
resp = user_response(state)
if isinstance(resp, pd.DataFrame):
    print("Monthly Revenue Trend (2025):")
    display(resp.head(15))
else:
    print(resp)

# Show chart if available
fig = state.get("chart_figure")
if fig is not None:
    fig.show()

# Show agent trace (user-friendly explanation)
print_agent_trace(state)

2025-12-10 17:55:21 - INFO - [Classifier] Classifying query
2025-12-10 17:55:24 - INFO - [Classifier] Classification Done <True: Query targets revenue by month using orders, order_items, and products from the schema.>
2025-12-10 17:55:24 - INFO - [Planner] Building Plan
2025-12-10 17:55:42 - INFO - [Planner] Plan generated.
2025-12-10 17:55:42 - INFO - [Writer] Generating SQL
2025-12-10 17:55:52 - INFO - [Writer] SQL generated successfully
2025-12-10 17:55:52 - INFO - [Critic] Reviewing SQL
2025-12-10 17:56:04 - INFO - [Executor] Running SQL on DB
2025-12-10 17:56:04 - INFO - [Executor] Finished
2025-12-10 17:56:04 - INFO - [Visualization] Detecting chart type
  chain = LLMChain(llm=self.llm, prompt=get_chart_detection_prompt())
  raw_response = chain.run(


Monthly Revenue Trend (2025):


Unnamed: 0,month,revenue
0,2025-01,40235.68
1,2025-02,35875.0
2,2025-03,54222.56
3,2025-04,52566.22
4,2025-05,59379.7
5,2025-06,66312.58
6,2025-07,72152.29
7,2025-08,65970.71
8,2025-09,63242.29
9,2025-10,74210.73



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Query targets revenue by month using orders, order_items, and products from the schema.
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 12 rows
[Viz]          ‚úì  ‚Üí line chart
[Insight]      -  ‚Üí Skipped


In [None]:
print(state["plan"])
print(state["sql"])
print(state["final_critic_sql"])

{'entities': ['orders', 'order_items', 'products'], 'joins': [{'left': 'orders.order_id', 'right': 'order_items.order_id'}, {'left': 'order_items.product_id', 'right': 'products.product_id'}], 'filters': [{'field': 'orders.order_date', 'op': 'between', 'values': ['2025-01-01', '2025-12-31']}], 'metrics': [{'expr': 'SUM(products.price)', 'alias': 'revenue'}], 'group_by': ["DATE_TRUNC('month', orders.order_date)"], 'order_by': [{'expr': "DATE_TRUNC('month', orders.order_date)", 'dir': 'asc'}]}
SELECT STRFTIME('%Y-%m', orders.order_date) AS month, SUM(products.price) AS revenue
FROM orders
INNER JOIN order_items ON orders.order_id = order_items.order_id
INNER JOIN products ON order_items.product_id = products.product_id
WHERE orders.order_date BETWEEN '2025-01-01' AND '2025-12-31'
GROUP BY STRFTIME('%Y-%m', orders.order_date)
ORDER BY STRFTIME('%Y-%m', orders.order_date) ASC;
SELECT STRFTIME('%Y-%m', orders.order_date) AS month, SUM(products.price) AS revenue FROM orders INNER JOIN order_

##Case 3 - Acquisition Channel Mix
**Why this matters:**
Highlights how customers are acquired across channels, supporting marketing spend allocation and channel strategy.

In [None]:
state = run_query("Show the percentage share of customers by acquisition channel.", title="Acquisition Channel Mix")

2025-12-11 19:18:40 - INFO - [Classifier] Classifying query
2025-12-11 19:18:45 - INFO - [Classifier] Classification Done <True: Relevant: relies on customers.acquisition_channel, which exists in the schema and metadata.>
2025-12-11 19:18:45 - INFO - [Planner] Building Plan
2025-12-11 19:19:40 - INFO - [Planner] Plan generated.
2025-12-11 19:19:40 - INFO - [Writer] Generating SQL
2025-12-11 19:19:56 - INFO - [Writer] SQL generated successfully
2025-12-11 19:19:56 - INFO - [Critic] Reviewing SQL
2025-12-11 19:20:16 - INFO - [Executor] Running SQL on DB
2025-12-11 19:20:16 - INFO - [Executor] Finished
2025-12-11 19:20:16 - INFO - [Visualizater] Detecting chart type
2025-12-11 19:20:24 - INFO - [Visualizer] Chart type selected: pie


Acquisition Channel Mix


Unnamed: 0,channel,channel_count,share_percent
0,Influencer,111,55.5
1,Organic Search,38,19.0
2,Instagram Ads,30,15.0
3,Google Ads,21,10.5



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Relevant: relies on customers.acquisition_channel, which exists in the schema and metadata.
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 4 rows
[Viz]          ‚úì  ‚Üí pie chart


In [None]:
state

{'question': 'Show the percentage share of customers by acquisition channel.',
 'schema': 'customers(cust_id, first_name, last_name, mobile, state, joined_at, acquisition_channel)\norder_items(row_id, order_id, product_id)\norders(order_id, cust_id, order_date, status)\nproducts(product_id, category, price)',
 'metadata': {'orders.status': {'type': 'enum',
   'values': ['completed', 'cancelled', 'returned', 'pending'],
   'meaning': {'returned': 'order was returned by the customer',
    'completed': 'order was delivered',
    'cancelled': 'order was cancelled before fulfillment'}},
  'customers.acquisition_channel': {'type': 'enum',
   'values': ['Organic Search', 'Instagram Ads', 'Google Ads', 'Influencer']},
  'products.category': {'type': 'enum',
   'values': ['Skincare',
    'Haircare',
    'Makeup',
    'Fragrance',
    'Handbags',
    'Accessories']}},
 'classification': True,
 'classification_reason': 'Relevant: relies on customers.acquisition_channel, which exists in the schema

##Case 4 - Return Rate by Category
**Why this matters:**
Helps identify product categories with higher return rates, which can signal issues with product quality, sizing, customer expectations, or fulfillment.

In [None]:
state = run_query("Show return rate for each category", title="Return Rate")

2025-12-11 13:44:38 - INFO - [Classifier] Classifying query
2025-12-11 13:44:45 - INFO - [Classifier] Classification Done <True: Yes. You can compute return rate per category by joining orders (status='returned'), order_items, and products (category).>
2025-12-11 13:44:45 - INFO - [Planner] Building Plan
2025-12-11 13:45:14 - INFO - [Planner] Plan generated.
2025-12-11 13:45:14 - INFO - [Writer] Generating SQL
2025-12-11 13:45:24 - INFO - [Writer] SQL generated successfully
2025-12-11 13:45:24 - INFO - [Critic] Reviewing SQL
2025-12-11 13:45:41 - INFO - [Executor] Running SQL on DB
2025-12-11 13:45:41 - INFO - [Executor] Finished
2025-12-11 13:45:41 - INFO - [Visualization] Detecting chart type


Return Rate


Unnamed: 0,category,return_rate
0,Skincare,0.110715
1,Haircare,0.10572
2,Fragrance,0.102821
3,Makeup,0.098872
4,Accessories,0.092671
5,Handbags,0.08129



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Yes. You can compute return rate per category by joining orders (status='returned'), order_items, and products (category).
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 6 rows
[Viz]          ‚úì  ‚Üí bar chart


## Case 5 - Same-Day Conversion Rate
**Why this matters:**
Evaluates how effectively the platform converts new sign-ups into buyers on the day they join.

In [None]:
state = run_query("How many customers placed a completed order on the same day they signed up?", title="Same-Day Conversion Rate")

2025-12-11 14:29:30 - INFO - [Classifier] Classifying query
2025-12-11 14:29:38 - INFO - [Classifier] Classification Done <True: Requires joining customers and orders on cust_id and filtering where orders.status = 'completed' and date(orders.order_date) = date(customers.joined_at); all fields and the status value exist in schema/metadata.>
2025-12-11 14:29:38 - INFO - [Planner] Building Plan
2025-12-11 14:29:54 - INFO - [Planner] Plan generated.
2025-12-11 14:29:54 - INFO - [Writer] Generating SQL
2025-12-11 14:30:02 - INFO - [Writer] SQL generated successfully
2025-12-11 14:30:02 - INFO - [Critic] Reviewing SQL
2025-12-11 14:30:10 - INFO - [Executor] Running SQL on DB
2025-12-11 14:30:10 - INFO - [Executor] Finished
2025-12-11 14:30:10 - INFO - [Visualization] Detecting chart type
2025-12-11 14:30:17 - INFO - [Visualization] Chart not possible for this query.


Same-Day Conversion Rate


Unnamed: 0,same_day_completed_customer_count
0,69



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Requires joining customers and orders on cust_id and filtering where orders.status = 'completed' and date(orders.order_date) = date(customers.joined_at); all fields and the status value exist in schema/metadata.
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 1 rows
[Viz]          -  ‚Üí No chart possible


##Case 6 - Top 10 Customers by Lifetime Revenue
**Why this matters:** Identifies high-value customers who contribute disproportionately to revenue, enabling targeted retention and loyalty efforts.

In [None]:
state = run_query("Show the top 10 customers by total spend.", title="Top 10 Customers by Lifetime Revenue")

2025-12-11 14:51:58 - INFO - [Classifier] Classifying query
2025-12-11 14:52:06 - INFO - [Classifier] Classification Done <True: Yes. The schema includes customers, orders, order_items, and products; you can compute total spend by joining orders to order_items to products (optionally filtering to completed) and aggregating by customer, then selecting the top 10.>
2025-12-11 14:52:06 - INFO - [Planner] Building Plan
2025-12-11 14:52:29 - INFO - [Planner] Plan generated.
2025-12-11 14:52:29 - INFO - [Writer] Generating SQL
2025-12-11 14:52:38 - INFO - [Writer] SQL generated successfully
2025-12-11 14:52:38 - INFO - [Critic] Reviewing SQL
2025-12-11 14:52:53 - INFO - [Executor] Running SQL on DB
2025-12-11 14:52:53 - INFO - [Executor] Finished
2025-12-11 14:52:53 - INFO - [Visualization] Detecting chart type


Top 10 Customers by Lifetime Revenue


Unnamed: 0,cust_id,first_name,last_name,total_spend
0,C187,Samantha,Fernandez,5339.02
1,C115,Evan,Cline,4820.08
2,C075,Brittany,Williams,4787.52
3,C077,David,Meza,4546.43
4,C197,Natasha,Knight,4514.82
5,C058,Zachary,Mcfarland,4445.48
6,C139,Edgar,Smith,4422.95
7,C143,William,Wagner,4397.98
8,C144,Kara,Ayers,4342.57
9,C189,Teresa,Cantrell,4326.03



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Yes. The schema includes customers, orders, order_items, and products; you can compute total spend by joining orders to order_items to products (optionally filtering to completed) and aggregating by customer, then selecting the top 10.
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 10 rows
[Viz]          ‚úì  ‚Üí bar chart


In [None]:
state

{'question': 'What is the average order value for each product category?',
 'schema': 'customers(cust_id, first_name, last_name, mobile, state, joined_at, acquisition_channel)\norder_items(row_id, order_id, product_id)\norders(order_id, cust_id, order_date, status)\nproducts(product_id, category, price)',
 'metadata': {'orders.status': {'type': 'enum',
   'values': ['completed', 'cancelled', 'returned', 'pending'],
   'meaning': {'returned': 'order was returned by the customer',
    'completed': 'order was delivered',
    'cancelled': 'order was cancelled before fulfillment'}},
  'customers.acquisition_channel': {'type': 'enum',
   'values': ['Organic Search', 'Instagram Ads', 'Google Ads', 'Influencer']},
  'products.category': {'type': 'enum',
   'values': ['Skincare',
    'Haircare',
    'Makeup',
    'Fragrance',
    'Handbags',
    'Accessories']}},
 'classification': True,
 'classification_reason': 'Yes ‚Äî the schema has orders, order_items, and products with price and category;

## Case 7 - Average Order Value (AOV) by Category
**Why this matters:**
Compares purchasing behavior across categories to understand where customers tend to place higher-value orders.

In [None]:
state = run_query("What is the average order value for each product category?", title="Average Order Value (AOV) by Category")

2025-12-11 15:16:35 - INFO - [Classifier] Classifying query
2025-12-11 15:16:48 - INFO - [Classifier] Classification Done <True: Related to database; schema supports orders, order_items, and products with price and category to compute average order value by category.>
2025-12-11 15:16:48 - INFO - [Planner] Building Plan
2025-12-11 15:17:35 - INFO - [Planner] Plan generated.
2025-12-11 15:17:35 - INFO - [Writer] Generating SQL
2025-12-11 15:17:57 - INFO - [Writer] SQL generated successfully
2025-12-11 15:17:57 - INFO - [Critic] Reviewing SQL
2025-12-11 15:18:17 - INFO - [Executor] Running SQL on DB
2025-12-11 15:18:17 - INFO - [Executor] Finished
2025-12-11 15:18:17 - INFO - [Visualization] Detecting chart type


Average Order Value (AOV) by Category


Unnamed: 0,avg_order_value_by_category,category
0,82.040385,Handbags
1,75.804501,Fragrance
2,61.167006,Skincare
3,39.540577,Makeup
4,39.05425,Haircare
5,38.909092,Accessories



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Related to database; schema supports orders, order_items, and products with price and category to compute average order value by category.
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 6 rows
[Viz]          ‚úì  ‚Üí bar chart


In [None]:
state

{'question': 'What is the average order value for each product category?',
 'schema': 'customers(cust_id, first_name, last_name, mobile, state, joined_at, acquisition_channel)\norder_items(row_id, order_id, product_id)\norders(order_id, cust_id, order_date, status)\nproducts(product_id, category, price)',
 'metadata': {'orders.status': {'type': 'enum',
   'values': ['completed', 'cancelled', 'returned', 'pending'],
   'meaning': {'returned': 'order was returned by the customer',
    'completed': 'order was delivered',
    'cancelled': 'order was cancelled before fulfillment'}},
  'customers.acquisition_channel': {'type': 'enum',
   'values': ['Organic Search', 'Instagram Ads', 'Google Ads', 'Influencer']},
  'products.category': {'type': 'enum',
   'values': ['Skincare',
    'Haircare',
    'Makeup',
    'Fragrance',
    'Handbags',
    'Accessories']}},
 'classification': True,
 'classification_reason': 'Related to database; schema supports orders, order_items, and products with price

## Case 8 - 7-Day Activation Funnel
**Why this matters:**
Assesses onboarding effectiveness by measuring how quickly new customers place their first completed order.

In [None]:
state = run_query("What percentage of customers placed their first completed order within 7 days of signing up?", "7-Day Activation Funnel")

2025-12-11 16:19:03 - INFO - [Classifier] Classifying query
2025-12-11 16:19:12 - INFO - [Classifier] Classification Done <True: Yes. Schema has customers.joined_at, orders.order_date, and orders.status with 'completed', enabling calculation of each customer's first completed order date relative to signup and the 7‚Äëday window.>
2025-12-11 16:19:12 - INFO - [Planner] Building Plan
2025-12-11 16:20:21 - INFO - [Planner] Plan generated.
2025-12-11 16:20:21 - INFO - [Writer] Generating SQL
2025-12-11 16:20:41 - INFO - [Writer] SQL generated successfully
2025-12-11 16:20:41 - INFO - [Critic] Reviewing SQL
2025-12-11 16:21:04 - INFO - [Executor] Running SQL on DB
2025-12-11 16:21:04 - INFO - [Executor] Finished
2025-12-11 16:21:04 - INFO - [Visualizater] Detecting chart type
2025-12-11 16:21:04 - INFO - [Visualizer] No results to visualize


7-Day Activation Funnel


Unnamed: 0,percentage_within_7_days
0,35.0



=== AGENT EXECUTION TRACE ===
[Classifier]   ‚úì  ‚Üí Yes. Schema has customers.joined_at, orders.order_date, and orders.status with 'completed', enabling calculation of each customer's first completed order date relative to signup and the 7‚Äëday window.
[Planner]      ‚úì  ‚Üí Plan OK
[Writer]       ‚úì  ‚Üí SQL generated
[Critic]       ‚úì  ‚Üí SQL approved
[Executor]     ‚úì  ‚Üí Returned 1 rows
[Viz]          -  ‚Üí No chart possible


In [None]:
state

{'question': 'What percentage of customers placed their first completed order within 7 days of signing up?',
 'schema': 'customers(cust_id, first_name, last_name, mobile, state, joined_at, acquisition_channel)\norder_items(row_id, order_id, product_id)\norders(order_id, cust_id, order_date, status)\nproducts(product_id, category, price)',
 'metadata': {'orders.status': {'type': 'enum',
   'values': ['completed', 'cancelled', 'returned', 'pending'],
   'meaning': {'returned': 'order was returned by the customer',
    'completed': 'order was delivered',
    'cancelled': 'order was cancelled before fulfillment'}},
  'customers.acquisition_channel': {'type': 'enum',
   'values': ['Organic Search', 'Instagram Ads', 'Google Ads', 'Influencer']},
  'products.category': {'type': 'enum',
   'values': ['Skincare',
    'Haircare',
    'Makeup',
    'Fragrance',
    'Handbags',
    'Accessories']}},
 'classification': True,
 'classification_reason': "Yes. Schema has customers.joined_at, orders.ord

#Try It Yourself
You can now ask your own question!
The system will automatically plan, generate, validate, and execute SQL, and visualize results when appropriate.

In [None]:
print("üîç Try It Yourself ‚Äî Ask any analytics question!\n")
user_q = input("Enter your question: ")

state = run_query(user_q, title="Your Query Result")