In [None]:
# 모델 가속화 및 메모리 관리
%pip install accelerate
%pip install bitsandbytes

%pip install 'autoawq>=0.1.7'  
%pip install datasets
%pip install torch==2.2.0
%pip install torchvision==0.17.0
%pip install transformers -U
%pip install sentence-transformers 

# 벡터 데이터베이스 관련 라이브러리
# %pip install faiss-cpu
%pip install faiss-gpu
 
# 데이터
%pip install tiktoken
%pip install pymupdf4llm 
%pip install pandas 

# langchain
%pip install langchain 
%pip install langchain-community 
%pip install langchain-huggingface
%pip install langchain-anthropic

%pip install python-dotenv 
%pip install rank_bm25

In [None]:
import os
import json
import unicodedata
import re
import pandas as pd
import pymupdf4llm
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from accelerate import Accelerator
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.chains import create_stuff_documents_chain, create_retrieval_chain
from langchain.schema import Document
from langchain.prompts import ChatPromptTemplate
from langchain.utils import CacheBackedEmbeddings
from langchain.store import LocalFileStore
from tqdm import tqdm
import torch


# 비동기 이벤트 루프를 재설정하거나 중첩할 수 있도록 허용
nest_asyncio.apply()

# 로깅 설정
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)

# CUDA 메모리 할당이 필요할 때마다 메모리 블록을 확장
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# 시드 설정
seed = 42
random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)
set_seed(seed)

In [None]:
# Config 클래스
class Config:
    def __init__(self, llm_name: str, embedding_name: str, concept: str) -> None:        
        # 모델별 설정 딕셔너리
        self.model_config = {
            "meta-llama/Meta-Llama-3.1-8B-Instruct": {
                "quantization_config": None,
                "torch_dtype": "auto",
                "max_token": 256,
            },
            "rtzr/ko-gemma-2-9b-it": {
                "quantization_config": self.get_quantization_config(),
                "torch_dtype": "auto",
                "max_token": 450,
            },
            "jjjguz/Llama-3.1-Korean-8B-Instruct-v1": {
                "quantization_config": None,
                "torch_dtype": "auto",
                "max_token": 256,
            },
            "mindsignal/rtzr-ko-gemma-2-9b-it-4bit-financesinfo-ver1": {
                "quantization_config": self.get_quantization_config(),
                "torch_dtype": "auto",
                "max_token": 450,
            },
            "mindsignal/upstage-SOLAR-10.7B-Instruct-v1.0-4bit-financesinfo-ver1": {
                "quantization_config": self.get_quantization_config(),
                "torch_dtype": torch.float16,
                "max_token": 512
            },
            "upstage/SOLAR-10.7B-Instruct-v1.0": {
                "quantization_config": None,
                "torch_dtype": "auto",
                "max_token": 256,
            }
        }
        
        # Large Language Model
        self.llm_name = llm_name
        self.llm_config = self.model_config[self.llm_name]
        self.llm_obj = self.setup_llm()
        
        # 임베딩 모델 설정
        self.embedding_name = embedding_name
        self.embedding_obj = self.setup_embedding()
        
        # document
        self.chunk_size = 512
        self.chunk_overlap = 32
        self.bm25_w = 0.5
        self.faiss_w = 0.5
        
        # Data Path
        self.base_directory = "open/"
        self.train_csv_path = os.path.join(self.base_directory, "train.csv")
    
    def to_json(self):
        return json.dumps(self.__dict__)
    
    def get_quantization_config(self):
        """4-bit 양자화 설정을 반환하는 함수"""
        return BitsAndBytesConfig(
            load_in_4bit=True,  # 4-bit 양자화
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16  # 연산에 사용할 데이터 타입
        )
        
    def setup_embedding(self):
        """ 임베딩 모델 설정 """
        embed_id = self.embedding_name
        
        model_kwargs = {'device': 'cuda'}
        encode_kwargs = {'normalize_embeddings': True}
        embd = HuggingFaceEmbeddings(
            model_name=embed_id,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs
        )
        
        store = LocalFileStore("./cache/")
        
        # Cache Embedding 사용
        cached_embeddings = CacheBackedEmbeddings.from_bytes_store(
            underlying_embeddings=embd, 
            document_embedding_cache=store, 
            namespace=embed_id
        )
        
        return cached_embeddings
    
    def setup_llm(self):
        """LLM 설정 및 파이프라인 구성"""
        model_id = self.llm_name
        
        # 토크나이저 로드 및 설정
        tokenizer = AutoTokenizer.from_pretrained(model_id)
        tokenizer.use_default_system_prompt = False
        
        # 모델 로드 및 양자화 설정 적용
        model = AutoModelForCausalLM.from_pretrained(
            model_id,
            device_map="auto",
            quantization_config=self.llm_config["quantization_config"],
            trust_remote_code=True
        )
        
        # 모델을 여러 GPU에 할당
        accelerator = Accelerator()
        model = accelerator.prepare(model)
        
        print(f"#### [ model ] ####\n{model}\n###################")
        
        # 스트리머를 설정하여 토큰이 생성될 때마다 출력
        streamer = TextStreamer(tokenizer)

        # HuggingFacePipeline 객체 생성
        text_generation_pipeline = pipeline(
            model=model,
            tokenizer=tokenizer,
            task="text-generation",
            return_full_text=False,
            max_new_tokens=450,
            streamer=streamer,
        )

        hf = HuggingFacePipeline(pipeline=text_generation_pipeline)

        return hf

