In [1]:
import os
import re
import duckdb
import pandas as pd
from pymilvus import connections, utility, Collection, CollectionSchema, FieldSchema, DataType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings

In [2]:
!pip install duckdb



In [4]:
# --- 設定 ---
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
DUCKDB_FILE = "sales_rag_app/db/sales_specs.db"
COLLECTION_NAME = "sales_notebook_specs"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
DATA_DIR = "data"

In [6]:
# --- 增強版文本解析函數 ---
def parse_spec_file_enhanced(file_path):
    """
    解析包含多個模型的 .txt 規格檔案。
    能處理針對 'All models' 或 'ModelA / ModelB:' 的規格。
    """
    records = []
    current_section = None
    # 從檔案名稱中提取基礎型號系列，例如從 '326_AllModels.txt' 提取 '326'
    base_model_series = os.path.basename(file_path).split('_')[0]
    
    with open(file_path, 'r', encoding='utf-8') as f:
        content_lines = f.readlines()

    all_models_in_file = set()
    # 第一次遍歷：找出檔案中定義的所有型號
    for line in content_lines:
        line = line.strip()
        if ':' in line:
            key, _ = map(str.strip, line.split(':', 1))
            model_keys_raw = re.split(r' / |,|&', key)
            potential_models = [k.strip().replace('-', '').replace(':', '') for k in model_keys_raw if k.strip()]
            for pm in potential_models:
                # 假設型號包含字母和數字
                if re.search(r'[A-Z]', pm) and re.search(r'[0-9]', pm):
                    all_models_in_file.add(pm)

    # 如果檔案中沒有明確定義型號，則使用檔案名稱作為基礎型號
    if not all_models_in_file:
        all_models_in_file.add(base_model_series)

    # 第二次遍歷：解析規格並應用到對應的型號
    for line in content_lines:
        line = line.strip()
        if not line:
            continue

        section_match = re.match(r'^\[(.*)\]$', line)
        if section_match:
            current_section = section_match.group(1).strip()
            continue

        if not current_section or ':' not in line:
            continue

        key, value = map(str.strip, line.split(':', 1))
        
        models_affected = []
        feature_name = key

        # 檢查 key 是否為型號定義
        model_keys_raw = re.split(r' / |,|&', key)
        potential_models = [k.strip().replace('-', '').replace(':', '') for k in model_keys_raw if k.strip()]
        
        # 判斷是否為多個型號共用的規格定義行
        is_multi_model_spec = False
        if len(potential_models) > 1:
            if all((pm in all_models_in_file for pm in potential_models)):
                 models_affected = potential_models
                 feature_name = "Configuration"
                 is_multi_model_spec = True

        if not is_multi_model_spec:
            # 如果不是多型號定義行，則視為通用規格或單一特性
            # 如果 key 是 'All models' 或在常見的通用關鍵字中，則適用於檔案內所有型號
            if 'all models' in key.lower() or any(kw in key.lower() for kw in ['default', 'option', 'support']):
                models_affected.extend(all_models_in_file)
            else:
                # 否則，也假設它適用於所有型號，除非有更明確的指示
                 models_affected.extend(all_models_in_file)
        
        value_str = ", ".join(value) if isinstance(value, list) else value
        
        for model in set(models_affected): # 使用 set 避免重複
            records.append({
                "model_name": model,
                "section": current_section,
                "feature": feature_name,
                "value": value_str
            })
            
    return records

In [7]:
# def specs_to_dataframe(specs, model_name):
#     """將解析後的規格轉換為 DataFrame"""
#     records = []
#     for section, details in specs.items():
#         if isinstance(details, dict):
#             for feature, value in details.items():
#                 # 將列表值轉換為字串
#                 value_str = ", ".join(value) if isinstance(value, list) else value
#                 records.append([model_name, section, feature, value_str])
#     return pd.DataFrame(records, columns=['model_name', 'section', 'feature', 'value'])

