In [None]:
# /home/mapleleaf/LCJRepos/projects/mlinfo_kb_platform/refData/codes/ipynb/ingest_spec_20250821.ipynb


In [1]:
import os
import uuid
import pandas as pd
import duckdb
from pymilvus import (
    connections,
    utility,
    FieldSchema,
    CollectionSchema,
    DataType,
    Collection,
)
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# /home/mapleleaf/LCJRepos/projects/mlinfo_kb_platform/refData/codes/ipynb/ingest_spec_20250821.ipynb
# ```python?code_reference&code_event_index=2


# [Checklist Item 2: Define Constants]
# --- Milvus Configuration ---
MILVUS_HOST = "localhost"
MILVUS_PORT = "19530"
COLLECTION_NAME = "new_pc_v2"

# --- DuckDB Configuration ---
DUCKDB_PATH = "./nb_spec_v2.db"

# --- Embedding Model Configuration ---
MODEL_NAME = 'all-MiniLM-L6-v2'
DIMENSION = 384  # Dimension for all-MiniLM-L6-v2

# --- Chunking Configuration ---
PARENT_CHUNK_SIZE = 1024
PARENT_CHUNK_OVERLAP = 128
CHILD_CHUNK_SIZE = 256
CHILD_CHUNK_OVERLAP = 32

# [Checklist Item 3: Implement Milvus Connection and Setup Function]
def setup_milvus_collection():
    """Sets up the Milvus connection and collection, recreating it if it exists."""
    print("Connecting to Milvus...")
    connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)

    if utility.has_collection(COLLECTION_NAME):
        print(f"Collection '{COLLECTION_NAME}' already exists. Dropping it.")
        utility.drop_collection(COLLECTION_NAME)

    print(f"Creating collection '{COLLECTION_NAME}'...")
    #一般主鍵的加法# a.主鍵 (Primary Key)
    fields = [
        FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True)
    ]
    #加入parent and child data
    fields.append(FieldSchema(name="doc_id", dtype=DataType.INT64))
    # fields = [
    #     # FieldSchema(name="uuid", dtype=DataType.VARCHAR, is_primary=True, max_length=36),
    #     FieldSchema(name="doc_id", dtype=DataType.INT64),
    #     FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=512),
    #     FieldSchema(name="parent_text", dtype=DataType.VARCHAR, max_length=65535),
    #     FieldSchema(name="chunk_text", dtype=DataType.VARCHAR, max_length=1000),
    #     FieldSchema(name="query_vector", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
    # ]
    schema = CollectionSchema(fields, description="Parent-Child Chunk Collection")
    collection = Collection(COLLECTION_NAME, schema)

    print("Creating index on 'query_vector'...")
    index_params = {
        "metric_type": "L2",
        "index_type": "HNSW",
        "params": {"M": 8, "efConstruction": 64}
    }
    collection.create_index(field_name="query_vector", index_params=index_params)
    collection.load()
    return collection

# [Checklist Item 4: Implement DuckDB Setup Function]
def setup_duckdb_table():
    """Sets up the DuckDB connection and creates the documents table."""
    print(f"Setting up DuckDB at '{DUCKDB_PATH}'...")
    con = duckdb.connect(DUCKDB_PATH)
    con.execute("""
        CREATE TABLE IF NOT EXISTS documents (
            id BIGINT,
            source VARCHAR,
            content VARCHAR
        );
    """)
    return con

# [Checklist Item 5 & 6: Implement Data Loading, Processing, Iteration, and Chunking]
def process_files(collection, db_con):
    """
    Loads data from CSV files, processes it, creates chunks, generates embeddings,
    and inserts data into Milvus and DuckDB.
    """
    print(f"Loading sentence transformer model '{MODEL_NAME}'...")
    model = SentenceTransformer(MODEL_NAME)

    # csv_files = [f for f in os.listdir('.') if f.startswith('data_') and f.endswith('.csv')]
    # csv_files = [f for f in os.listdir('.') if f.endswith('.csv')]
    # print(f"Found {len(csv_files)} CSV files to process.")
    csv_files = [f for f in os.listdir('./datasrc/') if f.endswith('.csv')]
    print(f"Found {len(csv_files)} CSV files to process.")

    doc_id_counter = 0
    child_chunks_for_embedding = []
    data_for_milvus = []

    parent_splitter = RecursiveCharacterTextSplitter(
        chunk_size=PARENT_CHUNK_SIZE, chunk_overlap=PARENT_CHUNK_OVERLAP
    )
    child_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHILD_CHUNK_SIZE, chunk_overlap=CHILD_CHUNK_OVERLAP
    )

    for filename in tqdm(csv_files, desc="Processing Files"):
        try:
            print(f"current processing file:{filename}")
            df = pd.read_csv(filename)
            # Combine all columns into a single text document for each row
            df['combined_content'] = df.apply(
                lambda row: ' | '.join(row.dropna().astype(str)), axis=1
            )

            for _, row in tqdm(df.iterrows(), total=len(df), desc=f"Rows in {filename}", leave=False):
                content = row['combined_content']
                if not content or content.isspace():
                    continue

                doc_id_counter += 1
                source = filename

                # Insert original content into DuckDB
                db_con.execute("INSERT INTO documents VALUES (?, ?, ?)", (doc_id_counter, source, content))

                parent_chunks = parent_splitter.split_text(content)

                for parent_chunk_text in parent_chunks:
                    child_chunks = child_splitter.split_text(parent_chunk_text)
                    for child_chunk_text in child_chunks:
                        milvus_record = {
                            "uuid": str(uuid.uuid4()),
                            "doc_id": doc_id_counter,
                            "source": source,
                            "parent_text": parent_chunk_text,
                            "chunk_text": child_chunk_text
                        }
                        data_for_milvus.append(milvus_record)
                        child_chunks_for_embedding.append(child_chunk_text)
                
        except Exception as e:
            print(f"Error processing file {filename}: {e}")
        print("success")
    # [Checklist Item 7: Implement Batch Embedding]
