In [1]:
# -*- coding: utf-8 -*-
# 멀티법령 RAG 전처리/인덱싱 (PIPA, 신용정보법, 전자서명법, 정보통신망법)

import os, re, json, math
from dataclasses import dataclass, asdict
from typing import List, Dict, Tuple, Optional

import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

from PyPDF2 import PdfReader
from langchain_community.embeddings import HuggingFaceEmbeddings
import faiss

# ===== 사용자 환경 상수 =====
LLM_ID = "nlpai-lab/KULLM3"
EMB_MODEL = "jhgan/ko-sroberta-multitask"   # 경량/호환성 위주
CHUNK_TOKENS = 600
CHUNK_OVERLAP = 32
CTX_TOKEN_BUDGET = 600
TOP_K = 4
SEED = 42
torch.manual_seed(SEED)

# ===== 법령 설정: 파일 경로 + 제거/정규식 패턴 =====
LAW_CONFIG = {
    # 개인정보 보호법
    "pipa": {
        "law_name": "개인정보 보호법",
        "pdf_path": "../data/개인정보 보호법(법률)(제19234호)(20250313).pdf",
        "drop_patterns": [
            r'법제처\s+\d+\s+국가법령정보센터\s*개인정보\s*보호법',
            r'법제처\s+\d+\s+국가법령정보센터',
            r'국가법령정보센터\s*개인정보\s*보호법',
            r'법제처|국가법령정보센터',
            r'<[^>]+>',         # <개정 …>, <신설 …>
            r'\[[^\]]+\]',      # [본조신설 …]
        ],
    },
    # 신용정보의 이용 및 보호에 관한 법률
    "ciupa": {
        "law_name": "신용정보법",
        "pdf_path": "../data/신용정보의 이용 및 보호에 관한 법률(법률)(제20304호)(20240814).pdf",
        "drop_patterns": [
            r'법제처\s+\d+\s+국가법령정보센터\s*신용정보.*법',
            r'법제처|국가법령정보센터',
            r'<[^>]+>', r'\[[^\]]+\]',
        ],
    },
    # 전자서명법
    "es_act": {
        "law_name": "전자서명법",
        "pdf_path": "../data/전자서명법(법률)(제18479호)(20221020).pdf",
        "drop_patterns": [
            r'법제처\s+\d+\s+국가법령정보센터\s*전자서명법',
            r'법제처|국가법령정보센터',
            r'<[^>]+>', r'\[[^\]]+\]',
        ],
    },
    # 정보통신망 이용촉진 및 정보보호 등에 관한 법률
    "icn_act": {
        "law_name": "정보통신망법",
        "pdf_path": "../data/정보통신망 이용촉진 및 정보보호 등에 관한 법률(법률)(제20678호)(20250722).pdf",
        "drop_patterns": [
            r'법제처\s+\d+\s+국가법령정보센터\s*정보통신망.*법',
            r'법제처|국가법령정보센터',
            r'<[^>]+>', r'\[[^\]]+\]',
        ],
    },
}

In [2]:
# ===== 토크나이저: 토큰 길이 계산/청킹 =====
llm_tokenizer = AutoTokenizer.from_pretrained(LLM_ID)
if llm_tokenizer.pad_token is None:
    llm_tokenizer.pad_token = llm_tokenizer.eos_token
llm_tokenizer.padding_side = "right"

def token_len(s: str) -> int:
    return len(llm_tokenizer(s, add_special_tokens=False)["input_ids"])

def split_sentences_ko(text: str) -> List[str]:
    # 고정폭 lookbehind: '다.' 또는 일반 종결부호
    text = re.sub(r'\s+', ' ', text).strip()
    if not text:
        return []
    return re.split(r'(?<=다\.)\s+|(?<=[.?!。！？])\s+', text)




In [3]:
# ===== 공통 정제 =====
def normalize_common(text: str) -> str:
    # 한자 제거
    text = re.sub(r'[\u4e00-\u9fff]', '', text)
    # circled numbers → (n)
    circled = '①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳'
    for idx, c in enumerate(circled, 1):
        text = text.replace(c, f'({idx})')
    # 공백/빈 괄호 정리
    text = re.sub(r'\s+', ' ', text).strip()
    text = re.sub(r'\(\s*\)', '', text)
    return text