In [8]:
# --- 主執行流程 ---
def main():
    # --- 1. 處理結構化資料 (DuckDB) ---
    print("--- 正在處理結構化規格資料並存入 DuckDB ---")
    
    txt_files = [f for f in os.listdir(DATA_DIR) if f.endswith('.txt')]
    if not txt_files:
        print("錯誤：在 'data' 目錄中找不到任何 .txt 檔案。請將您的 `*_AllModels.txt` 檔案放入其中。")
        return
        
    print(f"找到 {len(txt_files)} 個 txt 規格檔案: {', '.join(txt_files)}")

    all_db_records = []
    for filename in txt_files:
        file_path = os.path.join(DATA_DIR, filename)
        print(f"  -> 正在解析: {filename}")
        records = parse_spec_file_enhanced(file_path)
        all_db_records.extend(records)

    if not all_db_records:
        print("警告：未從 .txt 檔案中解析出任何結構化資料。")
    else:
        final_df = pd.DataFrame(all_db_records).drop_duplicates()
        
        if os.path.exists(DUCKDB_FILE):
            os.remove(DUCKDB_FILE)
        con = duckdb.connect(database=DUCKDB_FILE, read_only=False)
        con.execute("CREATE TABLE specs AS SELECT * FROM final_df")
        print(f"成功將 {len(final_df)} 筆規格資料存入 DuckDB。")
        con.close()

    # --- 2. 處理所有 .txt 文件以進行語意搜尋 (Milvus) ---
    print("\n--- 正在處理所有 .txt 文檔資料並存入 Milvus ---")
    connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)

    if utility.has_collection(COLLECTION_NAME):
        print(f"找到舊的 Collection '{COLLECTION_NAME}'，正在刪除...")
        utility.drop_collection(COLLECTION_NAME)

    fields = [
        FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=200),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384)
    ]
    schema = CollectionSchema(fields, "銷售筆電規格知識庫")
    collection = Collection(COLLECTION_NAME, schema)
    
    all_milvus_docs = []
    # 這次只處理 .txt 檔案
    files_to_process_milvus = [f for f in os.listdir(DATA_DIR) if f.endswith('.txt')]
    print(f"準備為 Milvus 處理 {len(files_to_process_milvus)} 個檔案: {', '.join(files_to_process_milvus)}")

    for filename in files_to_process_milvus:
        file_path = os.path.join(DATA_DIR, filename)
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
            text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
            chunks = text_splitter.split_text(content)
            for i, chunk in enumerate(chunks):
                all_milvus_docs.append({
                    "pk": f"{filename}_{i}",
                    "text": chunk,
                    "source": filename
                })

    print(f"共讀取並分割成 {len(all_milvus_docs)} 個文本區塊。")

    if not all_milvus_docs:
        print("警告：沒有要存入 Milvus 的資料。")
        connections.disconnect("default")
        return

    print("正在產生嵌入向量...")
    embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    texts_to_embed = [doc['text'] for doc in all_milvus_docs]
    vectors = embeddings.embed_documents(texts_to_embed)

    entities = [
        [doc['pk'] for doc in all_milvus_docs],
        [doc['text'] for doc in all_milvus_docs],
        [doc['source'] for doc in all_milvus_docs],
        vectors
    ]

    print("正在將資料插入 Milvus...")
    collection.insert(entities)
    collection.flush()

    print("正在為向量創建索引 (IVF_FLAT)...")
    index_params = {
        "metric_type": "L2",
        "index_type": "IVF_FLAT",
        "params": {"nlist": 128}
    }
    collection.create_index("embedding", index_params)
    collection.load()

    print(f"成功將 {len(all_milvus_docs)} 筆資料導入 Milvus Collection '{COLLECTION_NAME}'。")
    print("\n資料導入完成！")
    connections.disconnect("default")

if __name__ == "__main__":
    main()

--- 正在處理結構化規格資料並存入 DuckDB ---
找到 5 個 txt 規格檔案: 819_AllModels.txt, 656_AllModels.txt, 839_AllModels.txt, 958_AllModels.txt, 326_AllModels.txt
  -> 正在解析: 819_AllModels.txt
  -> 正在解析: 656_AllModels.txt
  -> 正在解析: 839_AllModels.txt
  -> 正在解析: 958_AllModels.txt
  -> 正在解析: 326_AllModels.txt
