## 로컬 환경에서 PDF 파일 RAG 검색하기 3단계

__step4__
- PDF 문서 여러개 로드하여 임베딩 후 csv 파일로 저장
- csv 파일을 랭체인 FAISS 인덱싱하여 인덱스 파일 생성 및 저장
- 랭체인 프레임워크 적용하여 llm 검색 까지 구현
- 파일 3개 인덱싱 후 1개 추가하여 llm 검색 하기 구현

- 추가 파일 인덱스 병합하는 코드 

In [34]:
import os
import csv
import json
import ast
import faiss
import numpy as np
import pandas as pd
from langchain.document_loaders import DirectoryLoader, PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.docstore.document import Document
from langchain_core.embeddings import Embeddings
from sentence_transformers import SentenceTransformer

# 1. 문서 로드
loader = DirectoryLoader(
    'data',                         # data 폴더
    glob='*.pdf',                   # 모든 pdf 파일
    loader_cls=PyMuPDFLoader
)
docs = loader.load()

# 2. 문서 분할
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=50)
split_documents = text_splitter.split_documents(docs)

# 3. 임베딩을 위한 클래스 생성
class KoSentenceTransformerEmbeddings(Embeddings):
    def __init__(self, model_name):
        self.model = SentenceTransformer(model_name)
    
    def embed_documents(self, texts):
        return self.model.encode(texts, convert_to_numpy=True).tolist()
    
    def embed_query(self, text):
        return self.model.encode([text], convert_to_numpy=True).tolist()[0]

embedding_model = KoSentenceTransformerEmbeddings("jhgan/ko-sroberta-multitask")

