## document chunking

In [None]:
import json
from langchain_experimental.text_splitter import SemanticChunker
from sentence_transformers import SentenceTransformer

# embedding 모델
model = SentenceTransformer("jhgan/ko-sroberta-multitask")

class CustomEmbeddings:
    def embed_documents(self, documents):
        return model.encode(documents)

text_splitter = SemanticChunker(CustomEmbeddings())

output_file = '/data/ephemeral/home/data/document_chunk.jsonl'

with open('/data/ephemeral/home/data/documents.jsonl', 'r', encoding='utf-8') as infile, open(output_file, 'w', encoding='utf-8') as outfile:
    for line in infile:

        data = json.loads(line)
        docid = data['docid']
        src = data['src']
        content = data['content']

        chunks = text_splitter.split_text(content)

        for chunk in chunks:
            output_data = {
                "docid": docid,
                "src": src,
                "content": chunk
            }

            outfile.write(json.dumps(output_data, ensure_ascii=False) + '\n')

print(f"청킹 완료 :'{output_file}'")

## Elasticsearch

In [None]:
# 엘라스틱서치의 데몬 인스턴스 만들기
# Nori 설치 이전에 데몬을 생성하면 Nori가 바로 사용할 수 없음, 이때는 데몬 재실행 필요

import os
from elasticsearch import Elasticsearch, helpers
import numpy as np
import pandas as pd
import json
from subprocess import Popen, PIPE, STDOUT

es_server = Popen(['/data/ephemeral/home/elasticsearch-8.8.0/bin/elasticsearch'],
                  stdout=PIPE, stderr=STDOUT,
                  preexec_fn=lambda: os.setuid(1)  # as daemon
                 )

# 인스턴스를 로드하는 데 약간의 시간이 걸림
import time
time.sleep(30)


In [None]:
# 데몬이 구동되었는지 확인 (세개의 daemon process가 있어야 함)
!ps -ef | grep elasticsearch

In [None]:
username = 'elastic'

# 위 명령 실행 결과의 마지막 부분인 PASSWORD elastic 값으로 교체 필요
password = 'PASSWORD elastic'

es = Elasticsearch(['https://localhost:9200'], basic_auth=(username, password), ca_certs="/data/ephemeral/home/elasticsearch-8.8.0/config/certs/http_ca.crt")

resp = dict(es.info())

resp

In [None]:
import os
import json
from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer

In [None]:
# Sentence Transformer 모델 초기화
# model = SentenceTransformer("snunlp/KR-SBERT-V40K-klueNLI-augSTS")
model = SentenceTransformer("jhgan/ko-sroberta-multitask")

# SetntenceTransformer를 이용하여 임베딩 생성
def get_embedding(sentences):
    return model.encode(sentences)


# 주어진 문서의 리스트에서 배치 단위로 임베딩 생성
def get_embeddings_in_batches(docs, batch_size=100):
    batch_embeddings = []
    for i in range(0, len(docs), batch_size):
        batch = docs[i:i + batch_size]
        contents = [doc["content"] for doc in batch]
        embeddings = get_embedding(contents)
        batch_embeddings.extend(embeddings)
        print(f'batch {i}')
    return batch_embeddings


# 새로운 index 생성
def create_es_index(index, settings, mappings):
    # 인덱스가 이미 존재하는지 확인
    if es.indices.exists(index=index):
        # 인덱스가 이미 존재하면 설정을 새로운 것으로 갱신하기 위해 삭제
        es.indices.delete(index=index)
    # 지정된 설정으로 새로운 인덱스 생성
    es.indices.create(index=index, settings=settings, mappings=mappings)


# 지정된 인덱스 삭제
def delete_es_index(index):
    es.indices.delete(index=index)


# Elasticsearch 헬퍼 함수를 사용하여 대량 인덱싱 수행
def bulk_add(index, docs):
    # 대량 인덱싱 작업을 준비
    actions = [
        {
            '_index': index,
            '_source': doc
        }
        for doc in docs
    ]
    return helpers.bulk(es, actions)


# 역색인을 이용한 검색
def sparse_retrieve(query_str, size):
    query = {
        "match": {
            "content": {
                "query": query_str
            }
        }
    }
    return es.search(index="test", query=query, size=size, sort="_score")


# Vector 유사도를 이용한 검색
def dense_retrieve(query_str, size):
    # 벡터 유사도 검색에 사용할 쿼리 임베딩 가져오기
    query_embedding = get_embedding([query_str])[0]

    # KNN을 사용한 벡터 유사성 검색을 위한 매개변수 설정
    knn = {
        "field": "embeddings",
        "query_vector": query_embedding.tolist(),
        "k": size,
        "num_candidates": 100
    }

    # 지정된 인덱스에서 벡터 유사도 검색 수행
    return es.search(index="test", knn=knn)

