In [None]:
import re
import os
import json
import pdfplumber
from dotenv import load_dotenv
from pypdf import PdfReader, PdfWriter

from langchain_upstage import UpstageEmbeddings

In [None]:
load_dotenv('.env', override=True)
UPSTAGE_API_KEY = os.getenv("UPSTAGE_API_KEY")

In [None]:
input_path = "./assets/ewha/ewha.pdf"

In [None]:
# make new .pdf which contains only .txt information

reader = PdfReader(input_path)
writer = PdfWriter()

exclude_pages = {19, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51}  

for i, page in enumerate(reader.pages):
    if i not in exclude_pages:
        writer.add_page(page)
        
with open("./assets/ewha/ewha_text.pdf", "wb") as f:
    writer.write(f)

In [None]:
# make new .pdf which contains only table information

reader = PdfReader(input_path)
writer = PdfWriter()

keep_pages = [19, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51]  

for page_num in keep_pages:
    if page_num < len(reader.pages):
        writer.add_page(reader.pages[page_num])

with open("./assets/ewha/ewha_table.pdf", "wb") as f:
    writer.write(f)

### .txt

1. 장 단위로 쪼갬
2. 수작업으로 각 장 내부의 조 단위로 쪼개어 chapters.txt 파일로 저장
3. 각 장 내부에서 RecursiveTextSplitter(config: max_token=500, overlap=100)으로 쪼개어 ewha_chunk.jsonl 파일로 저장
4. embedding 후 faiss vectorstore에 저장

In [None]:
# .txt만 청킹
def extract_chapters_from_pdf(pdf_path):
    """
    PDF에서 장 단위로 텍스트를 추출하고 파싱
    
    Args:
        pdf_path: PDF 파일 경로
    
    Returns:
        list: [{chapter_num, chapter_title, content}, ...]
    """
    
    full_text = ""
    
    # 1. extract text from .pdf
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            text = page.extract_text()
            if text:
                full_text += text + "\n"
    
    # 2. remove page number
    lines = full_text.split('\n')
    cleaned_lines = []
    
    for line in lines:
        line = line.strip()
        
        if not line:
            continue
            
        if re.match(r'^[\d\-\s]+$', line):
            continue
            
        cleaned_lines.append(line)
    
    cleaned_text = '\n'.join(cleaned_lines)
    
    # 3. 장(Chapter) 단위로 분리
    # 패턴: "제N장 [제목]" 또는 "제N조[제목]"
    chapter_pattern = r'제(\d+)장\s+([^\n]+)'
    
    chapters = []
    matches = list(re.finditer(chapter_pattern, cleaned_text))
    
    for i, match in enumerate(matches):
        chapter_num = match.group(1)
        chapter_title = match.group(2).strip()
        
        start_pos = match.end()
        
        if i < len(matches) - 1:
            end_pos = matches[i + 1].start()
        else:
            end_pos = len(cleaned_text)
        
        content = cleaned_text[start_pos:end_pos].strip()
        
        content = re.sub(r'\s+', ' ', content)
        content = re.sub(r'\n\s*\n', '\n', content)
        
        chapters.append({
            'chapter_num': chapter_num,
            'chapter_title': chapter_title,
            'content': content
        })
    
    return chapters


def save_chapters_to_txt(chapters, output_path="./assets/ewha/chapters.txt"):
    """
    추출된 장들을 텍스트 파일로 저장
    """
    with open(output_path, 'w', encoding='utf-8') as f:
        for chapter in chapters:
            f.write(f"제{chapter['chapter_num']}장: {chapter['chapter_title']}\n")
            f.write(f"{chapter['content']}\n\n")
    
    print(f"총 {len(chapters)}개 장이 {output_path}에 저장되었습니다.")

if __name__ == "__main__":
    pdf_path = "./assets/ewha/ewha_text.pdf"
    
    chapters = extract_chapters_from_pdf(pdf_path)
    
    for chapter in chapters:
        print(f"\n제{chapter['chapter_num']}장: {chapter['chapter_title']}")
        print(f"내용 길이: {len(chapter['content'])}자")
        print(f"내용 미리보기: {chapter['content'][:100]}...")
    
    save_chapters_to_txt(chapters)
    
    print(f"\n총 {len(chapters)}개의 장이 추출되었습니다.")

직접 ./assets/chapters.txt의 각 장 내부를 조 단위로 쪼갬. 

In [None]:

