In [None]:
import os
import re
import unicodedata
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from funcs.logic import process_pdfs_to_faiss_with_positions
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph
from funcs.logic import get_qa_score
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from docx import Document as DocxDocument
from tqdm import tqdm  # 진행 상황 표시용

source_dir = "path/to/sources"
output_dir = "data/case2"
# embedding_model = "intfloat/multilingual-e5-small"
embedding_model = "intfloat/multilingual-e5-large-instruct"
chunk_size = 500  # 각 청크의 크기 (문자 수)
chunk_overlap = 50  # 청크 간 겹침 크기 (문자 수)

In [None]:
# 1️⃣ Word 문서에서 텍스트 추출하고 전처리하는 함수
def extract_and_preprocess_text_from_docx(file_path):
    try:
        doc = DocxDocument(file_path)
        # 문단 텍스트 추출 (빈 줄 제외)
        paragraphs = [para.text for para in doc.paragraphs if para.text.strip()]

        # 문단을 개행문자로 결합
        text = "\n".join(paragraphs)

        # 텍스트 전처리
        text = preprocess_text(text)

        return text
    except Exception as e:
        print(f"⚠️ {file_path} 파일 처리 중 오류 발생: {str(e)}")
        return ""

# 2️⃣ 텍스트 전처리 함수
def preprocess_text(text):
    # 한글 유니코드 정규화 (NFC)
    text = unicodedata.normalize('NFC', text)

    # 불필요한 특수 문자 및 공백 제거
    text = re.sub(r'[\u200b\u200c\u200d\u2060\ufeff]', '', text)  # 보이지 않는 특수 문자 제거
    text = re.sub(r'\s+', ' ', text)  # 연속된 공백 제거

    # 각 줄 앞뒤 공백 제거
    lines = [line.strip() for line in text.split('\n')]
    text = '\n'.join(lines)

    # 빈 줄 제거 (연속된 줄바꿈)
    text = re.sub(r'\n\s*\n', '\n', text)

    return text.strip()

# 3️⃣ 텍스트를 청크로 분할하는 함수
def create_chunks_from_texts(texts, metadata_list, chunk_size=500, chunk_overlap=50):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", ". ", "? ", "! ", " ", ""],
        length_function=len
    )

    documents = []
    for text, metadata in zip(texts, metadata_list):
        # 텍스트가 있는 경우에만 청킹 진행
        if text:
            chunks = text_splitter.split_text(text)
            # 각 청크별로 문서 생성
            for i, chunk in enumerate(chunks):
                chunk_metadata = metadata.copy()  # 원본 메타데이터 복사
                chunk_metadata["chunk_id"] = i    # 청크 ID 추가
                documents.append(Document(page_content=chunk, metadata=chunk_metadata))

    return documents

# 4️⃣ 메인 함수: 여러 개의 Word 파일을 읽어와서 벡터 DB 생성
def process_word_documents_to_vector_db(doc_dir, output_dir,
                                        embedding_model_name="intfloat/multilingual-e5-small",
                                        chunk_size=500, chunk_overlap=50):
    # Word 파일 목록 수집
    word_files = [f for f in os.listdir(doc_dir) if f.endswith(".docx")]

    if not word_files:
        print("⚠️ 처리할 Word 파일이 없습니다.")
        return

    print(f"🔍 총 {len(word_files)}개의 Word 파일을 처리합니다...")

    # 텍스트 추출 및 전처리
    texts = []
    metadata_list = []

    for file_name in tqdm(word_files, desc="파일 처리 중"):
        file_path = os.path.join(doc_dir, file_name)
        text = extract_and_preprocess_text_from_docx(file_path)

        if text:
            texts.append(text)
            # 메타데이터 생성 (파일명, 경로 등)
            metadata = {
                "source": file_name,
                "full_path": file_path,
                "file_type": "docx"
            }
            metadata_list.append(metadata)

    # 텍스트 청킹
    print(f"📄 텍스트를 청크 크기 {chunk_size}(겹침 {chunk_overlap})로 분할합니다...")
    documents = create_chunks_from_texts(texts, metadata_list, chunk_size, chunk_overlap)

    if not documents:
        print("⚠️ 처리할 문서가 없습니다.")
        return

    print(f"✅ {len(documents)}개의 청크가 생성되었습니다.")

    # 임베딩 모델 로드
    print(f"🧠 임베딩 모델 '{embedding_model_name}'을 로드합니다...")
    embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)

    # FAISS 벡터 DB 생성
    print("🔢 벡터 DB를 생성합니다...")
    vector_db = FAISS.from_documents(documents, embedding_model)

    # 벡터 DB 저장
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    vector_db.save_local(output_dir)
    print(f"✅ '{output_dir}'에 벡터 데이터베이스 저장 완료")

    return vector_db