成功將 23448 筆規格資料存入 DuckDB。

--- 正在處理所有 .txt 文檔資料並存入 Milvus ---
找到舊的 Collection 'sales_notebook_specs'，正在刪除...
準備為 Milvus 處理 5 個檔案: 819_AllModels.txt, 656_AllModels.txt, 839_AllModels.txt, 958_AllModels.txt, 326_AllModels.txt
共讀取並分割成 118 個文本區塊。
正在產生嵌入向量...


  from .autonotebook import tqdm as notebook_tqdm


正在將資料插入 Milvus...
正在為向量創建索引 (IVF_FLAT)...
成功將 118 筆資料導入 Milvus Collection 'sales_notebook_specs'。

資料導入完成！


In [None]:
import os
import re
import duckdb
import pandas as pd
from pymilvus import connections, utility, Collection, CollectionSchema, FieldSchema, DataType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings

# --- 設定 ---
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
DUCKDB_FILE = "sales_rag_app/db/sales_specs.db"
COLLECTION_NAME = "sales_notebook_specs"
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
DATA_DIR = "data"

# --- 增強版文本解析函數 ---
def parse_spec_file_enhanced(file_path):
    """
    解析包含多個模型的 .txt 規格檔案。
    能處理針對 'All models' 或 'ModelA / ModelB:' 的規格。
    """
    records = []
    current_section = None
    # 從檔案名稱中提取基礎型號系列，例如從 '326_AllModels.txt' 提取 '326'
    base_model_series = os.path.basename(file_path).split('_')[0]
    
    with open(file_path, 'r', encoding='utf-8') as f:
        content_lines = f.readlines()

    all_models_in_file = set()
    # 第一次遍歷：找出檔案中定義的所有型號
    for line in content_lines:
        line = line.strip()
        if ':' in line:
            key, _ = map(str.strip, line.split(':', 1))
            model_keys_raw = re.split(r' / |,|&', key)
            potential_models = [k.strip().replace('-', '').replace(':', '') for k in model_keys_raw if k.strip()]
            for pm in potential_models:
                # 假設型號包含字母和數字
                if re.search(r'[A-Z]', pm) and re.search(r'[0-9]', pm):
                    all_models_in_file.add(pm)

    # 如果檔案中沒有明確定義型號，則使用檔案名稱作為基礎型號
    if not all_models_in_file:
        all_models_in_file.add(base_model_series)

    # 第二次遍歷：解析規格並應用到對應的型號
    for line in content_lines:
        line = line.strip()
        if not line:
            continue

        section_match = re.match(r'^\[(.*)\]$', line)
        if section_match:
            current_section = section_match.group(1).strip()
            continue

        if not current_section or ':' not in line:
            continue

        key, value = map(str.strip, line.split(':', 1))
        
        models_affected = []
        feature_name = key

        # 檢查 key 是否為型號定義
        model_keys_raw = re.split(r' / |,|&', key)
        potential_models = [k.strip().replace('-', '').replace(':', '') for k in model_keys_raw if k.strip()]
        
        # 判斷是否為多個型號共用的規格定義行
        is_multi_model_spec = False
        if len(potential_models) > 1:
            if all((pm in all_models_in_file for pm in potential_models)):
                 models_affected = potential_models
                 feature_name = "Configuration"
                 is_multi_model_spec = True

        if not is_multi_model_spec:
            # 如果不是多型號定義行，則視為通用規格或單一特性
            # 如果 key 是 'All models' 或在常見的通用關鍵字中，則適用於檔案內所有型號
            if 'all models' in key.lower() or any(kw in key.lower() for kw in ['default', 'option', 'support']):
                models_affected.extend(all_models_in_file)
            else:
                # 否則，也假設它適用於所有型號，除非有更明確的指示
                 models_affected.extend(all_models_in_file)
        
        value_str = ", ".join(value) if isinstance(value, list) else value
        
        for model in set(models_affected): # 使用 set 避免重複
            records.append({
                "model_name": model,
                "section": current_section,
                "feature": feature_name,
                "value": value_str
            })
            
    return records


