In [None]:
import json
import os
import subprocess
from langchain_community.document_loaders import UnstructuredHTMLLoader
from pathlib import Path
import base64
import http.client
from tqdm import tqdm
import requests

In [None]:
url_to_filename_map = {}
 
with open("clovastudiourl.txt", "r") as file:
    urls = [url.strip() for url in file.readlines()]
 
folder_path = "clovastudioguide"
 
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
 
for url in urls:
    filename = url.split("/")[-1] + ".html"
    file_path = os.path.join(folder_path, filename)
    subprocess.run(
        [
            "wget",
            "--user-agent='MyCrawler/1.0 (+http://mycrawler.com/info)'",
            "-O",
            file_path,
            url,
        ],
        check=True,
    )
    url_to_filename_map[url] = filename
 
with open("url_to_filename_map.json", "w") as map_file:
    json.dump(url_to_filename_map, map_file)

In [None]:
# 폴더 이름에 맞게 수정
html_files_dir = Path('/Users/user/Desktop/raghigh/rag_html/forwiki/clovastudioguide')
 
html_files = list(html_files_dir.glob("*.html"))
 
clovastudiodatas = []
 
for html_file in html_files:
    loader = UnstructuredHTMLLoader(str(html_file))
    document_data = loader.load()
    clovastudiodatas.append(document_data)
    print(f"Processed {html_file}")

In [None]:
with open("url_to_filename_map.json", "r") as map_file:
    url_to_filename_map = json.load(map_file)
 
filename_to_url_map = {v: k for k, v in url_to_filename_map.items()}
 
# clovastudiodatas 리스트의 각 Document 객체의 'source' 수정
for doc_list in clovastudiodatas:
    for doc in doc_list:
        extracted_filename = doc.metadata["source"].split("/")[-1]
        if extracted_filename in filename_to_url_map:
            doc.metadata["source"] = filename_to_url_map[extracted_filename]
        else:
            print(f"Warning: {extracted_filename}에 해당하는 URL을 찾을 수 없습니다.")

In [None]:
clovastudiodatas_flattened = [item for sublist in clovastudiodatas for item in sublist]

In [None]:
class SegmentationExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def _send_request(self, completion_request):
        headers = {
            "Content-Type": "application/json; charset=utf-8",
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id
        }
 
        conn = http.client.HTTPSConnection(self._host)
        conn.request(
            "POST",
            "/testapp/v1/api-tools/segmentation/{app-id}", # If using Service App, change 'testapp' to 'serviceapp', and corresponding app id.
            json.dumps(completion_request),
            headers
        )
        response = conn.getresponse()
        result = json.loads(response.read().decode(encoding="utf-8"))
        conn.close()
        return result
 
    def execute(self, completion_request):
        res = self._send_request(completion_request)
        if res["status"]["code"] == "20000":
            return res["result"]["topicSeg"]
        else:
            raise ValueError(f"{res}")
 
 
if __name__ == "__main__":
    segmentation_executor = SegmentationExecutor(
        host="clovastudio.apigw.ntruss.com",
        api_key='<api_key>',
        api_key_primary_val='<api_key_primary_val>',
        request_id='<request_id>'
    )
  
    chunked_html = []
  
    for htmldata in tqdm(clovastudiodatas_flattened):
        try:
            request_data = {
                "postProcessMaxSize": 100,
                "alpha": -100,
                "segCnt": -1,
                "postProcessMinSize": -1,
                "text": htmldata.page_content,
                "postProcess": True
            }
             
            request_json_string = json.dumps(request_data)
            request_data = json.loads(request_json_string, strict=False)
            response_data = segmentation_executor.execute(request_data)
            result_data = [' '.join(segment) for segment in response_data]
 
        except json.JSONDecodeError as e:
            print(f"JSON decoding failed: {e}")
        except Exception as e:
            print(f"An error occurred: {e}")
         
        for paragraph in result_data:
            chunked_document = {
                "source": htmldata.metadata["source"],
                "text": paragraph
            }
            chunked_html.append(chunked_document)
  
