# 07. 고급 Text2SQL 최적화 기법

이 노트북에서 다루는 내용:
- 사내 문서 임베딩 시스템 구축
- 렉시콘 및 도메인 용어 사전 관리
- 스키마 설명 확장
- Few-shot 예제 자동 관리
- 통합 최적화 파이프라인 구현
- 성능 모니터링 및 개선

## 1. 라이브러리 임포트

In [None]:
import sys
sys.path.append('/workspace')

import os
import pandas as pd
from pathlib import Path
from typing import List, Dict, Any

from src.utils.db_utils import DatabaseConnection, get_database_context
from src.utils.document_loader import DocumentLoader
from src.utils.embedding_utils import generate_embeddings, store_document
from src.utils.lexicon_manager import LexiconManager
from src.utils.schema_enhancer import SchemaEnhancer
from src.utils.example_manager import ExampleManager
from src.utils.text2sql_utils import Text2SQLGenerator

print("✓ 라이브러리 임포트 완료")

## 2. 기본 컴포넌트 초기화

In [None]:
# 데이터베이스 연결
db = DatabaseConnection()
print("✓ 데이터베이스 연결 완료")

# Text2SQL 생성기
try:
    generator = Text2SQLGenerator(llm_provider="ollama", model_name="llama2")
    print("✓ Text2SQL 생성기 초기화 완료")
except Exception as e:
    print(f"⚠ Text2SQL 생성기 초기화 실패: {e}")
    print("Ollama가 실행 중인지 확인하세요")

## 3. 사내 문서 임베딩 시스템

사내 문서(PDF, Word, Excel)를 로드하고 임베딩하여
RAG 시스템에 활용할 수 있도록 준비합니다.

### 3.1 문서 로더 초기화

In [None]:
# 문서 로더 초기화
doc_loader = DocumentLoader()

# 예제 문서 경로 (실제 환경에서는 사내 문서 경로로 변경)
company_docs_path = "/workspace/data/company_docs"

# 디렉토리가 없으면 생성
os.makedirs(company_docs_path, exist_ok=True)

print(f"문서 경로: {company_docs_path}")
print("✓ 문서 로더 초기화 완료")

### 3.2 문서 로드 및 파싱

다양한 형식의 문서를 로드합니다.

In [None]:
# 문서 로드 예제
documents = []

try:
    # 디렉토리 내 모든 문서 로드
    if os.path.exists(company_docs_path):
        for file_path in Path(company_docs_path).rglob("*"):
            if file_path.is_file():
                ext = file_path.suffix.lower()
                if ext in ['.pdf', '.docx', '.doc', '.xlsx', '.xls', '.txt']:
                    try:
                        docs = doc_loader.load(str(file_path))
                        documents.extend(docs)
                        print(f"✓ 로드 완료: {file_path.name} ({len(docs)}개 문서)")
                    except Exception as e:
                        print(f"⚠ 로드 실패: {file_path.name} - {e}")
    
    print(f"\n총 {len(documents)}개 문서 로드됨")
    
    # 문서 샘플 확인
    if documents:
        print("\n[문서 샘플]")
        print(f"내용: {documents[0].page_content[:200]}...")
        print(f"메타데이터: {documents[0].metadata}")
    else:
        print("\n⚠ 로드된 문서가 없습니다.")
        print(f"문서를 {company_docs_path} 디렉토리에 추가하세요.")
        
except Exception as e:
    print(f"문서 로드 중 오류: {e}")

### 3.3 문서 청킹 전략

긴 문서를 작은 청크로 분할하여 임베딩 효율을 높입니다.

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 텍스트 스플리터 초기화
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    length_function=len,
    separators=["\n\n", "\n", ". ", " ", ""]
)

# 문서 청킹
chunks = []
if documents:
    for doc in documents:
        split_docs = text_splitter.split_documents([doc])
        chunks.extend(split_docs)

print(f"총 {len(chunks)}개 청크 생성")

# 청크 샘플 확인
if chunks:
    print("\n[청크 샘플]")
    for i, chunk in enumerate(chunks[:3]):
        print(f"\n청크 {i+1}:")
        print(f"  길이: {len(chunk.page_content)} 문자")
        print(f"  내용: {chunk.page_content[:150]}...")

