# Introduction to Simple RAG

Retrieval-Augmented Generation (RAG) is a hybrid approach that combines information retrieval with generative models. It enhances the performance of language models by incorporating external knowledge, which improves accuracy and factual correctness.

In a Simple RAG setup, we follow these steps:

1. **Data Ingestion**: Load and preprocess the text data.
2. **Chunking**: Break the data into smaller chunks to improve retrieval performance.
3. **Embedding Creation**: Convert the text chunks into numerical representations using an embedding model.
4. **Semantic Search**: Retrieve relevant chunks based on a user query.
5. **Response Generation**: Use a language model to generate a response based on retrieved text.

This notebook implements a Simple RAG approach, evaluates the model’s response, and explores various improvements.

## Setting Up the Environment
We begin by importing necessary libraries.

In [None]:
import os
import numpy as np
from openai import OpenAI
from dotenv import load_dotenv
import json

# .env 파일 로드
load_dotenv()

API_KEY = os.environ.get('api_key')

## Extracting Text from a PDF File
To implement RAG, we first need a source of textual data. In this case, we extract text from a PDF file using the PyMuPDF library.

In [28]:

import google.generativeai as genai

def extract_text_from_pdf(pdf_path):
    # API 키 설정
    genai.configure(api_key=API_KEY)
    client = genai.GenerativeModel('gemini-2.0-flash-lite')

    # PDF 파일 업로드
    with open(pdf_path, "rb") as file:
        file_data = file.read()


    prompt = "Extract all text from the provided PDF file."
    response = client.generate_content([
        {"mime_type": "application/pdf", "data": file_data},
        prompt
    ],generation_config={
            "max_output_tokens": 10  # 최대 출력 토큰 수 설정 (예: 8192 토큰, 약 24,000~32,000자)
    })
    return response.text

In [29]:
def load_text_file(pdf_path):

    # text 파일 로드
    with open(pdf_path, "r", encoding="utf-8") as txt_file:
        text = txt_file.read()

    return text

## Chunking the Extracted Text
Once we have the extracted text, we divide it into smaller, overlapping chunks to improve retrieval accuracy.

In [32]:
def chunk_text(text, n, overlap):
    """
    Chunks the given text into segments of n characters with overlap.

    Args:|
    text (str): The text to be chunked.
    n (int): The number of characters in each chunk.
    overlap (int): The number of overlapping characters between chunks.

    Returns:
    List[str]: A list of text chunks.
    """
    chunks = []  # Initialize an empty list to store the chunks
    
    # Loop through the text with a step size of (n - overlap)
    for i in range(0, len(text), n - overlap):
        # Append a chunk of text from index i to i + n to the chunks list
        chunks.append(text[i:i + n])

    return chunks  # Return the list of text chunks

## Setting Up the OpenAI API Client
We initialize the OpenAI client to generate embeddings and responses.

In [33]:
# Initialize the OpenAI client with the base URL and API key
client = OpenAI(
    base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
    api_key=API_KEY
)

## Extracting and Chunking Text from a PDF File
Now, we load the PDF, extract text, and split it into chunks.

In [None]:
# Define the path to the PDF file
text_path = "./data_creation/pdf_data/(1) 2024 달라지는 세금제도.txt"

# Extract text from the PDF file
extracted_text = load_text_file(text_path)

# Chunk the extracted text into segments of 1000 characters with an overlap of 200 characters
text_chunks = chunk_text(extracted_text, 512, 50)

# Print the number of text chunks created
print("Number of text chunks:", len(text_chunks))

# Print the first text chunk
print("\nFirst text chunk:")
print(text_chunks[0])

In [None]:
len(text_chunks)

## Creating Embeddings for Text Chunks
Embeddings transform text into numerical vectors, which allow for efficient similarity search.

