### ü§ñ LLM and AI Agent Development Courses  
**‚ú® All courses available for just ‚Çπ399 INR / $9.99 USD ‚ú®**

| Course | Description | Enroll |
|--------|-------------|---------|
| **üéØ Master OpenAI Agent Builder** | Build and deploy AI agents visually using OpenAI Agent Builder, ChatKit, RAG, Chatbot, AI Assistant with MCP, AWS, RDS MySQL | [Enroll Now](https://kgptalkie.com/agent-builder) |
| **üî• MCP Mastery** | Build MCP servers & clients with Python, Streamlit, ChromaDB, LangChain, LangGraph agents, and Ollama integrations | [Enroll Now](https://kgptalkie.com/mcp) |
| **üìä Private Agentic RAG with LangChain** | Step-by-Step Guide to RAG with LangChain v1, LangGraph, and Ollama (Qwen3, Gemma3, DeepSeek-R1, LLAMA, FAISS) | [Enroll Now](https://kgptalkie.com/agentic-rag) |
| **üîß Master LangGraph and LangChain** | Agentic RAG and Chatbot, AI Agent with LangChain v1, Qwen3, Gemma3, DeepSeek-R1, LLAMA 3.2, FAISS Vector Database | [Enroll Now](https://kgptalkie.com/langgraph) |
| **‚ö° Master Langchain and Ollama** | Master Langchain v1, Local LLM Projects with Ollama, Qwen3, Gemma3, DeepSeek-R1, LLAMA 3.2, Complete Integration Guide | [Enroll Now](https://kgptalkie.com/langchain) |
| **üî¨ Fine Tuning LLM** | Learn transformer architecture fundamentals and fine-tune LLMs with custom datasets | [Enroll Now](https://kgptalkie.com/fine-tuning-llm) |

---

### üåê Join the Community & Stay Connected

- üîó **Join the Discord Community:** https://discord.gg/RFjwbkNa  
- üì∫ **Subscribe on YouTube (63K+ learners):** http://www.youtube.com/@KGPTalkie

---


# SQL Agent with LangGraph

**Intelligent database query agent using LangGraph workflow**

This notebook implements a SQL agent that can:
1. **Understand natural language questions** and convert them to SQL queries
2. **Validate queries for safety** before execution (prevents SQL injection)
3. **Execute queries** on an employees database
4. **Fix errors automatically** if queries fail
5. **Provide detailed answers** with proper formatting

## Key Features:
- **Multi-step workflow**: Schema retrieval ‚Üí Query generation ‚Üí Validation ‚Üí Execution
- **Error handling**: Automatically fixes failed queries up to 3 attempts
- **Safety-first**: Only SELECT queries allowed, validates for SQL injection patterns
- **Tool-based architecture**: Dedicated tools for each step of the SQL workflow

## Tools Available:
- `get_database_schema`: Get table structure and column information
- `generate_sql_query`: Convert natural language to SQL
- `validate_sql_query`: Check query safety and syntax
- `execute_sql_query`: Run validated queries
- `fix_sql_error`: Automatically correct failed queries

Database: **employees_db** (SQLite) with employee records, salaries, departments, and titles.

https://github.com/fracpete/employees-db-sqlite

In [None]:
from typing_extensions import TypedDict, Annotated
from typing import List
import os
import re 
import operator

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_ollama import ChatOllama
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

from langchain_community.utilities import SQLDatabase

from dotenv import load_dotenv
load_dotenv()

In [None]:
# =============================================================================
# Configuration
# =============================================================================

# LLM_MODEL = "qwen3"
LLM_MODEL = "gpt-oss"
BASE_URL = "http://localhost:11434"

llm = ChatOllama(model=LLM_MODEL, base_url=BASE_URL, reasoning=True)

response = llm.invoke("Hello, how are you?")
response.pretty_print()

In [None]:
response.additional_kwargs['reasoning_content']

In [None]:
# =============================================================================
# Database Setup
# =============================================================================

db = SQLDatabase.from_uri('sqlite:///db/employees_db-full-1.0.6.db')

tables = db.get_usable_table_names()


SCHEMA = db.get_table_info()

In [None]:
# print(SCHEMA)

### SQL Tools

In [None]:
# =============================================================================
# SQL Tools
# =============================================================================

@tool
def get_database_schema(table_name: str = None):
    """Get database schema information for SQL query generation.
    Use this first to understand table structure before creating queries."""

    if table_name:
        tables = db.get_usable_table_names()
        if table_name.lower() in [t.lower() for t in tables]:
            result = db.get_table_info([table_name])
            return result
        
        else:
            return f"Error: Table '{table_name}' not found. Available tables: '{', '.join(tables)}'"
    else:
        return SCHEMA

In [None]:
# get_database_schema.invoke('ht')

In [None]:
@tool
def generate_sql_query(question: str, schema_info: str=None):
    """Generate a SQL SELECT query from a natural language question using database schema.
        Always use this after getting schema information."""
    
    schema_to_use = schema_info if schema_info else SCHEMA

    prompt = f"""Based on this database schema:
                {schema_to_use}

                Generate a SQL query to answer this question: {question}

                Rules:
                - Use only SELECT statements
                - Include only existing columns and tables
                - Add appropriate WHERE, GROUP BY, ORDER BY clauses as needed
                - Limit results to 10 rows unless specified otherwise
                - Use proper SQL syntax for SQLite

                Return only the SQL query, nothing else."""
    
    response = llm.invoke(prompt)
    sql_query = response.content.strip()
    print(f"[TOOL] Generated SQL Query: {sql_query[:30]}...")
    return sql_query

In [None]:
generate_sql_query.invoke("how many employees are there?")

In [None]:
# 'SELECT COUNT(*) FROM employees;'

# ```sql
# 'SELECT COUNT(*) FROM employees;'
# ```

In [None]:
@tool
def validate_sql_query(query: str):
    """Validate SQL query for safety and syntax before execution.
        Returns 'Valid: <query>' if safe or 'Error: <message>' if unsafe."""
    
    clean_query = query.strip()

    # remove sql code block
    clean_query = re.sub(r'```sql\s*', '', clean_query, flags=re.IGNORECASE)
    clean_query = re.sub(r'```\s*', '', clean_query, flags=re.IGNORECASE)

    clean_query = clean_query.strip().rstrip(';')

    if not clean_query.lower().startswith('select'):
        return "Error: only 'select' statements are allowed."
    
    # Check 2: Block dangerous SQL keywords
    dangerous_keywords = ['INSERT', 'UPDATE', 'DELETE', 'ALTER', 'DROP', 'CREATE', 'TRUNCATE']
    query_upper = clean_query

    for keyword in dangerous_keywords:
        if keyword in query_upper:
            return f"Error: {keyword} operations are not allowed."
        
    print("[TOOL] Your sql query is validated. Passed!")
    return clean_query


In [None]:
validate_sql_query.invoke('SELECT COUNT(*) FROM employees;')

In [None]:
@tool
def execute_sql_query(sql_query: str):
    """Execute a validated SQL query and return results.
    Only use this after validating the query for safety."""


    query = validate_sql_query.invoke(sql_query)
    if query.startswith('Error:'):
        return f"Query '{sql_query}' validation failed with Error: {query}"
    
    result = db.run(query)

    if result:
        return  f"Query Results: {result}"
    
    else:
        return f"Query Executed Sucessfully but No Result was Found!"
    
    


In [None]:
execute_sql_query.invoke('SELECT COUNT(*) FROM employees;')

In [None]:
@tool
def fix_sql_error(original_query: str, error_message: str, question):
    """Fix a failed SQL query by analyzing the error and generating a corrected version.
        Use this when validation or execution fails."""
    
    fix_prompt = f"""The following SQL query failed:
                    Query: {original_query}
                    Error: {error_message}
                    Original Question: {question}

                    Database Schema:
                    {SCHEMA}

                    Analyze the error and provide a corrected SQL query that:
                    1. Fixes the specific error mentioned
                    2. Still answers the original question
                    3. Uses only valid table and column names from the schema
                    4. Follows SQLite syntax rules

                    Return only the corrected SQL query, nothing else."""
    
    response = llm.invoke(fix_prompt)
    query = response.content.strip()

    print(f"[TOOL] Generated fixed SQL Query.")

    return query

### MySQL Agent Creation

In [None]:
# =============================================================================
# State and Tools
# =============================================================================
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]

tools = [
    get_database_schema,
    generate_sql_query,
    execute_sql_query,
    fix_sql_error
]

llm_with_tools = llm.bind_tools(tools)

In [None]:
llm_with_tools

In [None]:
# =============================================================================
# LangGraph Agent Node
# =============================================================================

def agent_node(state: AgentState):
    
    system_prompt = f"""You are an expert SQL analyst working with an employees database.

                    Database Schema:
                    {SCHEMA}

                    Your workflow for answering questions:
                    1. Use `get_database_schema` first to understand available tables and columns (if needed)
                    2. Use `generate_sql_query` to create SQL based on the question
                    3. Use `execute_sql_query` to run the validated query
                    4. If there's an error, use `fix_sql_error` to correct it and try again (up to 3 times)

                    Rules:
                    - Always follow the workflow step by step
                    - If a query fails, use the fix tool and try again
                    - Provide clear, informative answers
                    - Be precise with table and column names
                    - Handle errors gracefully and try to fix them
                    - If you fail after 3 attempts, explain what went wrong

                    Available tools:
                    - get_database_schema: Get table structure info
                    - generate_sql_query: Create SQL from question
                    - execute_sql_query: Run the query
                    - fix_sql_error: Fix failed queries

                    Remember: Always validate queries before executing them for safety."""
    

    messages = [SystemMessage(system_prompt)] + state['messages']

    response = llm_with_tools.invoke(messages)

    return {'messages': [response]}

In [None]:
llm_with_tools.invoke('how many employees are there?')

In [None]:
# =============================================================================
# Router Logic
# =============================================================================

def should_continue(state: AgentState):

    last_message = state['messages'][-1]

    if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
        print("[TOOL] Calling the tool")
        for tc in last_message.tool_calls:
            print(f"Callling tool: '{tc['name']}' with Args: '{tc['args']}'")

        return 'tools'

    else:
        print("[AGENT] Agent is processing your request...")
        return END



In [None]:
# =============================================================================
# Build Graph
# =============================================================================

def create_sql_agent():

    builder = StateGraph(AgentState)

    # add nodes
    builder.add_node('agent', agent_node)
    builder.add_node('tools', ToolNode(tools))

    # add edges
    builder.add_edge(START, 'agent')
    builder.add_edge('tools', 'agent')

    # add router
    builder.add_conditional_edges('agent', should_continue, ['tools', END])

    graph = builder.compile()

    return graph

agent = create_sql_agent()
agent

In [None]:
query = "how many employees are there"
result = agent.invoke({'messages': [query]})

In [None]:
db.run('SELECT COUNT(*) FROM employees;')

In [None]:
result['messages'][-1].pretty_print()

In [None]:
query = "What is the average salary of each department show me top 5 department?"
result = agent.invoke({'messages': [query]})
result['messages'][-1].pretty_print()

In [None]:
query = "Show me the top 5 highest paid employees with their title and salaries?"
result = agent.invoke({'messages': [query]})
result['messages'][-1].pretty_print()