In [1]:
import sqlite3
from pprint import pprint


conn = sqlite3.connect("Chinook_Sqlite.sqlite")
cursor = conn.cursor()

question = "가장 많이 판매된 앨범 이름은?"

sql = """
SELECT Album.Title, COUNT(InvoiceLine.InvoiceLineId) AS SalesCount
FROM InvoiceLine
JOIN Track ON InvoiceLine.TrackId = Track.TrackId
JOIN Album ON Track.AlbumId = Album.AlbumId
GROUP BY Album.AlbumId
ORDER BY SalesCount DESC
LIMIT 1;
"""

cursor.execute(sql)
result = cursor.fetchall()

print(question)
pprint(result)

conn.close()

가장 많이 판매된 앨범 이름은?
[('Minha Historia', 27)]


In [2]:
from langgraph.prebuilt import create_react_agent
# from openai import AzureOpenAI
import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI
from pydantic import BaseModel
from langchain.tools import tool
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import re
from langchain_core.messages import HumanMessage

load_dotenv()

# Azure OpenAI 클라이언트 초기화
llm_selector = AzureChatOpenAI(
    azure_deployment="gpt-4o",
    azure_endpoint=os.getenv("SKCC_AZURE_ENDPOINT"),
    api_key=os.getenv("SKCC_AZURE_OPENAI_KEY"),
    api_version="2024-02-15-preview",
    temperature=0.7
)
import json
with open('Chinook_Column_Config.json', 'r', encoding='utf-8') as f:

    json_data = json.load(f)

global full_schema_json

full_schema_json = json_data

@dataclass
class TableSchema:
    table_name: str
    columns: List[str]
    descriptions: Dict[str, str] = None
    samples: List[Dict[str, Any]] = None

def _tokenize(s: str) -> List[str]:
    return re.findall(r"[A-Za-z0-9_]+", s.lower())

def _score(text: str, q_tokens: List[str]) -> int:
    base_score = sum(1 for qt in q_tokens if qt in text.lower())

    keyword_map = {
        "판매": ["invoice", "invoiceline", "order", "payment", "track", "album"],
        "앨범": ["album", "track", "invoice"],
        "음악": ["track", "artist", "album"],
        "고객": ["customer", "invoice"],
        "직원": ["employee", "supportrep"],
    }

    # 관계 기반 가중치 (JOIN에서 함께 자주 등장하는 테이블)
    relation_bonus_map = {
        "invoice": ["invoiceline", "track"],
        "track": ["album", "invoiceline"],
        "album": ["track", "artist"],
    }

    semantic_bonus = 0
    for qt in q_tokens:
        for k, synonyms in keyword_map.items():
            if qt == k and any(word in text.lower() for word in synonyms):
                semantic_bonus += 2

    # 테이블 이름이 관계망 안에 포함될 경우 추가 점수
    for key, related in relation_bonus_map.items():
        if key in text.lower() and any(r in text.lower() for r in related):
            semantic_bonus += 1

    return base_score + semantic_bonus
    #tks = _tokenize(text)
    #return sum(1 for qt in q_tokens if qt in tks)

def _table_text_blob(ts: TableSchema) -> str:
    col_part = " ".join(ts.columns)
    desc_part = " ".join([(ts.descriptions or {}).get(c, "") for c in ts.columns])
    return f"{ts.table_name} {col_part} {desc_part}".lower()

    #col_part = " ".join(ts.columns)
    #desc_part = " ".join([(ts.descriptions or {}).get(c, "") for c in ts.columns])
    #return f"{ts.table_name} {col_part} {desc_part}"

class FindTablesInput(BaseModel):
    query: str
    top_k: int = 5

class FindTablesOutput(BaseModel):
    table_name: str
    score: float

