In [1]:
import os  # 운영체제와 상호작용하기 위한 모듈, 파일 및 디렉토리 작업을 수행할 수 있음

import json  # JSON 데이터 처리 모듈, JSON 형식의 데이터를 파이썬 객체로 변환하거나 그 반대 작업을 수행할 수 있음

from elasticsearch import Elasticsearch, helpers  # Elasticsearch 클라이언트와 헬퍼 함수 모듈 가져오기
# Elasticsearch: 분산형 검색 및 분석 엔진과 상호작용하기 위한 클라이언트
# helpers: Elasticsearch와의 상호작용을 간소화하기 위한 헬퍼 함수, 예: 대량 데이터 인덱싱을 위한 bulk() 함수 포함

from sentence_transformers import SentenceTransformer  # 문장 임베딩을 위한 Transformer 모델 가져오기
# SentenceTransformer: 사전 훈련된 Transformer 모델을 사용하여 문장을 고차원 벡터로 변환하는 기능 제공
# 문서 유사도 계산, 검색 시스템, 문장 분류 등 다양한 NLP 작업에 활용 가능
import google.generativeai as genai
import json
from openai import OpenAI
import numpy as np

In [4]:
# # Sentence Transformer 모델 초기화 (한국어 임베딩 생성 가능한 어떤 모델도 가능)
# model = SentenceTransformer("snunlp/KR-SBERT-V40K-klueNLI-augSTS")


  return self.fget.__get__(instance, owner)()


In [2]:
# SetntenceTransformer를 이용하여 임베딩 생성
def get_embedding(sentences):
    client = OpenAI(
    base_url="https://api.upstage.ai/v1/solar"
    )
    batch_embeddings = []
    query_result = client.embeddings.create(
    model = "embedding-query",
    input = sentences
        )
    for query_embedding in query_result.data:
        batch_embeddings.append(query_embedding.embedding)
    return np.array(batch_embeddings).astype('float32')

# 주어진 문서의 리스트에서 배치 단위로 임베딩 생성
def get_embeddings_in_batches(docs, batch_size=100):
    batch_embeddings = []
    for i in range(0, len(docs), batch_size):
        batch = docs[i:i + batch_size]
        contents = [doc["content"] for doc in batch]
        embeddings = get_embedding(contents)
        batch_embeddings.extend(embeddings)
    return batch_embeddings


In [3]:
es_username = "elastic"
es_password = ""

# Elasticsearch client 생성
es = Elasticsearch(['https://localhost:9200'], basic_auth=(es_username, es_password), ca_certs="/home/code/elasticsearch-8.15.2/config/certs/http_ca.crt")

In [4]:
# 색인을 위한 설정 설정
settings = {
    "analysis": {
        "analyzer": {
            "nori": {  # 사용자 정의 분석기 이름
                "type": "custom",  # 분석기의 유형
                "tokenizer": "nori_tokenizer",  # 사용할 토크나이저
                "decompound_mode": "mixed",  # 복합어 분해 모드 설정 (mixed는 복합어를 분해하고, 그렇지 않은 경우는 그대로 유지)
                "filter": ["nori_posfilter"]  # 적용할 필터 리스트
            }
        },
        "filter": {
            "nori_posfilter": {  # 사용자 정의 필터 이름
                "type": "nori_part_of_speech",  # 품사 기반 필터 유형
                # 어미, 조사, 구분자, 줄임표, 지정사, 보조 용언 등을 제거하기 위한 설정
                "stoptags": [
                    "E",  # 어미
                    "J",  # 조사
                    "SC",  # 구분자
                    "SE",  # 줄임표
                    "SF",  # 지정사
                    "VCN",  # 연결 용언
                    "VCP",  # 보조 용언
                    "VX"   # 보조 용언
                ]
            }
        }
    }
}


