# 상법 RAG 시스템 (PDF 파싱 → 벡터 임베딩 → LLM 질의응답)

이 노트북은 상법 PDF 문서를 파싱하고, 벡터 임베딩을 생성한 후 RAG(Retrieval-Augmented Generation) 시스템을 구축하여 질의응답을 수행합니다.

1. PDF → JSON 파싱
2. 임베딩 생성 및 FAISS 인덱스 구축
3. RAG + LLM 질의응답

## 환경 변수 설정
- `OPENAI_API_KEY`: OpenAI API 키
- `OPENAI_MODEL`: 사용할 OpenAI 모델 (기본값: gpt-4-turbo-preview)

In [None]:
# 필요한 라이브러리 설치 (필요한 경우)
# %pip install -q openai sentence-transformers faiss-cpu pypdf2 python-dotenv

import os
import json
import math
import numpy as np
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime

# OpenAI
from openai import OpenAI
from dotenv import load_dotenv

# 임베딩
from sentence_transformers import SentenceTransformer
import faiss

# PDF 처리
from PyPDF2 import PdfReader

# 환경변수 로드
load_dotenv()

# OpenAI 클라이언트 설정
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
if not client.api_key:
    raise ValueError("OPENAI_API_KEY 환경변수가 설정되지 않았습니다.")

In [None]:
# 기본 설정
WORKSPACE_DIR = Path(__file__).parent.absolute()
PDF_PATH = WORKSPACE_DIR / "상법(법률)(제20991호)(20250722).pdf"
INDEX_DIR = WORKSPACE_DIR / "kcc_index_json"
INDEX_DIR.mkdir(parents=True, exist_ok=True)

# 모델 설정
EMB_MODEL = "text-embedding-3-large"            # 임베딩 모델
LLM_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4-turbo-preview")  # 답변 생성 모델

# 기타 설정
BATCH_SIZE = 128
TOP_K = 10

print(f"작업 경로: {WORKSPACE_DIR}")
print(f"PDF 파일 존재: {PDF_PATH.exists()}")
print(f"인덱스 경로: {INDEX_DIR}")
print(f"사용 모델: {EMB_MODEL}, {LLM_MODEL}")

## 1. PDF 파싱

PDF 문서를 읽어서 다음과 같은 구조의 JSON으로 변환합니다:
```json
{
  "meta": {
    "title_ko": "상법",
    "source_file": "상법(법률)(제20991호)(20250722).pdf",
    "note": "부칙 제외, 본문 줄바꿈 제거, '제n조의m' 지원"
  },
  "total_articles": 877,
  "articles": [
    {
      "article_id": "1",
      "article_number_base": 1,
      "article_number_suffix": null,
      "title": "상사적용법규",
      "body": "상사에 관하여...",
      "part": null,
      "chapter": null,
      "section": null
    }
  ]
}
```

In [None]:
def parse_pdf_to_text(pdf_path: Path) -> str:
    """PDF 파일을 텍스트로 변환"""
    reader = PdfReader(str(pdf_path))
    pages = []
    for p in reader.pages:
        try:
            text = p.extract_text() or ""
        except Exception:
            text = ""
        pages.append(text)
    return "\n".join(pages)

def extract_main_text(raw_text: str) -> str:
    """부칙 이전까지의 본문만 추출"""
    import re
    m = re.search(r"(?:^|\n)\s*부칙\s*(?:\n|$)", raw_text)
    if m:
        return raw_text[:m.start()].strip()
    return raw_text.strip()

def parse_articles(text: str) -> List[Dict]:
    """본문을 조문 단위로 파싱"""
    import re
    
    # 조문 구분 패턴
    rx_art = re.compile(r"""
        (?:제\s*)?(\d+)               # 기본 조문 번호
        (?:\s*의\s*(\d+))?           # 옵션: 의2, 의3 등
        \s*조                        # '조' 글자
        (?:\s*\(([^)]+)\))?         # 옵션: (제목)
        \s*(.+?)                    # 본문 (non-greedy)
        (?=\s*제\s*\d+\s*조|\s*$)   # 다음 조문 시작 또는 끝
    """, re.VERBOSE | re.DOTALL)
    
    blocks = []
    for m in rx_art.finditer(text):
        base = int(m.group(1))
        suffix = int(m.group(2)) if m.group(2) else None
        title = m.group(3)
        body = m.group(4).replace("\n", " ").strip()
        
        art_id = str(base) if suffix is None else f"{base}의{suffix}"
        blocks.append({
            "article_id": art_id,
            "article_number_base": base,
            "article_number_suffix": suffix,
            "title": title,
            "body": body,
            "part": None,  # TODO: 편/장/절 구조 파싱 (필요시)
            "chapter": None,
            "section": None
        })
    
    return blocks

