In [1]:
from langchain_community.utilities.sql_database import SQLDatabase


In [2]:
# loading a database using langchain

db = SQLDatabase.from_uri("sqlite:///my_db.db")

In [3]:
db

<langchain_community.utilities.sql_database.SQLDatabase at 0x7a54ae13aef0>

In [4]:
db.dialect

'sqlite'

In [5]:
db.get_usable_table_names()

['customer', 'employee', 'orders']

In [6]:
pip install langchain-groq

Note: you may need to restart the kernel to use updated packages.


In [7]:
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv("API_KEY")

In [8]:
from langchain_groq import ChatGroq

In [9]:
llm = ChatGroq(model="llama3-70b-8192", api_key = api_key)

In [10]:
llm.invoke("hy how are you")

AIMessage(content='Hi! I\'m just an AI, I don\'t have feelings or emotions like humans do, so I don\'t have good or bad days. I\'m always "on" and ready to help with any questions or tasks you may have! How can I assist you today?', additional_kwargs={}, response_metadata={'token_usage': {'completion_tokens': 56, 'prompt_tokens': 14, 'total_tokens': 70, 'completion_time': 0.127905353, 'prompt_time': 0.01037029, 'queue_time': 0.26856034, 'total_time': 0.138275643}, 'model_name': 'llama3-70b-8192', 'system_fingerprint': 'fp_bf16903a67', 'service_tier': 'on_demand', 'finish_reason': 'stop', 'logprobs': None}, id='run--aa19431e-1999-467d-90a8-6422aa280233-0', usage_metadata={'input_tokens': 14, 'output_tokens': 56, 'total_tokens': 70})

In [11]:
# model is working fine as of now

# Langchain Tools 

In [12]:
from langchain_community.agent_toolkits import SQLDatabaseToolkit

In [13]:
toolkit = SQLDatabaseToolkit(db = db, llm = llm)

In [14]:
tools = toolkit.get_tools()

In [15]:
tools

