# Lab. 2-1 Text2SQL Implementation (Function Calling)

#### 이 실습에서는 Function Calling 기법으로 Text2SQL을 구현하는 방법을 알아보겠습니다. 

#### Amazon Bedrock에서는 Converse API에서 Tool Use 기능을 통해 Function Calling을 제공하고 있습니다.

### Agent vs Function Calling
- Function Calling 역시 ReAct 과정을 통해 Final Answer에 도달하기까지의 과정을 LLM이 결정하도록 한다는 점에서 Agent와 유사한 구현 방식입니다.
- 하지만, 프롬프트를 통해 External Function의 존재를 알리지만, 모델이 이를 직접 실행하는 것이 아니라 어떤 Argument를 이용해 Tool을 사용할 것인지만 결정하도록 한다는 것이 Function Calling의 특징입니다.
- 결과적으로, LLM의 결정을 바탕으로 하되, 개발자가 이를 Custom 로직으로 변형하여 이행하도록 코드에 구현할 수 있습니다.

#### **여기부터는 Tool Use라는 이름으로 통일하여 설명합니다.**

### 필요 라이브러리 설치 (Bedrock Converse API 활용이 가능한 boto3 라이브러리 버전)

In [None]:
!pip install "boto3>=1.34.116"
!pip install -q langchain langchain-aws langchain-core langchain-community

In [None]:
import boto3
import logging
from botocore.config import Config
from botocore.exceptions import ClientError

#### Amazon Bedrock 클라이언트 생성

In [None]:
region_name = 'us-west-2'
retry_config = Config(
    region_name=region_name,
    retries={
        "max_attempts": 10,
        "mode": "standard",
    },
)
client = boto3.client("bedrock-runtime", region_name=region_name, config=retry_config)

## Step 1: Text2SQL을 위한 Tool 정의

LLM에게 제공할 Tool 구현 내용을 정의합니다. 실제로 이 함수의 내용들을 LLM에게 전달하는 것은 아니고, 함수의 스펙 정보만 전달하게 됩니다.

여기에서는 Text2SQL 작업에 활용할 다음 함수들을 정의하도록 했습니다.
- list_db_tables
- desc_table_ciolumns
- query_checker
- query_executor

In [None]:
from typing import List, Dict
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_aws import ChatBedrock
import pandas as pd
import ast

def list_db_tables(uri: str) -> Dict[str, str]:
    try:
        engine = create_engine(uri)
        db = SQLDatabase(engine)
        
        table_names = db.get_usable_table_names()
        tables_dict = {table_name: "desc" for table_name in table_names}
        return tables_dict
    except SQLAlchemyError as e:
        print(f"Error: {e}")
        return {}

def desc_table_columns(uri: str, tables: List[str]) -> Dict[str, List[str]]:
    try:
        engine = create_engine(uri)
        db = SQLDatabase(engine)
        
        metadata = db._metadata
        metadata.reflect(bind=engine, only=tables)
        
        table_columns = {}
        
        for table in tables:
            if table in metadata.tables:
                table_obj = metadata.tables[table]
                column_names = [col.name for col in table_obj.columns]
                table_columns[table] = column_names
            else:
                table_columns[table] = []

        return table_columns
    except SQLAlchemyError as e:
        print(f"Error: {e}")
        return {}

def query_checker(query: str, dialect: str, model_id="anthropic.claude-3-sonnet-20240229-v1:0"):
    chat = ChatBedrock(
        model_id=model_id,
        region_name=region_name,
        model_kwargs={"temperature": 0.1},
    )
    message = [
        SystemMessage(
            content="""
            Double check the {dialect} query above for common mistakes, including:
            - Using NOT IN with NULL values
            - Using UNION when UNION ALL should have been used
            - Using BETWEEN for exclusive ranges
            - Data type mismatch in predicates
            - Properly quoting identifiers
            - Using the correct number of arguments for functions
            - Casting to the correct data type
            - Using the proper columns for joins

            If there are any of the above mistakes, rewrite the query. If there are no mistakes, just reproduce the original query.

            Output the final SQL query only. """.format(dialect=dialect)
        ),
        HumanMessage(
            content=query
        )
    ]
    res = chat.invoke(message).content
    return res

def query_executor(uri: str, query: str, output_columns: List[str]):
    engine = create_engine(uri)
    db = SQLDatabase(engine)

    data = db.run_no_throw(query)
    data = ast.literal_eval(data)
    if data:
        df = pd.DataFrame(data, columns=output_columns)  
        return df.to_csv(index=False)
    else:
        return None

### System Prompt 정의

Tool Use에서도 (Agent에서 그랬던 것처럼) LLM이 전체 작업을 사용자 의도에 맞춰 진행할 수 있도록 글로벌 프롬프트를 제공합니다.

