# Amazon Bedrock과 LangChain을 이용한 "비즈니스 데이터 분석을 위한 자연어 기반 BI"

---

이 노트북에서는 비즈니스 데이터 분석을 위한 자연어 기반 BI 구현하기 위한 생성형 AI 기반의 Text-to-SQL 기능을 위해 [Amazon Bedrock](https://aws.amazon.com/bedrock/)과 [LangChain](https://www.langchain.com/)을 사용합니다.

Text-to-SQL 기능을 이용한다면 자연어만을 사용해서 구조화된 정보를 보유하고 있는 SQL 데이터베이스에 LLM을 연결하여 특정 비즈니스 데이터에 대한 답을 얻을 수 있습니다. 이때 LLM은 질문에 대한 답을 추출을 위해 데이터베이스에 초점을 맞추게 되며 SQL을 통해 데이터베이스에서 제공하는 정확한 정보를 반환할 수 있습니다. LangChain은 LLM을 다른 데이터소스로 연결할 수 있도록 하는 기능을 제공하여 개발자는 복잡한 애플리케이션을 쉽게 만들 수 있습니다.

<img src="images/sql-agent.png" width="800"/>

### Text-to-SQL를 이용하여 아래와 같은 사례에 적용할 수 있습니다.
* 데이터베이스 데이터를 기반으로 질문에 답할 수 있는 챗봇 만들기
* 자연어 기반의 복잡한 쿼리 생성
* 비기술 인력 및 이해관계자에 대한 데이터베이스 접근성 확대

이 노트북에서는 사전에 생성된 Amazon Redshift Serverless를 사용하며 데이터베이스와 LLM 연결을 위해 LangChain을 사용합니다.
노트북은 아래와 같은 구성으로 이루어져있습니다.
1. Amazon Redshift Serverless 접속과 데이터 확인
2. LangChain의 SQLDatabaseChain 사용 

Amazon Bedrock과 LangChain을 이용한 Text-to-SQL 구조는 아래 다이어그램을 참조하세요.

<img src="images/rs-architecture.png" width="800"/>

데이터에 대한 질문을 위한 SQL 쿼리를 작성하기 위해서는 원하는 데이터와 조건 등을 지정하는 명령문을 전달합니다. 명령문은 프롬프트와 함께 전달되고 해당되는 데이터베이스에 맞는 SQL 문으로 변환됩니다. SQL문이 실행된 결과 데이터는 다시 LLM에 전달되고 프롬프트에 따라 LLM은 자연어로 결과를 제공해줍니다. 이때 자연어를 이용한 질의를 통해 복잡한 조인까지도 생성할 수 있어 SQL 쿼리를 생성하고 그 결과를 분석하는 것이 정말 쉬워집니다.

Image credits: 

---

In [None]:
%pip install --upgrade pip
%pip install -qU redshift_connector sqlalchemy-redshift
# %pip install -qU 'sagemaker>=2.15.0' 'PyAthena==1.10.7' 'SQLAlchemy==1.3.13'

In [None]:
import os
import boto3

In [None]:
my_region = os.environ["AWS_DEFAULT_REGION"]   # E.g. "us-east-1"
os.environ["BEDROCK_ENDPOINT_URL"] = f"https://bedrock-runtime.{my_region}.amazonaws.com"  # E.g. "https://..."

STACK_NAME = 'genai-workshop' # Cloudformation의 Stack Name을 입력합니다.

In [None]:
# Redshift Serverless로 연결을 위한 Endpoint를 Cloudformation에서 읽어옵니다.

cf_client = boto3.client('cloudformation')
response = cf_client.describe_stacks()

for output in response["Stacks"]:
    stackName = output["StackName"]
    if stackName.find('Redshift') > 0:
        response = cf_client.describe_stacks(StackName=stackName)
        for output in response["Stacks"][0]["Outputs"]:
            keyName = output["OutputKey"]
            if keyName == "ServerlessWorkgroup":
                WORK_GROUP = output["OutputValue"]
            if keyName == "ServerlessNamespace":
                NAME_SPACE = output["OutputValue"]

rs_client = boto3.client('redshift-serverless')
response = rs_client.get_workgroup(
    workgroupName=WORK_GROUP
)

redshift_endpoint = response['workgroup']['endpoint']['address']

redshift_endpoint

# Redshift Serverless 테이블 생성과 데이터 로딩

### 본 노트북에서 사용되는 일반적인 리테일 회사에서 사용될 수 있는 가상의 데이터를 참고했습니다.
- region (P: R_REGIONKEY): 지역 정보; 지역 이름과 설명
- nation (P: N_NATIONKEY): 국가 정보; 국가 이름, 지역 키, 설명
- customer (P: C_CUSTKEY): 고객 정보; 고객의 이름, 주소, 국가 키, 전화번호, 계정 잔액 
- orders (P: O_ORDERKEY): 주문 정보; 주문 상태, 총 가격, 주문일자, 우선 순위; 주문키(distkey), 주문일자(sortkey)
- part (P: P_PARTKEY): 제품 정보; 제품 이름, 제조사, 브랜드, 유형, 크기, 가격
- supplier (P: S_SUPPKEY): 공급업체 정보; 업체 이름, 주소, 국가 키, 전화번호, 계정 잔액
- lineitem (P: L_ORDERKEY): 주문 라인 항목 정보; 주문 제품, 공급업체, 라인 번호, 수량, 확장 가격, 할인, 세금; 주문키(distkey), 수령 일자(sortkey)
- partsupp (P: PS_PARTKEY, PS_SUPPKEY): 제품-공급업체 관계 정보; 제품의 가용 수량, 공급 비용, 설명

## 중요!: 본 실습에 필요한 Redshift Serverless 권한 적용을 위해서는 사용할 IAM Role을 Default Type으로 지정해야합니다.

아래와 같이 Redshift Serverless의 Namespace에 "Security and encryption"에서 사용할 Role
<img src="images/namespace.png" width="800"/>

<img src="images/make-default.png" width="800"/>

In [None]:
import redshift_connector

conn = redshift_connector.connect(
     host=redshift_endpoint,
     database='dev',
     port=5439,
     user="admin",
     password="Admin1234!"
)

conn.autocommit = True
cursor = conn.cursor()

## Redshift Serverless에 테이블 생성

In [None]:
cursor.execute("DROP TABLE IF EXISTS region")

create_statement = """
CREATE TABLE region (
  R_REGIONKEY bigint NOT NULL,
  R_NAME varchar(25),
  R_COMMENT varchar(152))
diststyle all;
"""
cursor.execute(create_statement)


In [None]:
cursor.execute("DROP TABLE IF EXISTS nation")

create_statement = """
CREATE TABLE nation (
  N_NATIONKEY bigint NOT NULL,
  N_NAME varchar(25),
  N_REGIONKEY bigint,
  N_COMMENT varchar(152))
diststyle all;
"""
cursor.execute(create_statement)

In [None]:
cursor.execute("DROP TABLE IF EXISTS customer")

create_statement = """
create table customer (
  C_CUSTKEY bigint NOT NULL,
  C_NAME varchar(25),
  C_ADDRESS varchar(40),
  C_NATIONKEY bigint,
  C_PHONE varchar(15),
  C_ACCTBAL decimal(18,4),
  C_MKTSEGMENT varchar(10),
  C_COMMENT varchar(117))
diststyle all;
"""
cursor.execute(create_statement)

In [None]:
cursor.execute("DROP TABLE IF EXISTS orders")

create_statement = """
create table orders (
  O_ORDERKEY bigint NOT NULL,
  O_CUSTKEY bigint,
  O_ORDERSTATUS varchar(1),
  O_TOTALPRICE decimal(18,4),
  O_ORDERDATE Date,
  O_ORDERPRIORITY varchar(15),
  O_CLERK varchar(15),
  O_SHIPPRIORITY Integer,
  O_COMMENT varchar(79))
distkey (O_ORDERKEY)
sortkey (O_ORDERDATE);
"""
cursor.execute(create_statement)

In [None]:
cursor.execute("DROP TABLE IF EXISTS part")

create_statement = """
create table part (
  P_PARTKEY bigint NOT NULL,
  P_NAME varchar(55),
  P_MFGR  varchar(25),
  P_BRAND varchar(10),
  P_TYPE varchar(25),
  P_SIZE integer,
  P_CONTAINER varchar(10),
  P_RETAILPRICE decimal(18,4),
  P_COMMENT varchar(23))
diststyle all;
"""
cursor.execute(create_statement)

In [None]:
cursor.execute("DROP TABLE IF EXISTS supplier")

create_statement = """
create table supplier (
  S_SUPPKEY bigint NOT NULL,
  S_NAME varchar(25),
  S_ADDRESS varchar(40),
  S_NATIONKEY bigint,
  S_PHONE varchar(15),
  S_ACCTBAL decimal(18,4),
  S_COMMENT varchar(101))
diststyle all;                                                              
"""
cursor.execute(create_statement)

In [None]:
cursor.execute("DROP TABLE IF EXISTS lineitem")

create_statement = """
create table lineitem (
  L_ORDERKEY bigint NOT NULL,
  L_PARTKEY bigint,
  L_SUPPKEY bigint,
  L_LINENUMBER integer NOT NULL,
  L_QUANTITY decimal(18,4),
  L_EXTENDEDPRICE decimal(18,4),
  L_DISCOUNT decimal(18,4),
  L_TAX decimal(18,4),
  L_RETURNFLAG varchar(1),
  L_LINESTATUS varchar(1),
  L_SHIPDATE date,
  L_COMMITDATE date,
  L_RECEIPTDATE date,
  L_SHIPINSTRUCT varchar(25),
  L_SHIPMODE varchar(10),
  L_COMMENT varchar(44))
distkey (L_ORDERKEY)
sortkey (L_RECEIPTDATE);
"""
cursor.execute(create_statement)

In [None]:
cursor.execute("DROP TABLE IF EXISTS partsupp")

create_statement = """
create table partsupp (
  PS_PARTKEY bigint NOT NULL,
  PS_SUPPKEY bigint NOT NULL,
  PS_AVAILQTY integer,
  PS_SUPPLYCOST decimal(18,4),
  PS_COMMENT varchar(199))
diststyle even;
"""
cursor.execute(create_statement)

## Redshift Serverlessd에 데이터 로딩

In [None]:
load_data_statement = """
COPY region FROM 's3://redshift-immersionday-labs/data/region/region.tbl.lzo'
iam_role default
region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

Note: S3부터 데이터 Copy를 위해서는 반드시 Set Default IAM Role in Redshift Serverless을 수행해야합니다.

In [None]:
load_data_statement = """
COPY nation FROM 's3://redshift-immersionday-labs/data/nation/nation.tbl.'
iam_role default
region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

In [None]:
load_data_statement = """
copy customer from 's3://redshift-immersionday-labs/data/customer/customer.tbl.'
iam_role default
region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

In [None]:
load_data_statement = """
copy orders from 's3://redshift-immersionday-labs/data/orders/orders.tbl.'
iam_role default
region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

In [None]:
load_data_statement = """
copy part from 's3://redshift-immersionday-labs/data/part/part.tbl.'
iam_role default
region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

In [None]:
load_data_statement = """
copy supplier from 's3://redshift-immersionday-labs/data/supplier/supplier.json' manifest
iam_role default
region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

In [None]:
load_data_statement = """
copy lineitem from 's3://redshift-immersionday-labs/data/lineitem-part/'
iam_role default
region 'us-west-2' gzip delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

In [None]:
load_data_statement = """
copy partsupp from 's3://redshift-immersionday-labs/data/partsupp/partsupp.tbl.'
iam_role default
region 'us-west-2' lzop delimiter '|' COMPUPDATE PRESET;
"""
cursor.execute(load_data_statement)

Redshift Serverless 데이터 로딩 확인

In [None]:
%pip install --quiet ipython-sql
%pip install --quiet --upgrade SQLAlchemy==1.*
%pip install -qU psycopg2-binary

import pandas as pd
import sqlalchemy

username = "admin" 
password = "Admin1234!" 
host = redshift_endpoint 
port = "5439"
mydatabase = "dev"

redshift_uri = f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{mydatabase}"

engine = sqlalchemy.create_engine(redshift_uri)
df = pd.read_sql('select * from orders limit 10', engine)
df.head()

# LangChain SQLDatabase 설정

In [None]:
%pip install -qU langchain langchain-experimental

In [None]:
import os
import sys
import boto3

from langchain.llms.bedrock import Bedrock

BEDROCK_ENDPOINT_URL = os.environ["BEDROCK_ENDPOINT_URL"] 

session = boto3.Session(
    profile_name=os.environ.get("AWS_PROFILE")
) 

bedrock = session.client(
    service_name='bedrock-runtime', 
    region_name=os.environ.get("AWS_DEFAULT_REGION"),
    endpoint_url=BEDROCK_ENDPOINT_URL
) 

# - create the Anthropic Model
llm = Bedrock(model_id="anthropic.claude-v2", client=bedrock, model_kwargs={'max_tokens_to_sample':1000})

In [None]:
from langchain import SQLDatabase
from langchain_experimental.sql import SQLDatabaseChain

db = SQLDatabase.from_uri(redshift_uri)

In [None]:
print(db.table_info)

## 프롬프트에 따라 사용자 질문을 기반으로 SQL 쿼리를 작성하고 실행하는 체인을 생성하는 코드 블록

In [None]:
PROMPT = """ 
Human: Given an input question, create a syntactically correct postgresql query to run,
then look at the results of the query execution and return only the answer.  
The query: {question}

Assistant:"""

db_chain = SQLDatabaseChain.from_llm(llm=llm, db=db, verbose=True) # verbose=True로 실행되는 SQL을 상세하게 보여줍니다.

question = "공급업체의 평균 계정 잔액은 어떻게 돼?"

db_chain.run(PROMPT.format(question=question))

#### 쿼리 실행없이 SQL문 확인

In [None]:
from langchain.chains import create_sql_query_chain

chain = create_sql_query_chain(llm, db)
response = chain.invoke({"question":"공급업체의 평균 계정 잔액은 어떻게 돼?"})

print(response)

#### 데이터베이스에서 SQL문 실행

In [None]:
sql_statement = "SELECT AVG(s_acctbal) AS avg_acct_balance FROM supplier"
df = pd.read_sql(sql_statement, engine)
df.head()

In [None]:
question = "어떤 국가의 고객이 가장 많이 주문했나요?"

db_chain.run(PROMPT.format(question=question))

In [None]:
question = "주문 수가 가장 많은 국가는 어디인가요?"

db_chain.run(PROMPT.format(question=question))

In [None]:
question = "고객의 평균 계정 잔액은 어떻게 되나요?"

db_chain.run(PROMPT.format(question=question))

In [None]:
question = "어떤 고객이 가장 많은 주문을 했나요?"

db_chain.run(PROMPT.format(question=question))

In [None]:
sql_statement = "select * from customer  where c_name like 'Customer#009100001'"
df = pd.read_sql(sql_statement, engine)
df.head()

In [None]:
query_statement = """
SELECT c_name 
FROM customer JOIN orders ON o_custkey = c_custkey
GROUP BY c_name
ORDER BY COUNT(*) DESC
LIMIT 1
"""
df = pd.read_sql(query_statement, engine)
df.head()

In [None]:
question = "마케팅 세그먼트별 고객이 주문하는 양은 어떻게 다른가요? "

db_chain.run(PROMPT.format(question=question))

In [None]:
question = "마케팅 세그먼트별 고객이 주문하는 양을 알려주세요"

db_chain.run(PROMPT.format(question=question))

In [None]:
question = "주문 상태별 주문 수와 총 매출은 어떻게 되나요?"

db_chain.run(PROMPT.format(question=question))

In [None]:
query_statement = """
SELECT o_orderstatus, COUNT(*) AS order_count, SUM(o_totalprice) AS total_price
FROM orders 
GROUP BY o_orderstatus;
"""
df = pd.read_sql(query_statement, engine)
df.head()

In [None]:
question = "주문 금액이 가장 큰 주문 금액과 주문 정보 알려주세요"

db_chain.run(PROMPT.format(question=question))

In [None]:
query_statement = """
SELECT o_orderkey, o_totalprice, o_orderdate, o_orderpriority 
FROM orders
ORDER BY o_totalprice DESC
LIMIT 1;
"""
df = pd.read_sql(query_statement, engine)
df.head()

In [None]:
question = "주문일자에 따른 주문량 및 매출의 월별 추이는 어떻게 되나요?"

db_chain.run(PROMPT.format(question=question))

In [None]:
query_statement = """
SELECT
  DATE_TRUNC('month', o_orderdate) AS month,
  SUM(l_quantity) AS total_qty,
  SUM(l_extendedprice * (1 - l_discount)) AS total_amount
FROM orders o
JOIN lineitem l ON o.o_orderkey = l.l_orderkey  
GROUP BY 1
ORDER BY 1;
"""
df = pd.read_sql(query_statement, engine)
df.head(100)

In [None]:
db_chain.run(PROMPT.format(question="주문일자에 따른 주문량 및 매출의 월별 추이는 어떻게 되나요?"))

In [None]:
query_statement = """
SELECT EXTRACT(MONTH FROM o_orderdate) AS order_month, 
       SUM(l_extendedprice * (1 - l_discount)) AS revenue,
       COUNT(o_orderkey) AS order_count
FROM orders o
JOIN lineitem l ON o.o_orderkey = l.l_orderkey
GROUP BY order_month
ORDER BY order_month;
"""
df = pd.read_sql(query_statement, engine)
df.head(12)

In [None]:
db_chain.run(PROMPT.format(question="어떤 주문 우선 순위가 가장 일반적인가요?"))

In [None]:
db_chain.run(PROMPT.format(question="주문 수가 가장 많은 점원은 어떻게 되나요?"))

In [None]:
db_chain.run(PROMPT.format(question="어떤 제품 카테고리가 가장 많이 주문되었나요?"))

In [None]:
db_chain.run(PROMPT.format(question="가장 비싼 제품은 무엇이며, 어떤 주문에서 판매되었나요? "))

In [None]:
db_chain.run(PROMPT.format(question="제품 크기에 따른 주문량은 어떻게 다른가요?"))

In [None]:
db_chain.run(PROMPT.format(question="가장 많이 재주문된 제품은?"))

In [None]:
db_chain.run(PROMPT.format(question="어떤 공급업체가 가장 많은 제품을 공급하였나요?"))

In [None]:
db_chain.run(PROMPT.format(question="공급업체의 평균 계정 잔액은 어떻게 되나요?"))

In [None]:
db_chain.run(PROMPT.format(question="공급업체의 평균 공급 비용은 어떻게 되나요?"))

In [None]:
db_chain.run(PROMPT.format(question="어떤 주문 라인 항목이 가장 높은 할인을 받았나요?"))

In [None]:
db_chain.run(PROMPT.format(question="주문 라인 항목의 평균 수량은 어떻게 되나요?"))

In [None]:
db_chain.run(PROMPT.format(question="어떤 배송 방법이 가장 많이 사용되었나요?"))

In [None]:
PROMPT = """ 
Human: Given an input question, create a syntactically correct postgresql query to run,
then look at the results of the query and return the detailed answer in Korean.  
The query: {question}

Assistant:"""

In [None]:
db_chain.run(PROMPT.format(question="어떤 배송 방법이 가장 많이 사용되었나요?"))

In [None]:
PROMPT = """ 
Human: Given an input question, create a syntactically correct postgresql query to run,
then look at the results of the query and return the detailed answer as much as possible based on the result.  
The query: {question}

Assistant:"""

db_chain.run(PROMPT.format(question="어떤 배송 방법이 가장 많이 사용되었나요?"))