### 3.4 임베딩 생성 및 저장

청크를 임베딩하고 데이터베이스에 저장합니다.

In [None]:
# 임베딩 생성 및 저장
if chunks:
    print("임베딩 생성 중...")
    stored_count = 0
    
    for i, chunk in enumerate(chunks):
        try:
            # 임베딩 생성
            embedding = generate_embeddings(chunk.page_content)
            
            # 데이터베이스에 저장
            doc_id = store_document(
                db=db,
                content=chunk.page_content,
                metadata=chunk.metadata,
                embedding=embedding
            )
            
            stored_count += 1
            
            if (i + 1) % 10 == 0:
                print(f"  진행: {i + 1}/{len(chunks)} 청크 처리됨")
                
        except Exception as e:
            print(f"  ⚠ 청크 {i+1} 처리 실패: {e}")
    
    print(f"\n✓ {stored_count}개 청크 임베딩 및 저장 완료")
else:
    print("저장할 청크가 없습니다.")

## 4. 렉시콘 및 도메인 용어 사전

비즈니스 용어를 기술 용어로 매핑하여
자연어 질의의 이해도를 향상시킵니다.

### 4.1 용어 매핑 테이블 생성

In [None]:
# 렉시콘 테이블 초기화 SQL 파일 실행
sql_file = "/workspace/scripts/init_lexicon.sql"

try:
    with open(sql_file, 'r', encoding='utf-8') as f:
        sql_script = f.read()
    
    # SQL 스크립트 실행
    db.execute_raw(sql_script)
    print("✓ 용어 매핑 테이블 생성 완료")
    
except Exception as e:
    print(f"⚠ SQL 실행 중 오류: {e}")
    print("테이블이 이미 존재할 수 있습니다.")

# 테이블 구조 확인
print("\n[term_mappings 테이블 구조]")
query = """
SELECT column_name, data_type 
FROM information_schema.columns 
WHERE table_name = 'term_mappings'
ORDER BY ordinal_position
"""
result = db.execute_query(query)
df = pd.DataFrame(result)
print(df.to_string(index=False))

### 4.2 렉시콘 매니저 초기화 및 용어 등록

In [None]:
# 렉시콘 매니저 초기화
lexicon = LexiconManager(db)
print("✓ 렉시콘 매니저 초기화 완료")

# 현재 등록된 용어 확인
query = "SELECT business_term, technical_terms, category FROM term_mappings LIMIT 10"
result = db.execute_query(query)
df = pd.DataFrame(result)

print("\n[등록된 용어 샘플]")
print(df.to_string(index=False))

### 4.3 추가 용어 등록

In [None]:
# 사용자 정의 용어 추가 예제
custom_terms = [
    {
        'business_term': '재택근무',
        'technical_terms': ['remote_work', 'work_from_home'],
        'synonyms': ['원격근무', '홈오피스'],
        'description': '사무실이 아닌 집에서 근무하는 형태',
        'category': 'hr'
    },
    {
        'business_term': '실적',
        'technical_terms': ['performance', 'achievement'],
        'synonyms': ['성과', '업적'],
        'description': '업무 수행의 결과나 성과',
        'category': 'sales'
    }
]

# 용어 추가
for term in custom_terms:
    try:
        lexicon.add_term(
            business_term=term['business_term'],
            technical_terms=term['technical_terms'],
            synonyms=term['synonyms'],
            description=term['description'],
            category=term['category']
        )
        print(f"✓ '{term['business_term']}' 용어 등록 완료")
    except Exception as e:
        print(f"⚠ '{term['business_term']}' 등록 실패: {e}")

### 4.4 용어 검색 및 변환

In [None]:
# 용어 매핑 테스트
test_queries = [
    "매출이 가장 높은 직원을 보여줘",
    "평균 급여가 얼마인지 알려줘",
    "부서별 직원 수를 세어줘"
]

print("[용어 매핑 결과]\n")
for query in test_queries:
    mapped_query = lexicon.map_terms(query)
    print(f"원본: {query}")
    print(f"매핑: {mapped_query}")
    print("-" * 60)