In [None]:
# 1️⃣ FAISS 벡터 DB 로드
qa_tokenizer = AutoTokenizer.from_pretrained("Dongjin-kr/ko-reranker")
qa_model = AutoModelForSequenceClassification.from_pretrained("Dongjin-kr/ko-reranker")


# 메인 함수 실행
vector_db = process_word_documents_to_vector_db(
    source_dir,
    output_dir,
    embedding_model,
    chunk_size,
    chunk_overlap
)


In [None]:
# LangGraph pipeline 실행
class QAState(TypedDict):
    question: str
    retrieved_docs: Optional[List]
    reranked_docs: Optional[List]
    top_docs: Optional[List]
    answer: Optional[str]


def retrieve_documents(state: QAState):
    question = state["question"]
    retrieved_docs = vector_db.similarity_search_with_score(question, k=30)
    return {"retrieved_docs": retrieved_docs}


def rerank_documents(state: QAState):
    question = state["question"]
    retrieved_docs = state["retrieved_docs"]

    scored_docs = []
    _score = []
    for doc, _ in retrieved_docs:
        score = get_qa_score(question, doc.page_content, qa_tokenizer, qa_model)
        scored_docs.append((doc, score))
        _score.append(score)

    _average = sum(_score) / len(_score)

    reranked_docs = sorted(scored_docs, key=lambda x: x[1], reverse=True)
    # top_docs = [doc for doc, _ in reranked_docs[:10]]
    top_docs = []
    for doc, score in reranked_docs[:10]:
        doc.metadata["score"] = score
        if score > _average:
            top_docs.append(doc)

    return {
        "reranked_docs": reranked_docs,
        "top_docs": top_docs,
    }





In [None]:
graph = StateGraph(QAState)

graph.add_node("retrieve_documents", retrieve_documents)
graph.add_node("rerank_documents", rerank_documents)

graph.set_entry_point("retrieve_documents")
graph.add_edge("retrieve_documents", "rerank_documents")


graph.set_finish_point("retrieve_documents")

qa_graph = graph.compile()


In [None]:
import csv


# CSV 파일을 읽어 딕셔너리 리스트로 저장
data_list = []
with open("path/to/sources", newline='', encoding="utf-8-sig") as csvfile:
    reader = csv.DictReader(csvfile)  # 각 행을 딕셔너리로 변환
    for row in reader:
        op = {
            'code':row['code'],
            'item':row['item'],
            'exp' : row['exp'],
              }
        result = qa_graph.invoke({"question": f"{row['item']}"})

        op['result'] = result
        data_list.append(op)

In [None]:
import pandas as pd


result_list = []
for _documents in data_list:
    for _docs in _documents['result']['top_docs']:
        _dummy = {
            "code":_documents['code'],
            "item":_documents['item'],
            "page_content": _docs.page_content,
            "score": _docs.metadata['score'],
        }
        result_list.append(_dummy)


df = pd.DataFrame(result_list)



mean_scores = df.groupby('code')['score'].mean().reset_index()

import seaborn as sns
import matplotlib.pyplot as plt




# 박스 플롯 그리기
plt.figure(figsize=(12, 6))
sns.boxplot(x='code', y='score', data=df)
plt.xticks(rotation=90)
plt.title('Score Distribution by Code')
plt.show()

In [None]:
result_list

In [None]:
df

In [None]:
data_list