# [Build a Question/Answering system over SQL data](https://python.langchain.com/docs/tutorials/sql_qa/)

データベース内の表形式データに対して、LLMがSQLなどのDSLでクエリを実行し、回答を生成するQ&Aシステムを作成する基本的な方法について説明します。  
チェーンとエージェントの両方を使用した実装について説明します。


大まかに言えば、これらのシステムの手順は次のとおりです。

1. **質問をDSLクエリに変換** : モデルはユーザー入力をSQLクエリに変換します。
1. **SQLクエリの実行** : クエリを実行します。
1. **質問への回答** : モデルはクエリの実行結果を使用して、ユーザー入力に応答します。

<img src="../../../docs/img/03_qa_with_sql/qa_with_sql_01.png" width="700px">

# ■ 前準備

In [1]:
!wget https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_Sqlite.sql -O langchain-tutorial/resources/Chinook_Sqlite.sql > /dev/null &2>1
!rm -f langchain-tutorial/resources/Chinook.db
!sqlite3 langchain-tutorial/resources/Chinook.db < langchain-tutorial/resources/Chinook_Sqlite.sql

In [2]:
from langchain_community.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///langchain-tutorial/resources/Chinook.db")
print(db.dialect)
print(db.get_usable_table_names())
db.run("SELECT * FROM Artist LIMIT 10;")


sqlite
['Album', 'Artist', 'Customer', 'Employee', 'Genre', 'Invoice', 'InvoiceLine', 'MediaType', 'Playlist', 'PlaylistTrack', 'Track']


"[(1, 'AC/DC'), (2, 'Accept'), (3, 'Aerosmith'), (4, 'Alanis Morissette'), (5, 'Alice In Chains'), (6, 'Antônio Carlos Jobim'), (7, 'Apocalyptica'), (8, 'Audioslave'), (9, 'BackBeat'), (10, 'Billy Cobham')]"

# ■ チェインの実装

In [17]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

## 1. ユーザー入力をSQLに変換する

**※ 正確なSQLが出力されない問題がある**