In [None]:
class DataIngestionPipeline:
    def __init__(self, config: Config):
        self.config = config
        
    def normalize_path(self, path):
        """ Path 유니코드 정규화 """
        normalized_path = unicodedata.normalize('NFD', path)
        logger.debug(f"정규화된 경로: {normalized_path}")
        return normalized_path
    
    def preprocess_text(self, text: str) -> str:
        """ 텍스트 전처리 함수 """
        # 불필요한 공백 제거
        text = text.strip()
        # 마침표 뒤 및 "----" 전후의 줄바꿈을 제외한 모든 줄바꿈을 제거
        text = re.sub(r'(?<!\.)(?<!-----)(\n|\r\n)(?!-----)', ' ', text)
        # 공백 처리 및 불필요한 줄바꿈 제거
        text = re.sub(r'\s+', ' ', text)
        return text
    
    def process_pdf(self, file_path: str) -> list:
        """ PDF 파일 로드, 텍스트 추출 및 전처리 후 노드 생성 """
        logger.info(f"PDF 처리 중: {file_path}")
        md_content = pymupdf4llm.to_markdown(file_path)
        
        md_content = self.preprocess_text(md_content)
                
        headers_to_split_on = [
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
        ]
        
        md_header_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)
        md_chunks = md_header_splitter.split_text(md_content)
        
        # 텍스트 데이터를 문서 청크로 분할
        text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
            chunk_size=self.config.chunk_size, chunk_overlap=self.config.chunk_overlap
        )
        chunks = text_splitter.split_documents(md_chunks)
        logger.info(f"텍스트를 {len(chunks)}개의 청크로 분할 완료.")
        
        return chunks
    
    def init_vector_db(self, pdf_files):
        """ PDF 파일 리스트를 처리하고 벡터 데이터베이스 및 리트리버를 초기화 """
        pdf_databases = {}
        unique_paths = df['Source_path'].unique()
        
        for path in tqdm(unique_paths, desc="Processing PDFs"):
            normalized_path = self.normalize_path(path)
            full_path = os.path.normpath(os.path.join(self.config.base_directory, normalized_path.lstrip('./'))) if not os.path.isabs(normalized_path) else normalized_path
            
            pdf_title = os.path.splitext(os.path.basename(full_path))[0]
            print(f"Processing {pdf_title}...")
            
            # PDF 처리 및 벡터 DB 생성
            chunks = self.process_pdf(full_path)

            vector_store = FAISS.from_documents(chunks, embedding=self.config.embedding_obj)
            
            bm25_retriever = BM25Retriever.from_documents(chunks)
            faiss_retriever = vector_store.as_retriever()
    
            retriever = EnsembleRetriever(
                retrievers=[bm25_retriever, faiss_retriever],
                weights=[self.config.bm25_w, self.config.faiss_w],
                search_type="mmr",
            )

            # 결과 저장
            pdf_databases[pdf_title] = {
                'vector_store': vector_store,
                'retriever': retriever
            }
            
        return pdf_databases

