In [None]:
# 출력 파일 생성 파트1 (답변 중간파일 생성 all_data.pkl)
# llm 변경후 사용.

In [None]:
# 1. 데이터 파일 읽기
import pandas as pd
from dotenv import load_dotenv

load_dotenv()

df = pd.read_csv("./data/summary_one.csv")
df.tail()


In [None]:
# 2. 정보 확인
df.info()

In [None]:
# 3. 디비 생성 클래스

import faiss
import torch
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.docstore.in_memory import InMemoryDocstore

from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams

def create_empty_faiss(embeddings, use_cosine=True):
    """빈 FAISS 벡터스토어 생성"""
    dimension = len(embeddings.embed_query("test"))
    print(f"dimension={dimension}")
    if use_cosine:
        index = faiss.IndexFlatIP(dimension)
        normalize = True
    else:
        index = faiss.IndexFlatL2(dimension)
        normalize = False
    
    return FAISS(
        embedding_function=embeddings,
        index=index,
        docstore=InMemoryDocstore(),
        index_to_docstore_id={},
        normalize_L2=normalize
    )

def create_empty_qdrant(embeddings):

    client = QdrantClient(":memory:")

    client.create_collection(
        collection_name="demo_collection",
        vectors_config=VectorParams(size=3072, distance=Distance.COSINE),
    )

    return QdrantVectorStore(
        client=client,
        collection_name="demo_collection",
        embedding=embeddings,
    )  

def get_embeddinggemma_300m(opt):
    return HuggingFaceEmbeddings(
        model_name="google/embeddinggemma-300m",
        encode_kwargs={"prompt_name": opt}
    )

def get_qwen3_embedding_4b():
    return HuggingFaceEmbeddings(
        model_name="Qwen/Qwen3-Embedding-4B",
        model_kwargs={
            "device": "cuda"
        },
        encode_kwargs={
            "normalize_embeddings": True
        }
    )

def get_bge_m3():
    return HuggingFaceEmbeddings(
        model_name="BAAI/bge-m3",
        #model_name="Qwen/Qwen3-Embedding-0.6B",
        model_kwargs={"device": "cuda"} ,
        encode_kwargs={"normalize_embeddings": True})

class ScienceRAG:
    def __init__(self):

        self.embeddings = get_bge_m3()
        
        self.vectorstore = create_empty_faiss(self.embeddings, use_cosine=True) 
        
        # 요약 체인
        #self.summary_chain = science_summary_chain
        #self.summary_chain = two_step_chain
    
    def add_documents(self, df):
        """문서를 요약하여 벡터DB에 저장"""
        
        texts = df['summary'].tolist()
        metadatas = [
            {
                'docid': row['docid'],
                'content': row['content']
            } 
            for _, row in df.iterrows()
        ]
        ids = df['docid'].tolist()


        # 2. 검색용 요약을 벡터DB에 저장
        self.vectorstore.add_texts(
            texts,
            metadatas=metadatas,
            ids = ids
        )
        
       
    def search(self, query: str, k: int = 3):
        """요약된 내용으로 검색"""
        results = self.vectorstore.similarity_search(query, k=k)
        return results

In [None]:
# 4. 디비 생성 

science_rag = ScienceRAG()

science_rag.add_documents(df)

In [None]:
# test
query = "공에 힘이 주어졌을 때 공이 어떻게 움직이는지 과학적으로 설명해줘."
docs = science_rag.vectorstore.similarity_search_with_score(query, k=3)
docs

In [None]:
# 5. 질의문 생성 체인

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama import ChatOllama

llm = ChatOllama(model="alibayram/Qwen3-30B-A3B-Instruct-2507")

# 프롬프트 
convertFormat = """
당신은 문자열 포맷 마이그레이션 전문가 입니다. <message> 에서 content 내용만 문자열로 출력합니다. 만일 content가 여러개가 있으면 전체적인 문맥을 파악하여 질문을 만들어서 출력합니다.

<message>
{message}
</message>

[example]

    <message>{{"role": "user", "content": "피를 맑게 하고 몸 속의 노폐물을 없애는 역할을 하는 기관은?"}}</message> 
    output:피를 맑게 하고 몸 속의 노폐물을 없애는 역할을 하는 기관은? 

    <message>{{"role": "user", "content": "이란 콘트라 사건이 뭐야"}}, {{"role": "assistant", "content": "이란-콘트라 사건은 로널드 레이건 집권기인 1986년에 레이건 행정부와 CIA가 적성국이었던 이란에게 무기를 몰래 수출한 대금으로 니카라과의 우익 성향 반군 콘트라를 지원하면서 동시에 반군으로부터 마약을 사들인 후 미국에 판매하다가 발각되어 큰 파장을 일으킨 사건입니다."}}, {{"role": "user", "content": "이 사건이 미국 정치에 미친 영향은?"}}</message>
    output:1986년에 발생한 이란 콘트라 사건이 미국 정치에 미친 영향은? 


output:
[Your output here - NOTHING ELSE]
"""

# 프롬프트 객체 생성
convertFormat_prompt = PromptTemplate(
    input_variables=["message"],
    template=convertFormat
)

# 출력 파서 (문자열)
output_parser = StrOutputParser()

# LCEL 체인 구성
convertFormat_chain = (
    convertFormat_prompt 
    | llm 
    | output_parser
)

In [None]:
# 6. 과학상식 체크 체인

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama import ChatOllama


# 프롬프트 
selectYn = """
당신은 세상의 모든 상식과 지식에 정통한 전문가 입니다. 만약 <message> 가 세상의 상식 또는 지식에 관련된 질문이라면  Y 아니라면 N으로 답해주세요. 너에 관하여 물어보는건 세상의 상식 또는 지식에 관련된 질문이 아니야!

<message>
{message}
</message>

당신은 반드시 Y 뜨는 N으로 답해야 합니다.
Answer:
"""