In [None]:
import torch
from sentence_transformers import SentenceTransformer
def create_embeddings(embedding_model, texts, device='cuda', batch_size=16):
    """
    Creates embeddings for the given text using the specified SentenceTransformer model.

    Args:
        embedding_model: The SentenceTransformer model to create embeddings.
        texts (list): List of input texts for which embeddings are to be created.
        device (str): Device to run the model on ('cuda' for GPU, 'cpu' for CPU).
        batch_size (int): Batch size for encoding to manage GPU memory.

    Returns:
        np.ndarray: The embeddings generated by the model.
    """
    # Ensure model is on the specified device
    embedding_model = embedding_model.to(device)
    
    # Create embeddings with specified batch size
    embeddings = embedding_model.encode(
        texts,
        device=device,
        batch_size=batch_size,  # Smaller batch size to reduce memory usage
        show_progress_bar=True  # Progress bar to monitor encoding
    )
    
    return embeddings

# Check if GPU is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# Load model
model = "BAAI/bge-m3"
embedding_model = SentenceTransformer(model)


# Generate embeddings with reduced batch size
embeddings = create_embeddings(embedding_model, text_chunks, device=device, batch_size=4)

print(f"Generated {len(embeddings)} sentence embeddings.")

## Performing Semantic Search
We implement cosine similarity to find the most relevant text chunks for a user query.

In [17]:
def cosine_similarity(vec1, vec2):
    """
    Calculates the cosine similarity between two vectors.

    Args:
    vec1 (np.ndarray): The first vector.
    vec2 (np.ndarray): The second vector.

    Returns:
    float: The cosine similarity between the two vectors.
    """
    # Compute the dot product of the two vectors and divide by the product of their norms
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

In [18]:
def semantic_search(query, text_chunks, embeddings, k=5):
    """
    Performs semantic search on the text chunks using the given query and embeddings.

    Args:
    query (str): The query for the semantic search.
    text_chunks (List[str]): A list of text chunks to search through.
    embeddings (List[dict]): A list of embeddings for the text chunks.
    k (int): The number of top relevant text chunks to return. Default is 5.

    Returns:
    List[str]: A list of the top k most relevant text chunks based on the query.
    """
    # Create an embedding for the query
    query_embedding = create_embeddings(embedding_model, [query])[0]
    similarity_scores = []  # Initialize a list to store similarity scores

    # Calculate similarity scores between the query embedding and each text chunk embedding
    for i, chunk_embedding in enumerate(embeddings):
        similarity_score = cosine_similarity(np.array(query_embedding), np.array(chunk_embedding))
        similarity_scores.append((i, similarity_score))  # Append the index and similarity score

    # Sort the similarity scores in descending order
    similarity_scores.sort(key=lambda x: x[1], reverse=True)
    # Get the indices of the top k most similar text chunks
    top_indices = [index for index, _ in similarity_scores[:k]]
    # Return the top k most relevant text chunks
    return [text_chunks[index] for index in top_indices]


## Running a Query on Extracted Chunks

In [None]:
import pandas as pd
# Load the validation data from a JSON file
df = pd.read_csv('./data_creation/rag_val_new_post.csv')
df.head()

In [None]:
# Extract the first query from the validation data
query = df['query'][0]
# Perform semantic search to find the top 2 most relevant text chunks for the query
top_chunks = semantic_search(query, text_chunks, embeddings, k=3)

# Print the query
print("Query:", query)

# Print the top 2 most relevant text chunks
for i, chunk in enumerate(top_chunks):
    print(f"Context {i + 1}:\n{chunk}\n=====================================")

## Generating a Response Based on Retrieved Chunks

In [None]:


def generate_response(system_prompt, user_message, model='gemma-3-1b-it'):
    """
    Generates a response from the AI model based on the system prompt and user message.

    Args:
    system_prompt (str): The system prompt to guide the AI's behavior.
    user_message (str): The user's message or query.
    model (str): The model to be used for generating the response. Default is "meta-llama/Llama-2-7B-chat-hf".

    Returns:
    dict: The response from the AI model.
    """


    response = client.chat.completions.create(
    model=model,
    messages=  [{"role": "user", "content": system_prompt + '\n\n' +user_message}],
    temperature=0.1,
    top_p=0.9,
    max_tokens=1024,
    )

    return response