[QuerySQLDatabaseTool(description="Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.", db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7a54ae13aef0>),
 InfoSQLDatabaseTool(description='Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling sql_db_list_tables first! Example Input: table1, table2, table3', db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7a54ae13aef0>),
 ListSQLDatabaseTool(db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7a54ae13aef0>),
 QuerySQLCheckerTool(description='Use this tool to double check

In [16]:
# checking what tools i will be getting in toolkit
for tool in tools:
    print(tool.name)

sql_db_query
sql_db_schema
sql_db_list_tables
sql_db_query_checker


In [17]:
# to get any tool 
sql_db_query = next((tool for tool in tools if tool.name == "sql_db_query"), None)

In [18]:
sql_db_query

QuerySQLDatabaseTool(description="Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.", db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7a54ae13aef0>)

In [19]:
list_table_tool = next((tool for tool in tools if tool.name == "sql_db_list_tables"), None)

In [20]:
list_table_tool.invoke("")

'customer, employee, orders'

In [21]:
schema_tool_db  =next(tool for tool in tools if tool.name == "sql_db_schema")

In [22]:
print(schema_tool_db.invoke("customer"))


CREATE TABLE customer (
	customer_id INTEGER, 
	first_name TEXT NOT NULL, 
	email TEXT NOT NULL, 
	phone TEXT, 
	PRIMARY KEY (customer_id), 
	UNIQUE (email)
)

/*
3 rows from customer table:
customer_id	first_name	email	phone
1	a	a@gmail.com	123456
2	b	b@gmail.com	234567
3	c	c@gmail.com	345678
*/


In [23]:
# data base cannot be directly connected with langchain
# we have created database wrapup and will use this in a flow

In [24]:
# tool  decorator will convert any function into a tool
from langchain_core.tools import tool
@tool 
def db_query_tool(query:str)->str:
    """
    Return the Result and execute sql query
    if query invalied or return no result, an error message will be returned
    """
    result = db.run_no_throw(query)
    if not result:
        return "Error: query failed"
    return result

In [25]:
db_query_tool.invoke("select * from Employee;")

"[(1, 'Mohit', 'mohit@gmail.com', '1-9-2025', 50000), (2, 'X', 'x@gmail.com', '1-9-2025', 10000), (3, 'Y', 'y@gmail.com', '1-9-2025', 20000), (4, 'Z', 'z@gmail.com', '1-9-2025', 30000), (5, 'w', 'w@gmail.com', '1-9-2025', 40000)]"

In [26]:
db.run("select * from Employee;")

"[(1, 'Mohit', 'mohit@gmail.com', '1-9-2025', 50000), (2, 'X', 'x@gmail.com', '1-9-2025', 10000), (3, 'Y', 'y@gmail.com', '1-9-2025', 20000), (4, 'Z', 'z@gmail.com', '1-9-2025', 30000), (5, 'w', 'w@gmail.com', '1-9-2025', 40000)]"

In [28]:
pip install langgraph

Collecting langgraph
  Using cached langgraph-0.6.6-py3-none-any.whl.metadata (6.8 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.1.0 (from langgraph)
  Using cached langgraph_checkpoint-2.1.1-py3-none-any.whl.metadata (4.2 kB)
Collecting langgraph-prebuilt<0.7.0,>=0.6.0 (from langgraph)
  Using cached langgraph_prebuilt-0.6.4-py3-none-any.whl.metadata (4.5 kB)
Collecting langgraph-sdk<0.3.0,>=0.2.2 (from langgraph)
  Using cached langgraph_sdk-0.2.4-py3-none-any.whl.metadata (1.5 kB)
Collecting xxhash>=3.5.0 (from langgraph)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting ormsgpack>=1.10.0 (from langgraph-checkpoint<3.0.0,>=2.1.0->langgraph)
  Downloading ormsgpack-1.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (43 kB)
Using cached langgraph-0.6.6-py3-none-any.whl (153 kB)
Using cached langgraph_checkpoint-2.1.1-py3-none-any.whl (43 kB)
Using cached langgraph_prebuilt-0.6.4-py3-none-any.whl (2

In [37]:
# imports
from typing import Annotated, Literal
from langchain_core.messages import AIMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from typing_extensions import TypedDict
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import AnyMessage, add_messages
from typing import Any
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda, RunnableWithFallbacks



In [33]:
# Query check 

from langchain_core.prompts import ChatPromptTemplate

query_check_system = """Double check the sqlite query for common mistakes, including:
- using NOT IN with NULL values
- using UNION when UNION ALL should have been used
- using BETWEEN for exclusive ranges
- data types mismatch in predicators
- properly quoting identifiers
- using the correct number of arguments for functions
- casting to the correct data types
- using the proper coloums for joins

if there are any of the above mistakes, rewrite the query, if no mistake just reproduce
the original query.

you will call the appropriate tool to execute the query after running this check.
"""


# if only there is query check it will call the db_query_tool, otherwise not

query_check_promt = ChatPromptTemplate.from_messages([("system", query_check_system), ("placeholder", "{messages}")])
query_check = query_check_promt | llm.bind_tools([db_query_tool])
query_check.invoke({"messages": [("user", "SELECT * FROM EMPLOYEE;")]})

AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'r00wqe7ym', 'function': {'arguments': '{"query":"SELECT * FROM EMPLOYEE;"}', 'name': 'db_query_tool'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 69, 'prompt_tokens': 1016, 'total_tokens': 1085, 'completion_time': 0.14508881, 'prompt_time': 0.082195937, 'queue_time': 0.269577089, 'total_time': 0.227284747}, 'model_name': 'llama3-70b-8192', 'system_fingerprint': 'fp_bf16903a67', 'service_tier': 'on_demand', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--860d7cfa-dea7-4f11-86a4-737a9980ef77-0', tool_calls=[{'name': 'db_query_tool', 'args': {'query': 'SELECT * FROM EMPLOYEE;'}, 'id': 'r00wqe7ym', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1016, 'output_tokens': 69, 'total_tokens': 1085})

In [36]:
# Query generation

class SubmitFinalAnswer(BaseModel):
    """Submit the final answer to the user based on the query result."""
    final_answer: str = Field(..., description="The Final Answer to the user")


query_gen_system = """
you are a sql expert with strong attention to detail.

Given a input question, output a syntactically correct sqlite query to run, then look at the results of the query and return the answer.
do not call any tool besids submitfinalans to submit the final answer.

when generating the query:

output the sql query that answer the input questions without a tool call.

unless the user specifies a specific number of example they wish to obtain, always limit your query to at most 5 results.
you can  order the results by a relevant coloum to return the most interesting example in the database.

never query for all the coloums from a specific table, only ask for the ralevnt coloums given the question.

if you get any error while executing the query, rewrite the query and try again.

if you get an empty result set, you should try to rewrite the query and try again.

if you have enough information to ans the input question, simply invoice the appropriate
tool to submit the final ans to the user

do not make any dml statement (insert, delete, drop, etc,) to the databse. do not return any sql query except answer.

"""


query_gen_promt = ChatPromptTemplate.from_messages([("system", query_gen_system), ("placeholder", "{messages}")])
query_gen = query_gen_promt | llm.bind_tools([SubmitFinalAnswer])

In [None]:
# Defining the state in which we create a workflow

In [38]:
class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

In [None]:
def first_tool_call(state:State):
    pass

In [40]:
def handle_tool_error(state:State):
    pass

In [41]:
def create_tool_node_with_fallback(tools:list)->RunnableWithFallbacks[Any, dict]:
    pass
    

In [42]:
def query_gen_node(state:State):
    pass

In [43]:
def should_continue(state:State):
    pass

In [44]:
def model_check_query(state:State):
    pass

In [45]:
# lets create a state workflow

In [46]:
workflow = StateGraph(State)

In [49]:
workflow.add_node()
workflow.add_node()
workflow.add_node()
workflow.add_node()
workflow.add_node()
workflow.add_node()
workflow.add_node()


TypeError: StateGraph.add_node() missing 1 required positional argument: 'node'

In [None]:
workflow.add_edge()
workflow.add_edge()
workflow.add_edge()
workflow.add_edge()
workflow.add_edge()
workflow.add_edge()
workflow.add_edge()
workflow.add_edge()
workflow.add_edge()

In [None]:
app = workflow.compile()

In [None]:
app.invoke()