#     if not child_chunks_for_embedding:
#         print("No text chunks found to embed.")
#         return 0, 0

#     print(f"\nEmbedding {len(child_chunks_for_embedding)} child chunks in a batch...")
#     embeddings = model.encode(
#         child_chunks_for_embedding,
#         show_progress_bar=True,
#         batch_size=32
#     )

#     # [Checklist Item 8: Implement Milvus Data Insertion]
#     for i, record in enumerate(data_for_milvus):
#         record['query_vector'] = embeddings[i]

#     print("Preparing data for Milvus insertion...")
#     # Reformat data for pymilvus insert
#     entities = [
#         [rec["uuid"] for rec in data_for_milvus],
#         [rec["doc_id"] for rec in data_for_milvus],
#         [rec["source"] for rec in data_for_milvus],
#         [rec["parent_text"] for rec in data_for_milvus],
#         [rec["chunk_text"] for rec in data_for_milvus],
#         [rec["query_vector"] for rec in data_for_milvus],
#     ]

#     print(f"Inserting {len(data_for_milvus)} records into Milvus collection...")
#     # Insert in batches
#     batch_size = 1000
#     for i in tqdm(range(0, len(data_for_milvus), batch_size), desc="Inserting into Milvus"):
#         batch_entities = [field[i:i + batch_size] for field in entities]
#         collection.insert(batch_entities)

#     print("Flushing Milvus collection...")
#     collection.flush()

#     return doc_id_counter, len(data_for_milvus)


# # [Checklist Item 9 & 10: Finalize Script and Cleanup]
# if __name__ == "__main__":
#     milvus_collection = None
#     duckdb_connection = None
#     try:
#         milvus_collection = setup_milvus_collection()
#         duckdb_connection = setup_duckdb_table()
#         total_docs, total_chunks = process_files(milvus_collection, duckdb_connection)
#         print("\n--- Processing Complete ---")
#         print(f"Total original documents processed: {total_docs}")
#         print(f"Total child chunks created and inserted: {total_chunks}")
#         print(f"Data successfully saved to DuckDB ('{DUCKDB_PATH}') and Milvus ('{COLLECTION_NAME}').")

#     except Exception as e:
#         print(f"\nAn error occurred: {e}")
#     finally:
#         if duckdb_connection:
#             duckdb_connection.close()
#             print("DuckDB connection closed.")
#         if utility.has_collection(COLLECTION_NAME):
#             connections.disconnect("default")
#             print("Milvus connection closed.")


### functions unit tests

In [None]:
#

In [10]:
process_files(COLLECTION_NAME,DUCKDB_PATH)

Loading sentence transformer model 'all-MiniLM-L6-v2'...
Found 19 CSV files to process.


Processing Files: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 19/19 [00:00<00:00, 15890.68it/s]

current processing file:data_8329.csv
Error processing file data_8329.csv: [Errno 2] No such file or directory: 'data_8329.csv'
success
current processing file:data_938.csv
Error processing file data_938.csv: [Errno 2] No such file or directory: 'data_938.csv'
success
current processing file:data_529.csv
Error processing file data_529.csv: [Errno 2] No such file or directory: 'data_529.csv'
success
current processing file:data_835.csv
Error processing file data_835.csv: [Errno 2] No such file or directory: 'data_835.csv'
success
current processing file:data_27.csv
Error processing file data_27.csv: [Errno 2] No such file or directory: 'data_27.csv'
success
current processing file:AC01_result.csv
Error processing file AC01_result.csv: [Errno 2] No such file or directory: 'AC01_result.csv'
success
current processing file:data_17.csv
Error processing file data_17.csv: [Errno 2] No such file or directory: 'data_17.csv'
success
current processing file:data_839.csv
Error processing file data




In [12]:
# csv_files = [f for f in os.listdir('../../../data/raw/EM_New TTL_241104_AllTransformedToGoogleSheet/') if f.endswith('.csv')]
# print(f"Found {len(csv_files)} CSV files to process.")

Found 19 CSV files to process.


In [14]:
from pymilvus import Collection

# 連接到 Milvus 服務
# ...（根據你的連線方式填寫 host/port 等）...
# --- 2. 處理並存入非結構化資料 (Milvus) ---
    print("\n--- 正在處理文本資料並存入 Milvus ---")
    connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)

    if utility.has_collection(COLLECTION_NAME):
        print(f"找到舊的 Milvus Collection '{COLLECTION_NAME}'，正在刪除...")
        utility.drop_collection(COLLECTION_NAME)
# 載入集合
collection = Collection(name=COLLECTION_NAME)

# 查詢集合資訊（例如 schema、row count）
print(f"總紀錄數: {collection.num_entities}")
print(f"結構: {collection.schema}")

# 查詢部分數據（例如查前10條）
results = collection.query(
    expr=None,   # 無條件則查全部
    output_fields=["欄位1", "欄位2"],   # 需填你關心的欄位；如未知可用 collection.schema
    limit=10
)
for item in results:
    print(item)

ConnectionNotExistException: <ConnectionNotExistException: (code=1, message=should create connection first.)>