def clean_text_by_config(text: str, drop_patterns: List[str]) -> str:
    for pat in drop_patterns:
        text = re.sub(pat, '', text)
    return normalize_common(text)

# ===== 조문 단위 분리 =====
ARTICLE_HEADER_PATTERN = r'(제\d+조(?:의\d+)?\([^)]+\))'  # 제X조(제목) / 제X조의Y(제목)

def split_articles(raw_text: str) -> List[Tuple[str, str, str]]:
    parts = re.split(ARTICLE_HEADER_PATTERN, raw_text)
    out = []
    for i in range(1, len(parts), 2):
        header = parts[i]
        body = (parts[i+1] if i+1 < len(parts) else "").strip().replace("\n", " ")
        m = re.match(r'(제\d+조(?:의\d+)?)[(]([^)]+)[)]', header)
        if not m:
            continue
        article_id = m.group(1)          # 제xx조 / 제xx조의y
        article_title = m.group(2)       # (제목)
        out.append((article_id, article_title, body))
    return out

def chunk_article(article_body: str, header: str) -> List[str]:
    prefix = header.strip() + "\n"
    sents = split_sentences_ko(article_body) or [article_body]
    chunks, cur, cur_toks = [], [], token_len(prefix)
    for s in sents:
        tl = token_len(s)
        if cur_toks + tl > CHUNK_TOKENS and cur:
            chunks.append(prefix + " ".join(cur))
            # overlap: 마지막 문장 유지
            keep = cur[-1] if CHUNK_OVERLAP > 0 and cur else ""
            cur = [keep] if keep else []
            cur_toks = token_len(prefix) + (token_len(keep) if keep else 0)
        cur.append(s); cur_toks += tl
    if cur:
        chunks.append(prefix + " ".join(cur))
    return chunks


In [4]:
# ===== 데이터 클래스 =====
@dataclass
class LawDoc:
    text: str
    meta: Dict

# ===== 임베딩 =====
embeddings = HuggingFaceEmbeddings(
    model_name=EMB_MODEL,
    model_kwargs={"device": "cuda" if torch.cuda.is_available() else "cpu"},
    encode_kwargs={
        "normalize_embeddings": True,
        "batch_size": 128,
        "convert_to_numpy": True,
        "convert_to_tensor": False
    }
)

# ===== 인덱스 (FAISS HNSW) =====
def build_faiss_hnsw(vectors: np.ndarray, m: int = 32, ef_search: int = 32) -> faiss.IndexHNSWFlat:
    dim = vectors.shape[1]
    idx = faiss.IndexHNSWFlat(dim, m)
    idx.hnsw.efSearch = ef_search
    idx.add(vectors.astype(np.float32))
    return idx

# ===== 파이프라인: 1) 로드 → 2) 정제 → 3) 조문분리 → 4) 청킹 → 5) 임베딩/인덱스
def load_pdf_text(pdf_path: str) -> str:
    reader = PdfReader(pdf_path)
    text = ""
    for p in reader.pages:
        t = p.extract_text() or ""
        text += t + "\n"
    return text

def preprocess_law(law_id: str, cfg: Dict) -> List[LawDoc]:
    raw = load_pdf_text(cfg["pdf_path"])
    cleaned = clean_text_by_config(raw, cfg["drop_patterns"])
    articles = split_articles(cleaned)

    docs: List[LawDoc] = []
    # 시행일 파싱(있으면): [시행 YYYY. M. D.]가 머리말에 있는 경우가 많으나 PDF마다 다름 → 필요 시 별도 파서
    effective_date = None  # 필요 시 추출 로직 추가

    for article_id, title, body in articles:
        header = f'{cfg["law_name"]} {article_id}({title})'
        chunks = chunk_article(body, header)
        for ch in chunks:
            meta = {
                "law_id": law_id,
                "law_name": cfg["law_name"],
                "article_id": article_id,
                "article_title": title,
                "effective_date": effective_date,   # None 가능
                "tok_len": token_len(ch),
                "source_uri": cfg.get("pdf_path"),
                "version": None
            }
            docs.append(LawDoc(text=ch, meta=meta))
    return docs