하지만, Tool Use에서는 (Agent와 달리) ReAct 과정에서의 프롬프트 포맷이나 Tool 목록 등을 프롬프트에 직접 정의할 필요는 없습니다.

In [None]:
system_prompts = [
    {
        "text":"""
            You are a helpful assistant tasked with answering user queries efficiently.
            Use the provided tools to progress towards answering the question. 
            Based on the user's question, compose a SQLite query if necessary, examine the results, and then provide an answer. 
            Provide a final answer to the user's question with specific data and include the SQL query used to obtain it within a Markdown code block. 
        """
    }
]

### Tool 모듈 별 테스트

위에 구현한 함수들이 의도에 맞게 잘 동작하는지 기능을 테스트합니다.

In [None]:
input_text = "2022년 매출 상위 10개 국가를 알려줘"
uri="sqlite:///../Chinook.db"
dialect = "sqlite"

In [None]:
tables = list_db_tables(uri)
print("tables:", tables)

In [None]:
tables = ["Invoice", "Customer"]
columns = desc_table_columns(uri, tables)
print("columns:", columns)

In [None]:
query = """SELECT c.Country, SUM(i.Total) AS TotalSales  
FROM Invoice i  
JOIN Customer c ON i.CustomerId = c.CustomerId  
WHERE strftime('%Y', i.InvoiceDate) = '2022'  
GROUP BY c.Country  
ORDER BY TotalSales DESC  
LIMIT 10;"""

final_query = query_checker(query=query, dialect=dialect)
answer = query_executor(uri, final_query, output_columns=['Country', 'TotalSales'])
print(answer)

### Tool 스펙 정의 (ToolConfig)

- Tool에 대한 스펙은 다음과 같은 JSON 문서로 작성되어 모델 호출 시 전달됩니다.
- 아래 내용들이 **ToolConfig**에 정의되어야 합니다.
    - **Tool Name** : LLM이 Tool Use를 결정했을 때, 어떤 Tool을 사용할 차례인지 이 이름을 리턴합니다. 
    - **Description** : 어떤 경우에 해당 Tool을 사용할 것인지 LLM의 판단 기준이 됩니다.
    - **inputSchema** : LLM은 사용할 Tool을 선택하면서, inputSchema의 형식과 목적에 맞는 argument를 함께 생성합니다.

In [None]:
tool_config = {
    "tools": [
        {
            "toolSpec": {
                "name": "list_tables",
                "description": "Get tables names and descriptions.",
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "uri": {
                                "type": "string",
                                "description": "database uri for which you want to access"
                            }
                        },
                        "required": [
                            "uri"
                        ]
                    }
                }
            }
        },
        {
            "toolSpec": {
                "name": "desc_columns",
                "description": """
                Input is a list of tables, output is the description about the DB schemas and sample rows for those tables.
                Use this tool before generating a query. Be sure that the tables actually exist by using 'list_tables' tool first!
                """,
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "uri": {
                                "type": "string",
                                "description": "database uri for which you want to access"
                            },
                            "tables": {
                                "type": "array",
                                "items": {
                                    "type": "string"
                                },
                                "description": "list of table names for which you want to get column descriptions"
                            }
                        },
                        "required": [
                            "uri",
                            "tables"
                        ]
                    }
                }
            }
        },
        {
            "toolSpec": {
                "name": "query_checker",
                "description": """
                Use an LLM to check if a query is correct.
                Always use this tool before executing a query with sql_db_query!
                """,
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "The SQL query to check"
                            },
                            "dialect": {
                                "type": "string",
                                "description": "The SQL dialect of the database"
                            }
                        },
                        "required": [
                            "query",
                            "dialect"
                        ]
                    }
                }
            }
        },
        {
            "toolSpec": {
                "name": "query_executor",
                "description": """
                Execute a SQL query against the database and get back the result.
                If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again.
                Only one statement can be executed at a time, so if multiple queries need to be executed, use this tool repeatedly.
                """,
                "inputSchema": {
                    "json": {
                        "type": "object",
                        "properties": {
                            "uri": {
                                "type": "string",
                                "description": "database uri for which you want to access"
                            },
                            "query": {
                                "type": "string",
                                "description": "The SQL query to execute"
                            },
                            "output_columns": {
                                "type": "array",
                                "descriptions": "The column names expected in the output"
                            }
                        },
                        "required": [
                            "uri",
                            "query",
                            "output_columns"
                        ]
                    }
                }
            }
        }
    ]
}


### Converse API (Tool Use) 응답의 Parsing 구문 정의

이해를 돕기 위해, Tool Use에서 Tool 호출 상황을 예시로 들어보겠습니다.

테이블 목록을 조회한 LLM이 Text2SQL 쿼리 작성에 어떤 테이블을 사용할 것인지 결정을 끝냈다면, 이제 해당 테이블에 어떤 컬럼들이 있는지 참조해야 할 것입니다.