@tool("find_relevant_tables", args_schema=FindTablesInput, return_direct=False)
def find_relevant_tables(query: str,  top_k: int = 5) -> List[FindTablesOutput]:
    """
    사용자 질문(query)과 DB 스키마(db_schema)를 입력받아 관련도가 높은 테이블 top_k개를 반환
    반환 형식: [{"table_name": "...", "score": 7}, ...]
    """

    q_tokens = _tokenize(query)
    scored = []

    for t in full_schema_json:
        ts = TableSchema(
            table_name=t["table_name"],
            columns=t.get("column_names", []),
            descriptions={c: d for c, d in zip(t.get("column_names", []), t.get("description", []) or [])},
            samples=t.get("sample_rows", []),
        )
        sc = _score(_table_text_blob(ts), q_tokens)
        scored.append({"table_name": ts.table_name, "score": sc})

    scored.sort(key=lambda x: x["score"], reverse=True)
    return scored[:top_k]
    # q_tokens = _tokenize(query)
    # scored = []
    # for t in schema_data:
    #     ts = TableSchema(
    #         table_name=t["table_name"],
    #         columns=t.get("column_names", []),
    #         descriptions={c: d or "" for c, d in zip(t.get("column_names", []), t.get("description", []) or [])},
    #         samples=t.get("sample_rows", [])
    #     )
    #     sc = _score(_table_text_blob(ts), q_tokens)
    #     scored.append({"table_name": ts.table_name, "score": sc})
    # scored.sort(key=lambda x: x["score"], reverse=True)
    # return scored[:top_k]

# ⚠️ get_table_columns 도구에도 Optional db_schema 적용
class GetColumnsInput(BaseModel):
    table_name: str

@tool("get_table_columns", args_schema=GetColumnsInput, return_direct=False)
def get_table_columns(table_name: str) -> Dict[str, Any]:
    """
    단일 테이블의 컬럼, 타입, 설명을 반환.
    """
    for t in full_schema_json:
        if t["table_name"].lower() == table_name.lower():
            return {
                "table_name": t["table_name"],
                "columns": t.get("column_names", []),
                "types": t.get("column_types", []),
                "descriptions": t.get("description", []),
                "samples": t.get("sample_rows", [])
            }
    return {"table_name": table_name, "error": "not found"}

SELECTOR_SYSTEM_PROMPT = """
당신은 SQLite Text-to-SQL용 MAC-SQL Selector Agent입니다.
당신의 임무는 **사용자 질문과 전체 DB 스키마(JSON 리스트)**를 기반으로 관련된 테이블과 컬럼만 선택하는 것입니다.

아래는 Chinook Database의 주요 관계(Join 관계)입니다:
- Album.ArtistId → Artist.ArtistId
- Track.AlbumId → Album.AlbumId
- Track.GenreId → Genre.GenreId
- InvoiceLine.TrackId → Track.TrackId
- InvoiceLine.InvoiceId → Invoice.InvoiceId
- Invoice.CustomerId → Customer.CustomerId

**힌트**
- "판매", "주문", "결제", "구매" 관련 질문 → Invoice 또는 InvoiceLine 관련
- "앨범", "음악", "트랙", "곡", "아티스트" 관련 질문 → Album, Track, Artist 관련
- "고객", "국가", "도시" 관련 → Customer, Invoice

당신은 다음 두 가지 도구를 사용할 수 있습니다:
1) find_relevant_tables: 관련 테이블 Top-K 후보 검색
2) get_table_columns: 특정 테이블의 컬럼 및 설명 확인

모든 결과는 아래 형식의 JSON으로만 반환해야 합니다:
{
  "selected_tables": ["TBL1", "TBL2", ...],
  "selected_columns": {
    "TBL1": ["col_a", "col_b"],
    "TBL2": ["col_x"]
  },
  "rationale": "왜 이 테이블과 컬럼을 선택했는지 한국어로 설명",
  "confidence": 0.0_to_1.0
}

규칙:
- 정확성을 우선합니다 (적지만 핵심적인 테이블만 선택)
- DB 스키마에 존재하지 않는 테이블/컬럼은 절대 생성하지 마세요
- 출력은 반드시 JSON 형식만 (백틱, 문장 불가)
- 모든 설명은 한국어로 작성하세요.
"""

selector_tools = [find_relevant_tables, get_table_columns]

selector_agent = create_react_agent(
    llm_selector,
    tools=selector_tools,
    prompt=SELECTOR_SYSTEM_PROMPT,
)

