In [29]:
import os
from dotenv import load_dotenv

# .env 파일 로드
load_dotenv()

# 환경 변수 읽기
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
db_pass = os.getenv("DB_PASSWORD")

In [30]:
import google.generativeai as genai
import json

# 1. API 키 설정 (https://aistudio.google.com/ 에서 발급)
genai.configure(api_key=GEMINI_API_KEY)

# 2. 모델 설정 (구조화된 JSON 응답을 위해 1.5 Flash 이상 권장)
model = genai.GenerativeModel('gemini-2.5-flash-lite',
                             generation_config={"response_mime_type": "application/json"})

In [31]:
# 3. 시스템 프롬프트 정의
SYSTEM_PROMPT = """
당신은 사용자의 질문을 분석하여 SMQ(Semantic Meta Query) JSON으로 변환하는 분석가입니다.
반드시 아래 JSON 구조로만 답변하세요.

[JSON 구조]
{
  "table": "데이터가 포함된 테이블명 (customers, transactions, loans 중 선택)",
  "metrics": ["계산해야 할 값 (가입자수, 총액, 평균금액 등)"],
  "filters": [
    {"column": "필터링할 항목명", "operator": "=", "value": "값"}
  ],
  "group_by": ["그룹화할 항목명 (필요 시)"]
}
"""

In [37]:
def ask_to_smq(user_query):
    # Gemini에게 전달할 메시지 구성
    prompt = f"{SYSTEM_PROMPT}\n\n사용자 질문: {user_query}"
    
    response = model.generate_content(prompt)
    
    # 결과가 JSON 문자열이므로 파이썬 객체로 변환
    return json.loads(response.text)

# 테스트 실행
try:
    user_query = "작년에 정기예금에 가입한 VIP 고객은 몇 명이야?"
    smq_result = ask_to_smq(user_query)
    print("--- 생성된 SMQ (Raw) ---")
    print(json.dumps(smq_result, indent=2, ensure_ascii=False))
except Exception as e:
    print(f"오류 발생: {e}")

--- 생성된 SMQ (Raw) ---
{
  "table": "customers",
  "metrics": [
    "가입자수"
  ],
  "filters": [
    {
      "column": "가입일",
      "operator": "=",
      "value": "작년"
    },
    {
      "column": "상품",
      "operator": "=",
      "value": "정기예금"
    },
    {
      "column": "고객등급",
      "operator": "=",
      "value": "VIP"
    }
  ],
  "group_by": []
}


In [39]:
import json

class QUVIProcessor:
    def __init__(self, seeds_path):
        with open(seeds_path, 'r', encoding='utf-8') as f:
            self.seeds = json.load(f)

    def apply_seeds(self, smq):
        # 1. 테이블명 매핑
        table_name = smq.get('table')
        if table_name in self.seeds['concepts']:
            smq['table'] = self.seeds['concepts'][table_name]

        # 2. 필터 컬럼명 및 값 치환
        for f in smq.get('filters', []):
            # 컬럼명 치환 (예: '가입일' -> 'join_date')
            if f['column'] in self.seeds['concepts']:
                f['column'] = self.seeds['concepts'][f['column']]
            
            # 값 치환 (예: 'VIP' -> '01' / '정기예금' -> '0102001')
            val = str(f.get('value'))
            if val in self.seeds['codes']:
                f['value'] = self.seeds['codes'][val]
            
            # '작년' 같은 시간 표현 처리 (임시 로직 - SQL 조립 시 더 정교하게 처리 가능)
            if f['value'] == "작년":
                # 예시: 가입일 >= '2025-01-01' AND 가입일 <= '2025-12-31'
                # 여기서는 필드값만 일단 2025로 치환하거나, 나중에 SQL 엔진에서 처리하도록 둡니다.
                f['value'] = '2025' 
        
        # 3. 메트릭 치환 (예: '가입자수' -> 'COUNT(DISTINCT cust_id)')
        new_metrics = []
        for m in smq.get('metrics', []):
            if m in self.seeds['metrics']:
                new_metrics.append(self.seeds['metrics'][m])
            elif m in self.seeds['concepts']: # 만약 메트릭에 그냥 '금액'이라고 왔을 경우
                new_metrics.append(self.seeds['concepts'][m])
            else:
                new_metrics.append(m)
        smq['metrics'] = new_metrics
        
        # 4. Group By 컬럼명 치환
        new_groups = []
        for g in smq.get('group_by', []):
            if g in self.seeds['concepts']:
                new_groups.append(self.seeds['concepts'][g])
            else:
                new_groups.append(g)
        smq['group_by'] = new_groups
        
        return smq

# --- 실제 실행 흐름 ---
# 1. Gemini가 생성한 원본 smq_result가 있다고 가정
# smq_result = ask_to_smq("작년에 정기예금에 가입한 VIP 고객은 몇 명이야?")

processor = QUVIProcessor('seeds.json')
final_smq = processor.apply_seeds(smq_result)