# Create the user prompt based on the top chunks
user_prompt = "\n".join([f"Context {i + 1}:\n{chunk}\n=====================================\n" for i, chunk in enumerate(top_chunks)])
user_prompt = f"{user_prompt}\n질문: {query}"


# Define the system prompt for the AI assistant
system_prompt = "당신은 제공된 Context에 기반하여 엄격히 답변하는 AI 어시스턴트입니다. 답변이 컨텍스트에서 직접 도출될 수 없는 경우, 다음 문장을 사용하세요: '해당 질문에 답변할 충분한 정보가 없습니다.'"
# Generate AI response
ai_response = generate_response(system_prompt, user_prompt)

## Evaluating the AI Response
We compare the AI response with the expected answer and assign a score.

In [None]:
# Define the system prompt for the evaluation system
evaluate_system_prompt = "You are an intelligent evaluation system tasked with assessing the AI assistant's responses. If the AI assistant's response is very close to the true response, assign a score of 1. If the response is incorrect or unsatisfactory in relation to the true response, assign a score of 0. If the response is partially aligned with the true response, assign a score of 0.5."

# Create the evaluation prompt by combining the user query, AI response, true response, and evaluation system prompt
evaluation_prompt = f"User Query: {query}\nAI Response:\n{ai_response.choices[0].message.content}\nTrue Response: {df['generation_gt'][0]}\n{evaluate_system_prompt}"

# Generate the evaluation response using the evaluation system prompt and evaluation prompt
evaluation_response = generate_response(evaluate_system_prompt, evaluation_prompt,'gemini-2.0-flash')

# Print the evaluation response
print(evaluation_response.choices[0].message.content)

## 전체 추론

In [None]:
## Evaluating the AI Response
# We compare the AI response with the expected answer and assign a score.
result = []

for i in range(len(df)):
    query = df['query'][i]
    top_chunks = semantic_search(query, text_chunks, embeddings, k=3)

    # Define the system prompt for the AI assistant
    system_prompt = "당신은 제공된 Context에 기반하여 엄격히 답변하는 AI 어시스턴트입니다. 답변이 컨텍스트에서 직접 도출될 수 없는 경우, 다음 문장을 사용하세요: '해당 질문에 답변할 충분한 정보가 없습니다.'"
        
    # Create the user prompt based on the top chunks
    user_prompt = "\n".join([f"Context {i + 1}:\n{chunk}\n=====================================\n" for i, chunk in enumerate(top_chunks)])
    user_prompt = f"{user_prompt}\n질문: {query}"

    # Generate AI response
    ai_response = generate_response(system_prompt, user_prompt)

    print(ai_response.choices[0].message.content)
    
    # Define the system prompt for the evaluation system
    evaluate_system_prompt = "You are an intelligent evaluation system tasked with assessing the AI assistant's responses. If the AI assistant's response is very close to the true response, assign a score of 1. If the response is incorrect or unsatisfactory in relation to the true response, assign a score of 0. If the response is partially aligned with the true response, assign a score of 0.5. \n The answer should only output numbers and should not output any words."

    # Create the evaluation prompt by combining the user query, AI response, true response, and evaluation system prompt
    evaluation_prompt = f"User Query: {query}\nAI Response:\n{ai_response.choices[0].message.content}\nTrue Response: {df['generation_gt'][i]}\n{evaluate_system_prompt}"

    # Generate the evaluation response using the evaluation system prompt and evaluation prompt
    evaluation_response = generate_response(evaluate_system_prompt, evaluation_prompt,'gemini-2.0-flash')

    # Print the evaluation response
    print(evaluation_response.choices[0].message.content)
    result.append(evaluation_response.choices[0].message.content)


## 점수 계산

In [None]:
import numpy as np
result = [float(x.replace('\n',''))for x in result]
print(np.mean(result))