# RAG 기본 구조 이해하기

## 1. 사전작업(Pre-processing) - 1~4 단계

![rag-1.png](./assets/rag-1.png)

![rag-1-graphic](./assets/rag-graphic-1.png)

사전 작업 단계에서는 데이터 소스를 Vector DB (저장소) 에 문서를 로드-분할-임베딩-저장 하는 4단계를 진행합니다.

- 1단계 문서로드(Document Load): 문서 내용을 불러옵니다.
- 2단계 분할(Text Split): 문서를 특정 기준(Chunk) 으로 분할합니다.
- 3단계 임베딩(Embedding): 분할된(Chunk) 를 임베딩하여 저장합니다.
- 4단계 벡터DB 저장: 임베딩된 Chunk 를 DB에 저장합니다.

## 2. RAG 수행(RunTime) - 5~8 단계

![rag-2.png](./assets/rag-2.png)

![](./assets/rag-graphic-2.png)

- 5단계 검색기(Retriever): 쿼리(Query) 를 바탕으로 DB에서 검색하여 결과를 가져오기 위하여 리트리버를 정의합니다. 리트리버는 검색 알고리즘이며(Dense, Sparse) 리트리버로 나뉘게 됩니다. Dense: 유사도 기반 검색, Sparse: 키워드 기반 검색
- 6단계 프롬프트: RAG 를 수행하기 위한 프롬프트를 생성합니다. 프롬프트의 context 에는 문서에서 검색된 내용이 입력됩니다. 프롬프트 엔지니어링을 통하여 답변의 형식을 지정할 수 있습니다.
- 7단계 LLM: 모델을 정의합니다.(GPT-3.5, GPT-4, Claude, etc..)
- 8단계 Chain: 프롬프트 - LLM - 출력 에 이르는 체인을 생성합니다.

## 실습에 활용한 문서

 '01. 온보딩 프로세스 - 기반기술.pdf

B팀에서 작성한 문서를 pdf로 변환한 파일입니다.


## 환경설정


API KEY 를 설정합니다.


In [13]:
# API 키를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv

# API 키 정보 로드
load_dotenv()

True

## RAG 기본 파이프라인(1~8단계)


In [5]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

아래는 기본적인 RAG 구조 이해를 위한 뼈대코드(skeleton code) 입니다.

각 단계별 모듈의 내용을 앞으로 상황에 맞게 변경하면서 문서에 적합한 구조를 찾아갈 수 있습니다.

(각 단계별로 다양한 옵션을 설정하거나 새로운 기법을 적용할 수 있습니다.)

In [6]:
# 단계 1: 문서 로드(Load Documents)
loader = PyMuPDFLoader("data/01. 온보딩 프로세스 - 기반기술.pdf")
docs = loader.load()
print(f"문서의 페이지수: {len(docs)}")

문서의 페이지수: 11


페이지의 내용을 출력합니다.

In [9]:
print(docs[10].page_content)

3.4. Hazelcast (Cache, IMDG- In-Memory Data Grid) 
분산형 메모리 스토어에요 
DWorks 는 Spring Cache 저장소로 사용하고 있어요 
Spring Cache 는 분산 캐싱을 추상화하여 제공하며, @Cacheable, @CacheEvict, 
@CachePut 과 같은 Annotation 을 사용하여 캐싱기능을 쉽게 구현 가능하게해줘요 
영구적인 데이터 저장소는 DB/Elasticsearch 를 이용하고, 성능향상을 위한 캐싱처리는 
Hazelcast 를 이용해요. 
 
C팀에서 준비한 자료도 함께 보아주세요! 
 
Part_02_기술요소.docx


`metadata` 를 확인합니다.

In [10]:
docs[10].__dict__

{'id': None,
 'metadata': {'producer': 'Skia/PDF m134 Google Docs Renderer',
  'creator': '',
  'creationdate': '',
  'source': 'data/01. 온보딩 프로세스 - 기반기술.pdf',
  'file_path': 'data/01. 온보딩 프로세스 - 기반기술.pdf',
  'total_pages': 11,
  'format': 'PDF 1.4',
  'title': '01. 온보딩 프로세스 - 기반기술',
  'author': '',
  'subject': '',
  'keywords': '',
  'moddate': '',
  'trapped': '',
  'modDate': '',
  'creationDate': '',
  'page': 10},
 'page_content': '3.4. Hazelcast (Cache, IMDG- In-Memory Data Grid) \n분산형 메모리 스토어에요 \nDWorks 는 Spring Cache 저장소로 사용하고 있어요 \nSpring Cache 는 분산 캐싱을 추상화하여 제공하며, @Cacheable, @CacheEvict, \n@CachePut 과 같은 Annotation 을 사용하여 캐싱기능을 쉽게 구현 가능하게해줘요 \n영구적인 데이터 저장소는 DB/Elasticsearch 를 이용하고, 성능향상을 위한 캐싱처리는 \nHazelcast 를 이용해요. \n \nC팀에서 준비한 자료도 함께 보아주세요! \n \nPart_02_기술요소.docx',
 'type': 'Document'}