# --- 主執行流程 ---
def main():
    # --- 1. 處理結構化資料 (DuckDB) ---
    print("--- 正在處理結構化規格資料並存入 DuckDB ---")
    
    txt_files = [f for f in os.listdir(DATA_DIR) if f.endswith('.txt')]
    if not txt_files:
        print("錯誤：在 'data' 目錄中找不到任何 .txt 檔案。請將您的 `*_AllModels.txt` 檔案放入其中。")
        return
        
    print(f"找到 {len(txt_files)} 個 txt 規格檔案: {', '.join(txt_files)}")

    all_db_records = []
    for filename in txt_files:
        file_path = os.path.join(DATA_DIR, filename)
        print(f"  -> 正在解析: {filename}")
        records = parse_spec_file_enhanced(file_path)
        all_db_records.extend(records)

    if not all_db_records:
        print("警告：未從 .txt 檔案中解析出任何結構化資料。")
    else:
        final_df = pd.DataFrame(all_db_records).drop_duplicates()
        
        if os.path.exists(DUCKDB_FILE):
            os.remove(DUCKDB_FILE)
        con = duckdb.connect(database=DUCKDB_FILE, read_only=False)
        con.execute("CREATE TABLE specs AS SELECT * FROM final_df")
        print(f"成功將 {len(final_df)} 筆規格資料存入 DuckDB。")
        con.close()

    # --- 2. 處理所有 .txt 文件以進行語意搜尋 (Milvus) ---
    print("\n--- 正在處理所有 .txt 文檔資料並存入 Milvus ---")
    connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)

    if utility.has_collection(COLLECTION_NAME):
        print(f"找到舊的 Collection '{COLLECTION_NAME}'，正在刪除...")
        utility.drop_collection(COLLECTION_NAME)

    fields = [
        FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=200),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384)
    ]
    schema = CollectionSchema(fields, "銷售筆電規格知識庫")
    collection = Collection(COLLECTION_NAME, schema)
    
    all_milvus_docs = []
    # 這次只處理 .txt 檔案
    files_to_process_milvus = [f for f in os.listdir(DATA_DIR) if f.endswith('.txt')]
    print(f"準備為 Milvus 處理 {len(files_to_process_milvus)} 個檔案: {', '.join(files_to_process_milvus)}")

    for filename in files_to_process_milvus:
        file_path = os.path.join(DATA_DIR, filename)
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
            text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
            chunks = text_splitter.split_text(content)
            for i, chunk in enumerate(chunks):
                all_milvus_docs.append({
                    "pk": f"{filename}_{i}",
                    "text": chunk,
                    "source": filename
                })

    print(f"共讀取並分割成 {len(all_milvus_docs)} 個文本區塊。")

    if not all_milvus_docs:
        print("警告：沒有要存入 Milvus 的資料。")
        connections.disconnect("default")
        return

    print("正在產生嵌入向量...")
    embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    texts_to_embed = [doc['text'] for doc in all_milvus_docs]
    vectors = embeddings.embed_documents(texts_to_embed)

    entities = [
        [doc['pk'] for doc in all_milvus_docs],
        [doc['text'] for doc in all_milvus_docs],
        [doc['source'] for doc in all_milvus_docs],
        vectors
    ]

    print("正在將資料插入 Milvus...")
    collection.insert(entities)
    collection.flush()

    print("正在為向量創建索引 (IVF_FLAT)...")
    index_params = {
        "metric_type": "L2",
        "index_type": "IVF_FLAT",
        "params": {"nlist": 128}
    }
    collection.create_index("embedding", index_params)
    collection.load()

    print(f"成功將 {len(all_milvus_docs)} 筆資料導入 Milvus Collection '{COLLECTION_NAME}'。")
    print("\n資料導入完成！")
    connections.disconnect("default")

if __name__ == "__main__":
    main()