In [None]:
# 필요한 라이브러리 설치
!pip install -qU transformers sentence-transformers datasets bitsandbytes
!pip install pymupdf4llm==0.0.10
!pip install orjson==3.9.15
!pip install -qU langchain chromadb langchain-chroma langchain_community
!pip install langchain-huggingface
!pip install peft # LoRA 튜닝을 위한 라이브러리
!pip install accelerate
!pip install trl

In [None]:
# 필요한 라이브러리 임포트
import os
import re
import unicodedata
import warnings
import numpy as np
import pandas as pd
import torch
import pymupdf4llm
from datetime import datetime
from tqdm.notebook import tqdm
from transformers import AutoTokenizer, LlamaForCausalLM, BitsAndBytesConfig, pipeline
from peft import PeftModel
from datasets import Dataset
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings, HuggingFacePipeline
from langchain_chroma import Chroma
from langchain.schema.runnable import RunnablePassthrough
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 경고 메시지 무시
warnings.filterwarnings("ignore")

In [None]:
# Google Drive를 마운트하여 파일에 접근할 수 있도록 설정
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# PDF 파일을 처리하여 문서 청크로 분할하는 함수
def process_pdf(file_path, chunk_size=512, chunk_overlap=32):
    """
    PDF 파일을 읽어와서 마크다운으로 변환한 후,
    주어진 청크 크기와 오버랩을 기준으로 문서 청크로 분할합니다.
    """
    # PDF 파일을 마크다운으로 변환
    pdf = pymupdf4llm.to_markdown(file_path)
    pdf = re.sub(r'(?<!\n)\n\n(?!\n)', '\n', pdf)  # 약간의 전처리
    doc = [Document(page_content=pdf, metadata={"source": file_path})]
    
    # PDF를 청크로 분할
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    chunks = splitter.split_documents(doc)
    return chunks

# 문서를 벡터 데이터베이스로 변환하는 함수
def create_vector_db(document, embeddings, file_path):
    """
    주어진 문서를 벡터 데이터베이스로 변환하여 저장합니다.
    """
    db = Chroma.from_documents(documents=document, embedding=embeddings, persist_directory=file_path)
    return db

# 모든 PDF 파일을 처리하고 벡터 데이터베이스를 생성하는 함수
def create_pdf_databases(base_directory, embeddings, chunk_size, chunk_overlap):
    """
    주어진 디렉토리의 모든 PDF 파일을 처리하여 벡터 데이터베이스를 생성합니다.
    """
    pdf_databases = {}
    for p in tqdm(os.listdir(base_directory), desc="Creating Vectordb for each PDF"):
        print(p)
        doc = process_pdf(base_directory + p, chunk_size, chunk_overlap)

        # 벡터 데이터베이스 저장 경로 설정
        if 'train_source' in base_directory:
            file_path = base_directory[:base_directory.find('train_source/')]
        elif 'test_source' in base_directory:
            file_path = base_directory[:base_directory.find('test_source/')]

        vectordb_path = file_path + 'pymupdf4llm/' + p[:p.find('.pdf')]

        # 기존 벡터 데이터베이스가 있는지 확인하고 로드
        if os.path.exists(vectordb_path):
            print(f"Vector DB already exists at {vectordb_path}, loading existing DB...")
            vectordb = Chroma(persist_directory=vectordb_path, embedding_function=embeddings)
        else:
            print(f"Creating new Vector DB at {vectordb_path}...")
            vectordb = create_vector_db(doc, embeddings, vectordb_path)

        pdf_databases[unicodedata.normalize('NFC', p.split('.pdf')[0])] = {
            'db': vectordb,
            'doc': doc
        }

    return pdf_databases

In [None]:
# Embedding 모델 설정
model_name = "BAAI/bge-m3"
model_kwargs = {"device": "cuda"}
encode_kwargs = {"normalize_embeddings": True}

bge_embeddings = HuggingFaceEmbeddings(
    model_name=model_name,
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)

# PDF 데이터베이스 생성
base_directory = '/content/drive/MyDrive/재정정보 AI 검색 알고리즘 경진대회/test_source/'
pdf_databases = create_pdf_databases(base_directory, bge_embeddings, chunk_size=500, chunk_overlap=50)

# 테스트 데이터 로드
test_df = pd.read_csv('/content/drive/MyDrive/재정정보 AI 검색 알고리즘 경진대회/test.csv')

In [None]:
# LLM 파이프라인 설정 함수
def setup_llm_pipeline():
    """
    LoRA 튜닝된 LLM 모델과 함께 text-generation 파이프라인을 설정합니다.
    """
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
    )

    model_id = "I-BRICKS/Cerebro_BM_solar_v01"
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.use_default_system_prompt = False

    model = LlamaForCausalLM.from_pretrained(
        model_id,
        quantization_config=bnb_config,
        device_map="auto",
        trust_remote_code=True
    )

    adapter_name = "/content/drive/MyDrive/재정정보 AI 검색 알고리즘 경진대회/pymupdf4llm/finetune/checkpoint-248"
    model = PeftModel.from_pretrained(model, adapter_name, is_trainable=False)

    text_generation_pipeline = pipeline(
        model=model,
        tokenizer=tokenizer,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.eos_token_id,
        task="text-generation",
        return_full_text=False,
        max_new_tokens=512,
        repetition_penalty=1.0,
        do_sample=False
    )

    hf = HuggingFacePipeline(pipeline=text_generation_pipeline)
    return hf

hf = setup_llm_pipeline()
reranker_model = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")

# 문서 내용을 포맷팅하는 함수
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [None]:
# 테스트 질문에 대한 답변을 생성하는 과정
sub_list = []
for _, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Answering Questions"):
    retriever = pdf_databases[unicodedata.normalize('NFC', row['Source'])]['db'].as_retriever(search_kwargs={"k":5})

    # 질문에 대한 답변을 생성하기 위해 압축기와 검색기를 설정합니다.
    compressor = HuggingFaceCrossEncoder(model=reranker_model, top_n=2)
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=compressor, base_retriever=retriever
    )

    # 질문에 대한 답변을 생성하기 위한 프롬프트 템플릿 설정
    template = """
    다음 정보를 바탕으로 질문에 답하세요:
    {context}

    질문: {question}

    주어진 질문에만 답변하세요. 문장으로 답변해주세요. 답변할 때 질문의 주어를 써주세요.
    답변:
    """

    prompt = ChatPromptTemplate.from_template(template)

    # RAG 체인 설정
    rag_chain = (
        {
            "context": compression_retriever | format_docs,
            "question": RunnablePassthrough(),
        }
        | prompt
        | hf
        | StrOutputParser()
    )

    question = row['Question']
    print('------------------------')
    print('질문: ', question)
    print('------------------------')
    answer = rag_chain.invoke(question).strip()
    print('답변: ', answer)

    sub_list.append(answer)

In [None]:
# 제출 파일 생성
submission_df = pd.DataFrame({
    'Question': test_df['Question'],
    'Answer': sub_list
})
submission_df.to_csv('/content/drive/MyDrive/재정정보 AI 검색 알고리즘 경진대회/submission.csv', index=False)

# 현재 날짜와 시간으로 파일 이름 생성
now = datetime.now()
formatted_time = now.strftime("%y%m%d_%H%M%S")
file_name = f'submission_{formatted_time}.csv'

# 파일을 UTF-8 인코딩으로 저장
submission_df.to_csv(file_name, index=False, encoding='UTF-8')

print(f"파일이 저장되었습니다: {file_name}")