# PDF 파싱 실행
raw_text = parse_pdf_to_text(PDF_PATH)
main_text = extract_main_text(raw_text)
articles = parse_articles(main_text)

# JSON 생성
output = {
    "meta": {
        "title_ko": "상법",
        "source_file": PDF_PATH.name,
        "note": "부칙 제외, 본문 줄바꿈 제거, '제n조의m' 지원"
    },
    "total_articles": len(articles),
    "articles": articles
}

# 저장
out_json = INDEX_DIR.parent / "상법_파싱.json"
with out_json.open("w", encoding="utf-8") as f:
    json.dump(output, f, ensure_ascii=False, indent=2)

print(f"총 {len(articles)}개 조문 파싱 완료")
print(f"저장 경로: {out_json} ({out_json.stat().st_size:,} bytes)")

## 2. 벡터 임베딩 생성

파싱된 JSON에서 조문별 검색용 데이터를 구성하고 FAISS 인덱스를 생성합니다:

1. 조문별 별칭(alias) 생성
2. 텍스트 정규화
3. 임베딩 생성
4. FAISS 인덱스 구축 및 저장

In [None]:
def make_aliases(base: int, suf: Optional[int], title: Optional[str]) -> List[str]:
    """조문 번호와 제목으로부터 다양한 검색용 별칭 생성"""
    aliases = []
    # 한국어 기본형
    if suf is None:
        ko_core = f"제{base}조"
    else:
        ko_core = f"제{base}조의{suf}"
    aliases.append(ko_core)
    aliases.append(f"상법 {ko_core}")
    aliases.append(f"상법 {base}조" if suf is None else f"상법 {base}조의{suf}")
    aliases.append(f"{base}조" if suf is None else f"{base}조의{suf}")
    if title:
        aliases.append(f"{ko_core}({title})")
        aliases.append(f"상법 {ko_core}({title})")
    
    # 하이픈 표기
    hyf = f"{base}" if suf is None else f"{base}-{suf}"
    aliases.append(hyf)
    aliases.append(f"제{hyf}조")
    aliases.append(f"상법 제{hyf}조")
    
    # 영어식
    en_core = f"Article {base}" if suf is None else f"Article {base}-{suf}"
    aliases.append(en_core)
    aliases.append(f"KCC {en_core}")
    aliases.append(f"Korean Commercial Code {en_core}")
    
    # 중복 제거 + 안정 정렬
    seen = set(); out = []
    for a in aliases:
        if a not in seen:
            seen.add(a); out.append(a)
    return out

def build_text(rec: Dict) -> str:
    """검색용 텍스트 구성"""
    base = rec.get("article_number_base")
    suf  = rec.get("article_number_suffix")
    head = f"상법 제{base}조" if suf is None else f"상법 제{base}조의{suf}"
    if rec.get("title"):
        head += f" {rec['title']}"
    body = rec.get("body","").replace("\n", " ").strip()
    return f"[{head}] {body}".strip()

# JSON 로드
with open(INDEX_DIR.parent / "상법_파싱.json", "r", encoding="utf-8") as f:
    data = json.load(f)
    
articles = data.get("articles", [])
print(f"로드된 조문 수: {len(articles)}")

# 검색용 데이터 구성
docs = []
for rec in articles:
    base = rec.get("article_number_base")
    suf  = rec.get("article_number_suffix")
    art_id = rec.get("article_id") or (str(base) if suf is None else f"{base}의{suf}")
    title = rec.get("title")

    aliases = make_aliases(base, suf, title)
    text = build_text(rec)
    
    docs.append({
        "id": f"KCC-{art_id}",
        "article_id": art_id,
        "title": title,
        "aliases": aliases,
        "text": text,
        "raw_text": rec.get("body", ""),
        "meta": {
            "law": "상법",
            "source_file": data.get("meta",{}).get("source_file",""),
            "part": rec.get("part"),
            "chapter": rec.get("chapter"),
            "section": rec.get("section"),
        }
    })

