In [None]:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import os
import pandas as pd
from dotenv import load_dotenv

# 환경 변수 로드
load_dotenv()

# BigQuery 연결 정보
PROJECT_ID = os.getenv('GCP_PROJECT_ID')
DATASET_ID = os.getenv('BIGQUERY_DATASET_ID')
CREDENTIALS_PATH = os.getenv('LLM_CREDENTIALS')


# BigQuery URL 생성 (credentials_path를 URL에 포함)
DATABASE_URL = f"bigquery://{PROJECT_ID}/{DATASET_ID}?credentials_path={CREDENTIALS_PATH}"

print(DATABASE_URL)
# 엔진 생성
engine = create_engine(DATABASE_URL)

# 세션 생성
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Base 클래스 생성
Base = declarative_base()

def get_db():
    """
    데이터베이스 세션을 생성하고 반환하는 함수
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def execute_query(query: str, params: dict = None):
    """
    BigQuery 쿼리를 실행하는 함수
    
    Args:
        query (str): 실행할 SQL 쿼리
        params (dict, optional): 쿼리 파라미터
    
    Returns:
        list: 쿼리 결과
    """
    with engine.connect() as connection:
        result = connection.execute(text(query), params or {})
        return [dict(zip(result.keys(), row)) for row in result]

def execute_many(query: str, params_list: list):
    """
    여러 개의 파라미터로 쿼리를 실행하는 함수
    
    Args:
        query (str): 실행할 SQL 쿼리
        params_list (list): 쿼리 파라미터 리스트
    
    Returns:
        None
    """
    with engine.connect() as connection:
        connection.execute(text(query), params_list)
        connection.commit()

# 사용 예시
if __name__ == "__main__":
    # 단일 쿼리 실행 예시
    result = execute_query(f"""
        SELECT *
        FROM `{PROJECT_ID}.{DATASET_ID}.customers` limit 10
    """)
    # display(pd.DataFrame(result))

bigquery://sparatallm/sql?credentials_path=C:/Users/snowg/OneDrive/Documents/Github/llmbot/credentials/sparatallm-77020e47659e.json


  Base = declarative_base()


Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,50814dd40158d233103f675c340c68b1,b3bbe51a0caf36d05239cdb5124a2421,1008,sao paulo,SP
1,d829dcbbc1ddabd3a91c20b64e2ce5d8,5c3a15e958c7a36e722408cd2ce8b34e,1044,sao paulo,SP
2,9380c4d3c24f6c62c443c8da0f2f0ecf,c645339c7f927e64c0cbe6079255fc7a,1107,sao paulo,SP
3,128664a3f38d78795c698c380488a5dd,5254c0eec4d1c586e4e6b59fba3d33ea,1131,sao paulo,SP
4,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
5,1a1aca3aea66ecc5fa26e1994aad30d6,04baa28903e22974e6213e632b09c193,1152,sao paulo,SP
6,de27e7ae8adefd14b55e07db50c5c037,a50381d8ac7b457ced2071af9ef0caed,1224,sao paulo,SP
7,22ea9677582d0e8c4ceeaa2a306fd486,120f875c7db31f3a391a0feacbe05fa5,1227,sao paulo,SP
8,dcb6e7bc4e65d815025d0aa14349a4ca,a1864cdd58debf5031958069ee937742,1230,sao paulo,SP
9,3f5939e06e89efd9e9ca8ef3ab9d68c7,6b7e1090d106aaee31d5ed12d2bbc2af,1310,sao paulo,SP


cloud sql(MySQL) 연결 및 DDL

In [15]:
# MySQL 연결 및 데이터 업로드

import os
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

# 환경 변수 로드
load_dotenv()

# MySQL 연결 정보
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')
MYSQL_HOST = os.getenv('MYSQL_HOST')  # 공개 IP 주소
MYSQL_PORT = os.getenv('MYSQL_PORT')  # 기본 TCP 포트
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE')  # 데이터베이스 이름

# MySQL 연결
mysql_url = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"
mysql_engine = create_engine(mysql_url)

# 테이블 생성 SQL
create_customers_table = """
CREATE TABLE IF NOT EXISTS customers (
    customer_id VARCHAR(32) PRIMARY KEY,
    customer_unique_id VARCHAR(32),
    customer_zip_code_prefix VARCHAR(5),
    customer_city VARCHAR(50),
    customer_state CHAR(2)
)
"""

create_orders_table = """
CREATE TABLE IF NOT EXISTS orders (
    order_id VARCHAR(32) PRIMARY KEY,
    customer_id VARCHAR(32),
    order_status VARCHAR(10),
    order_purchase_timestamp DATETIME,
    order_approved_at DATETIME,
    order_delivered_carrier_date DATETIME, 
    order_delivered_customer_date DATETIME,
    order_estimated_delivery_date DATETIME
)
"""

create_payments_table = """
CREATE TABLE IF NOT EXISTS payments (
    order_id VARCHAR(32),
    payment_sequential INT,
    payment_type VARCHAR(20),
    payment_installments INT, 
    payment_value DECIMAL(10,2)
)
"""

try:
    # CSV 파일 읽기
    customers_df = pd.read_csv('../answer/customers.csv')
    orders_df = pd.read_csv('../answer/orders.csv')
    payments_df = pd.read_csv('../answer/payments.csv')

    with mysql_engine.connect() as conn:
        # 테이블 삭젠
        # 외래키 옵션 설정 시, 의존성이 걸려 삭제 가 안될 수 있음 그런 경우 다음 SET FORKEN_KEY체크 필요
        # conn.execute(text("SET FOREIGN_KEY_CHECKS=0"))
        conn.execute(text("DROP TABLE IF EXISTS payments"))
        conn.execute(text("DROP TABLE IF EXISTS orders"))
        conn.execute(text("DROP TABLE IF EXISTS customers"))
        # conn.execute(text("SET FOREIGN_KEY_CHECKS=1"))
                
        # 테이블 생성
        conn.execute(text(create_customers_table))
        conn.execute(text(create_orders_table))
        conn.execute(text(create_payments_table))
        
        # 데이터 삽입 (replace 옵션으로 변경)
        customers_df.to_sql('customers', con=mysql_engine, if_exists='replace', index=False)
        orders_df.to_sql('orders', con=mysql_engine, if_exists='replace', index=False)
        payments_df.to_sql('payments', con=mysql_engine, if_exists='replace', index=False)
        
        conn.commit()

    print("데이터 업로드가 완료되었습니다.")
    
except Exception as e:
    print(f"에러가 발생했습니다: {str(e)}")


데이터 업로드가 완료되었습니다.


In [14]:
# MySQL 쿼리 실행 템플릿
query = """
SELECT *
FROM orders
LIMIT 5
"""

try:
    with mysql_engine.connect() as conn:
        # 쿼리 실행
        result = conn.execute(text(query))
        
        # 결과를 DataFrame으로 변환
        df = pd.DataFrame(result.fetchall(), columns=result.keys())
        
        # 결과 출력
        print("쿼리 결과:")
        display(df)

except Exception as e:
    print(f"쿼리 실행 중 에러가 발생했습니다: {str(e)}")


쿼리 결과:


Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00


In [1]:
import os
import sys

# 현재 작업 디렉토리의 상위 디렉토리로 이동
current_dir = os.getcwd()  # 현재 작업 디렉토리
parent_dir = os.path.dirname(current_dir)  # 상위 디렉토리
os.chdir(parent_dir)  # 작업 디렉토리 변경

# Python 경로에 상위 디렉토리 추가 (모듈 import를 위해)
sys.path.append(parent_dir)

print(os.getcwd())
from core.mysql_engine import check_query_result


qr = list()

qr1= """
SELECT customer_id, count(*) AS total_orders
FROM orders
WHERE customer_id IN
(
SELECT DISTINCT customer_id 
FROM  orders
WHERE order_estimated_delivery_date < order_delivered_customer_date
)
GROUP BY customer_id
ORDER BY total_orders DESC
LIMIT 1;
"""

qr2 = """
select payment_type,
       total_payment_value,
       round(total_payment_value/sum(total_payment_value) over()*100,2) as payment_percentage
