## 의미 기반 Chunking 소개

Text chunking은 RAG(Retrieval-Augmented Generation) 시스템에서 꼭 필요한 과정으로, 긴 텍스트를 더 의미 있는 단위로 나누어 검색 정확도를 높이는 데 사용됩니다.
고정된 길이로 나누는 방식과 달리, **의미 기반 chunking**은 이어지는 문장 간의 **유사도**를 기준으로 텍스트를 나눕니다.

#### 분할 기준 방법

* **백분위(Percentile)**: 모든 문장 간 유사도 차이 중 X번째 백분위 값을 구하고, 이보다 큰 폭으로 유사도가 떨어지는 지점을 기준으로 분할합니다.
* **표준편차(Standard Deviation)**: 전체 평균보다 X 표준편차만큼 유사도가 급격히 떨어지는 지점을 기준으로 분할합니다.
* **사분위 범위(IQR)**: Q3(3사분위) - Q1(1사분위)로 계산된 IQR 값을 기준으로, 이보다 큰 차이가 발생하는 지점을 분할 지점으로 사용합니다.

이 노트북에서는 **percentile method**을 활용한 의미 기반 chunking을 구현하고, 예시 텍스트를 이용해 성능을 평가합니다.


## 환경 설정하기
필요한 라이브러리를 가져옵니다.

In [None]:
import os
import numpy as np
from dotenv import load_dotenv

# .env 파일 로드
load_dotenv()

API_KEY = os.environ.get('api_key')
gemini_API_KEY = os.environ.get('gemini_api_key')

## OpenAI API 클라이언트 설정하기

In [None]:
from openai import OpenAI

client_openai = OpenAI(api_key = API_KEY)

## PDF 파일에서 텍스트 추출하기
RAG를 구현하려면 먼저 텍스트 데이터 소스가 필요합니다. 저는 gemini를 이용해 pdf에서 텍스트를 추출하는 방식을 사용합니다.  
만약 txt 형태로 파일이 존재한다면 `load_text_file` 함수를 사용하면됩니다.

In [None]:
import google.generativeai as genai

def extract_text_from_pdf(pdf_path):
    # API 키 설정
    genai.configure(api_key=gemini_API_KEY)
    client = genai.GenerativeModel('gemini-2.0-flash-lite')

    # PDF 파일 업로드
    with open(pdf_path, "rb") as file:
        file_data = file.read()


    prompt = "Extract all text from the provided PDF file."
    response = client.generate_content([
        {"mime_type": "application/pdf", "data": file_data},
        prompt
    ],generation_config={
            "max_output_tokens": 8192  # 최대 출력 토큰 수 설정 (예: 8192 토큰, 약 24,000~32,000자)
    })
    return response.text

In [None]:
# 이미 text 파일로 저장되어 있다면 load_text_file 함수를 사용하면 됩니다.
def load_text_file(pdf_path):

    # text 파일 로드
    with open(pdf_path, "r", encoding="utf-8") as txt_file:
        text = txt_file.read()

    return text

txt_path = "./data_creation/pdf_data/(1) 2024 달라지는 세금제도.txt"

extracted_text = load_text_file(txt_path)
print(extracted_text[:500])

## 문장 단위 임베딩 생성
텍스트를 문장 단위로 나눈 뒤, 각 문장에 대한 임베딩을 생성합니다.

In [None]:
import torch
from sentence_transformers import SentenceTransformer
def create_embeddings(embedding_model, texts, device='cuda', batch_size=16):
    """
    SentenceTransformer 모델을 사용하여 텍스트의 임베딩 생성

    Args:
        embedding_model: 임베딩을 생성할 SentenceTransformer 모델입니다.
        texts (list): 임베딩을 생성할 입력 텍스트 리스트입니다.
        device (str): 모델을 실행할 장치 ('cuda' for GPU, 'cpu' for CPU).
        batch_size (int): 한번에 처리할 텍스트의 개수

    Returns:
        np.ndarray: 임베딩
    """
    # 모델이 지정된 장치에 있는지 확인합니다.
    embedding_model = embedding_model.to(device)
    
    # 지정된 배치 크기로 임베딩을 생성합니다.
    embeddings = embedding_model.encode(
        texts,
        device=device,
        batch_size=batch_size,  # 메모리 사용량을 줄이기 위해 더 작은 배치 크기를 사용합니다.
        show_progress_bar=True  # 인코딩 진행 상태를 모니터링하기 위해 진행 표시줄 출력
    )
    
    return embeddings

# GPU 사용 가능 여부 확인
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# 모델 로드
model = "BAAI/bge-m3"
embedding_model = SentenceTransformer(model)

text_chunks = extracted_text.split(". ")

# 임베딩 생성
embeddings = create_embeddings(embedding_model, text_chunks, device=device, batch_size=4)

print(f"Generated {len(embeddings)} sentence embeddings.")

## 문장간 유사도 차이 계산하기
연속된 문장 사이의 코사인 유사도를 계산합니다.

