In [30]:
from sqlalchemy import create_engine, text

engine = create_engine(
    "postgresql+psycopg2://postgres:2004@localhost:5432/company_db"
)

In [31]:
import os
import json
from openai import OpenAI
from sqlalchemy import create_engine, text

# ==============================
# SET YOUR OPENROUTER KEY HERE
# ==============================

os.environ["OPENROUTER_API_KEY"] = "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"

client = OpenAI(
    api_key=os.getenv("OPENROUTER_API_KEY"),
    base_url="https://openrouter.ai/api/v1",
)

print("âœ… OpenRouter client ready")

âœ… OpenRouter client ready


In [32]:
def get_schema_metadata():
    query = """
    SELECT table_name, column_name, data_type
    FROM information_schema.columns
    WHERE table_schema = 'public'
    ORDER BY table_name, ordinal_position;
    """

    with engine.connect() as conn:
        result = conn.execute(text(query))
        return [dict(row._mapping) for row in result]

In [33]:
from decimal import Decimal
from datetime import date, datetime

def serialize_value(value):
    if isinstance(value, Decimal):
        return float(value)
    if isinstance(value, (date, datetime)):
        return value.isoformat()
    return value


def query_postgres(sql_query: str):

    if not sql_query.strip().lower().startswith("select"):
        return {"error": "Only SELECT queries allowed"}

    try:
        with engine.connect() as conn:
            result = conn.execute(text(sql_query))
            rows = []

            for row in result:
                row_dict = {}
                for key, value in row._mapping.items():
                    row_dict[key] = serialize_value(value)
                rows.append(row_dict)

            return rows

    except Exception as e:
        return {"error": str(e)}

In [34]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_schema_metadata",
            "description": "Read database schema using SQL metadata tables",
            "parameters": {
                "type": "object",
                "properties": {}
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "query_postgres",
            "description": "Execute read-only SQL query",
            "parameters": {
                "type": "object",
                "properties": {
                    "sql_query": {
                        "type": "string",
                        "description": "SQL SELECT query"
                    }
                },
                "required": ["sql_query"]
            }
        }
    }
]

In [35]:
def execute_tool(name, arguments):

    if name == "get_schema_metadata":
        return get_schema_metadata()

    elif name == "query_postgres":
        return query_postgres(arguments["sql_query"])

    else:
        return {"error": "Unknown tool"}

In [36]:
SYSTEM_PROMPT = """
You are an autonomous PostgreSQL analysis agent.

STRICT RULES:
1. You MUST first call get_schema_metadata to understand the database structure.
2. Never assume table or column names.
3. After seeing schema, generate a SELECT query.
4. Only SELECT queries allowed.
5. Never use INSERT, UPDATE, DELETE, DROP.
6. After receiving SQL results, explain clearly.

You must reason step by step.
"""

In [37]:
def autonomous_agent(user_question, max_iterations=6):

    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": user_question}
    ]

    for i in range(max_iterations):

        response = client.chat.completions.create(
            model="openai/gpt-4o-mini",
            messages=messages,
            tools=tools,
            tool_choice="auto",
            temperature=0
        )

        message = response.choices[0].message

        # Tool call
        if message.tool_calls:

            messages.append(message)

            for tool_call in message.tool_calls:
                tool_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)

                print(f"\nðŸ”§ Iteration {i+1}: Calling {tool_name}")

                result = execute_tool(tool_name, arguments)

                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": json.dumps(result)
                })

        else:
            return message.content

    return "âš  Max iterations reached."

In [38]:
question = "Which department has the highest total salary expense?"

result = autonomous_agent(question)

print("\n==============================")
print("FINAL ANSWER")
print("==============================\n")
print(result)


ðŸ”§ Iteration 1: Calling get_schema_metadata

ðŸ”§ Iteration 2: Calling query_postgres

ðŸ”§ Iteration 3: Calling query_postgres

FINAL ANSWER

The department with the highest total salary expense is **Research & Development**, with a total salary expense of **$6,036,284**. 

This was determined by summing the monthly income of all employees grouped by their respective departments and identifying the department with the highest total.


In [43]:
import os
import json
from decimal import Decimal
from datetime import date, datetime
from openai import OpenAI
from sqlalchemy import create_engine, text


In [68]:
def get_schema_metadata():
    query = """
    SELECT table_name, column_name, data_type
    FROM information_schema.columns
    WHERE table_schema = 'public'
    ORDER BY table_name, ordinal_position;
    """
    with engine.connect() as conn:
        result = conn.execute(text(query))
        return [dict(row._mapping) for row in result]