## 5. 스키마 설명 확장

데이터베이스 스키마에 상세한 설명을 추가하여
LLM의 SQL 생성 정확도를 높입니다.

### 5.1 스키마 설명 테이블 생성

In [None]:
# 스키마 설명 테이블 초기화
sql_file = "/workspace/scripts/init_schema_descriptions.sql"

try:
    with open(sql_file, 'r', encoding='utf-8') as f:
        sql_script = f.read()
    
    db.execute_raw(sql_script)
    print("✓ 스키마 설명 테이블 생성 완료")
    
except Exception as e:
    print(f"⚠ SQL 실행 중 오류: {e}")
    print("테이블이 이미 존재할 수 있습니다.")

# 테이블 확인
query = "SELECT table_name, korean_name, description FROM table_descriptions"
result = db.execute_query(query)
df = pd.DataFrame(result)

print("\n[테이블 설명]")
print(df.to_string(index=False))

### 5.2 스키마 인핸서 초기화

In [None]:
# 스키마 인핸서 초기화
enhancer = SchemaEnhancer(db)
print("✓ 스키마 인핸서 초기화 완료")

### 5.3 컬럼 설명 조회

In [None]:
# employees 테이블의 컬럼 설명 조회
query = """
SELECT column_name, korean_name, description, example_values
FROM column_descriptions
WHERE table_name = 'employees'
ORDER BY column_name
"""
result = db.execute_query(query)
df = pd.DataFrame(result)

print("[employees 테이블 컬럼 설명]")
for _, row in df.iterrows():
    print(f"\n• {row['column_name']} ({row['korean_name']})")
    print(f"  설명: {row['description']}")
    if row['example_values']:
        print(f"  예시: {', '.join(row['example_values'][:3])}")

### 5.4 확장된 스키마 정보 가져오기

In [None]:
# 확장된 스키마 정보 조회
enhanced_schema = enhancer.get_enhanced_schema(tables=['employees', 'departments', 'sales'])

print("[확장된 스키마 정보]")
print("=" * 70)
print(enhanced_schema)

## 6. Few-shot 예제 자동 관리

질의-SQL 쌍을 저장하고 유사한 질의에 대해
적절한 예제를 자동으로 제공합니다.

### 6.1 예제 데이터베이스 구축

In [None]:
# Few-shot 예제 테이블 초기화
sql_file = "/workspace/scripts/init_examples.sql"

try:
    with open(sql_file, 'r', encoding='utf-8') as f:
        sql_script = f.read()
    
    db.execute_raw(sql_script)
    print("✓ Few-shot 예제 테이블 생성 완료")
    
except Exception as e:
    print(f"⚠ SQL 실행 중 오류: {e}")
    print("테이블이 이미 존재할 수 있습니다.")

# 예제 통계 확인
query = """
SELECT query_category, COUNT(*) as count, AVG(success_rate) as avg_success_rate
FROM query_examples
GROUP BY query_category
ORDER BY count DESC
"""
result = db.execute_query(query)
df = pd.DataFrame(result)

print("\n[예제 통계]")
print(df.to_string(index=False))

### 6.2 예제 매니저 초기화

In [None]:
# 예제 매니저 초기화
example_mgr = ExampleManager(db)
print("✓ 예제 매니저 초기화 완료")

### 6.3 예제 추가

In [None]:
# 사용자 정의 예제 추가
custom_examples = [
    {
        'natural_query': '가장 최근에 입사한 직원 5명을 보여줘',
        'sql_query': 'SELECT * FROM employees ORDER BY hire_date DESC LIMIT 5',
        'category': 'sorting',
        'difficulty': 'easy',
        'tags': ['order by', 'limit', 'recent'],
        'description': '최근 입사자 조회'
    },
    {
        'natural_query': '각 지역별 매출 합계를 높은 순으로 정렬해서 보여줘',
        'sql_query': 'SELECT region, SUM(amount) as total_sales FROM sales GROUP BY region ORDER BY total_sales DESC',
        'category': 'aggregation',
        'difficulty': 'medium',
        'tags': ['sum', 'group by', 'order by'],
        'description': '지역별 매출 집계 및 정렬'
    }
]