-  [create_sql_query_chain](https://python.langchain.com/api_reference/langchain/chains/langchain.chains.sql_database.query.create_sql_query_chain.html)


```python
create_sql_query_chain(
  llm: BaseLanguageModel,
  db: SQLDatabase,
  prompt: BasePromptTemplate | None = None, k: int = 5
) -> Runnable[SQLInput | SQLInputWithTables | Dict[str, Any], str][source]
```


In [22]:
from langchain.chains import create_sql_query_chain

chain = create_sql_query_chain(llm=llm, db=db)
response = chain.invoke({"question": "社員は全部で何人ですか"})
response

'```sql\nSELECT COUNT("EmployeeId") AS "TotalEmployees" FROM "Employee";\n```'

In [None]:
# NOTE: responseにコードブロックが入ってくるので除外しないといけない、、、
# db.run(response)
sql = response.replace("```sql", "").replace("```", "")
db.run(sql)

'[(8,)]'

In [25]:
# SQLのクエリを生成するためのプロンプトを表示
chain.get_prompts()[0].pretty_print()

You are a SQLite expert. Given an input question, first create a syntactically correct SQLite query to run, then look at the results of the query and return the answer to the input question.
Unless the user specifies in the question a specific number of examples to obtain, query for at most 5 results using the LIMIT clause as per SQLite. You can order the results to return the most informative data in the database.
Never query for all columns from a table. You must query only the columns that are needed to answer the question. Wrap each column name in double quotes (") to denote them as delimited identifiers.
Pay attention to use only the column names you can see in the tables below. Be careful to not query for columns that do not exist. Also, pay attention to which column is in which table.
Pay attention to use date('now') function to get the current date, if the question involves "today".

Use the following format:

Question: Question here
SQLQuery: SQL Query to run
SQLResult: Result

## 2. クエリの実行

LLMが生成したSQLを実行します

In [26]:
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool

execute_query = QuerySQLDataBaseTool(db=db)
write_query = create_sql_query_chain(llm=llm, db=db)
chain = write_query | execute_query
result = chain.invoke({"question": "従業員は何名ですか"})
result


'Error: (sqlite3.OperationalError) near "```sql\nSELECT COUNT("EmployeeId") AS "EmployeeCount" FROM "Employee";\n```": syntax error\n[SQL: ```sql\nSELECT COUNT("EmployeeId") AS "EmployeeCount" FROM "Employee";\n```]\n(Background on this error at: https://sqlalche.me/e/20/e3q8)'

## 3. 質問への回答

SQLの実行結果を受けて質問に回答します。

In [43]:
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from langchain_core.runnables import RunnablePassthrough

answer_template = """
以下のユーザーからの質問、対応するSQLクエリ、およびSQL結果が与えられている場合、ユーザーからの質問に答えてください。

Question: {question}
SQL Query: {query}
SQL Result: {result}
Answer:
"""
answer_prompt = ChatPromptTemplate([
    HumanMessagePromptTemplate.from_template(answer_template),
])

chain = (
    RunnablePassthrough
        .assign(query=write_query)
        .assign(result=(itemgetter("query") | execute_query))
    | answer_prompt
    | llm
    | StrOutputParser()
)


In [44]:
chain.invoke({"question": "従業員は何名ですか。"})

'従業員の数を取得するためのSQLクエリにエラーが発生しています。クエリの形式が正しくないため、実行できませんでした。正しいクエリは以下のようになります。\n\n```sql\nSELECT COUNT("EmployeeId") AS "EmployeeCount" FROM "Employee";\n```\n\nこのクエリを実行することで、従業員の数を取得できます。従業員の数を知りたい場合は、正しい形式でクエリを再実行してください。'

動作確認

In [42]:
# assign(query=write_query):
#   write_queryの結果をqueryに代入
# assign(result=(itemgetter("query") | execute_query)):
#   query属性の値をexecute_queryチェインにの入力として、出力をresultに代入
chain = RunnablePassthrough.assign(query=write_query).assign(result=(itemgetter("query") | execute_query))
chain.invoke({"question": "従業員は何名ですか？"})

{'question': '従業員は何名ですか？',
 'query': 'SQLQuery: SELECT COUNT("EmployeeId") AS "EmployeeCount" FROM "Employee"',
 'result': 'Error: (sqlite3.OperationalError) near "SQLQuery": syntax error\n[SQL: SQLQuery: SELECT COUNT("EmployeeId") AS "EmployeeCount" FROM "Employee"]\n(Background on this error at: https://sqlalche.me/e/20/e3q8)'}

# ■ エージェントの実装

エージェントの利用には以下のようなメリットがあります。

- データベースのスキーマだけでなく、データベースのコンテンツ (特定のテーブルの説明など) に基づいて質問に答えることができます。
- 生成されたクエリを実行し、トレースバックをキャッチして正しく再生成することで、エラーから回復できます。
- ユーザーの質問に答えるために必要な回数だけデータベースをクエリできます。
- 関連するテーブルからスキーマのみを取得することでトークンを節約します。

In [45]:
from langchain_openai import ChatOpenAI
from langchain_community.agent_toolkits import SQLDatabaseToolkit

llm = ChatOpenAI(model="gpt-4o-mini")

toolkit = SQLDatabaseToolkit(db=db, llm=llm)

tools = toolkit.get_tools()

tools

[QuerySQLDataBaseTool(description="Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.", db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7c9e84336000>),
 InfoSQLDatabaseTool(description='Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling sql_db_list_tables first! Example Input: table1, table2, table3', db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7c9e84336000>),
 ListSQLDatabaseTool(db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7c9e84336000>),
 QuerySQLCheckerTool(description='Use this tool to double check

エージェント用のシステムプロンプトを作成します。

In [47]:
from langchain_core.messages import SystemMessage

SQL_PREFIX = """You are an agent designed to interact with a SQL database.
Given an input question, create a syntactically correct SQLite query to run, then look at the results of the query and return the answer.
Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most 5 results.
You can order the results by a relevant column to return the most interesting examples in the database.
Never query for all the columns from a specific table, only ask for the relevant columns given the question.
You have access to tools for interacting with the database.
Only use the below tools. Only use the information returned by the below tools to construct your final answer.
You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.

DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database.

To start you should ALWAYS look at the tables in the database to see what you can query.
Do NOT skip this step.
Then you should query the schema of the most relevant tables."""

system_message = SystemMessage(content=SQL_PREFIX)

エージェントの初期化

In [50]:
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent

agent_executor = create_react_agent(model=llm, tools=tools, messages_modifier=system_message)

  agent_executor = create_react_agent(model=llm, tools=tools, messages_modifier=system_message)


In [None]:
query = " どの国の顧客が最も消費しましたか？"
output_stream = agent_executor.stream({"messages": [HumanMessage(content=query)]}, stream_mode="values")

for e in output_stream:
    e["messages"][-1].pretty_print()


 どの国の顧客が最も消費しましたか？
Tool Calls:
  sql_db_list_tables (call_LEidpYhFwGT8O3SdDTjMppuN)
 Call ID: call_LEidpYhFwGT8O3SdDTjMppuN
  Args:
Name: sql_db_list_tables

Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track
Tool Calls:
  sql_db_schema (call_s27AE4VRm3hOGJEu729PlTps)
 Call ID: call_s27AE4VRm3hOGJEu729PlTps
  Args:
    table_names: Customer
  sql_db_schema (call_WTeRNIdVKf3F1UtVOpRzUfiJ)
 Call ID: call_WTeRNIdVKf3F1UtVOpRzUfiJ
  Args:
    table_names: Invoice
Name: sql_db_schema


CREATE TABLE "Invoice" (
	"InvoiceId" INTEGER NOT NULL, 
	"CustomerId" INTEGER NOT NULL, 
	"InvoiceDate" DATETIME NOT NULL, 
	"BillingAddress" NVARCHAR(70), 
	"BillingCity" NVARCHAR(40), 
	"BillingState" NVARCHAR(40), 
	"BillingCountry" NVARCHAR(40), 
	"BillingPostalCode" NVARCHAR(10), 
	"Total" NUMERIC(10, 2) NOT NULL, 
	PRIMARY KEY ("InvoiceId"), 
	FOREIGN KEY("CustomerId") REFERENCES "Customer" ("CustomerId")
)

/*
3 rows from Invoice table:
InvoiceId

In [62]:
query = "テーブル一覧を教えて"
output_stream = agent_executor.stream({"messages": [HumanMessage(content=query)]}, stream_mode="values")

for e in output_stream:
    e["messages"][-1].pretty_print()


テーブル一覧を教えて
Tool Calls:
  sql_db_list_tables (call_uAly3279y9Q9tKa9587IgjMi)
 Call ID: call_uAly3279y9Q9tKa9587IgjMi
  Args:
Name: sql_db_list_tables

Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track

データベースには以下のテーブルがあります：

1. Album
2. Artist
3. Customer
4. Employee
5. Genre
6. Invoice
7. InvoiceLine
8. MediaType
9. Playlist
10. PlaylistTrack
11. Track

何か特定のテーブルについて知りたいことがありますか？


In [63]:
query = "PlaylistTrackテーブルを説明して"
output_stream = agent_executor.stream({"messages": [HumanMessage(content=query)]}, stream_mode="values")

for e in output_stream:
    e["messages"][-1].pretty_print()


PlaylistTrackテーブルを説明して
Tool Calls:
  sql_db_list_tables (call_bLSsY9kNsDLOgViMNcMmTa3S)
 Call ID: call_bLSsY9kNsDLOgViMNcMmTa3S
  Args:
Name: sql_db_list_tables

Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track
Tool Calls:
  sql_db_schema (call_gIN17kTvuBJZhSJyVdcRLR55)
 Call ID: call_gIN17kTvuBJZhSJyVdcRLR55
  Args:
    table_names: PlaylistTrack
Name: sql_db_schema


CREATE TABLE "PlaylistTrack" (
	"PlaylistId" INTEGER NOT NULL, 
	"TrackId" INTEGER NOT NULL, 
	PRIMARY KEY ("PlaylistId", "TrackId"), 
	FOREIGN KEY("TrackId") REFERENCES "Track" ("TrackId"), 
	FOREIGN KEY("PlaylistId") REFERENCES "Playlist" ("PlaylistId")
)

/*
3 rows from PlaylistTrack table:
PlaylistId	TrackId
1	3402
1	3389
1	3390
*/

`PlaylistTrack`テーブルは、プレイリストとトラックの関係を管理するためのテーブルです。このテーブルの構造は以下の通りです：

- **PlaylistId**: プレイリストのID (整数型, NOT NULL)
- **TrackId**: トラックのID (整数型, NOT NULL)

このテーブルの主キーは、`PlaylistId`と`TrackId`の組み合わせです。これにより、同じプレイリストに同じトラックが重複して追加されることを

## 値のバリエーションが多いカラムのフィルタリング

住所、曲名、アーティストなどの固有名詞を含む列をフィルタリングするには、データを正しくフィルタリングするために、まずスペルを再確認する必要があります。

これを実現するには、データベース内に存在するすべての固有名詞を含むベクトル ストアを作成します。  
その後、ユーザーが質問に固有名詞を含めるたびに、エージェントがそのベクトル ストアをクエリして、その単語の正しいスペルを検索します。  
このようにして、エージェントはターゲット クエリを構築する前に、ユーザーがどのエンティティを参照しているかを確実に理解できます。

まず、必要なエンティティごとに一意の値が必要であり、その結果を要素のリストに解析する関数を定義します。

In [67]:
from pprint import pprint
import ast
import re


def query_as_list(db, query):
    res = db.run(query)
    res = [el for sub in ast.literal_eval(res) for el in sub if el]
    res = [re.sub(r"\b\d+\b", "", string).strip() for string in res]
    return list(set(res))


artists = query_as_list(db, "SELECT Name FROM Artist")
pprint(artists[:5])
albums = query_as_list(db, "SELECT Title FROM Album")
pprint(albums[:5])

['Chicago Symphony Chorus, Chicago Symphony Orchestra & Sir Georg Solti',
 'Kiss',
 'O Terço',
 'Emanuel Ax, Eugene Ormandy & Philadelphia Orchestra',
 'Luiz Melodia']
['Unplugged',
 'Motley Crue Greatest Hits',
 'Balls to the Wall',
 'Rock In Rio [CD1]',
 'SCRIABIN: Vers la flamme']


ベクトルストアに取得した値を登録

In [68]:
from langchain.agents.agent_toolkits import create_retriever_tool
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

vector_db = FAISS.from_texts(artists + albums, OpenAIEmbeddings())
retriever = vector_db.as_retriever(search_kwargs={"k": 5})
description = """
フィルタリングする値を検索するために使用します。入力は固有名詞のおおよそのスペルです。
有効な固有名詞です。検索に最も類似した名詞を使用します。
"""
retriever_tool = create_retriever_tool(retriever=retriever, name="search_proper_nouns", description=description)

エージェントが「Alice Chains」のようなアーティストに基づいてフィルターを作成する必要があると判断した場合、最初に取得ツールを使用して列の関連する値を観察できます。

In [None]:
print(retriever_tool.invoke("Alice Chains"))

Alice In Chains

Alanis Morissette

Pearl Jam

Pearl Jam

Audioslave


In [None]:
system = """You are an agent designed to interact with a SQL database.
Given an input question, create a syntactically correct SQLite query to run, then look at the results of the query and return the answer.
Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most 5 results.
You can order the results by a relevant column to return the most interesting examples in the database.
Never query for all the columns from a specific table, only ask for the relevant columns given the question.
You have access to tools for interacting with the database.
Only use the given tools. Only use the information returned by the tools to construct your final answer.
You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.

DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database.

You have access to the following tables: {table_names}

If you need to filter on a proper noun, you must ALWAYS first look up the filter value using the "search_proper_nouns" tool!
Do not try to guess at the proper name - use this function to find similar ones.""".format(
    table_names=db.get_usable_table_names()
)

system_message = SystemMessage(content=system)

tools.append(retriever_tool)

agent = create_react_agent(llm, tools, messages_modifier=system_message)