#  추천 시스템 - 데이터베이스 저장 및 Text2SQL 실행

---

## 환경 설정 및 준비

`(1) Env 환경변수`

In [None]:
from dotenv import load_dotenv
load_dotenv()

`(2) 기본 라이브러리`

In [None]:
import os
from glob import glob

from pprint import pprint
import json

## ETF 데이터 로드

- ETF 목록: CSV 다운로드 ( http://data.krx.co.kr/contents/MDC/MDI/mdiLoader/index.cmd?menuId=MDC020103010901 )


In [None]:
import pandas as pd
import numpy as np

# ETF 목록
etf_data = pd.read_csv('data/etf_list.csv', encoding='cp949')
etf_data.head()

## **Text2SQL** 

- 자연어로 된 질문을 **SQL 쿼리**로 자동 변환

- 데이터베이스 스키마 기반으로 **정확한 쿼리** 생성

- 개발자가 아닌 사용자도 **데이터베이스 검색** 가능

- 데이터베이스 접근성을 높이는 자연어 인터페이스 기술

- 실습 데이터: ETF 목록 데이터 ('`data/etf_list.csv`')


### 1) **SQLite Database**에 저장

- ETF 데이터를 CSV에서 읽어 SQLite **데이터베이스 테이블** 생성

- **데이터 타입** 최적화: 종목코드(INTEGER PRIMARY KEY), 수익률/총보수(FLOAT), 문자열(TEXT)

- 기본 **통계 분석**: 전체 ETF 수, 운용사 수, 평균 수익률/총보수 등 산출

In [None]:
# ETF 목록
etf_data.head(2)

In [None]:
# 열 이름 변경
etf_data.columns = ['종목코드', '종목명', '상장일', '분류체계', '운용사', '수익률_최근1년', '기초지수', '추적오차',
       '순자산총액', '괴리율', '변동성', '복제방법', '총보수', '과세유형']

In [None]:
import pandas as pd

# 데이터 타입 변환
def convert_to_numeric_safely(value):
    try:
        return pd.to_numeric(value)
    except:
        return None

# 각 컬럼의 데이터 타입 변환
etf_data['종목코드'] = etf_data['종목코드'].apply(lambda x: str(x).strip())
etf_data['수익률_최근1년'] = etf_data['수익률_최근1년'].apply(convert_to_numeric_safely)
etf_data['추적오차'] = etf_data['추적오차'].apply(convert_to_numeric_safely)
etf_data['순자산총액'] = etf_data['순자산총액'].apply(convert_to_numeric_safely)
etf_data['괴리율'] = etf_data['괴리율'].apply(convert_to_numeric_safely)
etf_data['총보수'] = etf_data['총보수'].apply(convert_to_numeric_safely)

# 문자열 데이터 정리
string_columns = ['종목명', '상장일', '분류체계', '운용사', '기초지수', '변동성', '복제방법', '과세유형']
for col in string_columns:
    etf_data[col] = etf_data[col].astype(str).apply(lambda x: x.strip())


# 데이터 타입 확인
etf_data.info()

In [None]:
import pandas as pd
import sqlite3

# SQLite 데이터베이스 생성
conn = sqlite3.connect('etf_database.db')
cursor = conn.cursor()

# 테이블 삭제 (if exists)
cursor.execute("DROP TABLE IF EXISTS ETFs")

# 테이블 생성
cursor.execute("""
CREATE TABLE ETFs (
    종목코드 TEXT PRIMARY KEY,
    종목명 TEXT,
    상장일 TEXT,
    분류체계 TEXT,
    운용사 TEXT,
    수익률_최근1년 REAL,
    기초지수 TEXT,
    추적오차 REAL,
    순자산총액 REAL,
    괴리율 REAL,
    변동성 TEXT,
    복제방법 TEXT,
    총보수 REAL,
    과세유형 TEXT
)
""")

# 데이터 삽입
for _, row in etf_data.iterrows():
    try:
        cursor.execute("""
        INSERT INTO ETFs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            str(row['종목코드']),
            str(row['종목명']),
            str(row['상장일']),
            str(row['분류체계']),
            str(row['운용사']),
            float(row['수익률_최근1년']) if pd.notna(row['수익률_최근1년']) else None,
            str(row['기초지수']),
            float(row['추적오차']) if pd.notna(row['추적오차']) else None,
            float(row['순자산총액']) if pd.notna(row['순자산총액']) else None,
            float(row['괴리율']) if pd.notna(row['괴리율']) else None,
            str(row['변동성']),
            str(row['복제방법']),
            float(row['총보수']) if pd.notna(row['총보수']) else None,
            str(row['과세유형'])
        ))
    except Exception as e:
        print(f"Error inserting row: {row}")
        print(f"Error message: {str(e)}")
        continue

# 변경사항 저장
conn.commit()

# 데이터베이스 상태 확인
cursor.execute("SELECT COUNT(*) FROM ETFs")
etf_count = cursor.fetchone()[0]
print(f"\n=== 데이터베이스 생성 완료 ===")
print(f"ETF 개수: {etf_count}")

In [None]:
# 데이터 확인
print("\n=== 데이터 샘플 ===")
cursor.execute("SELECT * FROM ETFs LIMIT 1")
columns = [description[0] for description in cursor.description]
row = cursor.fetchone()
if row:
    for col, val in zip(columns, row):
        print(f"{col}: {val}")

In [None]:
# 데이터베이스 종료
conn.close()

### 2) **LangChain**에 연동

- **LangChain**과 ETF DB 연동으로 자연어 쿼리 처리 가능

- **GPT**와 **Gemini** 모델을 활용한 SQL 쿼리 자동 생성

- 한국어 응답을 위한 **QA Chain** 구성 및 쿼리 실행 도구 설정

`(1) DB 스키마 확인`
   - 작업의 첫 단계로 테이블 목록 확인 필요
   - 각 테이블의 **구조와 관계** 파악을 위한 스키마 정보 검토

In [None]:
from langchain_community.utilities import SQLDatabase

# SQLite 데이터베이스 연결
db = SQLDatabase.from_uri("sqlite:///etf_database.db")

# 사용 가능한 테이블 목록 출력
tables = db.get_usable_table_names()
print(tables)

In [None]:
# 테이블 스키마 정보 출력
print(db.get_table_info())

In [None]:
# 기본 쿼리 실행
query = "SELECT * FROM ETFs LIMIT 5"
result = db.run(query)
print(result)

`(2) SQL Chain 설정`
- **SQL Chain** 구성을 위한 **GPT**와 **Gemini** 모델 통합 설정
- 정규식을 활용한 **SQL 쿼리 자동 추출** 기능 구현
- 모델 간 **연계 프로세스** 최적화로 쿼리 생성 효율 향상
- AI 모델 기반 SQL 쿼리 자동화 가능

In [None]:
from langchain_openai import ChatOpenAI
from langchain_classic.chains import create_sql_query_chain

# SQL Chain 설정
gpt_llm = ChatOpenAI(model="gpt-4.1-mini")
gpt_sql = create_sql_query_chain(llm=gpt_llm, db=db)

# 쿼리 실행
test_question = "상위 5개 운용사별 ETF 개수는 몇 개인가요?"
gpt_generated_sql = gpt_sql.invoke({'question':test_question})

print(f"Answer (GPT):\n{gpt_generated_sql}")
print("-"*100)

In [None]:
# from langchain_google_genai import ChatGoogleGenerativeAI
# gemini_llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
# gemini_sql = create_sql_query_chain(llm=gemini_llm, db=db)
# gemini_generated_sql = gemini_sql.invoke({'question':test_question}, max_retries=3 )
# print(f"Answer (Gemini):\n{gemini_generated_sql}")
# print("-"*100)

## **SQL QA** 

- **SQL 기반 Q&A 시스템**은 구조화된 데이터를 LLM으로 처리하는 특수 사례
    - **자연어 처리**를 통한 SQL 쿼리 자동 변환 구현
    - 데이터베이스에서 **쿼리 실행 및 결과 추출** 
    - 추출된 데이터를 활용한 **자연어 답변 생성**

- **Chain**과 **Agent** 두 가지 구현 방식으로 자연어 질문에 대한 답변 제공

- **LangGraph**를 활용한 반복적 데이터베이스 쿼리 실행 가능

-참조: https://python.langchain.com/docs/tutorials/sql_qa/

### 1) **Chain** 방식

- **단순 실행 흐름**으로 예측 가능한 결과 도출

- **단일 쿼리** 실행만으로 해결 가능한 단순 질의에 최적화

- 명확한 **입력-출력 구조**로 안정적 동작 보장

`(1) State 상태 정의`

- **LangGraph 상태 관리**를 위한 TypedDict 구조 정의

- 입력 **질문**, **쿼리**, **결과**, **답변**의 4가지 핵심 상태 추적

- 상태 데이터의 단계별 전달로 실행 흐름 제어

In [None]:
from typing import TypedDict

# 상태 정보를 저장하는 State 클래스
class State(TypedDict):
    question: str  # 입력 질문
    query: str     # 생성된 쿼리
    result: str    # 쿼리 결과
    answer: str    # 생성된 답변

`(2) 프롬프트 템플릿`

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from typing import TypedDict, Annotated

# 프롬프트 템플릿 생성
query_prompt_template = ChatPromptTemplate.from_messages([
    ("system", """
    Given an input question, create a syntactically correct {dialect} query to run to help find the answer. 
    Unless the user specifies in his question a specific number of examples they wish to obtain, 
    always limit your query to at most {top_k} results.
      
    You can order the results by a relevant column to return the most interesting examples in the database.
    Never query for all the columns from a specific table, only ask for a the few relevant columns given the question.
    Pay attention to use only the column names that you can see in the schema description. 
    Be careful to not query for columns that do not exist.
      
    Also, pay attention to which column is in which table.
    Only use the following tables:
    {table_info}
    """),
    ("user", """
    Question:
    {input}
    """)
])

# 프롬프트 템플릿 출력
for m in query_prompt_template.messages:
   m.pretty_print()

In [None]:
# required 입력 필드 확인 (dialect, input, table_info, top_k)
query_prompt_template.input_schema.model_json_schema()

`(3) SQL 쿼리 생성`

In [None]:
from langchain_openai import ChatOpenAI

# llm 모델 생성
llm = ChatOpenAI(model="gpt-4.1-mini")

In [None]:
from typing import Annotated, TypedDict

class QueryOutput(TypedDict):
    """Generated SQL query."""
    query: Annotated[str, ..., "Syntactically valid SQL query."]


def write_query(state: State):
    """Generate SQL query to fetch information."""
    prompt = query_prompt_template.invoke(
        {
            "dialect": db.dialect,
            "top_k": 10,
            "table_info": db.get_table_info(),
            "input": state["question"],
        }
    )
    structured_llm = llm.with_structured_output(QueryOutput)
    result = structured_llm.invoke(prompt)
    return {"query": result["query"]}

In [None]:
# 쿼리 실행
response = write_query({"question": "총보수가 0.1% 이하인 ETF는 무엇인가요?"}) 

response

`(4) SQL 쿼리 실행`

In [None]:
from langchain_community.tools import QuerySQLDatabaseTool

def execute_query(state: State):
    """Execute SQL query."""
    execute_query_tool = QuerySQLDatabaseTool(db=db)
    return {"result": execute_query_tool.invoke(state["query"])}

In [None]:
execute_query({"query": response["query"]})

`(5) RAG 답변 생성`

In [None]:
def generate_answer(state: State):
    """Answer question using retrieved information as context."""
    prompt = (
        "Given the following user question, corresponding SQL query, "
        "and SQL result, answer the user question.\n\n"
        f'Question: {state["question"]}\n'
        f'SQL Query: {state["query"]}\n'
        f'SQL Result: {state["result"]}'
    )
    response = llm.invoke(prompt)
    return {"answer": response.content}

In [None]:
question = "총보수가 0.1% 이하인 ETF는 무엇인가요?"

query = write_query({"question": question})["query"]  #type: ignore
result = execute_query({"query": query})["result"]    #type: ignore

response = generate_answer({
    "question": question,
    "query": query,
    "result": result,
})   #type: ignore

print(response["answer"])

`(6) LangGraph 통합`

- langgraph 설치 (pip 또는 uv)

In [None]:
from langgraph.graph import START, StateGraph 

graph_builder = StateGraph(State)

graph_builder.add_node("write_query", write_query)
graph_builder.add_node("execute_query", execute_query)
graph_builder.add_node("generate_answer", generate_answer)

graph_builder.add_edge(START, "write_query")
graph_builder.add_edge("write_query", "execute_query")
graph_builder.add_edge("execute_query", "generate_answer")  
graph = graph_builder.compile()

In [None]:
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
for step in graph.stream(
    {"question": "총보수가 0.1% 이하인 ETF는 모두 몇 개인가요?"}, stream_mode="updates"
):
    print(step)

### 2) **Agent** 방식

- **반복 쿼리 실행**으로 복잡한 데이터 분석 가능

- 오류 발생 시 **자동 복구 메커니즘** 작동

- **DB 스키마 기반**의 정확한 답변 생성

- 다단계 추론이 필요한 **복잡한 질의** 처리 가능

`(1) 필요한 도구 준비`

- **ReAct 에이전트**와 **SQLDatabaseToolkit** 통합으로 고급 쿼리 처리
- 다중 쿼리 실행, **오류 복구**, 스키마 기반 응답 기능 제공
- 쿼리 생성, 구문 검사, 테이블 정보 조회 등 **다양한 도구** 활용

In [None]:
from langchain_community.agent_toolkits import SQLDatabaseToolkit

toolkit = SQLDatabaseToolkit(db=db, llm=llm)

tools = toolkit.get_tools()

tools

`(2) 프롬프트 템플릿`

In [None]:
system_message = """
You are an agent designed to interact with a SQL database.
Given an input question, create a syntactically correct {dialect} query to run, 
then look at the results of the query and return the answer.
Unless the user specifies a specific number of examples they wish to obtain, 
always limit your query to at most {top_k} results.
You can order the results by a relevant column to return the most interesting examples in the database.
Never query for all the columns from a specific table, only ask for the relevant columns given the question.
You have access to tools for interacting with the database.
Only use the below tools. Only use the information returned by the below tools to construct your final answer.
You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.

DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database.

To start you should ALWAYS look at the tables in the database to see what you can query.
Do NOT skip this step.
Then you should query the schema of the most relevant tables.
""".format(dialect=db.dialect, top_k=5)

print(system_message)

`(3) ReAct 에이전트 초기화`

In [None]:
from langchain.agents import create_agent

# SQL 에이전트 생성 
agent = create_agent(
    model=llm,
    tools=toolkit.get_tools(),
    system_prompt=system_message
)

# 입력 스키마 출력 (선택사항)
agent.get_input_jsonschema()

`(4) ReAct 에이전트 실행`

In [None]:
question = "총보수가 0.1% 이하인 ETF는 모두 몇 개인가요?"

# 에이전트 실행
for step in agent.stream(
    {"messages": [{"role": "user", "content": question}]},
    stream_mode="values",
):
    step["messages"][-1].pretty_print()