In [1]:
# !pip install pdfplumber

In [4]:
import sys
print(sys.executable)

C:\Users\Admin\anaconda3\envs\llm\python.exe


In [2]:
# !{sys.executable} -m pip install pdfplumber

In [7]:
import pdfplumber
print(pdfplumber.__version__)

0.11.9


In [29]:
import os
import re
import csv
import uuid
from typing import List, Dict
from docx import Document

# =========================
# (1) 설정값
# =========================
MAX_LEN = 1500  # ✅ 1500자 기준 청킹 상한선

# 타입별 우선순위(너가 정한 룰)
PRIORITY_MAP = {
    "law": 1,   # 주택임대차보호법/법률 쪽
    "rule": 2,  # 시행령/시행규칙/대법원규칙
    "case": 4,  # 판례(지금은 안 함)
}


# 내지, 삭제 등 실질 내용이 없는 조문 제거
def is_meaningful_text(text: str) -> bool:
    
    t = text.strip()

    # 너무 짧으면 의미 없음
    if len(t) < 20:
        return False

    # 생략/삭제 표식
    if t in ["내지", "삭제"]:
        return False

    # 전문개정 표시 제거
    if t.startswith("전문개정"):
        return False

    return True


# =========================
# (2) DOCX 읽기
# =========================
def load_docx_text(file_path: str) -> str:
    """
    docx 문서의 모든 문단을 읽어서 한 덩어리 문자열로 합침.
    - 빈 줄/공백만 있는 문단은 제거
    """
    doc = Document(file_path)
    paras = []
    for p in doc.paragraphs:
        t = p.text.strip()
        if t:
            paras.append(t)
    return "\n".join(paras)


# =========================
# (3) '제n조' 단위로 조문 분리
# =========================
def split_by_article(full_text: str) -> Dict[str, str]:
    """
    전체 텍스트를 조문 키(예: 제3조, 제3조의2) 기준으로 나눔.
    반환: { "제3조": "본문...", "제3조의2": "본문..." }
    """
    # '제숫자조' 또는 '제숫자조의숫자' 패턴
    pattern = re.compile(r"(제\d+조(?:의\d+)?)")

    parts = pattern.split(full_text)
    # parts 구조 예:
    # [조문이전텍스트, "제1조", 본문, "제2조", 본문, ...]
    articles = {}

    # (1,3,5...)가 조문명, 그 다음이 본문
    for i in range(1, len(parts), 2):
        article = parts[i].strip()
        body = parts[i + 1].strip() if i + 1 < len(parts) else ""
        if body:
            articles[article] = body

    return articles


# =========================
# (4) 항(①②③...) 단위 분리
# =========================
def split_by_clause(article_body: str) -> List[Dict[str, str]]:
    """
    조문 본문을 '①②③...' 항 단위로 나눔.
    - 항이 없으면 chunk_id='본문'으로 하나만 반환
    반환: [{"chunk_id": "항①", "text": "..."}, ...] 또는 [{"chunk_id":"본문","text":"..."}]
    """
    pattern = re.compile(r"(①|②|③|④|⑤|⑥|⑦|⑧|⑨|⑩)")
    parts = pattern.split(article_body)

    # 항 표시가 없으면 통으로 반환
    if len(parts) == 1:
        return [{"chunk_id": "본문", "text": article_body.strip()}]

    clauses = []
    for i in range(1, len(parts), 2):
        mark = parts[i]              # 예: "①"
        text = parts[i + 1].strip()  # 해당 항의 내용

        if text:  # 빈 항 방지
            clauses.append({
                "chunk_id": f"항{mark}",  # 예: "항①"
                "text": text
            })
    return clauses


# =========================
# (5) 1500자 기준으로 추가 분할
# =========================
def split_by_length(text: str, base_chunk_id: str) -> List[Dict[str, str]]:
    """
    text가 MAX_LEN 초과면 잘라서 여러 청크로 분할.
    chunk_id 규칙:
    - 1500자 이하: base_chunk_id 그대로
    - 초과: base_chunk_id_1, base_chunk_id_2, ...
    """
    if len(text) <= MAX_LEN:
        return [{"chunk_id": base_chunk_id, "text": text}]

    chunks = []
    start = 0
    idx = 1

    while start < len(text):
        end = start + MAX_LEN
        piece = text[start:end]

        chunks.append({
            "chunk_id": f"{base_chunk_id}_{idx}",
            "text": piece
        })

        start = end
        idx += 1

    return chunks


# =========================
# (6) docx 1개 → 청킹 결과 리스트
# =========================
def chunk_docx(file_path: str) -> List[Dict[str, str]]:
    """
    docx 한 개를 읽어서:
    - 제n조로 분리
    - 항(①②③)으로 분리
    - 1500자 초과하면 추가 분할
    결과: [{"article":..., "chunk_id":..., "text":...}, ...]
    """
    full_text = load_docx_text(file_path)
    articles = split_by_article(full_text)

    results = []

    for article, body in articles.items():
        clauses = split_by_clause(body)

        for clause in clauses:
            # 항/본문 단위 텍스트를 1500자 기준으로 재분할
            pieces = split_by_length(clause["text"], clause["chunk_id"])

            for piece in pieces:
                if not is_meaningful_text(piece["text"]):
                    continue
                results.append({
                    "article": article,
                    "chunk_id": piece["chunk_id"],
                    "text": piece["text"]
                })

    return results