print(f"검색용 문서 생성: {len(docs)}개")
print("샘플 문서:", {k:docs[0][k] for k in ["id","article_id","title"]})

In [None]:
# 임베딩 생성
model = SentenceTransformer("intfloat/multilingual-e5-base")

def encode_texts(texts: List[str], batch_size: int = 128, normalize: bool = True) -> np.ndarray:
    """텍스트 리스트를 임베딩 벡터로 변환"""
    emb = model.encode(texts, batch_size=batch_size, 
                      convert_to_numpy=True, show_progress_bar=True)
    if normalize:
        faiss.normalize_L2(emb)
    return emb

# 코퍼스 구성
CORPUS = [d["text"] for d in docs]
IDS = [d["id"] for d in docs]
METAS = [{k:d[k] for k in ["article_id","title","aliases","raw_text","meta"]} for d in docs]

# 임베딩 및 인덱스 생성
print("임베딩 생성 중...")
EMB = encode_texts(CORPUS, batch_size=BATCH_SIZE, normalize=True)
index = faiss.IndexFlatIP(EMB.shape[1])
index.add(EMB)
print(f"인덱스 크기: {index.ntotal}, 차원: {EMB.shape[1]}")

# 저장
print("인덱스 저장 중...")
faiss.write_index(index, str(INDEX_DIR / "kcc.index"))
np.save(INDEX_DIR / "ids.npy", np.array(IDS))
with (INDEX_DIR / "metas.json").open("w", encoding="utf-8") as f:
    json.dump(METAS, f, ensure_ascii=False, indent=2)

print(f"인덱스 저장 완료: {INDEX_DIR}")

## 3. RAG + LLM 질의응답 시스템

생성된 임베딩과 FAISS 인덱스를 활용하여 다음과 같은 과정으로 질의응답을 수행합니다:

1. 질의어 임베딩 → 코사인 유사도 기반 1차 검색
2. LLM 기반 후보 재정렬 (정답 근거성 평가)
3. 선택된 근거를 바탕으로 LLM 답변 생성

In [None]:
# 검색 유틸리티
RX_ART_NUM = re.compile(r"(?:제\s*)?(\d+)(?:\s*조\s*의\s*(\\d+)|\\s*조)")

def normalize_query(q: str) -> str:
    """검색어 정규화 (조문 번호 처리)"""
    qn = q.strip()
    m = re.search(r"(\d+)\s*의\s*(\d+)", qn)
    extra = []
    if m:
        extra.append(f"{m.group(1)}-{m.group(2)}")
        extra.append(f"제{m.group(1)}조의{m.group(2)}")
    m2 = RX_ART_NUM.search(qn)
    if m2:
        base = m2.group(1); suf = m2.group(2)
        if suf:
            extra += [f"{base}의{suf}", f"{base}-{suf}", 
                     f"제{base}조의{suf}", f"Article {base}-{suf}"]
        else:
            extra += [f"{base}", f"제{base}조", f"Article {base}"]
    if extra:
        qn = qn + " " + " ".join(sorted(set(extra)))
    return qn

def retrieve(query: str, top_k: int = 10) -> List[Dict]:
    """임베딩 기반 1차 검색"""
    qv = client.embeddings.create(model=EMB_MODEL, input=[query]).data[0].embedding
    qv = np.array(qv, dtype=np.float32)
    D, I = index.search(qv.reshape(1, -1), top_k)
    I, D = I[0], D[0]
    
    results = []
    for i, s in zip(I, D):
        if i < 0:  # FAISS의 패딩
            continue
        m = METAS[i]
        results.append({
            "index": int(i),
            "score": float(s),
            "text": m["raw_text"],
            "meta": m["meta"]
        })
    return results