print("\n--- Seeds 매핑 후 SMQ (Final) ---")
print(json.dumps(final_smq, indent=2, ensure_ascii=False))


--- Seeds 매핑 후 SMQ (Final) ---
{
  "table": "customers",
  "metrics": [
    "COUNT(DISTINCT cust_id)"
  ],
  "filters": [
    {
      "column": "join_date",
      "operator": "=",
      "value": "2025"
    },
    {
      "column": "prod_cd",
      "operator": "=",
      "value": "0102001"
    },
    {
      "column": "grade_cd",
      "operator": "=",
      "value": "01"
    }
  ],
  "group_by": []
}


In [40]:
import psycopg2
from psycopg2.extras import RealDictCursor

class QUVIQueryEngine:
    def __init__(self, db_config):
        self.db_config = db_config

    def build_sql(self, smq):
        # 기본 테이블 설정
        base_table = smq.get('table')
        metrics = ", ".join(smq.get('metrics', ['*']))
        
        # JOIN 로직 판단: 만약 고객과 거래 정보가 모두 필요하다면 JOIN 문 생성
        # 여기서는 학습을 위해 항상 JOIN이 가능한 구조로 기본 틀을 잡습니다.
        from_clause = f"{base_table}"
        
        # 특정 컬럼이 포함되어 있을 때 자동으로 JOIN 추가
        # 예: 테이블이 customers인데 prod_cd(상품) 조건이 들어온 경우
        all_columns = [f['column'] for f in smq.get('filters', [])] + smq.get('metrics', [])
        
        if base_table == 'customers' and any(col in all_columns for col in ['prod_cd', 'tx_date', 'amt']):
            from_clause = "customers c JOIN transactions t ON c.cust_id = t.cust_id"
            # 테이블 별칭(Alias)에 맞춰 컬럼명 보정
            metrics = metrics.replace("cust_id", "c.cust_id")
        
        where_clauses = []
        for f in smq.get('filters', []):
            col = f['column']
            # JOIN 시 컬럼 모호성 해결을 위한 별칭 부여
            if from_clause.count('JOIN') > 0:
                if col in ['cust_id', 'grade_cd', 'join_date']: col = f"c.{col}"
                elif col in ['prod_cd', 'tx_date', 'amt']: col = f"t.{col}"
            
            op = f['operator']
            val = str(f['value'])
            
            # 날짜 처리 (이전 로직 유지)
            if 'date' in col:
                if len(val) == 4 and val.isdigit():
                    where_clauses.append(f"{col} BETWEEN '{val}-01-01' AND '{val}-12-31'")
                    continue

            if isinstance(f['value'], str):
                where_clauses.append(f"{col} {op} '{val}'")
            else:
                where_clauses.append(f"{col} {op} {val}")
        
        where_str = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
        sql = f"SELECT {metrics} FROM {from_clause} {where_str}"
        return sql

    def execute_query(self, sql):
        try:
            conn = psycopg2.connect(**self.db_config)
            cur = conn.cursor(cursor_factory=RealDictCursor)
            cur.execute(sql)
            results = cur.fetchall()
            cur.close()
            conn.close()
            return results
        except Exception as e:
            return f"DB 실행 오류: {e}"

# 1. DB 접속 정보 설정 (본인의 환경에 맞게 수정)
db_config = {
    "host": "localhost",
    "database": "postgres",
    "user": "postgres",
    "password": db_pass, # DBeaver 접속 시 비밀번호
    "port": "5432"
}

# 2. 엔진 초기화
engine = QUVIQueryEngine(db_config)

# 3. SQL 변환 및 실행
# (앞 단계에서 완성된 final_smq를 사용합니다)
final_sql = engine.build_sql(final_smq)
print(f"--- 생성된 SQL ---\n{final_sql}\n")

query_result = engine.execute_query(final_sql)
print("--- 실행 결과 ---")
print(query_result)

--- 생성된 SQL ---
SELECT COUNT(DISTINCT c.cust_id) FROM customers c JOIN transactions t ON c.cust_id = t.cust_id WHERE c.join_date BETWEEN '2025-01-01' AND '2025-12-31' AND t.prod_cd = '0102001' AND c.grade_cd = '01'

--- 실행 결과 ---
[RealDictRow([('count', 41)])]


In [41]:
def reverse_mapping(results, seeds):
    # seeds['codes']의 키와 값을 뒤집어서 역매핑용 딕셔너리 생성
    inv_codes = {v: k for k, v in seeds['codes'].items()}
    
    for row in results:
        for key, value in row.items():
            if str(value) in inv_codes:
                row[key] = inv_codes[str(value)]
    return results

# 역매핑 실행
final_display = reverse_mapping(query_result, processor.seeds)
print("\n--- 최종 사용자 반환 데이터 ---")
print(final_display)


--- 최종 사용자 반환 데이터 ---
[RealDictRow([('count', 41)])]
