In [None]:
!pip install -q torch pyarrow python-dotenv langchain-huggingface langchain-text-splitters huggingface_hub langchain-community pymilvus langchain-experimental

In [None]:
import os
import torch
import re
import json
import shutil
import pyarrow.parquet as pq
from typing import List, Dict, Any, Optional, Tuple
from dotenv import load_dotenv
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from huggingface_hub import login
from langchain_community.vectorstores import Zilliz
from langchain_experimental.text_splitter import SemanticChunker

In [None]:
def semantic_chunk(text: str, embedding_model: Any) -> Optional[List[str]]:
    if not text or not text.strip(): return []
    
    try:
        text_splitter = SemanticChunker(
            embedding_model,
            breakpoint_threshold_type="percentile", 
            breakpoint_threshold_amount=90 )
        
        docs = text_splitter.create_documents([text])
        chunks = [doc.page_content for doc in docs]
        return chunks

    except Exception as e:
        print(f"Warning: Semantic chunking failed: {e}")

        from langchain_text_splitters import RecursiveCharacterTextSplitter
        fallback_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000, 
            chunk_overlap=200
        )
        return fallback_splitter.split_text(text)

In [None]:
ZILLIZ_CLOUD_URI = "Your_ZILLIZ_CLOUD_URI"
ZILLIZ_CLOUD_API_KEY = "your_ZILLIZ_CLOUD_API_KEY"
HF_TOKEN =  "your_Hf_Token"

SOURCE_PARQUET_FILE = "Your_File_Path"
COLLECTION_NAME = "Your_COLLECTION_NAME"


EMBEDDING_MODEL_NAME = "google/embeddinggemma-300m"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

READ_BATCH_SIZE = 1000
WRITE_BATCH_SIZE = 4000

MAX_BATCHES_TO_PROCESS = None

print(f"--- [Step 1] Settings defined. Pushing to Zilliz Collection: {COLLECTION_NAME} ---")
print(f"Using device: {DEVICE}")
if MAX_BATCHES_TO_PROCESS:
    print(f"--- ‚ö†Ô∏è SAFETY LIMIT ENABLED: Will process a maximum of {MAX_BATCHES_TO_PROCESS} batches. ---")


In [None]:

def initialize_vector_store(collection_name: str, embedding_function: HuggingFaceEmbeddings) -> Optional[Zilliz]:
    print(f"Initializing Zilliz connection for collection: {collection_name}...")
    if not ZILLIZ_CLOUD_URI or not ZILLIZ_CLOUD_API_KEY:
        print("FATAL ERROR: Zilliz credentials not found.")
        return None
    try:
        vector_store = Zilliz(
            embedding_function=embedding_function,
            collection_name=collection_name,
            connection_args={'uri': ZILLIZ_CLOUD_URI, 'token': ZILLIZ_CLOUD_API_KEY},
            auto_id=True,
            drop_old=False  )
        print("Zilliz vector store connected/created successfully!")
        return vector_store
    except Exception as e:
        print(f"FATAL ERROR initializing Zilliz vector store: {e}")
        return None



In [None]:
def load_embedding_model(device: str) -> Optional[HuggingFaceEmbeddings]:
    print(f"Attempting to load embedding model ({EMBEDDING_MODEL_NAME}) on device: {device}...")
    try:
        embedding_model = HuggingFaceEmbeddings(
            model_name=EMBEDDING_MODEL_NAME,
            model_kwargs={'device': device, 'truncate_dim': 256},
            encode_kwargs={'normalize_embeddings': True}
        )
        print("Embedding model loaded successfully.")
        return embedding_model
    except Exception as e:
        print(f"FATAL ERROR loading embedding model: {e}")
        return None



In [None]:
from langchain_experimental.text_splitter import SemanticChunker
import pyarrow.parquet as pq