# 4. 임베딩 저장 함수
# 처리할 문서 리스트, 임베딩 모델, 저장할 폴더 경로 
def save_embeddings_to_csv(documents, embedding_model, folder_path):
    os.makedirs(folder_path, exist_ok=True)                             # 폴더가 없으면 생성, 있으면 넘어감
    
    file_docs = {}                                                      # pdf 파일별로 임베딩 결과를 저장할 딕셔너리 초기화 
    for doc in documents:
        file_name = os.path.basename(doc.metadata['source']).replace('.pdf', '')        # 원본 파일에서 제목만 추출 
        if file_name not in file_docs:
            file_docs[file_name] = []
        file_docs[file_name].append(doc)
    
    for file_name, docs in file_docs.items():
        full_path = os.path.join(folder_path, f"{file_name}.csv")
        
        embeddings = embedding_model.embed_documents([doc.page_content for doc in docs])
        
        with open(full_path, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerow(["document", "embedding"])
            
            for doc, embedding in zip(docs, embeddings):
                writer.writerow([doc.page_content, json.dumps(embedding)])  # JSON 형식으로 저장
        
        print(f"✅ 임베딩 데이터가 {full_path} 파일에 저장되었습니다.")
    
save_embeddings_to_csv(split_documents, embedding_model, 'csv/')

# 5. CSV에서 임베딩 로드 및 FAISS 인덱스 생성
# csv 파일들이 들어있는 경로 
def load_csv_embeddings(folder_path):
    data_dict = {}
    
    for filename in os.listdir(folder_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(folder_path, filename)
            df = pd.read_csv(file_path)
            
            if "document" in df.columns and "embedding" in df.columns:
                documents, embeddings, metadatas = [], [], []
                for _, row in df.iterrows():
                    text = row["document"]
                    try:
                        embedding = json.loads(row["embedding"])  # JSON 로드 방식으로 변경
                        if isinstance(embedding, list):
                            embeddings.append(np.array(embedding, dtype=np.float32))
                            documents.append(text)
                            metadatas.append({"source": filename})
                        else:
                            print(f"⚠️ {filename}의 임베딩 형식이 올바르지 않습니다!")
                    except Exception as e:
                        print(f"⚠️ {filename}에서 임베딩 변환 오류: {e}")
                data_dict[filename.replace('.csv', '')] = (documents, embeddings, metadatas)
    
    return data_dict

# FAISS 인덱스 생성 및 저장

def create_faiss_indexes(data_dict, save_path):
    os.makedirs(save_path, exist_ok=True)
    
    for file_name, (documents, embeddings, metadatas) in data_dict.items():
        vector_store = FAISS.from_texts(texts=documents, embedding=embedding_model, metadatas=metadatas)
        faiss_index_path = os.path.join(save_path, file_name)
        vector_store.save_local(faiss_index_path)
        print(f" {file_name}에 대한 FAISS 인덱스 저장 완료: {faiss_index_path}")

# 실행
if __name__ == "__main__":
    data_folder = "./csv"
    faiss_index_folder = "./faiss_index"
    
    data_dict = load_csv_embeddings(data_folder)
    print(f"📄 {len(data_dict)}개의 파일 로드 완료!")
    
    if data_dict:
        create_faiss_indexes(data_dict, faiss_index_folder)
    else:
        print("⚠️ 문서 또는 임베딩 데이터가 없습니다. FAISS 인덱스를 생성할 수 없습니다!")


✅ 임베딩 데이터가 csv/SPRI_AI_Brief_2023년12월호_F.csv 파일에 저장되었습니다.
✅ 임베딩 데이터가 csv/AI기반_인파분석플랫폼구축_제안서.csv 파일에 저장되었습니다.
📄 2개의 파일 로드 완료!
 SPRI_AI_Brief_2023년12월호_F에 대한 FAISS 인덱스 저장 완료: ./faiss_index/SPRI_AI_Brief_2023년12월호_F
 AI기반_인파분석플랫폼구축_제안서에 대한 FAISS 인덱스 저장 완료: ./faiss_index/AI기반_인파분석플랫폼구축_제안서


In [37]:
def search_faiss_index(query, index_folder, file_name, top_k=5):
    index_path = os.path.join(index_folder, file_name)
    
    # FAISS 인덱스 로드 시 allow_dangerous_deserialization 옵션 추가
    vector_store = FAISS.load_local(index_path, embedding_model, allow_dangerous_deserialization=True)
    
    query_embedding = embedding_model.embed_query(query)
    results = vector_store.similarity_search_by_vector(query_embedding, k=top_k)
    
    print(f"🔍 '{query}' 검색 결과:")
    for i, result in enumerate(results):
        print(f"{i+1}. {result.page_content} (출처: {result.metadata['source']})")
    
    return results

# search_faiss_index("포항에서 열리는 축제의 이름은 무엇인가?", "./faiss_index", "AI기반_인파분석플랫폼구축_제안서")
# search_faiss_index("삼성에서 만든 AI의 이름은 무엇인가?", "./faiss_index", "SPRI_AI_Brief_2023년12월호_F")
search_faiss_index("경기 임산부 친환경농산물 지원 사업에 대해 설명해줘", "./faiss_index", "2024년 경기도 인구영향평가_편집본")

🔍 '경기 임산부 친환경농산물 지원 사업에 대해 설명해줘' 검색 결과:
1. - 20 -
구분
사업명
전문가 총평
16
경기도 어린이 
건강과일 공급
ž
제철과일 제공을 통해 어린이들의 식생활에 개선에 기여한다고 할 수 있음
ž
영유아 시기에 만들어진 식습관은 평생동안 영향을 미치기에 제철 과일 간식 제공을 
통해 올바른 식습관 형성에 도움을 주는 정책은 지속적으로 이루어져야 한다고 생각
됨
17
경기임산부 
친환경농산물 
지원
ž
본 사업은 임산부의 친환경농산물 지원으로 출산하는 여성을 격려하고 건강증진을 도
모하고 또한 출산친화적인 환경을 조성하는 사업임
ž
친환경 농산물 지원과 같은 정책이 임산부와 태아의 건강에 긍정적인 영향을 미칠 수 
있는지 장기적 관점으로 모니터링하고, 참여자의 만족도를 고려하여 우리 농산물 공
급에 대한 사업을 확장해 나갈 방안에 대한 논의가 필요함
18
친환경 등 
우수농산물 
영유아 
공공급식 지원
ž
영유아기 아동들의 영양공급은 신체적 발달뿐만 아니라, 인지적, 심리적, 사회적 발달
에도 영향을 미치는 것으로 연구됨
19
경기 
고립은둔청년 
지원
ž
사회적 문제로 부상하고 있는 고립·은둔 청년 문제에 대한 선제적 지원으로서 의미가 
있고, 고립·은둔 청년에 대한 실태조사와 상담 프로그램 등 방안을 마련하려는 노력을 높
이 평가함
ž
다만, 대응 방안을 상담 제공으로 국한하지 않고, 지속적으로 추적 및 관리하여 지역
사회 대응 시스템을 구축하는 형태로 발전시킬 수 있기를 기대함
20
경기청년 
갭이어 
프로그램 지원
ž
경기청년 갭이어 프로그램은 청년들이 자기 개발과 사회적 경험을 쌓을 수 있는 기회
를 제공하면서, 장기적으로 지역사회의 소득 안정성에 긍정적인 영향을 미칠 수 있다
고 보여짐
ž
다만 청년 진로 개발 프로그램에 대한 공급이 이미 어느 수준 충족되는 면이 있고, 
우리 사회가 당면한 저출산 고령화 및 인구구조 변동의 문제와 직접적인 관련성이 적
으므로 일부 개선이 필요하다고 보여짐
21
경기청년 
역량강화 기회

[Document(id='8f46c860-a689-4eed-b426-186bd5b4fbae', metadata={'source': '2024년 경기도 인구영향평가_편집본.csv'}, page_content='- 20 -\n구분\n사업명\n전문가 총평\n16\n경기도 어린이 \n건강과일 공급\nž\n제철과일 제공을 통해 어린이들의 식생활에 개선에 기여한다고 할 수 있음\nž\n영유아 시기에 만들어진 식습관은 평생동안 영향을 미치기에 제철 과일 간식 제공을 \n통해 올바른 식습관 형성에 도움을 주는 정책은 지속적으로 이루어져야 한다고 생각\n됨\n17\n경기임산부 \n친환경농산물 \n지원\nž\n본 사업은 임산부의 친환경농산물 지원으로 출산하는 여성을 격려하고 건강증진을 도\n모하고 또한 출산친화적인 환경을 조성하는 사업임\nž\n친환경 농산물 지원과 같은 정책이 임산부와 태아의 건강에 긍정적인 영향을 미칠 수 \n있는지 장기적 관점으로 모니터링하고, 참여자의 만족도를 고려하여 우리 농산물 공\n급에 대한 사업을 확장해 나갈 방안에 대한 논의가 필요함\n18\n친환경 등 \n우수농산물 \n영유아 \n공공급식 지원\nž\n영유아기 아동들의 영양공급은 신체적 발달뿐만 아니라, 인지적, 심리적, 사회적 발달\n에도 영향을 미치는 것으로 연구됨\n19\n경기 \n고립은둔청년 \n지원\nž\n사회적 문제로 부상하고 있는 고립·은둔 청년 문제에 대한 선제적 지원으로서 의미가 \n있고, 고립·은둔 청년에 대한 실태조사와 상담 프로그램 등 방안을 마련하려는 노력을 높\n이 평가함\nž\n다만, 대응 방안을 상담 제공으로 국한하지 않고, 지속적으로 추적 및 관리하여 지역\n사회 대응 시스템을 구축하는 형태로 발전시킬 수 있기를 기대함\n20\n경기청년 \n갭이어 \n프로그램 지원\nž\n경기청년 갭이어 프로그램은 청년들이 자기 개발과 사회적 경험을 쌓을 수 있는 기회\n를 제공하면서, 장기적으로 지역사회의 소득 안정성에 긍정적인 영향을 미칠

In [None]:
from langchain_core.prompts import PromptTemplate
from langchain_community.llms import Ollama
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS

import os

# ✅ Step 6: 프롬프트 생성
prompt = PromptTemplate.from_template(
    """You are an assistant for question-answering tasks. 
Use the following pieces of retrieved context to answer the question. 
If you don't know the answer, just say that you don't know. 
Answer in Korean, and make sure the answer ends with '입니다'.

#Context: 
{context}

#Question:
{question}

#Answer(Ensure the response ends with '입니다'):"""
)

# Step 7: 언어 모델 (LLM) 생성
llm = Ollama(model="llama3.2")  

# FAISS 인덱스 로드 (보안 옵션 추가)
faiss_index_folder = "./faiss_index"
faiss_indexes = {}

for file_name in os.listdir(faiss_index_folder):
    index_path = os.path.join(faiss_index_folder, file_name)
    if os.path.isdir(index_path):
        faiss_indexes[file_name] = FAISS.load_local(
            index_path,
            embedding_model,
            allow_dangerous_deserialization=True  # 보안 옵션 추가
        )

# 모든 인덱스를 검색하는 함수
def multi_index_search(question, faiss_indexes):
    all_docs = []
    
    # 각 FAISS 인덱스에서 검색
    for index_name, index in faiss_indexes.items():
        retriever = index.as_retriever(search_type="similarity", search_kwargs={"k": 3})  # 🔹 FAISS 검색 최적화
        docs = retriever.get_relevant_documents(question)  # 각 인덱스에서 검색된 문서
        for doc in docs:
            doc.metadata["source"] = index_name  # 출처 정보 추가
        all_docs.extend(docs)

    if not all_docs:
        return "⚠️ 관련 문서를 찾을 수 없습니다."
    
    # FAISS 검색 결과를 기반으로 RetrievalQA 생성
    retriever = FAISS.from_documents(all_docs, embedding_model).as_retriever()

    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        retriever=retriever,  # 올바른 retriever 전달
        chain_type="stuff",
        return_source_documents=True,
        chain_type_kwargs={"prompt": prompt}
    )
    
    response = qa_chain.invoke({"query": question})  # context 제거 후 수정
    
    answer = response.get("result", "⚠️ 답변을 생성할 수 없습니다.")  # key 수정
    sources = "\n".join(set([doc.metadata.get("source", "알 수 없음") for doc in all_docs]))

    return f" 답변: {answer}\n 출처: {sources}"

# 사용 예시
question = "경기임산부 친환경농산물 지원 사업에 대한 전문가 의견을 알려줘"
print(multi_index_search(question, faiss_indexes))


In [35]:
# 새롭게 추가할 폴더명
new_folder = "data/gg"

# 1. 새 폴더의 문서 로드
new_loader = DirectoryLoader(
    new_folder,                     # aaa 폴더
    glob="*.pdf",                   # 모든 pdf 파일
    loader_cls=PyMuPDFLoader
)
new_docs = new_loader.load()

# 2. 문서 분할 (기존 방식 사용)
new_split_documents = text_splitter.split_documents(new_docs)

# 3. 새로운 문서 임베딩을 csv/aaa 폴더에 저장
save_embeddings_to_csv(new_split_documents, embedding_model, "csv/gg")


✅ 임베딩 데이터가 csv/gg/2024년 경기도 인구영향평가_편집본.csv 파일에 저장되었습니다.


In [36]:
# 새로 추가된 aaa 폴더의 CSV 파일을 불러오기
new_data_dict = load_csv_embeddings("./csv/gg")

# 기존 FAISS 인덱스 폴더
faiss_index_folder = "./faiss_index"

# 새로 추가된 인덱스들을 기존 인덱스에 병합하는 함수
def append_faiss_indexes(existing_index_folder, new_data_dict):
    # 기존 인덱스 불러오기
    existing_indexes = {}

    for file_name in os.listdir(existing_index_folder):
        index_path = os.path.join(existing_index_folder, file_name)
        if os.path.isdir(index_path):
            existing_indexes[file_name] = FAISS.load_local(
                index_path,
                embedding_model,
                allow_dangerous_deserialization=True
            )

    # 새로운 데이터 처리
    for file_name, (documents, embeddings, metadatas) in new_data_dict.items():
        new_faiss_index = FAISS.from_texts(
            texts=documents,
            embedding=embedding_model,
            metadatas=metadatas
        )
        
        # 기존 인덱스가 있으면 병합
        if file_name in existing_indexes:
            existing_indexes[file_name].merge_from(new_faiss_index)
            print(f"기존 인덱스 {file_name}에 새로운 데이터 병합 완료!")
        else:
            existing_indexes[file_name] = new_faiss_index
            print(f"새로운 인덱스 {file_name} 생성 완료!")

    # 저장 (덮어쓰기)
    for file_name, index in existing_indexes.items():
        index_path = os.path.join(existing_index_folder, file_name)
        index.save_local(index_path)
        print(f"{file_name} FAISS 인덱스 업데이트 완료!")

# 실행
append_faiss_indexes(faiss_index_folder, new_data_dict)


기존 인덱스 2024년 경기도 인구영향평가_편집본에 새로운 데이터 병합 완료!
2040_seoul FAISS 인덱스 업데이트 완료!
2024년 경기도 인구영향평가_편집본 FAISS 인덱스 업데이트 완료!
SPRI_AI_Brief_2023년12월호_F FAISS 인덱스 업데이트 완료!
AI기반_인파분석플랫폼구축_제안서 FAISS 인덱스 업데이트 완료!
