##Objective:

Build an agent that converts natural language queries into SQL, executes them, and returns formatted results. The agent:

1.   Successfully uses OpenAI API to convert natural language queries into valid, syntactically correct SQL.
2.   Executes generated SQL safely and returns correct, formatted results from the database.
3.   Implements proper validation (e.g., blocks dangerous statements like DROP, DELETE, etc.).


In [1]:
from google.colab import userdata
openai_api_key = userdata.get('OPENAI_API_KEY')

In [2]:
!pip install openai==0.28

Collecting openai==0.28
  Downloading openai-0.28.0-py3-none-any.whl.metadata (13 kB)
Downloading openai-0.28.0-py3-none-any.whl (76 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.5/76.5 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: openai
  Attempting uninstall: openai
    Found existing installation: openai 2.15.0
    Uninstalling openai-2.15.0:
      Successfully uninstalled openai-2.15.0
Successfully installed openai-0.28.0


In [4]:
import sqlite3

# Part 1: Setting Up the Database


def setup_database():
    conn = sqlite3.connect('company.db')
    c = conn.cursor()

    # Create sample tables
    c.execute('''
        CREATE TABLE IF NOT EXISTS employees (
            id INTEGER PRIMARY KEY,
            name TEXT,
            department TEXT,
            salary REAL
        )
    ''')

    c.execute('''
        CREATE TABLE IF NOT EXISTS departments (
            id INTEGER PRIMARY KEY,
            name TEXT,
            budget REAL
        )
    ''')

    # Insert sample data
    c.execute("INSERT OR IGNORE INTO employees VALUES (1, 'John Doe', 'Engineering', 75000)")
    c.execute("INSERT OR IGNORE INTO employees VALUES (2, 'Jane Smith', 'Marketing', 65000)")
    c.execute("INSERT OR IGNORE INTO departments VALUES (1, 'Engineering', 1000000)")
    c.execute("INSERT OR IGNORE INTO departments VALUES (2, 'Marketing', 500000)")

    conn.commit()
    conn.close()

# Test Part 1
# Test database setup


if __name__ == "__main__":
    # Verify the setup
    setup_database()
    conn = sqlite3.connect('company.db')
    c = conn.cursor()

    print("Employees table:")
    c.execute("SELECT * FROM employees")
    print(c.fetchall())

    print("\nDepartments table:")
    c.execute("SELECT * FROM departments")
    print(c.fetchall())

    conn.close()

Employees table:
[(1, 'John Doe', 'Engineering', 75000.0), (2, 'Jane Smith', 'Marketing', 65000.0)]

Departments table:
[(1, 'Engineering', 1000000.0), (2, 'Marketing', 500000.0)]


In [6]:
import openai
import sqlite3
#from config import OPENAI_API_KEY

from google.colab import userdata
openai.api_key = userdata.get('OPENAI_API_KEY')

# Part 2: Creating the SQL Generator

# define schema to guide SQL generation


def get_schema():
    return """
    Table: employees
    Columns:
    - id (INTEGER PRIMARY KEY)
    - name (TEXT)
    - department (TEXT)
    - salary (REAL)

    Table: departments
    Columns:
    - id (INTEGER PRIMARY KEY)
    - name (TEXT)
    - budget (REAL)
    """

# Generate SQL query from natural language


def generate_sql(question):
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": f"""You are a SQL expert. Use this schema:\n{get_schema()}
            Return ONLY the SQL query without any explanation or markdown formatting."""},
            {"role": "user", "content": f"Generate SQL for: {question}"}
        ]
    )
    sql = response.choices[0].message.content.strip()

    # Remove any markdown code block syntax
    sql = sql.replace('```sql', '').replace('```SQL', '').replace('```', '')
    # Remove any explanatory text before or after the SQL
    sql_lines = [line.strip() for line in sql.split('\n') if line.strip()]
    sql = ' '.join(sql_lines)
    return sql


# Test SQL generation
if __name__ == "__main__":
    test_questions = [
        "List all employees",
        "Show departments with budgets over 750000"
    ]

    for question in test_questions:
        print(f"\nQuestion: {question}")
        print(f"Generated SQL: {generate_sql(question)}")

# Part 3: Implementing Query Execution
# validate SQL query before executing


def validate_sql(sql):
    # Basic safety checks
    sql_lower = sql.lower()
    if any(word in sql_lower for word in ['drop', 'delete', 'update', 'insert']):
        raise ValueError("Only SELECT queries are allowed")
    return sql


# execute the SQL on SQLite database

def execute_query(sql):
    sql = validate_sql(sql)
    conn = sqlite3.connect('company.db')
    try:
        cursor = conn.cursor()
        cursor.execute(sql)
        results = cursor.fetchall()
        return results
    except Exception as e:
        return f"Error: {str(e)}"
    finally:
        conn.close()

# format the SQL cleanly


def format_results(results):
    if not isinstance(results, list):
        return str(results)
    if not results:
        return "No results found"

        # If it's a single column result
    if len(results[0]) == 1:
        return "\n".join([str(row[0]) for row in results])

    # For multiple columns, try to format as a table
    # Get column names from the first result
    if isinstance(results[0], tuple):
        # Format each row with proper spacing
        formatted_rows = []
        for row in results:
            row_items = []
            for item in row:
                if isinstance(item, float):
                    row_items.append(f"${item:,.2f}" if "salary" in str(row) or "budget" in str(row) else f"{item:.2f}")
                else:
                    row_items.append(str(item))
            formatted_rows.append("\t".join(row_items))
        return "\n".join(formatted_rows)

    return "\n".join([str(row) for row in results])


# Test Part 3
# Test query execution
if __name__ == "__main__":
    test_sql = [
        "SELECT * FROM employees WHERE salary > 70000",
        "SELECT d.name, COUNT(e.id) FROM departments d LEFT JOIN employees e ON d.name = e.department GROUP BY d.name"
    ]

    for sql in test_sql:
        print(f"\nExecuting SQL: {sql}")
        print(f"Results: {execute_query(sql)}")

# Part 4: Creating the Complete Agent
# core agent: ask, generate, validate, run and respond

def query_agent(question):
    try:
        # Generate SQL
        sql = generate_sql(question)
        print(f"Generated SQL: {sql}\n")
        validated_sql = validate_sql(sql)
        # Execute and format results
        results = execute_query(validated_sql)
        return format_results(results)
    except Exception as e:
        return f"Error: {str(e)}"


# Test the complete agent
if __name__ == "__main__":
    test_questions = [
        "What is the average salary in each department?",
        "Which department has the highest budget?",
        "List all employees earning more than 70000",
        "DROP TABLE employees"  # This should be caught by validation
    ]

    for question in test_questions:
        print(f"\nQuestion: {question}")
        print(f"Answer: {query_agent(question)}")


Question: List all employees
Generated SQL: SELECT * FROM employees;

Question: Show departments with budgets over 750000
Generated SQL: SELECT * FROM departments WHERE budget > 750000;

Executing SQL: SELECT * FROM employees WHERE salary > 70000
Results: [(1, 'John Doe', 'Engineering', 75000.0)]

Executing SQL: SELECT d.name, COUNT(e.id) FROM departments d LEFT JOIN employees e ON d.name = e.department GROUP BY d.name
Results: [('Engineering', 1), ('Marketing', 1)]

Question: What is the average salary in each department?
Generated SQL: SELECT department, AVG(salary) FROM employees GROUP BY department;

Answer: Engineering	75000.00
Marketing	65000.00

Question: Which department has the highest budget?
Generated SQL: SELECT name FROM departments ORDER BY budget DESC LIMIT 1;

Answer: Engineering

Question: List all employees earning more than 70000
Generated SQL: SELECT * FROM employees WHERE salary > 70000;

Answer: 1	John Doe	Engineering	75000.00

Question: DROP TABLE employees
Gene