# 예제 추가
for example in custom_examples:
    try:
        example_id = example_mgr.add_example(
            natural_query=example['natural_query'],
            sql_query=example['sql_query'],
            category=example['category'],
            difficulty=example['difficulty'],
            tags=example['tags'],
            description=example['description']
        )
        print(f"✓ 예제 추가 완료 (ID: {example_id})")
    except Exception as e:
        print(f"⚠ 예제 추가 실패: {e}")

### 6.4 예제에 임베딩 추가

In [None]:
# 임베딩이 없는 예제에 임베딩 추가
query = "SELECT id, natural_language_query FROM query_examples WHERE embedding IS NULL"
result = db.execute_query(query)

print(f"임베딩 추가 대상: {len(result)}개 예제")

for row in result[:10]:  # 처음 10개만 처리
    try:
        embedding = generate_embeddings(row['natural_language_query'])
        
        update_query = """
        UPDATE query_examples 
        SET embedding = %s::vector 
        WHERE id = %s
        """
        db.execute_query(update_query, params=(embedding.tolist(), row['id']))
        print(f"✓ 예제 {row['id']} 임베딩 추가 완료")
    except Exception as e:
        print(f"⚠ 예제 {row['id']} 임베딩 실패: {e}")

### 6.5 유사 예제 검색

In [None]:
# 유사 예제 검색 테스트
test_query = "부서별로 평균 급여를 계산해줘"

print(f"질의: {test_query}\n")

try:
    similar_examples = example_mgr.search_similar_examples(
        query=test_query,
        limit=3
    )
    
    print("[유사 예제]")
    for i, ex in enumerate(similar_examples, 1):
        print(f"\n{i}. 유사도: {ex['similarity']:.3f}")
        print(f"   질의: {ex['natural_query']}")
        print(f"   SQL: {ex['sql_query']}")
        
except Exception as e:
    print(f"유사 예제 검색 실패: {e}")

## 7. 통합 최적화 파이프라인

모든 최적화 기법을 결합한 통합 파이프라인을 구현합니다.

### 7.1 통합 파이프라인 함수 구현