def build_indices(all_docs: List[LawDoc]):
    # (a) 법령별 인덱스
    per_law_docs: Dict[str, List[LawDoc]] = {}
    for d in all_docs:
        per_law_docs.setdefault(d.meta["law_id"], []).append(d)

    indices = {}
    for law_id, docs in per_law_docs.items():
        mat = np.array(embeddings.embed_documents([d.text for d in docs]), dtype=np.float32)
        indices[f"faiss_hnsw_{law_id}"] = {
            "index": build_faiss_hnsw(mat, m=32, ef_search=32),
            "docs": docs
        }

    # (b) 글로벌 인덱스
    mat_all = np.array(embeddings.embed_documents([d.text for d in all_docs]), dtype=np.float32)
    indices["faiss_hnsw_all"] = {
        "index": build_faiss_hnsw(mat_all, m=32, ef_search=32),
        "docs": all_docs
    }
    return indices

  embeddings = HuggingFaceEmbeddings(
  return self.fget.__get__(instance, owner)()


In [5]:
all_docs: List[LawDoc] = []
for law_id, cfg in LAW_CONFIG.items():
    if not os.path.exists(cfg["pdf_path"]):
        print(f"[WARN] PDF not found: {cfg['pdf_path']}")
        continue
    docs = preprocess_law(law_id, cfg)
    all_docs.extend(docs)
    print(f"[OK] {cfg['law_name']} → chunks: {len(docs)}")

indices = build_indices(all_docs)
print("[OK] built indices:", list(indices.keys()))

[OK] 개인정보 보호법 → chunks: 217
[OK] 신용정보법 → chunks: 181
[OK] 전자서명법 → chunks: 29
[OK] 정보통신망법 → chunks: 151
[OK] built indices: ['faiss_hnsw_pipa', 'faiss_hnsw_ciupa', 'faiss_hnsw_es_act', 'faiss_hnsw_icn_act', 'faiss_hnsw_all']


In [6]:
# ===== 라우팅 규칙 =====
ARTICLE_PTRN = re.compile(r"제\d+조(?:의\d+)?")  # 조문 표기
LAW_HINTS = {
    "pipa": ("개인정보", "개인 정보", "개인정보보호법"),
    "ciupa": ("신용정보",  "신용정보법"),
    "es_act": ("전자서명", "전자서명법"),
    "icn_act": ("정보통신망", "통신망", "정보통신망법"),
}

def detect_law_id(query: str) -> Optional[str]:
    q = query.lower()
    for law_id, kws in LAW_HINTS.items():
        if any(kw.lower() in q for kw in kws):
            return law_id
    return None

def route_is_domain(query: str) -> bool:
    # 법/금융/보안 도메인 간단 라우터 (검색 여부 판단용)
    domain_kws = ("법", "조(", "과징금", "처벌", "보안", "침해", "금융", "증권", "자본시장", "개인정보", "신용정보", "전자서명", "정보통신망")
    q = query.lower()
    return any(kw in q for kw in domain_kws) or bool(ARTICLE_PTRN.search(query))

def choose_index(indices: dict, query: str):
    """
    1) 질의에서 법령 단서 -> 해당 법 인덱스 우선
    2) 조문 패턴만 있거나 단서가 불분명 -> 글로벌 인덱스
    3) 아무 단서도 없으면 None (베이스모델 경로)
    """
    law_id = detect_law_id(query)
    if law_id:
        key = f"faiss_hnsw_{law_id}"
        if key in indices:
            return indices[key]  # {"index": ..., "docs": ...}
    # 법령 단서 없지만 도메인성/조문 표기는 있는 경우 글로벌
    if route_is_domain(query) and "faiss_hnsw_all" in indices:
        return indices["faiss_hnsw_all"]
    return None  # 베이스모델 직행


In [7]:
# ===== 검색 with 점수 (해당 인덱스에서) =====
def faiss_search_with_scores_from_index(index_entry: dict, query: str, top_k: int = TOP_K):
    # embeddings는 상위 스코프에서 로드되었다고 가정
    qv = np.array(embeddings.embed_query(query), dtype=np.float32).reshape(1, -1)
    D, I = index_entry["index"].search(qv, top_k)     # L2 거리 (정규화 벡터 가정)
    cos = 1.0 - (D[0] / 2.0)                          # L2 -> cosine
    out = []
    docs = index_entry["docs"]
    for idx, i in enumerate(I[0]):
        ii = int(i)
        if ii >= 0:
            out.append((docs[ii], float(cos[idx])))
    return out  # [(LawDoc, cosine), ...]

# ===== 컨텍스트 패킹 (질문 주신 코드 재사용 + tok_len 캐시) =====
def pack_context(docs_in, token_budget=CTX_TOKEN_BUDGET):
    acc, used = [], 0
    for d in docs_in:
        tl = d.meta.get("tok_len", None)
        if tl is None:
            tl = token_len(d.text); d.meta["tok_len"] = tl
        if used + tl <= token_budget:
            acc.append(d.text); used += tl
        else:
            remain = token_budget - used
            if remain > 50:
                ids = llm_tokenizer(d.text, add_special_tokens=False)["input_ids"][:remain]
                acc.append(llm_tokenizer.decode(ids))
            break
    return "\n\n".join(acc)


In [8]:
# ===== 프롬프트 =====
SYSTEM_PROMPT = (
    "당신은 한국 법령, 금융, 보안 도메인 Q/A를 담당하는 도우미입니다. "
    "아는 사실만 간결하게 답하고, 모르면 '알 수 없습니다'라고 말하세요."
)

def build_prompt(query: str, context: Optional[str]) -> Tuple[str, int]:
    """컨텍스트 유무에 따라 간단 프롬프트와 max_len을 반환"""
    if context and context.strip():
        prompt = (
            "아래 컨텍스트를 우선 사용해 정확히 답하세요. 불충분하면 아는 범위에서만 간결히 답하세요.\n\n"
            f"=== 컨텍스트 ===\n{context}\n=== 끝 ===\n"
            f"질문: {query}"
        )
        return f"{SYSTEM_PROMPT}\n\n{prompt}", 3072
    else:
        prompt = (
            "당신은 한국어로 간결하고 정확하게 답하는 도우미입니다. "
            "사실에 근거해 답하고, 모르면 '알 수 없습니다'라고 말하세요.\n\n"
            f"질문: {query}"
        )
        return prompt, 2048

# ===== 생성 토큰 상한 =====
def dynamic_max_new_tokens(question: str) -> int:
    lines = [ln.strip() for ln in question.split("\n") if ln.strip()]
    opt_cnt = sum(bool(re.match(r"^\d+(\s|[.)])", ln)) for ln in lines)
    return 96 if opt_cnt >= 2 else 192

# ===== 메인: 다중 인덱스 기반 텍스트 생성 =====
def generate_answer_with_indices(query: str, indices: dict) -> str:
    """
    indices: build_indices() 반환 구조
    - 라우팅 → 해당 인덱스에서 top-k 검색(점수 포함)
    - 최고 유사도 임계치 미만이면 컨텍스트 없이 베이스모델 생성(Adaptive RAG)
    """
    # 0) 우선, 일반상식/비도메인은 곧장 베이스모델
    if not route_is_domain(query):
        prompt, max_len = build_prompt(query, context=None)
        inputs = llm_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=max_len, padding=False)
        inputs = {k: v.to(llm_model.device) for k, v in inputs.items()}
        with torch.inference_mode():
            out = llm_model.generate(**inputs,
                                     max_new_tokens=dynamic_max_new_tokens(query),
                                     do_sample=False, temperature=0.2,
                                     eos_token_id=llm_tokenizer.eos_token_id,
                                     pad_token_id=llm_tokenizer.pad_token_id)
        gen = out[0][inputs["input_ids"].shape[1]:]
        return llm_tokenizer.decode(gen, skip_special_tokens=True).strip()

    # 1) 인덱스 선택
    idx_entry = choose_index(indices, query)

    # 2) 인덱스가 없으면 베이스모델
    if idx_entry is None:
        prompt, max_len = build_prompt(query, context=None)
        inputs = llm_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=max_len, padding=False)
        inputs = {k: v.to(llm_model.device) for k, v in inputs.items()}
        with torch.inference_mode():
            out = llm_model.generate(**inputs,
                                     max_new_tokens=dynamic_max_new_tokens(query),
                                     do_sample=False, temperature=0.2,
                                     eos_token_id=llm_tokenizer.eos_token_id,
                                     pad_token_id=llm_tokenizer.pad_token_id)
        gen = out[0][inputs["input_ids"].shape[1]:]
        return llm_tokenizer.decode(gen, skip_special_tokens=True).strip()

    # 3) 선택 인덱스에서 검색 + 점수
    scored = faiss_search_with_scores_from_index(idx_entry, query, top_k=TOP_K)
    best_cos = max((s for _, s in scored), default=0.0)

    # 4) 임계치: 충분히 유사할 때만 컨텍스트 사용 (속도 최적화)
    THRESH = 0.70
    use_context = best_cos >= THRESH and len(scored) > 0

    context = pack_context([d for d, _ in scored], token_budget=CTX_TOKEN_BUDGET) if use_context else None
    prompt, max_len = build_prompt(query, context)

    # 5) LLM 생성
    inputs = llm_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=max_len, padding=False)
    inputs = {k: v.to(llm_model.device) for k, v in inputs.items()}
    with torch.inference_mode():
        out = llm_model.generate(
            **inputs,
            max_new_tokens=dynamic_max_new_tokens(query),
            do_sample=False,
            temperature=0.2,
            eos_token_id=llm_tokenizer.eos_token_id,
            pad_token_id=llm_tokenizer.pad_token_id,
        )
    gen = out[0][inputs["input_ids"].shape[1]:]
    return llm_tokenizer.decode(gen, skip_special_tokens=True).strip()