print(len(chunked_html))

In [None]:
class EmbeddingExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def _send_request(self, completion_request):
        headers = {
            "Content-Type": "application/json; charset=utf-8",
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id
        }
 
        conn = http.client.HTTPSConnection(self._host)
        conn.request(
            "POST",
            "/testapp/v1/api-tools/embedding/clir-emb-dolphin/{app-id (앱 식별자)}", # If using Service App, change 'testapp' to 'serviceapp', and corresponding app id.
            json.dumps(completion_request),
            headers
        )
        response = conn.getresponse()
        result = json.loads(response.read().decode(encoding="utf-8"))
        conn.close()
        return result
 
    def execute(self, completion_request):
        res = self._send_request(completion_request)
        if res["status"]["code"] == "20000":
            return res["result"]["embedding"]
        else:
            error_code = res["status"]["code"]
            error_message = res.get("status", {}).get("message", "Unknown error")
            raise ValueError(f"오류 발생: {error_code}: {error_message}")
 
 
if __name__ == "__main__":
    embedding_executor = EmbeddingExecutor(
        host="clovastudio.apigw.ntruss.com",
        api_key='<api_key>',
        api_key_primary_val='<api_key_primary_val>',
        request_id='<request_id>'
    )
 
    for i, chunked_document in enumerate(tqdm(chunked_html)):
        try:
            request_json = {
                "text": chunked_document['text']
            }
            request_json_string = json.dumps(request_json)
            request_data = json.loads(request_json_string, strict=False)
            response_data = embedding_executor.execute(request_data)
        except ValueError as e:
            print(f"Embedding API Error. {e}")
        except Exception as e:
            print(f"Unexpected error: {e}")
         
        chunked_document["embedding"] = response_data

In [None]:
dimension_set = set()
 
for item in chunked_html:
    if "embedding" in item:
        dimension = len(item["embedding"])
        dimension_set.add(dimension)
 
print("임베딩된 벡터들의 차원:", dimension_set)

In [None]:
chunked_html[400]

In [None]:
class CompletionExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def execute(self, completion_request, response_type="stream"):
        headers = {
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id,
            "Content-Type": "application/json; charset=utf-8",
            "Accept": "text/event-stream"
        }
 
        final_answer = ""
 
        with requests.post(
            self._host + "/testapp/v1/chat-completions/HCX-003",
            headers=headers,
            json=completion_request,
            stream=True
        ) as r:
            if response_type == "stream":
                longest_line = ""
                for line in r.iter_lines():
                    if line:
                        decoded_line = line.decode("utf-8")
                        if decoded_line.startswith("data:"):
                            event_data = json.loads(decoded_line[len("data:"):])
                            message_content = event_data.get("message", {}).get("content", "")
                            if len(message_content) > len(longest_line):
                                longest_line = message_content
                final_answer = longest_line
            elif response_type == "single":
                final_answer = r.json()  # 가정: 단일 응답이 JSON 형태로 반환됨

In [None]:
class CompletionExecutor:
    def __init__(self, host, api_key, api_key_primary_val, request_id):
        self._host = host
        self._api_key = api_key
        self._api_key_primary_val = api_key_primary_val
        self._request_id = request_id
 
    def execute(self, completion_request):
        headers = {
            "X-NCP-CLOVASTUDIO-API-KEY": self._api_key,
            "X-NCP-APIGW-API-KEY": self._api_key_primary_val,
            "X-NCP-CLOVASTUDIO-REQUEST-ID": self._request_id,
            "Content-Type": "application/json; charset=utf-8",
            "Accept": "text/event-stream"
        }
 
        response = requests.post(
            self._host + "/testapp/v1/chat-completions/HCX-003",
            headers=headers,
            json=completion_request,
            stream=True
        )
 
        # 스트림에서 마지막 'data:' 라인을 찾기 위한 로직
        last_data_content = ""
 
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode("utf-8")
                if '"data":"[DONE]"' in decoded_line:
                    break
                if decoded_line.startswith("data:"):
                    last_data_content = json.loads(decoded_line[5:])["message"]["content"]
 
        return last_data_content