In [11]:
# 단계 2: 문서 분할(Split Documents)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_documents = text_splitter.split_documents(docs)
print(f"분할된 청크의수: {len(split_documents)}")

분할된 청크의수: 18


In [14]:
# 단계 3: 임베딩(Embedding) 생성
embeddings = OpenAIEmbeddings()

In [16]:
# 단계 4: DB 생성(Create DB) 및 저장
# 벡터스토어를 생성합니다.
vectorstore = Chroma.from_documents(documents=split_documents, embedding=embeddings)

In [19]:
for doc in vectorstore.similarity_search("분산형 메모리 스토어"):
    print("=" * 50)
    print(doc.page_content)

(Document(metadata={'author': '', 'creationDate': '', 'creationdate': '', 'creator': '', 'file_path': 'data/01. 온보딩 프로세스 - 기반기술.pdf', 'format': 'PDF 1.4', 'keywords': '', 'modDate': '', 'moddate': '', 'page': 10, 'producer': 'Skia/PDF m134 Google Docs Renderer', 'source': 'data/01. 온보딩 프로세스 - 기반기술.pdf', 'subject': '', 'title': '01. 온보딩 프로세스 - 기반기술', 'total_pages': 11, 'trapped': ''}, page_content='3.4. Hazelcast (Cache, IMDG- In-Memory Data Grid) \n분산형 메모리 스토어에요 \nDWorks 는 Spring Cache 저장소로 사용하고 있어요 \nSpring Cache 는 분산 캐싱을 추상화하여 제공하며, @Cacheable, @CacheEvict, \n@CachePut 과 같은 Annotation 을 사용하여 캐싱기능을 쉽게 구현 가능하게해줘요 \n영구적인 데이터 저장소는 DB/Elasticsearch 를 이용하고, 성능향상을 위한 캐싱처리는 \nHazelcast 를 이용해요. \n \nC팀에서 준비한 자료도 함께 보아주세요! \n \nPart_02_기술요소.docx'), 0.3318576393976306)
(Document(metadata={'author': '', 'creationDate': '', 'creationdate': '', 'creator': '', 'file_path': 'data/01. 온보딩 프로세스 - 기반기술.pdf', 'format': 'PDF 1.4', 'keywords': '', 'modDate': ''

In [20]:
# 단계 5: 검색기(Retriever) 생성
# 문서에 포함되어 있는 정보를 검색하고 생성합니다.
retriever = vectorstore.as_retriever()

검색기에 쿼리를 날려 검색된 chunk 결과를 확인합니다.

In [26]:
# 검색기에 쿼리를 날려 검색된 chunk 결과를 확인합니다.
docs = retriever.invoke("분산형 메모리 스토어가 뭐야?")

In [22]:
for doc in docs:
    print("=" * 50)
    print(doc.page_content)

3.4. Hazelcast (Cache, IMDG- In-Memory Data Grid) 
분산형 메모리 스토어에요 
DWorks 는 Spring Cache 저장소로 사용하고 있어요 
Spring Cache 는 분산 캐싱을 추상화하여 제공하며, @Cacheable, @CacheEvict, 
@CachePut 과 같은 Annotation 을 사용하여 캐싱기능을 쉽게 구현 가능하게해줘요 
영구적인 데이터 저장소는 DB/Elasticsearch 를 이용하고, 성능향상을 위한 캐싱처리는 
Hazelcast 를 이용해요. 
 
C팀에서 준비한 자료도 함께 보아주세요! 
 
Part_02_기술요소.docx
user 같은 것들이죠 
b.​ 하나 이상의 shard로 구성이 되요 
2.​ shard 
a.​ 데이터를 분산하여 저장하고, 검색작업을 병렬로 처리하기위한 기본 
단위에요, 클러스터내의 물리적인 노드들에 분산되어 저장된답니다 
b.​ Primary shard, Replica Shard 로 구성되며, 예를들어 Primary shard 가 3, 
Replica shard 가 1로 설정된다면 데이터가 3개로 나뉘어져 저장되고, 각각 
1개씩의 Replica shard 로 구성되요
3. 인프라 
3.1. Kafka (메시지 브로커) 
분산처리 환경에서 대용량의 메시지(데이터)를 안전하게 처리하기 위한 분산 스트리밍 
도구에요 
분산 아키텍처, 확장성, 신뢰성있는 데이터 전달등의 특징을 기반으로, 대규모 로그 수집, 
이벤트 소싱등에서 사용된답니다. 
Kafka 를 사용함으로써 서비스 간의 결합도를 낮추고, 서비스의 독립성과 확장성을 
향상시킬 수 있어요! 
구성 
1.​ Broker 
a.​ 클러스터를 구성하는 하나의 노드(서버)를 Broker라고 해요 
2.​ Producer 
a.​ 메시지를 생성하는 주체이며, 메시지는 특정 Topic 으로 보내져요 
b.​ Kafka 클러스터중 하나의 Broker 에게 보내져요 
c.​ Topic 은 메시지의 주소라고 생각하면 됩니다.

In [23]:
# 단계 6: 프롬프트 생성(Create Prompt)
# 프롬프트를 생성합니다.
prompt = PromptTemplate.from_template(
    """You are an assistant for question-answering tasks. 
Use the following pieces of retrieved context to answer the question. 
If you don't know the answer, just say that you don't know. 
Answer in Korean.

#Context: 
{context}

#Question:
{question}

#Answer:"""
)

In [24]:
# 단계 7: 언어모델(LLM) 생성
# 모델(LLM) 을 생성합니다.
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

In [34]:
# 단계 8: 체인(Chain) 생성
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

생성된 체인에 쿼리(질문)을 입력하고 실행합니다.

In [35]:
# 체인 실행(Run Chain)
# 문서에 대한 질의를 입력하고, 답변을 출력합니다.
question = "분산형 메모리 스토어에 대해 설명해줘"
response = chain.invoke(question)
print(response)

[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": "분산형 메모리 스토어에 대해 설명해줘"
}
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence > chain:RunnableParallel<context,question>] Entering Chain run with input:
[0m{
  "input": "분산형 메모리 스토어에 대해 설명해줘"
}
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence > chain:RunnableParallel<context,question> > chain:RunnableSequence] Entering Chain run with input:
[0m{
  "input": "분산형 메모리 스토어에 대해 설명해줘"
}
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence > chain:RunnableParallel<context,question> > chain:RunnablePassthrough] Entering Chain run with input:
[0m{
  "input": "분산형 메모리 스토어에 대해 설명해줘"
}
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableSequence > chain:RunnableParallel<context,question> > chain:RunnablePassthrough] [0ms] Exiting Chain run with output:
[0m{
  "output": "분산형 메모리 스토어에 대해 설명해줘"
}
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableSequence > chain:RunnablePa

## 전체 코드

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 단계 1: 문서 로드(Load Documents)
loader = PyMuPDFLoader("data/01. 온보딩 프로세스 - 기반기술.pdf")
docs = loader.load()

# 단계 2: 문서 분할(Split Documents)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_documents = text_splitter.split_documents(docs)

# 단계 3: 임베딩(Embedding) 생성
embeddings = OpenAIEmbeddings()

# 단계 4: DB 생성(Create DB) 및 저장
# 벡터스토어를 생성합니다.
vectorstore = Chroma.from_documents(documents=split_documents, embedding=embeddings)

# 단계 5: 검색기(Retriever) 생성
# 문서에 포함되어 있는 정보를 검색하고 생성합니다.
retriever = vectorstore.as_retriever()

In [None]:
# 단계 6: 프롬프트 생성(Create Prompt)
# 프롬프트를 생성합니다.
prompt = PromptTemplate.from_template(
    """You are an assistant for question-answering tasks. 
Use the following pieces of retrieved context to answer the question. 
If you don't know the answer, just say that you don't know. 
Answer in Korean.

#Context: 
{context}

#Question:
{question}

#Answer:"""
)

# 단계 7: 언어모델(LLM) 생성
# 모델(LLM) 을 생성합니다.
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

# 단계 8: 체인(Chain) 생성
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

In [None]:
# 체인 실행(Run Chain)
# 문서에 대한 질의를 입력하고, 답변을 출력합니다.
question = "분산형 메모리 스토어가 뭐야?"
response = chain.invoke(question)
print(response)