# =========================
# (7) CSV 저장
# =========================
def write_csv(output_path: str, rows: List[Dict]):
    """
    rows(list of dict)를 지정 스키마로 CSV 저장.
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    fieldnames = ["id", "category", "priority", "article", "chunk_id", "source", "text"]

    with open(output_path, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)


# =========================
# (8) 폴더 처리: law/rule → csv
# =========================
def process_docx_folder(input_dir: str, output_csv: str, source_type: str):
    """
    input_dir 안의 docx 전부 처리해서 output_csv로 저장.
    source_type: "law" 또는 "rule"
    """
    priority = PRIORITY_MAP[source_type]
    all_rows = []

    for file_name in os.listdir(input_dir):
        if not file_name.lower().endswith(".docx"):
            continue

        file_path = os.path.join(input_dir, file_name)

        # docx 한 파일 청킹 수행
        chunks = chunk_docx(file_path)

        # 청크마다 메타데이터 붙여서 rows 생성
        for c in chunks:
            all_rows.append({
                "id": str(uuid.uuid4()),
                "category": source_type,     # ✅ 여기만 바꾼다
                "priority": priority,
                "article": c["article"],
                "chunk_id": c["chunk_id"],
                "source": file_name,
                "text": c["text"],
            })

    # CSV로 저장
    write_csv(output_csv, all_rows)


# =========================
# (9) 실행 엔트리
# =========================
if __name__ == "__main__":
    # ✅ 너 프로젝트 구조에 맞게 여기만 경로 조정하면 됨
    LAW_DIR = "data/raw/law"
    RULE_DIR = "data/raw/rule"
    OUT_LAW  = r"data\processed\law\law.csv"
    OUT_RULE = r"data\processed\rule\rule.csv"

    # law.csv 생성
    process_docx_folder(LAW_DIR, OUT_LAW, source_type="law")
    print(f"[OK] law.csv 생성 완료 → {OUT_LAW}")

    # rule.csv 생성
    process_docx_folder(RULE_DIR, OUT_RULE, source_type="rule")
    print(f"[OK] rule.csv 생성 완료 → {OUT_RULE}")

    # case는 PDF라서 다음 단계에서 처리할 거라 지금은 스킵
    print("[SKIP] case/pdf 처리는 다음 단계에서 진행")

[OK] law.csv 생성 완료 → data\processed\law\law.csv
[OK] rule.csv 생성 완료 → data\processed\rule\rule.csv
[SKIP] case/pdf 처리는 다음 단계에서 진행


In [3]:
# 검증

# import pandas as pd

# df = pd.read_csv(r"C:\ai\source\프로젝트 2\data\processed\law\law.csv")

# df[df["text"].str.contains("내지", na=False)]

In [4]:
import os
import re
import uuid
import pdfplumber
from typing import List, Dict

MAX_LEN = 1500

# =========================
# (CASE-1) PDF 전체 텍스트 로드
# =========================
def load_pdf_text(file_path: str) -> str:
    """
    PDF를 페이지 순서대로 읽어 전체 텍스트로 합침
    """
    texts = []
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            t = page.extract_text()
            if t:
                texts.append(t.strip())
    return "\n".join(texts)


# =========================
# (CASE-2) 사례 단위 분리
# =========================
def split_by_case(text: str) -> Dict[str, str]:
    """
    '사례 1', '사례 2' 기준으로 텍스트 분리
    반환: { '사례 1': '내용...', '사례 2': '내용...' }
    """
    pattern = re.compile(r"(사례\s*\n?\s*\d+)")
    parts = pattern.split(text)

    cases = {}
    for i in range(1, len(parts), 2):
        case_title = parts[i].strip()
        body = parts[i + 1].strip() if i + 1 < len(parts) else ""
        if body:
            cases[case_title] = body

    return cases

# ============================

def normalize_case_title(case_title: str) -> str:
    """
    '사례\\n1', '사례  2' → '사례 1'
    """
    num = re.findall(r"\d+", case_title)
    if not num:
        return case_title.strip()
    return f"사례 {num[0]}"


# =========================
# (CASE-3) 1500자 기준 분할
# =========================
def split_by_length(text: str, case_no: int) -> List[Dict[str, str]]:
    """
    사례 본문을 1500자 기준으로 분할
    """
    chunks = []

    if len(text) <= MAX_LEN:
        return [{
            "chunk_id": f"case_{case_no}_1",
            "text": text
        }]

    start = 0
    idx = 1
    while start < len(text):
        chunks.append({
            "chunk_id": f"case_{case_no}_{idx}",
            "text": text[start:start + MAX_LEN]
        })
        start += MAX_LEN
        idx += 1

    return chunks


# =========================
# (CASE-4) case 폴더 → case.csv
# =========================
def process_case_folder(input_dir: str, output_csv: str):
    rows = []

    for file_name in os.listdir(input_dir):
        if not file_name.lower().endswith(".pdf"):
            continue

        file_path = os.path.join(input_dir, file_name)
        full_text = load_pdf_text(file_path)
        cases = split_by_case(full_text)

        for case_title, body in cases.items():
            norm_title = normalize_case_title(case_title)
            case_no = int(re.findall(r"\d+", norm_title)[0])
        
            pieces = split_by_length(body, case_no)

            for piece in pieces:
                # 너무 짧은 조각 제거
                if len(piece["text"].strip()) < 50:
                    continue

                rows.append({
                    "id": str(uuid.uuid4()),
                    "category": "case",
                    "priority": 4,
                    "article": norm_title,          # ← 사례 1, 사례 2
                    "chunk_id": piece["chunk_id"],  # ← case_1_1 이런 식으로 일치
                    "source": file_name,
                    "text": piece["text"]
                })

    # CSV 저장
    import csv
    os.makedirs(os.path.dirname(output_csv), exist_ok=True)
    with open(output_csv, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(
            f,
            fieldnames=["id", "category", "priority", "article", "chunk_id", "source", "text"]
        )
        writer.writeheader()
        writer.writerows(rows)

In [39]:
CASE_DIR = r"C:\ai\source\프로젝트 2\data\raw\case"
OUT_CASE = r"C:\ai\source\프로젝트 2\data\processed\case\case.csv"

process_case_folder(CASE_DIR, OUT_CASE)
print(f"[OK] case.csv 생성 완료 → {OUT_CASE}")

[OK] case.csv 생성 완료 → C:\ai\source\프로젝트 2\data\processed\case\case.csv


In [40]:
import pandas as pd

case_path = r"C:\ai\source\프로젝트 2\data\processed\case\case.csv"
df_case = pd.read_csv(case_path)

df_case.head()

Unnamed: 0,id,category,priority,article,chunk_id,source,text
0,243e0786-a330-4475-a2cd-f7750119749b,case,4,사례 1,case_1_1,2025전세피해지원사례집.pdf,전세대출 연체 문제\n주요\n전세피해주택에서 퇴거하고자 하나 임대인이 연락 두절(또...
1,0e5820dc-2fc3-4122-8d35-e32b91540fc0,case,4,사례 2,case_2_1,2025전세피해지원사례집.pdf,전세피해 임차인 무이자 대출 지원 3 생업상 주거이전 문제\n전세 계약기간이 끝났지...
2,33692535-ffd9-4531-8729-a05c2dd6dd10,case,4,사례 2,case_2_2,2025전세피해지원사례집.pdf,"은 임차인이\n전세대출을 상환하기 어려운 경우, 대출보증기관(HUG, HF, SGI..."
3,675ef49b-5f37-46b0-b5e4-795e410b025a,case,4,사례 4,case_4_1,2025전세피해지원사례집.pdf,깡통주택 매수 문제 5 보증금반환소송 등 소송비용 문제\n임대인이 보증금을 돌려줄 ...
4,c6674abe-c69d-4bd1-897b-c6d358b5623c,case,4,사례 4,case_4_2,2025전세피해지원사례집.pdf,"택을 경매신청 하기 위해서는 주택에 전세권을 설정하였거나, 보증금반환청구소송\n등을..."


In [44]:
import pandas as pd

# =========================
# 설정
# =========================
FILES = [
    "data/processed/law/law.csv",
    "data/processed/rule/rule.csv",
    "data/processed/case/case.csv",
]

LAW_PRIORITY_MAP = {
    "주택임대차보호법": 1,
    "시행령": 2,
    "시행규칙/대법원규칙": 3,
    "민법": 4,
    "사례/판례/피해상담": 50,
}

# =========================
# law_type 판별 함수
# =========================
def classify_law_type(source, article):
    source = str(source)
    article = str(article)

    if "사례" in article or "판례" in source or "사례집" in source:
        return "사례/판례/피해상담"

    if "주택임대차보호법" in source and "법률" in source:
        return "주택임대차보호법"

    if "시행령" in source:
        return "시행령"

    if "시행규칙" in source or "대법원규칙" in source:
        return "시행규칙/대법원규칙"

    if "민법" in source:
        return "민법"

    return "사례/판례/피해상담"


# =========================
# 실행
# =========================
for file in FILES:
    df = pd.read_csv(file)

    df["law_type"] = df.apply(
        lambda row: classify_law_type(row.get("source"), row.get("article")),
        axis=1
    )

    df["priority"] = df["law_type"].map(LAW_PRIORITY_MAP)

    out = file.replace(".csv", "_labeled.csv")
    df.to_csv(out, index=False, encoding="utf-8-sig")

    print(f"✅ {out} 완료")

✅ data/processed/law/law_labeled.csv 완료
✅ data/processed/rule/rule_labeled.csv 완료
✅ data/processed/case/case_labeled.csv 완료