In [14]:
def cosine_similarity(vec1, vec2):
    """
    두 벡터 간의 코사인 유사도를 계산합니다.

    Args:
    vec1 (np.ndarray): 첫 번째 벡터입니다.
    vec2 (np.ndarray): 두 번째 벡터입니다.

    Returns:
    float: 두 벡터 간의 코사인 유사도입니다.
    """
    # 두 벡터의 내적을 계산하고 두 벡터의 크기의 곱으로 나눕니다.
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

# 연속된 문장 사이의 코사인 유사도를 계산합니다.
similarities = [cosine_similarity(embeddings[i], embeddings[i + 1]) for i in range(len(embeddings) - 1)]

## 시맨틱 청킹 구현
breakpoints을 찾는 세 가지 방법을 구현합니다.

In [None]:
def compute_breakpoints(similarities, method="percentile", threshold=90):
    """
    유사도가 감소하는 부분을 기준으로 청킹 지점을 계산합니다.

    Args:
    similarities (List[float]): 문장 간 유사도 점수 목록
    method (str): 'percentile', 'standard_deviation', 'interquartile'
    threshold (float): 임계값 

    Returns:
    List[int]: 청킹 지점의 인덱스 목록입니다.
    """
    # 선택된 방법에 따라 임계값 계산
    if method == "percentile":
        # 유사도 점수의 X번째 백분위 값 계산
        threshold_value = np.percentile(similarities, threshold)
    elif method == "standard_deviation":
        # 유사도 점수의 평균과 표준편차 계산
        mean = np.mean(similarities)
        std_dev = np.std(similarities)
        # 임계값을 평균에서 X 표준편차만큼 뺌
        threshold_value = mean - (threshold * std_dev)
    elif method == "interquartile":
        # 첫 번째와 세 번째 사분위수(Q1과 Q3) 계산
        q1, q3 = np.percentile(similarities, [25, 75])
        # IQR 규칙을 사용하여 임계값 계산
        threshold_value = q1 - 1.5 * (q3 - q1)
    else:
        # 유효하지 않은 방법이 제공되면 오류 발생
        raise ValueError("Invalid method. Choose 'percentile', 'standard_deviation', or 'interquartile'.")

    # 유사도가 임계값 아래로 떨어지는 인덱스 식별
    return [i for i, sim in enumerate(similarities) if sim < threshold_value]

# 백분위 방법을 사용하여 임계값 90에서 청킹 지점을 계산합니다.
breakpoints = compute_breakpoints(similarities, method="percentile", threshold=90)

## 텍스트를 시맨틱 청크로 분할
계산된 청크 기준으로 텍스트를 분할합니다.

In [None]:
def split_into_chunks(sentences, breakpoints):
    """
    문장을 semantic chunk단위로 분할

    Args:
    sentences (List[str]): 문장 리스트
    breakpoints (List[int]): breakpoint 리스트

    Returns:
    List[str]: 청크 리스트
    """
    chunks = []  # 청크 리스트
    start = 0  # 시작 인덱스

    # breakpoint를 하나씩 증가시키며 청크 생성
    for bp in breakpoints:
        # 시작 인덱스부터 현재 breakpoint까지의 문장 청크 추가
        chunks.append(". ".join(sentences[start:bp + 1]) + ".")
        start = bp + 1  # 시작 인덱스를 breakpoint 다음 문장으로 업데이트

    # 남은 문장을 마지막 청크로 추가
    chunks.append(". ".join(sentences[start:]))
    return chunks  # 청크 리스트 반환

# 청크 생성
text_chunks = split_into_chunks(text_chunks, breakpoints)

# 청크 개수 출력
print(f"Number of semantic chunks: {len(text_chunks)}")

# 청크 출력
print("\nFirst text chunk:")
print(text_chunks[0])


## 시맨틱 청크 임베딩 생성
유저 쿼리로 검색을 하기 위해 각 청크에 대한 임베딩 생성

In [None]:
# 임베딩 생성
embeddings = create_embeddings(embedding_model, text_chunks, device=device, batch_size=4)

print(f"Generated {len(embeddings)} sentence embeddings.")

## Semantic Search
코사인 유사도를 구현하여 사용자 쿼리에 가장 관련성이 높은 텍스트 청크를 찾습니다.

In [None]:
def semantic_search(query, text_chunks, chunk_embeddings, k=5):
    """
    주어진 쿼리와 임베딩을 이용해 텍스트 chunk에 대해 Semantic Search 수행

    Args:
    query (str): 사용자 쿼리
    text_chunks (List[str]): 검색할 텍스트 청크 리스트
    embeddings (List[dict]): 텍스트 청크에 대한 임베딩 리스트
    k (int): 상위 k개의 관련 텍스트 청크를 반환

    Returns:
    List[str]: 쿼리와 가장 유사한 상위 k개의 텍스트 청크 리스트
    """
    # 쿼리에 대한 임베딩 생성
    query_embedding = create_embeddings(embedding_model, [query])[0]
    
    # 쿼리 임베딩과 각 청크 임베딩 간의 코사인 유사도 계산
    similarities = [cosine_similarity(query_embedding, emb) for emb in chunk_embeddings]
    # 가장 유사한 상위 k개의 청크의 인덱스 추출
    top_indices = np.argsort(similarities)[-k:][::-1]
    
    # 가장 유사한 상위 k개의 청크 반환
    return [text_chunks[i] for i in top_indices]

