<a href="https://colab.research.google.com/github/variouscafe/variouscafe.github.io/blob/master/building_a_db_query_chatbot_app_based_on_langchain.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

목표 : 자연어 입력해서 SQL문을 만든다음, 이를 실행시켜 그 결과를 자연어로 사용자에게 전달하는 시나리오.

In [None]:
import urllib.request

urllib.request.urlretrieve(
    "https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_Sqlite.sql",
    filename="Chinook_Sqlite.sql",
)

('Chinook_Sqlite.sql', <http.client.HTTPMessage at 0x782bb6d98ac0>)

# 데이터베이스 구축

In [None]:
import sqlite3

# 데이터베이스 연결 생성 (데이터베이스가 없으면 새로 만듭니다)
conn = sqlite3.connect('Chinook.db')

with open("Chinook_Sqlite.sql", 'r') as file:
    script = file.read()

# 스크립트 실행
conn.executescript(script)

conn.close()

# LLM 환경 설정

In [None]:
!pip install openai # openai 라이브러리를 설치합니다.
!pip install langchain

Collecting openai
  Downloading openai-1.14.3-py3-none-any.whl (262 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m262.9/262.9 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
Collecting httpx<1,>=0.23.0 (from openai)
  Downloading httpx-0.27.0-py3-none-any.whl (75 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m75.6/75.6 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
Collecting httpcore==1.* (from httpx<1,>=0.23.0->openai)
  Downloading httpcore-1.0.4-py3-none-any.whl (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.8/77.8 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting h11<0.15,>=0.13 (from httpcore==1.*->httpx<1,>=0.23.0->openai)
  Downloading h11-0.14.0-py3-none-any.whl (58 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.3/58.3 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: h11, httpcore, httpx, openai
Successfully installed h11-0.14.0 httpcore-1.0.4 ht

In [None]:
import os

In [None]:
from langchain.chat_models import ChatOpenAI
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')

chat_model = ChatOpenAI()

  warn_deprecated(


# DB 연결

In [None]:
from langchain.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///Chinook.db")

In [None]:
# 인자가 없으면 에러가 납니다.
def get_schema(_):

    print("###")
    print(_)
    print("in get_schema")
    print("###")

    return db.get_table_info()

In [None]:
get_schema(_)

###
('Chinook_Sqlite.sql', <http.client.HTTPMessage object at 0x782bb6d98ac0>)
in get_schema
###


'\nCREATE TABLE "Album" (\n\t"AlbumId" INTEGER NOT NULL, \n\t"Title" NVARCHAR(160) NOT NULL, \n\t"ArtistId" INTEGER NOT NULL, \n\tPRIMARY KEY ("AlbumId"), \n\tFOREIGN KEY("ArtistId") REFERENCES "Artist" ("ArtistId")\n)\n\n/*\n3 rows from Album table:\nAlbumId\tTitle\tArtistId\n1\tFor Those About To Rock We Salute You\t1\n2\tBalls to the Wall\t2\n3\tRestless and Wild\t2\n*/\n\n\nCREATE TABLE "Artist" (\n\t"ArtistId" INTEGER NOT NULL, \n\t"Name" NVARCHAR(120), \n\tPRIMARY KEY ("ArtistId")\n)\n\n/*\n3 rows from Artist table:\nArtistId\tName\n1\tAC/DC\n2\tAccept\n3\tAerosmith\n*/\n\n\nCREATE TABLE "Customer" (\n\t"CustomerId" INTEGER NOT NULL, \n\t"FirstName" NVARCHAR(40) NOT NULL, \n\t"LastName" NVARCHAR(20) NOT NULL, \n\t"Company" NVARCHAR(80), \n\t"Address" NVARCHAR(70), \n\t"City" NVARCHAR(40), \n\t"State" NVARCHAR(40), \n\t"Country" NVARCHAR(40), \n\t"PostalCode" NVARCHAR(10), \n\t"Phone" NVARCHAR(24), \n\t"Fax" NVARCHAR(24), \n\t"Email" NVARCHAR(60) NOT NULL, \n\t"SupportRepId" INTEG

# 자연어를 SQL문으로 변환

In [None]:
from langchain.prompts import ChatPromptTemplate

template = """Based on the table schema below, write a SQL query that would answer the user's question:
{schema}

Question: {question}
SQL Query:"""

# "SQL Query:"가 왜 필요할까요?
# 답변유도문 + 응답형식지정

prompt_template = ChatPromptTemplate.from_template(template)

In [None]:
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough

sql_response_chain = (
        RunnablePassthrough.assign(schema = get_schema) # 기존 dict에다가 schema 키와 값을 추가한다.
        | prompt_template
        | chat_model
        | StrOutputParser()
    )

In [None]:
sql_response_chain.invoke({"question": "How many employees are there?"})

###
{'question': 'How many employees are there?'}
in get_schema
###


'SELECT COUNT(EmployeeId) as num_employees\nFROM Employee;'

In [None]:
sql_response_chain.invoke({"question": "총 고객의 수는?"})

###
{'question': '총 고객의 수는?'}
in get_schema
###


'SELECT COUNT(CustomerId) AS TotalCustomers\nFROM Customer;'

In [None]:
sql_response_chain.invoke({"question": "가장 매출이 높은 음반은?"})

###
{'question': '가장 매출이 높은 음반은?'}
in get_schema
###


'SELECT Album.Title, SUM(InvoiceLine.UnitPrice * InvoiceLine.Quantity) AS TotalRevenue\nFROM Album\nJOIN Track ON Album.AlbumId = Track.AlbumId\nJOIN InvoiceLine ON Track.TrackId = InvoiceLine.TrackId\nGROUP BY Album.AlbumId\nORDER BY TotalRevenue DESC\nLIMIT 1;'

# SQL문의 실행결과를 사용자에게 잘 전달하는 체인 구성

예를들어 실행결과 '8' 이렇게 나왔을때, 실행결과를 잘 설명하려면?
답변: 8입니다.

편지봉투
- 총 직원수는 몇명입니까?
- Table Schema
- SELECT COUNT(EmployeeId) AS TotalEmployees FROM Employee;
- 8

답변: 총 직원수는 8명입니다.

즉 필요한 변수는 총 4개입니다.

In [None]:
template = """Based on the table schema below, question, sql query, and sql response, write a natural language response in Korean:
{schema}

Question: {question}
SQL Query: {query}
SQL Response: {response}"""

prompt_response = ChatPromptTemplate.from_template(template)

In [None]:
from langchain.schema.output_parser import StrOutputParser

'''
sql_response_chain = (
        RunnablePassthrough.assign(schema = get_schema) # 기존 dict에다가 schema 키와 값을 추가한다.
        | prompt_template
        | chat_model
        | StrOutputParser()
    )
'''

full_chain = (
    # {"question":"..."}
    RunnablePassthrough.assign(query = sql_response_chain)
    # {"question":"...", "query":"..."} > x
    | RunnablePassthrough.assign(
        schema = get_schema,
        response = lambda x: db.run(x["query"]),
    )
    # {"question":"...", "query":"...", "schema":"...", "response":"..."}
    | prompt_response
    | chat_model
    | StrOutputParser()
)

In [None]:
ans = full_chain.invoke({"question": "How many employees are there?"})

###
{'question': 'How many employees are there?'}
in get_schema
###
###
{'question': 'How many employees are there?', 'query': 'SELECT COUNT(*) AS EmployeeCount FROM Employee;'}
in get_schema
###


In [None]:
ans

'직원은 총 8명입니다.'

In [None]:
from langchain.schema.output_parser import StrOutputParser
from operator import itemgetter
from langchain.schema.runnable import RunnableLambda

full_chain = (
    RunnablePassthrough.assign(query = sql_response_chain)
    # {"question":"...", "query":"..."} > x
    | RunnablePassthrough.assign(
        schema = get_schema,
        response = itemgetter("query") | RunnableLambda(db.run),
    )
    | prompt_response
    | chat_model
    | StrOutputParser()
)

In [None]:
def langchain_conversation_chain(input_text):

    output = full_chain.invoke({"question": input_text})

    #print(output)

    return output

In [None]:
def chat_with_user(user_message):
    ai_message = langchain_conversation_chain(user_message)
    return ai_message

while True:
    user_message = input("USER > ")
    if user_message.lower() == "quit":
        break
    ai_message = chat_with_user(user_message)
    print(f" A I > {ai_message}")

USER > 가장 많이 구매한 고객은?
###
{'question': '가장 많이 구매한 고객은?'}
in get_schema
###
###
{'question': '가장 많이 구매한 고객은?', 'query': 'SELECT c.CustomerId, c.FirstName, c.LastName, COUNT(*) as TotalPurchases\nFROM Customer c\nJOIN Invoice i ON c.CustomerId = i.CustomerId\nGROUP BY c.CustomerId\nORDER BY TotalPurchases DESC\nLIMIT 1;'}
in get_schema
###
 A I > 가장 많이 구매한 고객은 루이스 곤살베스입니다.
USER > quit


In [None]:
from langchain.schema.output_parser import StrOutputParser

def print_string(in_text):

    print(in_text)

    return in_text

full_chain_debug = (
    RunnablePassthrough.assign(query=sql_response_chain)
    | print_string
    | RunnablePassthrough.assign(
        schema=get_schema,
        response=lambda x: db.run(x["query"]),
    )
    | print_string
    | prompt_response
    | print_string
    | chat_model
    | print_string
    | StrOutputParser()
)

In [None]:
def langchain_conversation_chain(input_text):

    output = full_chain_debug.invoke({"question": input_text})

    #print(output)

    return output

In [None]:
def chat_with_user(user_message):
    ai_message = langchain_conversation_chain(user_message)
    return ai_message

while True:
    user_message = input("USER > ")
    if user_message.lower() == "quit":
        break
    ai_message = chat_with_user(user_message)
    print(f" A I > {ai_message}")