# ---------------------------
# 1) 장 기준 분리 함수
# ---------------------------
def split_by_chapter(text):
    pattern = r"(제\d+장[:\s]*[^\n]*)"
    parts = re.split(pattern, text)

    if not parts[0].startswith("제"):
        parts = parts[1:]

    chapters = []
    for i in range(0, len(parts), 2):
        chapter_title = parts[i].strip()
        chapter_body = parts[i+1].strip() if i+1 < len(parts) else ""
        chapters.append((chapter_title, chapter_body))

    return chapters

# ---------------------------
# 2) 청킹 함수
# ---------------------------
def chunk_text(text, max_char=500, overlap=100):
    chunks = []
    start = 0
    while start < len(text):
        end = start + max_char
        chunk = text[start:end]
        chunks.append(chunk)

        start = end - overlap
        if start < 0:
            start = 0
    return chunks

# ---------------------------
# 3) 장별 청킹
# ---------------------------
def chunk_by_chapter(text, max_char=500, overlap=100):
    chapters = split_by_chapter(text)
    result = []

    for title, body in chapters:
        chunks = chunk_text(body, max_char, overlap)
        for idx, chunk in enumerate(chunks):
            result.append({
                "chapter": title,
                "chunk_id": idx,
                "text": chunk
            })

    return result


# ---------------------------
# 4) 실제 실행 + JSONL 저장
# ---------------------------

input_path = "./assets/ewha/chapters.txt"   
output_path = "./assets/ewha/ewha_chunk_text.jsonl"

with open(input_path, "r", encoding="utf-8") as f:
    text = f.read()

chunks = chunk_by_chapter(text, max_char=500, overlap=100)

with open(output_path, "w", encoding="utf-8") as f:
    for item in chunks:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")

print(f"저장 완료! → {output_path}")


### table
- Claude.ai으로 ewha_table.pdf를 텍스트 형태로 ewha_table.jsonl 파일 생성

In [None]:
# 입력 및 출력 파일 경로
input_file = './assets/ewha/ewha_table.jsonl'
output_file = './assets/ewha/ewha_chunk_table.jsonl'

# chunk_id 시작 번호
chunk_id = 46

with open(input_file, 'r', encoding='utf-8') as f_in, \
     open(output_file, 'w', encoding='utf-8') as f_out:
    
    for line in f_in:
        data = json.loads(line.strip())
        
        table_type = data.get('table_type', '')
        college = data.get('college', '')
        content = data.get('content', '')
        
        if college:
            text = f"{table_type} {college} {content}"
        else:
            text = f"{table_type} {content}"
        
        # ewha_chunk_text.jsonl과 구조 통합
        new_data = {
            'text': text,
            'metadata': {
                'chunk_id': chunk_id
            }
        }
        
        f_out.write(json.dumps(new_data, ensure_ascii=False) + '\n')
        
        # chunk_id 증가
        chunk_id += 1

print(f"변환 완료! 총 {chunk_id - 46}개의 레코드가 변환되었습니다.")
print(f"출력 파일: {output_file}")
        

In [None]:
import json
from langchain_core.documents import Document
from langchain_community.vectorstores import FAISS

faiss_save_dir = "./faiss/faiss_ewha"              

# ---------------------------
# 1) JSONL → Document 리스트 변환
# ---------------------------

docs = []

# JSONL 파일 경로 리스트
jsonl_files = [
    "./assets/ewha/ewha_chunk_text.jsonl",
    "./assets/ewha/ewha_chunk_table.jsonl"
]

# 각 파일을 순회하며 Document 생성
for jsonl_path in jsonl_files:
    with open(jsonl_path, "r", encoding="utf-8") as f:
        for line in f:
            item = json.loads(line)
            text = item["text"]
            
            metadata = {}
            
            if "chunk_id" in item:
                metadata["chunk_id"] = item["chunk_id"]
            elif "metadata" in item and "chunk_id" in item["metadata"]:
                metadata["chunk_id"] = item["metadata"]["chunk_id"]
            
            docs.append(Document(page_content=text, metadata=metadata))

print(f"총 Document 개수: {len(docs)}")


In [None]:
upstage_embedding_model =  UpstageEmbeddings(
                            model="solar-embedding-1-large-passage",
                            api_key=UPSTAGE_API_KEY)

In [None]:
# ---------------------------
# 2) FAISS vectorstore 생성
# ---------------------------
docs = sorted(docs, key=lambda x: x.page_content) # 코드 추가
db = FAISS.from_documents(docs, upstage_embedding_model)

# ---------------------------
# 3) 저장
# ---------------------------
db.save_local(faiss_save_dir)

print(f"FAISS 저장 완료 → {faiss_save_dir}")