#nl_question = "각 국가의 최신 통화코드를 알려줘"
#nl_question = "How did each salesperson's annual total sales compare to their annual sales quota? Provide the difference between their total sales and the quota for each year, organized by salesperson and year."
#nl_question = "가장 많이 판매된 앨범 이름은?"





#result = selector_agent.invoke({
#    "messages": [
#        HumanMessage(content=f"query:\n{nl_question}")
#    ]
#})

C:\Users\worb1\AppData\Local\Temp\ipykernel_23808\4175551566.py:189: LangGraphDeprecatedSinceV10: create_react_agent has been moved to `langchain.agents`. Please update your import to `from langchain.agents import create_agent`. Deprecated in LangGraph V1.0 to be removed in V2.0.
  selector_agent = create_react_agent(


In [None]:

llm_decomposer = AzureChatOpenAI(
    azure_deployment="gpt-4o",
    azure_endpoint=os.getenv("SKCC_AZURE_ENDPOINT"),
    api_key=os.getenv("SKCC_AZURE_OPENAI_KEY"),
    api_version="2024-02-15-preview",
    temperature=0.4
)


DECOMPOSER_SYSTEM_PROMPT = """
당신은 Text-to-SQL 시스템의 Decomposer Agent입니다.
입력된 자연어 질문을 SQL로 작성하기 위한 **논리적 단계(step)** 로 분해하세요.

입력으로 주어진 'related_tables'를 참고하여 어떤 데이터가 연결되어야 하는지 고려하세요.

출력 형식(JSON만 허용):
{
  "decomposition_steps": [
    {"step": 1, "subquery": "첫 번째 단계 설명"},
    {"step": 2, "subquery": "두 번째 단계 설명"},
    ...
  ],
  "reasoning": "왜 이런 단계로 분해했는지",
  "confidence": 0.0_to_1.0
}

규칙:
- 모든 단계는 SQL 작성의 논리적 순서(집계 → 정렬 → 필터 → 출력)에 따라 작성하세요.
- 불필요한 말이나 백틱(``) 없이, JSON만 출력하세요.
- 관련 테이블 간의 조인을 명확히 고려하세요.
- 모든 설명은 한국어로 작성하세요.
"""

decomposer_agent = create_react_agent(
    llm_decomposer,
    tools=[],  
    prompt=DECOMPOSER_SYSTEM_PROMPT,
)


C:\Users\worb1\AppData\Local\Temp\ipykernel_23808\2913010290.py:34: LangGraphDeprecatedSinceV10: create_react_agent has been moved to `langchain.agents`. Please update your import to `from langchain.agents import create_agent`. Deprecated in LangGraph V1.0 to be removed in V2.0.
  decomposer_agent = create_react_agent(


In [22]:
llm_composer = AzureChatOpenAI(
    azure_deployment="gpt-4o",
    azure_endpoint=os.getenv("SKCC_AZURE_ENDPOINT"),
    api_key=os.getenv("SKCC_AZURE_OPENAI_KEY"),
    api_version="2024-02-15-preview",
    temperature=0.4
)

COMPOSER_PROMPT = """
당신은 Text-to-SQL Composer Agent입니다.
주어진 단계(decomposition_steps)와 관련 테이블 정보를 기반으로 SQL 쿼리를 작성하세요.
출력은 반드시 JSON 형식만으로 출력하며, 코드블록(```)이나 'json' 문자열은 절대 포함하지 마세요.

출력 형식(JSON만 허용):
{
  "sql": "SELECT ...",
  "rationale": "쿼리 구성 이유",
  "confidence": 0.0~1.0
}

규칙:
- Oracle SQL 문법 기준 (대문자 키워드)
- FROM, JOIN, GROUP BY, ORDER BY 순서로 구성
- Table alias 사용 가능 (ex: a, t, i)
- 질문이 '가장 많이 판매된'이면 반드시 GROUP BY + COUNT + ORDER BY + LIMIT 1
"""
composer_agent = create_react_agent(llm_composer, tools=[], prompt=COMPOSER_PROMPT)


C:\Users\worb1\AppData\Local\Temp\ipykernel_23808\486159412.py:27: LangGraphDeprecatedSinceV10: create_react_agent has been moved to `langchain.agents`. Please update your import to `from langchain.agents import create_agent`. Deprecated in LangGraph V1.0 to be removed in V2.0.
  composer_agent = create_react_agent(llm_composer, tools=[], prompt=COMPOSER_PROMPT)


In [23]:
llm_refiner = AzureChatOpenAI(
    azure_deployment="gpt-4o",
    azure_endpoint=os.getenv("SKCC_AZURE_ENDPOINT"),
    api_key=os.getenv("SKCC_AZURE_OPENAI_KEY"),
    api_version="2024-02-15-preview",
    temperature=0.4
)

REFINER_PROMPT = """
당신은 SQL Refiner Agent입니다.
입력된 SQL 쿼리를 점검하여 문법적 오류나 누락된 조인, GROUP BY, LIMIT 등을 개선하세요.

출력 형식(JSON만):
{
  "refined_sql": "...",
  "changes": ["어떤 개선을 했는지 목록"],
  "confidence": 0.0~1.0
}
"""
refiner_agent = create_react_agent(llm_refiner, tools=[], prompt=REFINER_PROMPT)

C:\Users\worb1\AppData\Local\Temp\ipykernel_23808\1210848913.py:20: LangGraphDeprecatedSinceV10: create_react_agent has been moved to `langchain.agents`. Please update your import to `from langchain.agents import create_agent`. Deprecated in LangGraph V1.0 to be removed in V2.0.
  refiner_agent = create_react_agent(llm_refiner, tools=[], prompt=REFINER_PROMPT)


In [24]:

nl_question = "가장 많이 판매된 앨범 이름은?"
selector_result = selector_agent.invoke(
    {
   "messages": [
       HumanMessage(content=f"query:\n{nl_question}")
   ]
}
)


selected_tables = json.loads(selector_result["messages"][-1].content)["selected_tables"]


decomposer_result = decomposer_agent.invoke({
    "messages": [
        HumanMessage(content=f"query: {nl_question}\nrelated_tables: {selected_tables}")
    ]
})

dec_steps = json.loads(decomposer_result["messages"][-1].content)
print("#1 ", dec_steps)

composer_result = composer_agent.invoke({"messages": [
    HumanMessage(content=f"query: {nl_question}\nrelated_tables: {selected_tables}\ndecomposition_steps: {dec_steps}")
]})
print("#2 ", composer_result)

sql = json.loads(composer_result["messages"][-1].content)["sql"]

refiner_result = refiner_agent.invoke({"messages": [
    HumanMessage(content=f"sql: {sql}\nquestion: {nl_question}")
]})

#1  {'decomposition_steps': [{'step': 1, 'subquery': 'InvoiceLine 테이블에서 각 TrackId의 판매 횟수를 COUNT로 집계합니다.'}, {'step': 2, 'subquery': 'Track 테이블과 InvoiceLine 테이블을 TrackId를 기준으로 조인하여 각 트랙의 판매 횟수를 확인합니다.'}, {'step': 3, 'subquery': 'Album 테이블과 Track 테이블을 AlbumId를 기준으로 조인하여 각 앨범의 판매 횟수를 계산합니다.'}, {'step': 4, 'subquery': '각 앨범의 판매 횟수를 기준으로 정렬하고 가장 많이 판매된 앨범의 이름을 출력합니다.'}], 'reasoning': '가장 많이 판매된 앨범을 찾기 위해서는 먼저 각 트랙의 판매 횟수를 집계한 후, 이를 앨범 단위로 합산해야 합니다. 트랙과 앨범은 TrackId와 AlbumId를 통해 연결되므로 조인을 통해 데이터를 결합합니다. 마지막으로 판매 횟수를 기준으로 정렬하여 최다 판매 앨범을 식별합니다.', 'confidence': 0.95}
#2  {'messages': [HumanMessage(content="query: 가장 많이 판매된 앨범 이름은?\nrelated_tables: ['InvoiceLine', 'Track', 'Album']\ndecomposition_steps: {'decomposition_steps': [{'step': 1, 'subquery': 'InvoiceLine 테이블에서 각 TrackId의 판매 횟수를 COUNT로 집계합니다.'}, {'step': 2, 'subquery': 'Track 테이블과 InvoiceLine 테이블을 TrackId를 기준으로 조인하여 각 트랙의 판매 횟수를 확인합니다.'}, {'step': 3, 'subquery': 'Album 테이블과 Track 테이블을 AlbumId를 기준으로 조인하여 각 앨범의 판매 횟수를 계산합니다.'}, {'step': 4, 

In [30]:
print("COMPOSER RESULT : ", composer_result['messages'][-1].content)
print("REFINER RESULT : ", refiner_result['messages'][-1].content)

COMPOSER RESULT :  {
  "sql": "SELECT a.Title AS AlbumName FROM Album a JOIN Track t ON a.AlbumId = t.AlbumId JOIN InvoiceLine il ON t.TrackId = il.TrackId GROUP BY a.AlbumId, a.Title ORDER BY COUNT(il.InvoiceLineId) DESC FETCH FIRST 1 ROWS ONLY",
  "rationale": "이 쿼리는 InvoiceLine 테이블에서 각 트랙의 판매 횟수를 집계한 후, Track 테이블과 Album 테이블을 조인하여 각 앨범의 판매 횟수를 계산합니다. GROUP BY와 COUNT를 사용하여 앨범별 판매 횟수를 집계하고, ORDER BY와 FETCH FIRST 1 ROWS ONLY를 통해 가장 많이 판매된 앨범 이름을 출력합니다.",
  "confidence": 0.95
}
REFINER RESULT :  {
  "refined_sql": "SELECT a.Title AS AlbumName FROM Album a JOIN Track t ON a.AlbumId = t.AlbumId JOIN InvoiceLine il ON t.TrackId = il.TrackId GROUP BY a.AlbumId, a.Title ORDER BY COUNT(il.InvoiceLineId) DESC LIMIT 1",
  "changes": ["FETCH FIRST 1 ROWS ONLY를 LIMIT 1로 변경하여 더 일반적인 SQL 문법으로 수정"],
  "confidence": 0.95
}


In [None]:
"SELECT "
"a.Title AS AlbumName "
"FROM Album a "
"JOIN Track t ON a.AlbumId = t.AlbumId "
"JOIN InvoiceLine il ON t.TrackId = il.TrackId "
"GROUP BY a.AlbumId, a.Title "
"ORDER BY COUNT(il.InvoiceLineId) "
"DESC LIMIT 1"

"""
SELECT 
a.Title AS AlbumName 
FROM Album a 
JOIN Track t ON a.AlbumId = t.AlbumId 
JOIN InvoiceLine il ON t.TrackId = il.TrackId 
GROUP BY a.AlbumId, a.Title 
ORDER BY COUNT(il.InvoiceLineId) 
DESC LIMIT 1
"""

In [28]:
import sqlite3
from pprint import pprint


conn = sqlite3.connect("Chinook_Sqlite.sqlite")
cursor = conn.cursor()

question = "가장 많이 판매된 앨범 이름은?"

sql = """
SELECT Album.Title, COUNT(InvoiceLine.InvoiceLineId) AS SalesCount
FROM InvoiceLine
JOIN Track ON InvoiceLine.TrackId = Track.TrackId
JOIN Album ON Track.AlbumId = Album.AlbumId
GROUP BY Album.AlbumId
ORDER BY SalesCount DESC
LIMIT 1;
"""

cursor.execute(sql)
result = cursor.fetchall()

print(question)
pprint(result)

conn.close()

가장 많이 판매된 앨범 이름은?
[('Minha Historia', 27)]


In [29]:
import sqlite3
from pprint import pprint


conn = sqlite3.connect("Chinook_Sqlite.sqlite")
cursor = conn.cursor()

question = "가장 많이 판매된 앨범 이름은?"

sql = """
SELECT 
a.Title AS AlbumName 
FROM Album a 
JOIN Track t ON a.AlbumId = t.AlbumId 
JOIN InvoiceLine il ON t.TrackId = il.TrackId 
GROUP BY a.AlbumId, a.Title 
ORDER BY COUNT(il.InvoiceLineId) 
DESC LIMIT 1;
"""

cursor.execute(sql)
result = cursor.fetchall()

print(question)
pprint(result)

conn.close()

가장 많이 판매된 앨범 이름은?
[('Minha Historia',)]