In [None]:
# 질문 생성 및 답변 생성 프롬프트 템플릿
question_prompt_template = """ 
다음 문서에서 이해할 수 있는 주요 질문을 생성하세요:

문서 내용: 
{context}
"""
answer_prompt_template = """ 
당신은 재정 정책 전문가입니다. 주어진 정보를 바탕으로, 질문에 대해 정확히 답변하세요.

답변은 간결하고 명확하게 작성하며, 주어진 질문에 대해 핵심만 답변하세요.
단, 주어와 서술어를 사용하여 온전한 문장을 완성시켜 최대한 자연스럽게 답변해야 합니다.

자연스럽게 답변하기 위해 아래 조건을 지켜주세요. :
질문을 답변으로 생성하지 마세요.
아래 정보의 내용을 그대로 출력하지 마세요.
아래 정보의 내용을 참고해서 질문에 올바른 답변을 생성하세요.
답변하기 전에 한번 더 생각해보고 답변하세요.
기호나 공백을 제거하고 깔끔하게 답변하세요.

다음 정보를 바탕으로 질문에 답하세요 : 
{context}
"""
question_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", question_prompt_template),
        ("human", "{input}"),
    ]
)
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", answer_prompt_template),
        ("human", "{input}"),
    ]
)

In [None]:
class FISearch:
    def __init__(self, config: Config, question_prompt, answer_prompt) -> None:
        self.config = config
        self.question_prompt = question_prompt
        self.answer_prompt = answer_prompt

    def normalize_string(self, s):
        """ 유니코드 정규화 """
        return unicodedata.normalize('NFC', s)
    
    def format_docs(self, docs):
        # 문서의 페이지 내용을 이어붙여 반환합니다.
        return "\n\n".join(doc.page_content for doc in docs)

    def generate_questions(self, text):
        """ 문서 내용에서 질문 생성 """
        documents_chain = create_stuff_documents_chain(self.config.llm_obj, prompt=self.question_prompt)
        question_chain = create_retrieval_chain(None, documents_chain)
        result = question_chain.invoke({"input": text})
        questions = result.split("\n")  # Assume questions are separated by newlines
        return questions

    def generate_answers(self, questions, context):
        """ 질문 리스트에 대한 답변 생성 """
        results = []

        # 각 질문에 대해 처리
        for question in questions:
            print(f"질문 : {question}")
            answer_chain = create_stuff_documents_chain(self.config.llm_obj, prompt=self.answer_prompt)
            rag_chain = create_retrieval_chain(None, answer_chain)
            
            # 답변 생성
            result = rag_chain.invoke({"input": question, "context": context})
            print(f"답변: {result}\n================================================")
            
            # 결과 저장
            results.append({
                "Question": question,
                "Answer": result
            })

        return results



In [None]:
if __name__ == "__main__":
    # 초기 세팅
    config = Config(
        llm_name="upstage/SOLAR-10.7B-Instruct-v1.0", 
        embedding_name="intfloat/multilingual-e5-large",
        concept="financial_analysis"
    )
    
    # PDF 파일 리스트 정의
    df = pd.read_csv(config.train_csv_path)
    
    # 파이프라인 초기화
    dipipeline = DataIngestionPipeline(config)
    
    # 벡터 데이터베이스 초기화 - 메모리에 저장
    pdf_databases = dipipeline.init_vector_db(df)
    
    # 검색 및 질문 생성
    search_engine = FISearch(config, question_prompt=question_prompt, answer_prompt=answer_prompt)
    
    all_results = []
    
    for pdf_title, db_info in pdf_databases.items():
        context = search_engine.format_docs(db_info['vector_store'].get_documents())
        questions = search_engine.generate_questions(context)
        answers = search_engine.generate_answers(questions, context)
        
        for answer in answers:
            all_results.append({
                "Source": pdf_title,
                "Source_path": f'./train_source/{pdf_title}',
                "Question": answer["Question"],
                "Answer": answer["Answer"]
            })
    
    # 결과를 제출 양식에 맞게 저장
    submit_df = pd.DataFrame(all_results)
    
    # 결과를 CSV 파일로 저장
    submit_df.to_csv('augmentation_train_data.csv', encoding='UTF-8-sig', index=False)
    
    print("결과가 성공적으로 저장되었습니다.")