In [None]:
def optimized_text2sql(query: str, verbose: bool = True) -> Dict[str, Any]:
    """
    최적화된 Text2SQL 파이프라인
    
    Args:
        query: 자연어 질의
        verbose: 상세 로그 출력 여부
    
    Returns:
        결과 딕셔너리 (sql, results, metadata)
    """
    result = {
        'original_query': query,
        'normalized_query': None,
        'sql': None,
        'results': None,
        'error': None,
        'metadata': {}
    }
    
    try:
        # 1. 용어 정규화 (렉시콘)
        if verbose:
            print("[1/6] 용어 정규화 중...")
        normalized_query = lexicon.map_terms(query)
        result['normalized_query'] = normalized_query
        if verbose:
            print(f"  → {normalized_query}")
        
        # 2. 관련 문서 검색 (RAG)
        if verbose:
            print("\n[2/6] 관련 문서 검색 중...")
        try:
            from src.utils.embedding_utils import search_similar_documents
            docs = search_similar_documents(db, query, top_k=2)
            result['metadata']['related_docs'] = len(docs)
            if verbose and docs:
                print(f"  → {len(docs)}개 관련 문서 발견")
        except Exception as e:
            if verbose:
                print(f"  → 문서 검색 건너뜀: {e}")
        
        # 3. 유사 예제 검색
        if verbose:
            print("\n[3/6] 유사 예제 검색 중...")
        try:
            similar_examples = example_mgr.search_similar_examples(
                query=normalized_query,
                limit=3
            )
            result['metadata']['similar_examples'] = len(similar_examples)
            if verbose and similar_examples:
                print(f"  → {len(similar_examples)}개 유사 예제 발견")
                for ex in similar_examples:
                    print(f"     • 유사도 {ex['similarity']:.2f}: {ex['natural_query'][:40]}...")
        except Exception as e:
            similar_examples = []
            if verbose:
                print(f"  → 예제 검색 건너뜀: {e}")
        
        # 4. 확장된 스키마 정보 가져오기
        if verbose:
            print("\n[4/6] 스키마 정보 로드 중...")
        enhanced_schema = enhancer.get_enhanced_schema()
        if verbose:
            print("  → 확장된 스키마 정보 로드 완료")
        
        # 5. SQL 생성
        if verbose:
            print("\n[5/6] SQL 생성 중...")
        
        # Few-shot 예제 포맷팅
        few_shot_text = ""
        if similar_examples:
            few_shot_text = "\n\n유사한 예제:\n"
            for ex in similar_examples[:2]:
                few_shot_text += f"Q: {ex['natural_query']}\nSQL: {ex['sql_query']}\n\n"
        
        # 프롬프트 구성
        prompt = f"""
다음 스키마를 기반으로 자연어 질의를 SQL로 변환하세요.

{enhanced_schema}
{few_shot_text}
질의: {normalized_query}

SQL만 반환하세요 (설명 없이):
"""
        
        sql = generator.generate_sql(
            query=normalized_query,
            schema_context=enhanced_schema,
            few_shot_examples=similar_examples
        )
        result['sql'] = sql
        if verbose:
            print(f"  → {sql}")
        
        # 6. SQL 실행
        if verbose:
            print("\n[6/6] SQL 실행 중...")
        results = db.execute_query(sql)
        result['results'] = results
        if verbose:
            print(f"  → {len(results)}개 결과 반환")
        
        # 결과 로깅 (query_history)
        log_query = """
        INSERT INTO query_history 
            (natural_language_query, generated_sql, was_successful, execution_time_ms)
        VALUES (%s, %s, %s, %s)
        """
        db.execute_query(log_query, params=(query, sql, True, 0))
        
    except Exception as e:
        result['error'] = str(e)
        if verbose:
            print(f"\n✗ 오류 발생: {e}")
        
        # 실패 로깅
        try:
            log_query = """
            INSERT INTO query_history 
                (natural_language_query, generated_sql, was_successful, error_message)
            VALUES (%s, %s, %s, %s)
            """
            db.execute_query(log_query, params=(query, result.get('sql'), False, str(e)))
        except:
            pass
    
    return result

print("✓ 통합 파이프라인 함수 정의 완료")

### 7.2 파이프라인 테스트

In [None]:
# 테스트 쿼리 목록
test_queries = [
    "부서별 평균 급여를 보여줘",
    "매출이 가장 높은 직원 상위 5명은?",
    "올해 입사한 직원 수는 몇 명이야?",
    "지역별 매출 합계를 높은 순으로 정렬해줘"
]

# 각 쿼리 테스트
for i, test_query in enumerate(test_queries, 1):
    print("\n" + "="*70)
    print(f"테스트 {i}/{len(test_queries)}: {test_query}")
    print("="*70)
    
    result = optimized_text2sql(test_query, verbose=True)
    
    if result['results'] is not None:
        df = pd.DataFrame(result['results'])
        print("\n[결과]")
        print(df.head(10).to_string(index=False))
    
    print("\n")

## 8. 성능 모니터링 및 개선

시스템의 성능을 추적하고 개선점을 찾습니다.

### 8.1 성공률 추적

In [None]:
# 쿼리 히스토리 통계
query = """
SELECT 
    COUNT(*) as total_queries,
    SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as successful,
    SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failed,
    ROUND(100.0 * SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate
FROM query_history
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
"""

try:
    result = db.execute_query(query)
    df = pd.DataFrame(result)
    
    print("[최근 7일 쿼리 통계]")
    print(df.to_string(index=False))
except Exception as e:
    print(f"통계 조회 실패: {e}")

### 8.2 예제 성공률 분석

In [None]:
# 예제별 성능 분석
query = """
SELECT 
    query_category,
    difficulty,
    COUNT(*) as example_count,
    ROUND(AVG(success_rate)::numeric, 3) as avg_success_rate,
    SUM(usage_count) as total_usage
FROM query_examples
GROUP BY query_category, difficulty
ORDER BY avg_success_rate DESC, total_usage DESC
"""

try:
    result = db.execute_query(query)
    df = pd.DataFrame(result)
    
    print("[카테고리별 예제 성능]")
    print(df.to_string(index=False))