from(
select payment_type,
       sum(payment_value) as total_payment_value
from payments
group by payment_type
)a
order by payment_percentage desc;
"""

qr3=    """
    select sum(1) as cnt_users,
       sum(each_orders) as cnt_orders,
       sum(each_payment) as sum_payment,
       avg(each_payment) as arppu
from(
select distinct(o.order_id) as users,
       count(1) as each_orders,
       sum(p.payment_value) as each_payment
from orders o inner join payments p on o.order_id = p.order_id
where order_delivered_customer_date is not null
group by o.order_id
)a
    """

qr4 = """
"""

qr5 =     """
    
    """

qr6 =    """
    
    """
qr.extend([qr1, qr2, qr3, qr4, qr5, qr6])

qr
check_query_result(qr)

c:\Users\snowg\OneDrive\Documents\Github\llmbot
문제 4의 쿼리가 비어있습니다.
문제 5의 쿼리가 비어있습니다.
문제 6의 쿼리가 비어있습니다.

=== 채점 결과 ===
문제1: O

문제1 - 학생 쿼리 결과:
                        customer_id  total_orders
0  d2b091571da224a1b36412c18bc3bbfe             1

문제1 - 정답:
                        customer_id  total_orders
0  d2b091571da224a1b36412c18bc3bbfe             1


문제2: X

문제2 - 학생 쿼리 결과:
  payment_type  total_payment_value  payment_percentage
0  credit_card            115035.82               79.01
1       boleto             23547.38               16.17
2      voucher              4308.27                2.96
3   debit_card              2698.36                1.85

문제2 - 정답:
  payment_type  total_payment_value  payment_percentage
0  credit_card            115035.82               79.01
1       boleto             23547.38               16.17
2      voucher              4308.27                2.96
3   debit_card              2698.36                1.85


문제3: X

문제3 - 학생 쿼리 결과:
   cnt_users  cnt_orders 