## 文字嵌入

In [1]:
import json
from tqdm import tqdm
import time

import tiktoken
from langchain.embeddings.base import Embeddings
from langchain_chroma import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
from openai import OpenAI

from src.config.constant import (CHROMA_COLLECTION_NAME, CHROMA_PERSIST_DIR,
                                 PROCESSED_DATA_PATH, PROJECT_ROOT)

In [2]:
# 自訂 LM Studio 文字嵌入類別，繼承自 LangChain 的 Embeddings
class LmStudioEmbeddings(Embeddings):
    def __init__(self, model_name, url):
        """
        初始化 LM Studio Embeddings
        :param model_name: 要使用的嵌入模型名稱
        :param url: LM Studio 本地或遠端 API 的位址
        """
        self.model_name = model_name
        self.url = url
        # 建立 OpenAI 客戶端，連接 LM Studio 的 API
        self.client = OpenAI(base_url=url, api_key="lm-studio")

    def embed_query(self, text: str):
        """
        將單筆文字轉換為向量
        :param text: 要嵌入的文字
        :return: 向量 (list of float)
        """
        response = self.client.embeddings.create(
            input=text,      # 傳入單筆文字
            model=self.model_name  # 指定使用的模型
        )
        # 回傳第一筆 embedding
        return response.data[0].embedding

    def embed_documents(self, texts: list[str]):
        """
        將多筆文字轉換為向量
        :param texts: 要嵌入的文字列表
        :return: 向量列表，每個元素對應一筆文字
        """
        response = self.client.embeddings.create(
            input=texts,     # 傳入多筆文字
            model=self.model_name  # 指定使用的模型
        )
        # 回傳每筆文字的 embedding
        return [x.embedding for x in response.data]


def tiktoken_len(text):
    tokenizer = tiktoken.encoding_for_model("gpt-4")
    tokens = tokenizer.encode(text)
    return len(tokens)

def connect_to_vector_db(collection_name, embeddings):
    return Chroma(
        collection_name=CHROMA_COLLECTION_NAME,     # 向量資料表名稱
        embedding_function=embeddings,              # 指定embedding模型
        persist_directory=CHROMA_PERSIST_DIR        # 向量資料庫存取路徑
    )

In [3]:
# 讀取資料
input_num = 1

input_folder = PROJECT_ROOT / PROCESSED_DATA_PATH.format("document")
input_file = f"document_{input_num}.json"
input_path = input_folder / input_file

with open(input_path, "r", encoding="utf-8") as f:
    data_list = json.load(f)

print(f"讀入{len(data_list)}筆資料")

讀入1951筆資料


In [5]:
# 計算單筆資料文本平均字數
total = 0
for doc in data_list:
    word = doc.get("context", "")
    total += len(word)

token_count = (total / len(data_list)) / 4

print(f"粗估平均token數: {token_count}")

粗估平均token數: 894.2697334700154


In [6]:
# 轉換成Document物件
from langchain_core.documents import Document

doc_list = [Document(page_content=d["context"], metadata=d["metadata"]) for d in data_list]

print(f"已成功轉換{len(doc_list)}筆Document物件資料！")

已成功轉換1951筆Document物件資料！


In [3]:
# 載入Embedding模型
embedding_model = "text-embedding-bge-m3"
embedding_url = "http://192.168.0.109:1234/v1"

embeddings = LmStudioEmbeddings(model_name=embedding_model, url=embedding_url)
print("已建立embedding模型連線")

已建立embedding模型連線


In [None]:
# 進行文本切割
child_splitter = RecursiveCharacterTextSplitter(chunk_size=300,chunk_overlap=70, length_function=tiktoken_len)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=250, length_function=tiktoken_len)

total_docs = []

parent_docs = parent_splitter.split_documents(doc_list)         # 父文件不切割，用完整文件列表

for doc in parent_docs:                                         # for迴圈父文件列表
    parent_num = 1
    parent_id = str(doc.metadata.get("steam_appid"))            # 存取steam_appid值
    doc.metadata["doc_id"] = parent_id + f"_p0{str(parent_num)}"                          # 建立doc_id，值為steam_appid
    doc.metadata["parent_id"] = doc.metadata["doc_id"]                      # 建立parent_id，值為steam_appid

    split_docs = child_splitter.split_documents([doc])          # 將文件切割成子文件
    child_num = 1                                                     # 子文件編號計數
    for sdoc in split_docs:                                     # for迴圈遍歷每個切割出來的子文件
        sdoc.metadata["doc_id"] = parent_id + f"_c0{str(num)}"     # 為子文件加上doc_id（子文件的唯一id值），值為steam_appid加編號
        sdoc.metadata["parent_id"] = doc.metadata["doc_id"]                   # 為子文件加上parent_id，值為steam_appid
        num += 1                                                # 子文件編號+1
    total_docs.append(doc)                                      # 將父文件放入總文件列表
    total_docs.extend(split_docs)                               # 將子文件列表也放入總文件列表

print(f"父層文件（原始）數量：{len(parent_docs)}")
print(f"分段後文件數量：{len(total_docs)}")

父層文件（原始）數量：1951
分段後文件數量：10291


## 存入向量資料庫

In [9]:
# 輸入向量資料庫前先進行簡單去重複
ids = [doc.metadata['doc_id'] for doc in total_docs]

# 藉由重新配對doc和ids，將重複值去除
unique_data = {doc_id: doc for doc_id, doc in zip(ids, total_docs)}

# 取得新的ids和docs
unique_ids = list(unique_data.keys())
unique_docs = list(unique_data.values())

print(f"去重前資料筆數: {len(total_docs)}, {len(ids)}")
print(f"去重後資料筆數: {len(unique_docs)}, {len(unique_ids)}")

去重前資料筆數: 10291, 10291
去重後資料筆數: 10287, 10287


In [10]:
# 建立向量資料庫並存入資料
vector_store = connect_to_vector_db(collection_name=CHROMA_COLLECTION_NAME, embeddings=embeddings)

batch_size = 200
total_docs_count = len(total_docs)


for i in tqdm(range(0, len(unique_docs), batch_size), desc="寫入進度"):
    batch_docs = unique_docs[i : i + batch_size]
    batch_ids = unique_ids[i : i + batch_size]

    vector_store.add_documents(
            documents=batch_docs,
            ids=batch_ids
        )
    time.sleep(1)

print(f"已成功將{len(total_docs)}筆資料存入向量資料庫({CHROMA_PERSIST_DIR})")

寫入進度: 100%|██████████| 52/52 [10:26<00:00, 12.05s/it]

已成功將10291筆資料存入向量資料庫(C:\Users\add41\Documents\Data_Engineer\Project\Steam-Games-Database-with-RAG\data\vector)





In [6]:
vector_store = connect_to_vector_db(collection_name=CHROMA_COLLECTION_NAME, embeddings=embeddings)
vector_store.delete_collection()