이 때 LLM은 `toolUse`라는 stop reason으로 텍스트 생성을 중단하면서, response의 `tool['name']`으로 `desc_columns`이라는 Tool Name을 지정할 것입니다.
답변에는 `desc_columns`라는 Tool 호출에 필요한 `database uri`와 `tables` 같은 파라미터 정보를 응답의 `tool_use['input']`에 포함합니다. (이렇게 LLM이 Tool Use의 response를 구성해내는 방식은 앞서 ToolConfig로 정의한 바 있습니다.)

이렇게 LLM이 제공하는 Tool Use 정보를 Parsing하는 구문을 아래와 같이 사용자 코드에 반영해야 합니다. 아래 코드는 LLM 답변을 스트리밍으로 얻어내는 converse_stream API의 답변 파싱 예제입니다.

In [None]:
import json
import logging

logger = logging.getLogger(__name__)

def stream_messages(bedrock_client, model_id, messages, tool_config):
    #logger.info("Streaming messages with model %s", model_id)
    response = bedrock_client.converse_stream(
        modelId=model_id,
        messages=messages,
        system=system_prompts,
        toolConfig=tool_config
    )

    stop_reason = ""
 
    message = {}
    content = []
    message['content'] = content
    text = ''
    tool_use = {}

    #stream the response into a message.
    for chunk in response['stream']:
        if 'messageStart' in chunk:
            message['role'] = chunk['messageStart']['role']
        elif 'contentBlockStart' in chunk:
            tool = chunk['contentBlockStart']['start']['toolUse']
            tool_use['toolUseId'] = tool['toolUseId']
            tool_use['name'] = tool['name']
        elif 'contentBlockDelta' in chunk:
            delta = chunk['contentBlockDelta']['delta']
            if 'toolUse' in delta:
                if 'input' not in tool_use:
                    tool_use['input'] = ''
                tool_use['input'] += delta['toolUse']['input']
            elif 'text' in delta:
                text += delta['text']
                print(delta['text'], end='')
        elif 'contentBlockStop' in chunk:
            if 'input' in tool_use:
                tool_use['input'] = json.loads(tool_use['input'])
                content.append({'toolUse': tool_use})
                tool_use = {}
            else:
                content.append({'text': text})
                text = ''

        elif 'messageStop' in chunk:
            stop_reason = chunk['messageStop']['stopReason']

    return stop_reason, message

### Router 정의 
- 모델의 응답에서 얻어낸 Tool Name을 활용해서 해당 함수를 호출합니다.

In [None]:
def tool_router(tool, messages):
    print(f"\n<Tool: {tool['name']}>")
    match tool['name']:
        case 'list_tables':
            res = list_db_tables(tool['input']['uri'])
            tool_result = {
                "toolUseId": tool['toolUseId'],
                "content": [{"json": res}]
            }
        case 'desc_columns':
            res = desc_table_columns(tool['input']['uri'], tool['input']['tables'])
            tool_result = {
                "toolUseId": tool['toolUseId'],
                "content": [{"json": res}]
            }
        case 'query_checker':
            res = query_checker(tool['input']['query'], tool['input']['dialect'])
            tool_result = {
                "toolUseId": tool['toolUseId'],
                "content": [{"text": res}]
            }
        case 'query_executor':
            res = query_executor(tool['input']['uri'], tool['input']['query'], tool['input']['output_columns']) 
            tool_result = {
                "toolUseId": tool['toolUseId'],
                "content": [{"text": res}]
            }

    print(f"Result: {tool_result['content'][0]}\n")
    tool_result_message = {"role": "user", "content": [{"toolResult": tool_result}]}

    return tool_result_message

## Step 2: Text2SQL 호출

이제 앞에 정의한 함수들을 종합하여, 자연어 질문을 전달합니다.

LLM이 작업 단계에서 필요한 Tool을 선택하고 이에 대한 Input을 생성하면, 이를 함수로 호출하여 전체 작업이 진행됩니다.

In [None]:
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
input_text = """
2022년 매출 상위 10개 국가를 알아내는 쿼리를 작성해줘달

DB_URI: sqlite:///../Chinook.db
"""

In [None]:
messages = [{
    "role": "user",
    "content": [{"text": input_text}]
}]

stop_reason, message = stream_messages(client, model_id, messages, tool_config)
messages.append(message)

while stop_reason == "tool_use":
    contents = message["content"]
    for c in contents:  
        if "toolUse" not in c:
            continue
        tool_use = c["toolUse"] 
        message = tool_router(tool_use, messages)
        messages.append(message)
    
    stop_reason, message = stream_messages(client, model_id, messages, tool_config)
    messages.append(message)

print(f"\nFinal Response: {message['content'][0]['text']}")

#### 위 실행 과정에서 LLM의 쿼리 작성/검증 정확도에 따라 에러가 발생할 수 있습니다