## 테스트

In [None]:
import pandas as pd

# 평가 데이터 로드하기
df = pd.read_csv('./data_creation/rag_val_new_post.csv')
df.head()

In [None]:
# 평가 데이터에서 첫 번째 쿼리 추출
query = df['query'][0]

# 시맨틱 검색 수행
top_chunks = semantic_search(query, text_chunks, embeddings, k=3)

# 쿼리 출력
print("Query:", query)

# 쿼리와 연관있는 3개의 chunk 출력
for i, chunk in enumerate(top_chunks):
    print(f"Context {i + 1}:\n{chunk}\n=====================================")

## 검색된 청크를 기반으로 response 생성하기

In [None]:
def generate_response(system_prompt, user_message ,model_name='gpt-4.1-nano'):
    
    response = client_openai.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "system", "content": system_prompt},# Define the system prompt for the AI assistant
            {"role": "user", "content": user_message}
        ],
        temperature=0.1,
        top_p=0.9,
        max_tokens=1024,
    )
    # print(response.choices[0].message.content)

    return response

# 사용자 프롬프트
user_prompt = "\n".join([f"Context {i + 1}:\n{chunk}\n=====================================\n" for i, chunk in enumerate(top_chunks)])
user_prompt = f"{user_prompt}\n질문: {query}"

# 시스템 프롬프트
system_prompt = "당신은 제공된 Context에 기반하여 답변하는 AI 어시스턴트입니다. 답변이 컨텍스트에서 직접 도출될 수 없는 경우, 다음 문장을 사용하세요: '해당 질문에 답변할 충분한 정보가 없습니다.'"

# 응답 생성
ai_response = generate_response(system_prompt, user_prompt)

In [None]:
print(ai_response.choices[0].message.content)

## 생성 응답 평가하기
생성된 답변과 예상 답변을 비교하여 평가합니다. 답변 평가시에는 LLM을 사용합니다.

In [None]:
# 평가를 위한 시스템 프롬프트
evaluate_system_prompt = "You are an intelligent evaluation system tasked with assessing the AI assistant's responses. If the AI assistant's response is very close to the true response, assign a score of 1. If the response is incorrect or unsatisfactory in relation to the true response, assign a score of 0. If the response is partially aligned with the true response, assign a score of 0.5."

# 사용자 쿼리, 생성된 답변, 참고(정답) 답변, 평가 시스템 프롬프트를 결합하여 평가 프롬프트 생성
evaluation_prompt = f"User Query: {query}\nAI Response:\n{ai_response.choices[0].message.content}\nTrue Response: {df['generation_gt'][0]}\n{evaluate_system_prompt}"

# 평가 응답 생성
evaluation_response = generate_response(evaluate_system_prompt, evaluation_prompt,'gpt-4.1-mini')

# 평가 응답 출력
print(evaluation_response.choices[0].message.content)

## 전체 추론 및 평가

In [None]:
# 생성된 답변과 실제 답변을 비교하여 점수를 부여합니다.
result = []

for i in range(len(df)):
    query = df['query'][i]
    top_chunks = semantic_search(query, text_chunks, embeddings, k=3)

    # 답변 생성용 시스템 프롬프트
    user_prompt = "\n".join([f"Context {i + 1}:\n{chunk}\n=====================================\n" for i, chunk in enumerate(top_chunks)])
    user_prompt = f"{user_prompt}\n질문: {query}"

    # 답변 생성용 유저 프롬프트
    system_prompt = "당신은 제공된 Context에 기반하여 답변하는 AI 어시스턴트입니다. 답변이 컨텍스트에서 직접 도출될 수 없는 경우, 다음 문장을 사용하세요: '해당 질문에 답변할 충분한 정보가 없습니다.'"

    # response 생성
    ai_response = generate_response(system_prompt, user_prompt, 'gpt-4.1-mini')

    print(ai_response.choices[0].message.content)

    # 평가 시스템 프롬프트
    evaluate_system_prompt = "You are an intelligent evaluation system tasked with assessing the AI assistant's responses. If the AI assistant's response is very close to the true response, assign a score of 1. If the response is incorrect or unsatisfactory in relation to the true response, assign a score of 0. If the response is partially aligned with the true response, assign a score of 0.5. \n The answer should only output numbers and should not output any words."

    # 사용자 쿼리, 생성된 답변, 참고(정답) 답변, 평가 시스템 프롬프트를 결합하여 평가 프롬프트 생성
    evaluation_prompt = f"User Query: {query}\nAI Response:\n{ai_response.choices[0].message.content}\nTrue Response: {df['generation_gt'][i]}\n{evaluate_system_prompt}"

    # 평가 응답 생성
    evaluation_response = generate_response(evaluate_system_prompt, evaluation_prompt,'gpt-4.1-mini')

    # 평가 응답 출력
    print(evaluation_response.choices[0].message.content)
    result.append(evaluation_response.choices[0].message.content)

## 점수 계산

In [None]:
import numpy as np
result = [float(x.replace('\n',''))for x in result]
print(np.mean(result))