In [69]:
def generate_query_plan(user_question, schema_metadata):

    schema_text = json.dumps(schema_metadata, indent=2)

    prompt = f"""
You are a PostgreSQL structured query planner.

Schema (JSON):
{schema_text}

User question:
{user_question}

Return ONLY valid JSON:

{{
  "table": "",
  "select_columns": [],
  "aggregations": [
      {{
        "function": "",
        "column": "",
        "alias": ""
      }}
  ],
  "filters": [
      {{
        "column": "",
        "operator": "=",
        "value": ""
      }}
  ],
  "group_by": [],
  "order_by": {{
      "type": "column|aggregation",
      "value": ""
  }},
  "order_direction": "ASC|DESC",
  "limit": 10,
  "explanation": ""
}}

Rules:
- Use EXACT table and column names.
- Do NOT write SQL.
- Do NOT invent columns.
- If no aggregation needed, return empty list.
- If no filters, return empty list.
"""

    response = client.chat.completions.create(
        model="openai/gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )

    return response.choices[0].message.content

In [70]:
def validate_plan(plan_json, schema_metadata):

    try:
        plan = json.loads(plan_json)
    except:
        return False, "Invalid JSON"

    tables = {}
    for row in schema_metadata:
        tables.setdefault(row["table_name"], []).append(row["column_name"])

    table = plan.get("table")
    if table not in tables:
        return False, f"Invalid table: {table}"

    valid_columns = set(tables[table])

    # Validate select columns
    for col in plan.get("select_columns", []):
        if col not in valid_columns:
            return False, f"Invalid select column: {col}"

    # Validate aggregations
    for agg in plan.get("aggregations", []):
        if agg["column"] not in valid_columns:
            return False, f"Invalid aggregation column: {agg['column']}"

    # Validate filters
    for f in plan.get("filters", []):
        if f["column"] not in valid_columns:
            return False, f"Invalid filter column: {f['column']}"

    return True, plan

In [71]:
def build_sql_from_plan(plan):

    table = f'"{plan["table"]}"'

    select_parts = []

    # Add normal columns
    for col in plan.get("select_columns", []):
        select_parts.append(f'"{col}"')

    # Add aggregations
    for agg in plan.get("aggregations", []):
        func = agg["function"].upper()
        column = agg["column"]
        alias = agg["alias"]

        expr = f'{func}("{column}")'
        if alias:
            expr += f' AS "{alias}"'

        select_parts.append(expr)

    select_clause = ", ".join(select_parts)

    sql = f"SELECT {select_clause} FROM {table}"

    # WHERE
    if plan["filters"]:
        clauses = []
        for f in plan["filters"]:
            col = f'"{f["column"]}"'
            val = f["value"]
            if isinstance(val, str):
                val = f"'{val}'"
            clauses.append(f"{col} {f['operator']} {val}")
        sql += " WHERE " + " AND ".join(clauses)

    # GROUP BY
    if plan["group_by"]:
        cols = [f'"{c}"' for c in plan["group_by"]]
        sql += " GROUP BY " + ", ".join(cols)

    # ORDER BY
    if plan["order_by"]["value"]:
        if plan["order_by"]["type"] == "column":
            col = f'"{plan["order_by"]["value"]}"'
            sql += f" ORDER BY {col} {plan['order_direction']}"
        elif plan["order_by"]["type"] == "aggregation":
            for agg in plan["aggregations"]:
                if agg["alias"] == plan["order_by"]["value"]:
                    func = agg["function"].upper()
                    column = agg["column"]
                    sql += f' ORDER BY {func}("{column}") {plan["order_direction"]}'
                    break

    # LIMIT
    if plan["limit"]:
        sql += f" LIMIT {int(plan['limit'])}"

    return sql

In [72]:
def serialize_value(value):
    if isinstance(value, Decimal):
        return float(value)
    if isinstance(value, (date, datetime)):
        return value.isoformat()
    return value


def execute_sql(sql_query):

    if not sql_query.strip().lower().startswith("select"):
        return {"error": "Only SELECT allowed"}

    try:
        with engine.connect() as conn:
            result = conn.execute(text(sql_query))
            rows = []
            for row in result:
                row_dict = {
                    key: serialize_value(val)
                    for key, val in row._mapping.items()
                }
                rows.append(row_dict)
            return rows
    except Exception as e:
        return {"error": str(e)}

In [73]:
def reflect(user_question, sql_result):

    prompt = f"""
User Question:
{user_question}

SQL Result:
{sql_result}

Explain clearly and concisely.
If result is empty, explain why.
"""

    response = client.chat.completions.create(
        model="openai/gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )

    return response.choices[0].message.content

In [74]:
def structured_sql_agent(user_question):

    # Step 1: Read schema
    schema_metadata = get_schema_metadata()

    # Step 2: Generate structured plan
    plan_json = generate_query_plan(user_question, schema_metadata)
    print("Generated Plan:\n", plan_json)

    # Step 3: Validate
    valid, plan_or_error = validate_plan(plan_json, schema_metadata)
    if not valid:
        return f"Validation failed: {plan_or_error}"

    plan = plan_or_error

    # Step 4: Build SQL
    sql = build_sql_from_plan(plan)
    print("\nBuilt SQL:\n", sql)

    # Step 5: Execute
    result = execute_sql(sql)

    # Step 6: Reflect
    return reflect(user_question, result)

In [75]:
question = "Which department has the highest total salary expense?"

result = structured_sql_agent(question)

print("\n==============================")
print("FINAL ANSWER")
print("==============================\n")
print(result)

Generated Plan:
 {
  "table": "employees",
  "select_columns": [
    "Department"
  ],
  "aggregations": [
    {
      "function": "SUM",
      "column": "MonthlyIncome",
      "alias": "TotalSalaryExpense"
    }
  ],
  "filters": [],
  "group_by": [
    "Department"
  ],
  "order_by": {
    "type": "aggregation",
    "value": "TotalSalaryExpense"
  },
  "order_direction": "DESC",
  "limit": 1,
  "explanation": "This query calculates the total salary expense for each department by summing the MonthlyIncome and orders the results in descending order to find the department with the highest total salary expense."
}

Built SQL:
 SELECT "Department", SUM("MonthlyIncome") AS "TotalSalaryExpense" FROM "employees" GROUP BY "Department" ORDER BY SUM("MonthlyIncome") DESC LIMIT 1

FINAL ANSWER

The SQL result indicates that the department with the highest total salary expense is "Research & Development," with a total salary expense of $6,036,284. This means that among all departments in the data