In [None]:
# 사용자의 쿼리를 임베딩하는 함수를 먼저 정의
def query_embed(text: str):
    request_data = {"text": text}
    response_data = embedding_executor.execute(request_data)
    return response_data

In [None]:
import json
from typing import List, Dict, Any
from langchain.chains import RetrievalQA
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.schema import Document

def load_loan_data(file_path: str) -> List[Dict[str, Any]]:
    with open(file_path, 'r', encoding='utf-8') as file:
        data_raw = json.load(file)
    data_list = data_raw['result']['baseList']
    data_option = data_raw['result']['optionList']
    new_data = []
    for data in data_list:
        for option in data_option:
            if data["fin_prdt_cd"] == option["fin_prdt_cd"]:
                item_dict = data
                for op_key in option:
                    item_dict[op_key] = option[op_key]
                new_data.append(item_dict)
    return new_data

# Preprocess loan data to create documents
def create_documents(loan_data: List[Dict[str, Any]]) -> List[Document]:
    documents = []
    for product in loan_data:
        content = f"상품명: {product['fin_prdt_nm']}, 회사명: {product['kor_co_nm']}, 최소 연이율: {product.get('lend_rate_min', 'N/A')}, 최대 연이율: {product.get('lend_rate_max', 'N/A')}, 대출 한도: {product['loan_lmt']}, 설명: {product['loan_inci_expn']}"
        documents.append(Document(page_content=content))
    return documents

loan_data = load_loan_data('fss_test.json')
documents = create_documents(loan_data)
vectorstore = FAISS.from_documents(documents, OpenAIEmbeddings())

# Set up the RetrievalQA chain
retrieval_qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(),
    return_source_documents=True
)

In [None]:
def html_chat(realquery: str) -> str:
    # 사용자 쿼리 벡터화
    # query_vector = query_embed(realquery)
 
    # collection.load()
 
    # search_params = {"metric_type": "IP", "params": {"ef": 64}}
    # results = collection.search(
    #     data=[query_vector],  # 검색할 벡터 데이터
    #     anns_field="embedding",  # 검색을 수행할 벡터 필드 지정
    #     param=search_params,
    #     limit=10,
    #     output_fields=["source", "text"]
    # )

    retrieval_qa = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever(),
        return_source_documents=True
    )


    results = retrieval_qa({"query": realquery})
 
    reference = []
 
    for hit in results[0]:
        distance = hit.distance
        source = hit.entity.get("source")
        text = hit.entity.get("text")
        reference.append({"distance": distance, "source": source, "text": text})
 
    completion_executor = CompletionExecutor(
        host="https://clovastudio.stream.ntruss.com",
        api_key='<api_key>',
        api_key_primary_val='<api_key_primary_val>',
        request_id='<request_id>'
    )
 
    preset_texts = [
        {
            "role": "system",
            "content": "- 너의 역할은 사용자의 질문에 reference를 바탕으로 답변하는거야. \n- 너가 가지고있는 지식은 모두 배제하고, 주어진 reference의 내용만을 바탕으로 답변해야해. \n- 답변의 출처가 되는 html의 내용인 'source'도 답변과 함께 {url:}의 형태로 제공해야해. \n- 만약 사용자의 질문이 reference와 관련이 없다면, {제가 가지고 있는 정보로는 답변할 수 없습니다.}라고만 반드시 말해야해."
        }
    ]
 
    for ref in reference:
        preset_texts.append(
            {
                "role": "system",
                "content": f"reference: {ref['text']}, url: {ref['source']}"
            }
        )
 
    preset_texts.append({"role": "user", "content": realquery})
 
    request_data = {
        "messages": preset_texts,
        "topP": 0.6,
        "topK": 0,
        "maxTokens": 1024,
        "temperature": 0.5,
        "repeatPenalty": 1.2,
        "stopBefore": [],
        "includeAiFilters": False
    }
 
    # LLM 생성 답변 반환
    response_data = completion_executor.execute(request_data)
 
    return response_data