def process_parquet_to_store(
    parquet_file_path: str,
    vector_store: Zilliz,
    embedding_model: HuggingFaceEmbeddings,
    max_batches: Optional[int] = None,
    start_from_batch: int = 0  ) -> None:

    print("‚öôÔ∏è Initializing Semantic Chunker...")
    try:
        text_splitter = SemanticChunker(
            embedding_model,
            breakpoint_threshold_type="percentile",
            breakpoint_threshold_amount=90
        )
    except Exception as e:
        print(f"FATAL: Failed to init SemanticChunker: {e}")
        return

    all_chunks_text = []
    all_chunks_meta = []

    print(f"üìÇ Opening Parquet file: {parquet_file_path}")
    
    try:
        parquet_file = pq.ParquetFile(parquet_file_path)
        total_rows_processed = 0
        num_batches = parquet_file.num_row_groups

        # ÿ®ŸÜÿ®ÿØÿ£ ÿßŸÑŸÑŸàÿ® ÿπÿßÿØŸä
        for i, batch in enumerate(parquet_file.iter_batches(batch_size=READ_BATCH_SIZE)):
            current_batch_num = i + 1

            # --- 2. ŸÑŸàÿ¨ŸäŸÉ ÿßŸÑŸÄ Skip (ÿßŸÑÿ™ÿÆÿ∑Ÿä) ---
            if current_batch_num < start_from_batch:
                # ÿ®ŸÜÿ∑ÿ®ÿπ ŸÉŸÑ 50 ÿ®ÿßÿ™ÿ¥ ÿπÿ¥ÿßŸÜ ŸÜÿπÿ±ŸÅ ÿ•ŸÜŸá ÿ¥ÿ∫ÿßŸÑ ÿ®ŸäÿπÿØŸä ŸàŸÖŸàŸÇŸÅÿ¥
                if current_batch_num % 50 == 0:
                    print(f"‚è© Skipping Batch {current_batch_num} (Already processed)...")
                continue 
            # ------------------------------------

            print(f"--- Processing Batch {current_batch_num}/{num_batches} ---")

            if max_batches and current_batch_num > max_batches:
                print(f"üõë Reached safety limit of {max_batches} batches.")
                break

            batch_dicts = batch.to_pylist()
            
            # --- ÿ®ÿØÿßŸäÿ© ÿßŸÑÿ¥ÿ∫ŸÑ (ÿ≤Ÿä ŸÖÿß ŸáŸà) ---
            batch_contents = []
            batch_metadatas = []

            for row in batch_dicts:
                title = row.get("title", "No Title")
                content = row.get("content", "")

                if not content or len(content) < 50: continue

                batch_contents.append(content)
                batch_metadatas.append({"title": title, "source": "wikipedia"})

            if batch_contents:
                try:
                    docs = text_splitter.create_documents(batch_contents, metadatas=batch_metadatas)
                    for doc in docs:
                        all_chunks_text.append(doc.page_content)
                        all_chunks_meta.append(doc.metadata)
                        
                    total_rows_processed += len(batch_contents)

                except Exception as e:
                    print(f"‚ö†Ô∏è Error chunking batch: {e}")

            if len(all_chunks_text) >= WRITE_BATCH_SIZE:
                print(f"üöÄ PUSHING {len(all_chunks_text)} chunks to Zilliz...")
                try:
                    vector_store.add_texts(texts=all_chunks_text, metadatas=all_chunks_meta)
                    all_chunks_text = []
                    all_chunks_meta = []
                except Exception as e:
                    print(f"‚ùå Error uploading to Zilliz: {e}")

        # ÿ±ŸÅÿπ ÿßŸÑÿ®ŸÇÿßŸäÿß
        if all_chunks_text:
            print(f"üöÄ PUSHING final {len(all_chunks_text)} chunks...")
            try:
                vector_store.add_texts(texts=all_chunks_text, metadatas=all_chunks_meta)
            except Exception as e:
                print(f"‚ùå Error uploading final batch: {e}")

        print(f"\n‚úÖ Done! Processed {total_rows_processed} articles (started from batch {start_from_batch}).")

    except Exception as e:
        print(f"üî• Critical Error: {e}")

In [None]:
if __name__ == "__main__":
    print(f"--- Starting VDB Knowledge Base Creation on {DEVICE} ---")
    START_FROM_BATCH = 0
    SOURCE_PARQUET_FILE = "/kaggle/input/medical-wiki-rag/filtered_medical_wiki.parquet" 

    if HF_TOKEN:
        login(token=HF_TOKEN)
        print("Hugging Face login successful.")
    else:
        print("Warning: HF_TOKEN not found.")

    embed_model = load_embedding_model(device=DEVICE)
    if not embed_model:
        print("Stopping script."); exit()

    vdb_knowledge = initialize_vector_store(COLLECTION_NAME, embed_model)
    if not vdb_knowledge:
        print("Stopping script."); exit()

    process_parquet_to_store(
        SOURCE_PARQUET_FILE, 
        vdb_knowledge,
        embed_model,
        max_batches=MAX_BATCHES_TO_PROCESS, 
        start_from_batch=START_FROM_BATCH
    )

    print("--- Script Finished ---")
    print(f"All processed data has been pushed to Zilliz Cloud Collection: {COLLECTION_NAME}")