In [5]:
# 색인을 위한 매핑 설정 (역색인 필드, 임베딩 필드 모두 설정)
mappings = {
    "properties": {
        "content": {  # 텍스트 필드 설정
            "type": "text",  # 필드 타입을 텍스트로 설정
            "analyzer": "nori"  # nori 분석기를 사용하여 텍스트 분석
        },
        "embeddings": {  # 임베딩 필드 설정
            "type": "dense_vector",  # 필드 타입을 밀집 벡터로 설정
            "dims": 4096,  # 벡터 차원 수 (예: BERT와 같은 모델에서 생성된 임베딩 차원)
            "index": True,  # 색인 가능 여부 설정 (True로 설정하면 검색이 가능)
              # 유사도 측정 방법 설정 (L2 정규화를 사용하여 유사도 계산)
        }
    }
}

In [6]:
# 새로운 index 생성
def create_es_index(index, settings, mappings):
    # 인덱스가 이미 존재하는지 확인
    if es.indices.exists(index=index):
        # 인덱스가 이미 존재하면 설정을 새로운 것으로 갱신하기 위해 삭제
        es.indices.delete(index=index)
    # 지정된 설정으로 새로운 인덱스 생성
    es.indices.create(index=index, settings=settings, mappings=mappings)


In [7]:

# 지정된 인덱스 삭제
def delete_es_index(index):
    es.indices.delete(index=index)

In [8]:
# Elasticsearch 헬퍼 함수를 사용하여 대량 인덱싱 수행
def bulk_add(index, docs):
    # 대량 인덱싱 작업을 준비
    actions = [
        {
            '_index': index,
            '_source': doc
        }
        for doc in docs
    ]
    return helpers.bulk(es, actions)


In [9]:
# 지정된 인덱스 삭제
def delete_es_index(index):
    es.indices.delete(index=index)

In [11]:
# 역색인을 이용한 검색
def sparse_retrieve(query_str, size):
    query = {
        "match": {
            "content": {
                "query": query_str
            }
        }
    }
    return es.search(index="test", query=query, size=size, sort="_score")


def dense_retrieve(query_str, size):
    # 벡터 유사도 검색에 사용할 쿼리 임베딩 가져오기
    query_embedding = get_embedding([query_str])[0]

    # KNN을 사용한 벡터 유사성 검색을 위한 매개변수 설정
    knn = {
        "field": "embeddings",
        "query_vector": query_embedding.tolist(),
        "k": size,
        "num_candidates": 100
    }
    body={
        "size" : size,
        "query": {
            "script_score": {
                "query": {
                    "match_all": {}  # 모든 문서를 대상으로 검색
                },
                "script": {
                    "source": "cosineSimilarity(params.query_vector, 'embeddings') + 1.0",
                    "params": {
                        "k" : size,
                        "query_vector": query_embedding.tolist()
                    }
                }
            }
        }
    }

    # 지정된 인덱스에서 벡터 유사도 검색 수행
    return es.search(index="test", body=body)

In [12]:
create_es_index("test", settings, mappings)

# 문서의 content 필드에 대한 임베딩 생성
index_docs = []
with open("/home/data/documents.jsonl") as f:
    docs = [json.loads(line) for line in f]
embeddings = get_embeddings_in_batches(docs)
                
# 생성한 임베딩을 색인할 필드로 추가
for doc, embedding in zip(docs, embeddings):
    doc["embeddings"] = embedding.tolist()
    index_docs.append(doc)

# 'test' 인덱스에 대량 문서 추가
ret = bulk_add("test", index_docs)