In [9]:
# ---------------- LLM 로드 & 생성 ----------------
llm_model = AutoModelForCausalLM.from_pretrained(
    LLM_ID,
    device_map="auto",
    load_in_4bit=True,
    torch_dtype=torch.float16
)
if torch.cuda.is_available():
    torch.backends.cuda.matmul.allow_tf32 = True
    try:
        # llm_model.config.attn_implementation = "sdpa"
        llm_model.config.attn_implementation = "flash_attention_2"
        # llm_model.config.attn_implementation = "eager"
    except Exception:
        pass
llm_model.eval()
torch.set_grad_enabled(False)

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

<torch.autograd.grad_mode.set_grad_enabled at 0x754948a82c20>

In [17]:
# q = """개인정보보호법 제22조의2에 따라 만 14세 미만 아동의 개인정보를 처리하기 위해 필요한 절차로 옳은 것은?
# 1 아동의 학교의 동의를 받아야 한다.
# 2 법정대리인의 동의를 받아야 한다.
# 3 아동 본인의 동의만 받으면 된다.
# 4 아동의 친구의 동의를 받아야 한다."""

# q = """고려대학교에 대해서 설명해줘."""

# q = """정보통신망법 제44조의2에 따르면, 정보통신망을 통해 공개된 정보로 인해 사생활 침해나 명예훼손이 발생한 경우, 침해를 받은 자가 정보통신서비스 제공자에게 요청할 수 있는 조치는 무엇인가?
# 1 정보의 공개 및 공유
# 2 정보의 수정 및 재배포
# 3 정보의 접근권 제한 및 열람 기록 보관
# 4 정보의 삭제 또는 반박내용의 게재
# 5 정보의 암호화 및 보호"""

# q = """정보통신망법 제22조의2에 따라 이동통신단말장치의 운영체제를 제작하여 공급하는 자가 해야 할 조치로 옳은 것은?
# 1 이용자의 동의 없이 접근권한 설정
# 2 접근권한 철회 기능 구현
# 3 접근권한 설정에 대한 이용자 통보 생략
# 4 모든 접근권한에 대한 동의 철회 불가
# 5 운영체제 내 접근권한 요청 기록을 영구적으로 저장하지 않아야 한다."""

q = """신용정보법 제36조에 따르면, 신용정보주체가 상거래 거절의 근거가 된 신용정보에 대해 이의를 제기할 수 있는 기간은?
1 60일 이내
2 90일 이내
3 30일 이내
4 45일 이내"""

print(generate_answer_with_indices(q, indices))

정답: 1 60일 이내