# 프롬프트 객체 생성
selectYn_prompt = PromptTemplate(
    input_variables=["message"],
    template=selectYn
)

# 출력 파서 (문자열)
output_parser = StrOutputParser()

# LCEL 체인 구성
selectYn_chain = (
    selectYn_prompt 
    | llm 
    | output_parser
)

In [None]:
# 7. 답변 생성 체인

from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama import ChatOllama


# 프롬프트 
answer = """
당신은 과학 상식 전문가 입니다. <message> 의 질문에 대해서 주어진 <reference> 정보를 활용하여 간결하게 답변을 생성합니다.

    - 주어진 검색 결과 정보로 대답할 수 없는 경우는 정보가 부족해서 답을 할 수 없다고 대답합니다.
    - 한국어로 답변을 생성합니다..

<message>
{message}
</message>

<reference>
{reference}
</reference>

Answer:
"""

# 프롬프트 객체 생성
answer_prompt = PromptTemplate(
    input_variables=["message"],
    template=answer
)

# 출력 파서 (문자열)
output_parser = StrOutputParser()

# LCEL 체인 구성
answer_chain = (
    answer_prompt 
    | llm 
    | output_parser
)

In [None]:
# 8. 데이터 검색 함수

def query_db(message, k=30):

    docs = science_rag.vectorstore.similarity_search_with_relevance_scores(message, k=k)

    content = []
    docid= []
    reference = []
    
    for doc, score in docs:
        
        content.append(doc.metadata['content'])
        docid.append(doc.metadata['docid'])
        reference.append({"score": float(score), "docid": doc.metadata['docid'], "content": doc.metadata['content']})

    return content[:3], docid, reference

In [None]:
# test

message = convertFormat_chain.invoke({"message": '{"role": "user", "content": "python 공부중인데..."}, {"role": "assistant", "content": "네 꼭 필요한 언어라서 공부해 두면 좋습니다."}, {"role": "user", "content": "숫자 계산을 위한 operator 우선순위에 대해 알려줘."}'})
print(message)
result = selectYn_chain.invoke({"message": "니가 대답을 잘해줘서 너무 신나!"})
result

In [None]:
# 9. 메인 로직

import torch
import gc
import json
from FlagEmbedding import FlagReranker
from FlagEmbedding import FlagLLMReranker

# 답변 데이터 생성
def answer_question(messages):
    # 함수 출력 초기화
    
    message = convertFormat_chain.invoke({"message": messages})
    result = selectYn_chain.invoke({"message": message})

    context = {"message":message}
    response =  context | {"standalone_query": "", "topk": [], "references": [], "answer": ""}
    
    if result == "Y":
        context["reference"], response["topk"], response["references"] = query_db(message)
    else:
        context["reference"] = ""
        response["topk"] = []
        response["references"] = []
    
    response["answer"] = answer_chain.invoke(context)

    return response

def reranking(data):

    reranker = FlagLLMReranker('BAAI/bge-reranker-v2-gemma', use_fp16=True)

    message = data["message"]
    references = data["references"]
    pairs = [[message, doc['content']] for doc in references]
    rerank_scores = reranker.compute_score(pairs)

    print("*"*50)
    print(pairs)
    print(references)
    print(rerank_scores)


    results = list(zip(references, rerank_scores))
    results.sort(key=lambda x: x[1], reverse=True) 
    final_result = results[:3]
    print(f"final_result={final_result}")

    data['topk'] = [ doc['docid'] for doc, socre in final_result]
    data["references"] = [ {"score": float(score), "content": doc['content']} for doc, score in final_result ]

    return data

def eval_rag(eval_filename):
    
    all_data = []

    with open(eval_filename) as f:
        idx = 0

        for line in f:

            #if idx == 20: break
            
            j = json.loads(line)
            print(f'Test {idx}\nQuestion: {j["msg"]}')
            response = answer_question(j["msg"])
            print(f'Answer: {response["answer"]}\n')

            # 대회 score 계산은 topk 정보를 사용, answer 정보는 LLM을 통한 자동평가시 활용
            output = {"eval_id": j["eval_id"], "message": response["message"], "standalone_query": response["standalone_query"], "topk": response["topk"], "answer": response["answer"], "references": response["references"]}
            #print(output)
            all_data.append(output)
            idx += 1

        
    return all_data    

In [None]:
""" def eval_rag2(all_data, output_filename):

    with open(output_filename, "w") as of:
        idx = 0
        for data in all_data:
            print(f"data={data}")
            if len(data["topk"]) > 0:
                response = reranking(data)

            print(f'response: {response}\n')

            # 대회 score 계산은 topk 정보를 사용, answer 정보는 LLM을 통한 자동평가시 활용
            output = {"eval_id": response["eval_id"], "standalone_query": response["standalone_query"], "topk": response["topk"], "answer": response["answer"], "references": response["references"]}
            print(output)
            of.write(f'{json.dumps(output, ensure_ascii=False)}\n')
            idx += 1     """

In [None]:
# 10. 메인 

# 평가 데이터에 대해서 결과 생성
all_data = eval_rag("./data/eval.jsonl")
all_data

In [None]:
# 올라마 프로세스 종료
import subprocess

subprocess.run(['pkill', '-9', '-f', 'ollama'])

# GPU 메모리 확인
subprocess.run(['nvidia-smi'])


In [None]:
# 리스트 저장
import pickle

with open('./data/all_data.pkl', 'wb') as f:
    pickle.dump(all_data, f)

In [None]:
#eval_rag2(all_data, "./data/sample_submission.csv")