In [38]:
# RAG 구현에 필요한 질의 분석 및 검색 이외의 일반 질의 대응을 위한 LLM 프롬프트
persona_function_calling = """
## Role: 검색을 사용할 것인지, 대화를 생성할 것인 판단하는 역할 

## Instruction
* **주요 기능:** 사용자의 질문을 분석하여 관련된 정보를 검색하고 지식에 관련된 질문은 searchapi를 사용하여 검색을 하고, 일상 질문은 바로 생성합니다.
* **검색 범위:** 인문학, 사회과학, 자연과학, 공학 등 모든 분야를 포괄합니다.
* **검색 기준:**
    * **관련성:** 사용자의 질문과 가장 관련성이 높은 정보를 우선적으로 제공합니다.
    * **판단기준** 성적인 암시인 경우 같은 부적절한 것 같은 경우도 스스로 판단하지 말고, 검색이 가능하면 검색을 한다. "우울하다" 같이 감정을 나눈다면 
    적절한 대답을 한국말로 생성한다.
* **응답 형식:**
    * **요약:** 검색 결과를 요약하여 간결하게 제공합니다.
"""

In [40]:
persona_qa = """
## Role: function_calling으로 생성된 답변을 최종적으로 판단하여 답을 한국말로 생성한다.

## Instruction
* **주요 기능:** 사용자와 자연스러운 대화를 이어가고, 다양한 주제에 대한 질문에 답변합니다.
* **지식 기반:** `function_calling` 모듈을 통해 얻은 정보를 활용하여 정확한 답변을 제공합니다.
* *감정 이해:** 사용자의 감정을 이해하고, 적절한 공감과 위로를 표현합니다.
"""


In [23]:
tools = {
  "name": "search",
  "description": "관련 문서를 검색합니다. 캐주얼한 대화를 제외한 모든 질문이나 요청 시 이 함수를 호출하세요. 예: '지구 자전의 원인은?', '세종대왕에 대해 알려줘.'",
  "parameters": {
    "type_": "OBJECT",
    "properties": {
      "standalone_query": {
        "type_": "STRING",
        "description": "사용자 메시지 기록을 기반으로 문서 검색에 적합한 최종 쿼리. 항상 한국어로 작성하세요."
      }
    },
    "required": ["standalone_query"]
  }
}

In [17]:
from openai import OpenAI
import traceback

llm_model = genai.GenerativeModel('gemini-1.5-pro', tools=tools)
genai.configure(api_key="")

In [19]:
import json

def answer_question(eval_filename, output_filename):
    count = 0  
    with open(eval_filename) as f, open(output_filename, "w") as of:
        for line in f:
            response = {"standalone_query": "", "topk": [], "references": [], "answer": ""}
            j = json.loads(line)
            last_j= j['msg'].pop()['parts']
            chat = llm_model.start_chat(
                history=j['msg']
            )
            try:
                result = chat.send_message(last_j+persona_function_calling)
            except Exception as e:
                traceback.print_exc()
                return response

            if result.candidates[0].content.parts[0].function_call:
                function_call = result.candidates[0].content.parts[0].function_call
                standalone_query = function_call.args["standalone_query"]
                search_result_retrieve = dense_retrieve(standalone_query+"?", 3)
                response["standalone_query"] = standalone_query
                retrieve_context = []
                for i, rst in enumerate(search_result_retrieve['hits']['hits']):
                    retrieve_context.append(rst["_source"]["content"])
                    response["topk"].append(rst["_source"]["docid"])
                    response["references"].append({"score": rst["_score"], "content": rst["_source"]["content"]})
                    content = json.dumps(retrieve_context)
                    j['msg'].append({"role": "model", "parts": content})
                    last_j= j['msg'].pop()['parts']
                try:
                    qresult = chat.send_message(last_j+persona_qa)
                except Exception as e:
                    traceback.print_exc()
                    response["answer"] = qresult.candidates[0].content.parts[0].text
                # print(response)
        
            else:
                response["answer"] = result.candidates[0].content.parts[0].text
            count += 1  # 카운터 증가
            output = {"eval_id": j["eval_id"], "standalone_query": response["standalone_query"], "topk": response["topk"], "answer": response["answer"], "references": response["references"]}
            of.write(f'{json.dumps(output, ensure_ascii=False)}\n')
            print(f"Output {count}: {response}")  # 출력 번호와 함께 출력
    print(f"Total number of outputs: {count}")  # 총 출력 횟수 출력
        