except Exception as e:
    print(f"분석 실패: {e}")

### 8.3 실패 케이스 분석

In [None]:
# 최근 실패한 쿼리 분석
query = """
SELECT 
    natural_language_query,
    generated_sql,
    error_message,
    created_at
FROM query_history
WHERE NOT was_successful
ORDER BY created_at DESC
LIMIT 10
"""

try:
    result = db.execute_query(query)
    
    if result:
        print("[최근 실패한 쿼리]\n")
        for i, row in enumerate(result, 1):
            print(f"{i}. {row['natural_language_query']}")
            print(f"   SQL: {row['generated_sql']}")
            print(f"   오류: {row['error_message']}")
            print(f"   시간: {row['created_at']}")
            print("-" * 60)
    else:
        print("실패한 쿼리가 없습니다.")
except Exception as e:
    print(f"분석 실패: {e}")

### 8.4 자동 개선 제안

In [None]:
# 미등록 용어 탐지
print("[개선 제안]\n")

# 1. 자주 사용되지만 매핑되지 않은 용어 찾기
try:
    unknown_terms = lexicon.detect_unknown_terms()
    
    if unknown_terms:
        print("1. 렉시콘에 추가 권장 용어:")
        for term in unknown_terms[:5]:
            print(f"   • {term}")
    else:
        print("1. 추가 권장 용어 없음")
except:
    print("1. 용어 분석 불가")

print()

# 2. 낮은 성공률 카테고리 식별
query = """
SELECT query_category, AVG(success_rate) as avg_rate
FROM query_examples
GROUP BY query_category
HAVING AVG(success_rate) < 0.8
ORDER BY avg_rate ASC
"""

try:
    result = db.execute_query(query)
    if result:
        print("2. 개선 필요 카테고리 (성공률 < 80%):")
        for row in result:
            print(f"   • {row['query_category']}: {row['avg_rate']*100:.1f}%")
    else:
        print("2. 모든 카테고리의 성공률이 양호합니다.")
except:
    print("2. 카테고리 분석 불가")

print()

# 3. 임베딩 누락 확인
query = "SELECT COUNT(*) as count FROM query_examples WHERE embedding IS NULL"
try:
    result = db.execute_query(query)
    missing = result[0]['count']
    if missing > 0:
        print(f"3. 임베딩 누락 예제: {missing}개")
        print("   → 섹션 6.4를 참고하여 임베딩을 추가하세요.")
    else:
        print("3. 모든 예제에 임베딩이 설정되어 있습니다.")
except:
    print("3. 임베딩 확인 불가")

## 9. 요약 및 다음 단계

### 구현한 최적화 기법

1. **문서 임베딩 시스템**
   - PDF, Word, Excel 문서 로드
   - 문서 청킹 및 임베딩 생성
   - 벡터 데이터베이스 저장

2. **렉시콘 관리**
   - 비즈니스-기술 용어 매핑
   - 동의어 처리
   - 자동 용어 정규화

3. **스키마 설명 확장**
   - 테이블/컬럼 한글명 및 설명
   - 예시 값 제공
   - 비즈니스 의미 명시

4. **Few-shot 예제 관리**
   - 질의-SQL 쌍 저장
   - 임베딩 기반 유사도 검색
   - 성공률 추적

5. **통합 파이프라인**
   - 모든 기법을 결합한 최적화 워크플로우
   - 자동 로깅 및 모니터링

6. **성능 모니터링**
   - 성공률 추적
   - 실패 케이스 분석
   - 자동 개선 제안

### 다음 단계

- 실제 사내 문서를 `/workspace/data/company_docs`에 추가
- 도메인 특화 용어를 렉시콘에 계속 추가
- 실패한 쿼리를 분석하여 예제로 추가
- A/B 테스트를 통한 최적화 효과 검증
- 프로덕션 환경에 배포

In [None]:
print("✓ 고급 Text2SQL 최적화 기법 실습 완료!")
print("\n다음 노트북:")
print("  - 06_end_to_end.ipynb: 전체 파이프라인 통합 테스트")
print("  - 실제 사내 데이터를 활용한 프로덕션 배포")