def llm_rerank(query: str, candidates: List[Dict], take: int = 6, 
               model: str = LLM_MODEL) -> List[Dict]:
    """LLM 기반 후보 재정렬"""
    # 컨텍스트 구성
    bullet = []
    for i, c in enumerate(candidates):
        snippet = c["text"].replace("\n", " ").strip()
        if len(snippet) > 600:
            snippet = snippet[:600] + " ..."
        bullet.append(f"[{i}] {snippet}")
    
    # LLM 프롬프트
    prompt = f"""당신은 문서검색 재랭킹을 담당하는 평가자입니다.
사용자 질문: {query}

아래 후보 발췌문들 중에서, 질문에 대한 **정답 근거**가 가장 잘 담긴 순서로 재정렬하세요.
가능하면 숫자/연도/표 제목 일치 여부를 중시하세요.

후보:
{chr(10).join(bullet)}

출력 형식:
- 상위 {take}개의 인덱스만 콤마로 나열 (예: 3,0,5,1,2,4)
- 다른 설명은 쓰지 말 것
"""
    msg = [
        {"role":"system","content":"You are a rigorous re-ranker that only outputs indices."},
        {"role":"user","content": prompt}
    ]
    
    # LLM 호출 및 파싱
    resp = client.chat.completions.create(model=model, messages=msg, temperature=0)
    line = resp.choices[0].message.content.strip()
    idxs = []
    for tok in line.replace(" ", "").split(","):
        if tok.isdigit():
            idxs.append(int(tok))
    
    # 안전장치
    if not idxs:
        idxs = list(range(min(take, len(candidates))))
    idxs = idxs[:take]
    
    # 재정렬된 결과 반환
    return [candidates[i] for i in idxs]

def build_context_snippets(chunks: List[Dict], max_chars: int = 1800) -> str:
    """LLM 답변용 컨텍스트 구성"""
    ctx_lines = []
    for rank, c in enumerate(chunks, start=1):
        tag = f"[E{rank}]"
        snippet = c["text"].strip()
        if len(snippet) > 1200:
            snippet = snippet[:1200] + " ..."
        ctx_lines.append(f"{tag} {snippet}")
    return "\n\n".join(ctx_lines)

def answer_with_rag_llm(query: str, retrieve_k: int = 12, rerank_take: int = 6, 
                       model: str = LLM_MODEL, temperature: float = 0.0, 
                       debug: bool = False) -> Dict:
    """RAG + LLM 기반 질의응답"""
    # 1) 초기 검색
    cand = retrieve(query, top_k=retrieve_k)
    
    # 2) LLM 재정렬
    top = llm_rerank(query, cand, take=rerank_take, model=model)
    
    # 3) 컨텍스트 구성
    ctx = build_context_snippets(top)
    
    # 4) 답변 생성
    prompt = f"""당신은 정확한 회계/법률 문서 QA 어시스턴트입니다.
아래 '근거 발췌문'만을 **사실 근거**로 사용하여 질문에 답하세요.
근거에 없는 내용은 추정하지 말고, '근거 부족'이라고 밝혀주세요.
필요하면 답변 문장 끝에 [E1], [E2]처럼 근거 번호를 달아주세요.

[질문]
{query}

[근거 발췌문]
{ctx}
"""
    msgs = [
        {"role":"system","content":"You are a precise, Korean-speaking assistant for financial/legal documents."},
        {"role":"user","content": prompt}
    ]
    resp = client.chat.completions.create(
        model=model, messages=msgs, temperature=temperature
    )
    answer = resp.choices[0].message.content.strip()
    
    return {
        "answer": answer,
        "evidence": top,
        "raw_candidates": cand if debug else None
    }

## 4. 데모

실제 질의응답을 수행해봅니다.

In [None]:
# 예시 질의
q = "상법에서 소상인의 정의는 무엇입니까?"
out = answer_with_rag_llm(q, debug=True)

print("=== 답변 ===")
print(out["answer"])

print("\n=== 근거 ===")
for i, e in enumerate(out["evidence"], start=1):
    title = (e.get("meta", {}) or {}).get("title", "")
    print(f"[E{i}] score={e['score']:.4f}, title={title}")

def preview_text(evidence_list: List[Dict], which: int = 1):
    """근거 원문 확인"""
    i = max(1, min(which, len(evidence_list))) - 1
    print(evidence_list[i]["text"])

# 첫 번째 근거 원문 확인
preview_text(out["evidence"], which=1)