In [None]:
# 역색인 + Vector 유사도 혼합
def hybrid_retrieve(query_str, size):
    # 벡터 유사도 검색에 사용할 쿼리 임베딩 가져오기
    query_embedding = get_embedding([query_str])[0]
    body = {
        "query": {
            "match": {
                "content": {
                    "query": query_str,
                    # "boost": 0.0030
                    "boost" : 0.0025 
                }
            }
        },
        "knn": {
            "field": "embeddings",
            "query_vector": query_embedding.tolist(),
            "k": 5,
            "num_candidates": 50,
            "boost": 1
        },
        "collapse":{
            "field": "docid.keyword",  # keyword 타입으로 설정된 필드 사용
            "inner_hits": {
                "name": "top_hits",
                "size": 1,
                "sort": [
                    {"_score": "desc"}
                    ]
                }
            },
        "size": size
    }
    # 지정된 인덱스에서 벡터 유사도 검색 수행
    return es.search(index="test", body=body)

In [None]:

# 색인을 위한 setting 설정
settings = {
    "analysis": {
        "analyzer": {
            "nori": {
                "type": "custom",
                "tokenizer": "nori_tokenizer",
                "decompound_mode": "mixed",
                "filter": ["nori_posfilter"]
            }
        },
        "filter": {
            "nori_posfilter": {
                "type": "nori_part_of_speech",
                # 어미, 조사, 구분자, 줄임표, 지정사, 보조 용언 등
                "stoptags": ["E", "J", "SC", "SE", "SF", "VCN", "VCP", "VX"]
            }
        }
    }
}

# 색인을 위한 mapping 설정 (역색인 필드, 임베딩 필드 모두 설정)
mappings = {
    "properties": {
        # 역색인 필드 
        "content": {"type": "text", "analyzer": "nori"}, 
        # 임베딩 필드 
        "embeddings": {
            "type": "dense_vector",
            "dims": 768,
            "index": True,
            "similarity": "l2_norm"
        }
    }
}

In [None]:
create_es_index("test", settings, mappings)

In [None]:
# Document Chunking content 필드에 대한 임베딩 생성
index_docs = []
with open("../data/document_chunk.jsonl") as f:
#with open("/data/ephemeral/home/data/merged_documents.jsonl") as f:
    docs = [json.loads(line) for line in f]
embeddings = get_embeddings_in_batches(docs)

""" 
docs[0] -> 
{'docid': '42508ee0-c543-4338-878e-d98c6babee66',
 'src': 'ko_mmlu__nutrition__test',
 'content': '건강한 사람이 에너지 균형을 평형 상태로 유지하는 것은 중요합니다. ....
 }
"""

In [None]:
# doc = docs[0], len(embedding) = 768
# doc_key = docid, src, content
for doc, embedding in zip(docs, embeddings):  
    doc['embeddings'] = embedding.tolist() 
    index_docs.append(doc)
    
# index_docs_key = docid, src, content, embeddings 

In [None]:
ret = bulk_add("test", index_docs)

In [None]:
ret

In [None]:
test_query = "네트웍 계층 중 IP에 대해 설명해줘."
# 역색인을 사용하는 검색 예제
search_result_retrieve = hybrid_retrieve(test_query, 10)

# 결과 출력 테스트
for rst in search_result_retrieve['hits']['hits']:
    print('score:', rst['_score'], 'docid: ',rst["_source"]["docid"], 'source:', rst['_source']["content"])

In [None]:
# RAG를 구현
from openai import OpenAI
import traceback

# OpenAI API 키를 환경변수에 설정
os.environ["OPENAI_API_KEY"] = "OPENAI_API_KEY"

client = OpenAI()
# 사용할 모델
llm_model = "gpt-4o"

# RAG 구현에 필요한 Question Answering을 위한 LLM  프롬프트
persona_qa = """
## Role: 과학 상식 전문가

## Instructions
- 사용자의 이전 메시지 정보 및 주어진 Reference 정보를 활용하여 간결하게 답변을 생성한다.
- 주어진 검색 결과 정보로 대답할 수 없는 경우는 정보가 부족해서 답을 할 수 없다고 대답한다.
- 한국어로 답변을 생성한다.
"""

# RAG 구현에 필요한 질의 분석 및 검색 이외의 일반 질의 대응을 위한 LLM 프롬프트
persona_function_calling = """
## Role: 과학 상식 전문가

## Instruction
- 사용자가 대화를 통해 지식에 관한 주제로 질문하면 반드시 search api를 호출할 수 있어야 한다.
- 지식과 관련되지 않은 나머지 대화 메시지에는 함수 호출 없이 적절한 대답을 생성한다.
"""



In [None]:
# Function calling에 사용할 함수 정의
tools = [
    {
        "type": "function",
        "function": {
            "name": "search",
            "description": "search relevant documents",
            "parameters": {
                "properties": {
                    "long_question": {
                        "type": "string",
                        # "description": "Final query suitable for use in search from the user messages history"
                        # "description": "Keywords suitable for use in search engine."
                        # "description": "User's question in Korean, including all the keywords in the user messages"
                        "description": "User's question in Korean. Full message if the user message is single-turn."}
                        #"description" : "User's question in Korean. Full message if the user message is single-turn. Additionally, categorize the question and infer user's intent."}
                },
                "required": ["long_question"],
                "type": "object"
            }
        }
    },
]

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch
import numpy as np
import json

# Ranker 모델 로드
model_path = 'Dongjin-kr/ko-reranker'
tokenizer = AutoTokenizer.from_pretrained(model_path)
model_reranker = AutoModelForSequenceClassification.from_pretrained(model_path)
model_reranker.eval()

def exp_normalize(x):
    b = x.max()
    y = np.exp(x - b)
    return y / y.sum()

In [None]:
# 상위 5개 

def rerank_with_model(query, retrieved_docs):
    # Query와 검색된 문서를 쌍으로 만듬
    pairs = [[query, doc["_source"]["content"]] for doc in retrieved_docs]
    
    # Ranker 모델을 통해 점수 계산
    with torch.no_grad():
        inputs = tokenizer(pairs, padding=True, truncation=True, return_tensors='pt', max_length=512)
        scores = model_reranker(**inputs, return_dict=True).logits.view(-1, ).float()
        normalized_scores = exp_normalize(scores.numpy())

    # 점수에 따라 문서를 재정렬하고 상위 5개의 문서만 남김
    reranked_docs = sorted(zip(retrieved_docs, normalized_scores), key=lambda x: x[1], reverse=True)[:5]
    return reranked_docs

In [None]:
# LLM과 검색엔진을 활용한 RAG 구현
def answer_question(messages):
    # 함수 출력 초기화
    response = {"standalone_query": "", "topk": [], "references": [], "answer": ""}

    # 질의 분석 및 검색 이외의 질의 대응을 위한 LLM 활용
    msg = [{"role": "system", "content": persona_function_calling}] + messages
    try:
        result = client.chat.completions.create(
            model=llm_model,
            messages=msg,
            tools=tools,
            #tool_choice={"type": "function", "function": {"name": "search"}},
            temperature=0,
            seed=1,
            timeout=10
        )
    except Exception as e:
        traceback.print_exc()
        return response

    # 검색이 필요한 경우 검색 호출 후 결과를 활용하여 답변 생성
    if result.choices[0].message.tool_calls:
        tool_call = result.choices[0].message.tool_calls[0]
        function_args = json.loads(tool_call.function.arguments)
        standalone_query = function_args.get("long_question")

        # 검색 결과 추출
        # search_result = hybrid_retrieve(standalone_query, 100)
        search_result = hybrid_retrieve(standalone_query, 10)
        # 검색 결과를 Ranker 모델로 재정렬
        reranked_docs = rerank_with_model(standalone_query, search_result['hits']['hits'])

        response["standalone_query"] = standalone_query
        retrieved_context = []
        
        # 중복을 방지하기 위한 set
        #seen_docids = set()
        
        for doc, score in reranked_docs:
            #docid = doc["_source"]["docid"]
            # docid가 이미 추가된 경우는 생략
            #if docid not in seen_docids:
            retrieved_context.append(doc["_source"]["content"])
            response["topk"].append(doc["_source"]["docid"])
            response["references"].append({"score": float(score), "content": doc["_source"]["content"]})
                #seen_docids.add(docid)  # 이미 처리한 docid를 set에 추가
                
        # 재정렬된 문서들을 LLM에 다시 넣어 답변 생성
        # messages.append({"role": "assistant", "content": json.dumps(retrieved_context)})
        # msg = [{"role": "system", "content": persona_qa}] + messages
        # try:
        #     qaresult = client.chat.completions.create(
        #             model=llm_model,
        #             messages=msg,
        #             temperature=0,
        #             seed=1,
        #             timeout=30
        #         )
        # except Exception as e:
        #     traceback.print_exc()
        #     return response
        # response["answer"] = qaresult.choices[0].message.content

    # 검색이 필요하지 않은 경우 바로 답변 생성
    else:
        response["answer"] = result.choices[0].message.content

    return response


In [None]:
# 평가를 위한 파일을 읽어서 각 평가 데이터에 대해서 결과 추출후 파일에 저장
def eval_rag(eval_filename, output_filename):
    with open(eval_filename) as f, open(output_filename, "w") as of:
        idx = 0
        for line in f:
            j = json.loads(line)
            print(f'Test {idx}\nQuestion: {j["msg"]}')
            response = answer_question(j["msg"])
            print(f'Answer: {response["answer"]}\n')

            # 대회 score 계산은 topk 정보를 사용, answer 정보는 LLM을 통한 자동평가시 활용
            output = {"eval_id": j["eval_id"], "standalone_query": response["standalone_query"], "topk": response["topk"], "answer": response["answer"], "references": response["references"]}
            
            of.write(f'{json.dumps(output, ensure_ascii=False)}\n')
            idx += 1

# 평가 데이터에 대해서 결과 생성 - 파일 포맷은 jsonl이지만 파일명은 csv 사용
eval_rag("../data/eval.jsonl", "../submission/[16.3]submission.csv")
