In [None]:
import os
import uuid
import json
import pandas as pd
from qdrant_client import QdrantClient, models
from fastembed import TextEmbedding
from tqdm import tqdm
from qdrant_client.models import PointStruct, SparseVector
from fastembed import SparseTextEmbedding

In [2]:
# --- Configuration ---
DENSE_EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
DENSE_EMBEDDING_SIZE = 384

SPARSE_EMBEDDING_MODEL_NAME = "prithivida/Splade_PP_en_v1"

client = QdrantClient(host="localhost", port=6333)

def create_or_recreate_collection(name: str, indexes: dict, use_hnsw_optimization: bool = False):
    """Creates a new Qdrant collection with advanced options."""
    try:
        client.get_collection(collection_name=name)
        print(f"Collection '{name}' already exists. Recreating it for a clean slate.")
        client.delete_collection(collection_name=name)
    except Exception:
        pass  # Collection does not exist, which is fine

    print(f"Creating collection '{name}'...")
    
    hnsw_config = None
    if use_hnsw_optimization:
        print(f"Applying HNSW optimization for collection '{name}'.")
        hnsw_config = models.HnswConfigDiff(
            payload_m=16,
            m=0,
            on_disk=True,
        )

    client.create_collection(
        collection_name=name,
        vectors_config={
            "dense": models.VectorParams(
                size=DENSE_EMBEDDING_SIZE,
                distance=models.Distance.COSINE,
                on_disk=True,
            ),
        },
        sparse_vectors_config={
            "sparse": models.SparseVectorParams(
                index=models.SparseIndexParams(
                    on_disk=False,
                )
            )
        },
        hnsw_config=hnsw_config,
        on_disk_payload=True,
    )
    
    print(f"Creating payload indexes for '{name}'...")
    for field, field_type in indexes.items():
        # The field_schema now includes the is_tenant flag
        client.create_payload_index(name, field, field_schema=field_type)
    print(f"Collection '{name}' created successfully.")

In [3]:
user_data_indexes = {
    "tenant_id": models.KeywordIndexParams(
        type='keyword',
        is_tenant=True,
        on_disk=True
    ),
    "customer_id": models.KeywordIndexParams(
        type='keyword',
        is_tenant=True,
        on_disk=True
    )
}

kb_indexes = {
    "tenant_id": models.KeywordIndexParams(
        type='keyword',
        is_tenant=True,
        on_disk=True
    ),
    "tags": models.KeywordIndexParams(
        type='keyword',
        on_disk=True
    ),
    "source_type": models.KeywordIndexParams(
        type='keyword',
        on_disk=True
    )
}

# Create collections with the new advanced settings
create_or_recreate_collection("user_data", indexes=user_data_indexes, use_hnsw_optimization=True)
create_or_recreate_collection("knowledge_base", indexes=kb_indexes, use_hnsw_optimization=True)

Collection 'user_data' already exists. Recreating it for a clean slate.
Creating collection 'user_data'...
Applying HNSW optimization for collection 'user_data'.
Creating payload indexes for 'user_data'...
Collection 'user_data' created successfully.
Collection 'knowledge_base' already exists. Recreating it for a clean slate.
Creating collection 'knowledge_base'...
Applying HNSW optimization for collection 'knowledge_base'.
Creating payload indexes for 'knowledge_base'...
Collection 'knowledge_base' created successfully.


In [4]:
DENSE_EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
SPARSE_EMBEDDING_MODEL_NAME = "prithivida/Splade_PP_en_v1"

DENSE_EMBEDDING_MODEL = TextEmbedding(model_name=DENSE_EMBEDDING_MODEL_NAME)
SPARSE_EMBEDDING_MODEL = SparseTextEmbedding(model_name=SPARSE_EMBEDDING_MODEL_NAME)

In [5]:
def process_unstructured_files(directory_path, tenant_id, points_list):
    """Dynamically reads all JSON files from a given directory."""
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            filepath = os.path.join(directory_path, filename)
            source_name = filename.split('.')[0]  # e.g., 'faqs', 'policy'
            
            with open(filepath, 'r') as f:
                data = json.load(f)

                for item in data:
                    category = item.get(
                        "question", item.get("title", {})
                    )
                    content = item.get(
                        "answer", item.get(
                            "content", item.get("description", "")
                        )
                    )
                    tags = item.get("tags", {})
                    
                    if not content:
                        continue

                    text_to_embed = content
                    # THIS CAN ONLY BE QUESTION/TITLE/POLICY_TYPE
                    if source_name == 'faqs':
                        text_to_embed = f"Question: {category}\nAnswer: {content}"
                    elif source_name == 'handbook':
                        text_to_embed = f"Title: {category}\nContent: {content}"
                    elif source_name == 'policy':
                        text_to_embed = f"Policy Type: {category}\nPolicy Description: {content}"

                    payload = {
                        "tenant_id": tenant_id,
                        "source_type": source_name,
                        "tags": tags,
                        "content": text_to_embed,
                    }
                    points_list.append((text_to_embed, payload))

In [6]:
def upsert_in_batch(text, payloads, collection_name, batch_size):
    dense = list(DENSE_EMBEDDING_MODEL.embed(text))
    sparse = list(SPARSE_EMBEDDING_MODEL.embed(text))

    total = len(text)
    for i in tqdm(range(0, total, batch_size)):
        batch_dense_embeddings = dense[i:i + batch_size]
        batch_sparse_embeddings = sparse[i:i + batch_size]
        batch_payloads = payloads[i:i + batch_size]

        points = [
            PointStruct(
                id=str(uuid.uuid4()),
                vector={
                    "dense": dense_embeddings,
                    "sparse": SparseVector(indices=sparse_embeddings.indices, values=sparse_embeddings.values)
                },
                payload=payload
            )
            for dense_embeddings, sparse_embeddings, payload in zip(batch_dense_embeddings, batch_sparse_embeddings, batch_payloads)
        ]

        client.upsert(
            collection_name=collection_name,
            points=points,
            wait=True
        )

In [7]:
def ingest_data(data_path, batch_size = 64):
    """Processes all data sources and uploads them to their respective collections."""
    user_data_points = []
    kb_points = []

    tenants = ["ecom", "fintech"]
    for tenant in tenants:
        print(f"\n--- Processing data for tenant: {tenant} ---")
        
        crm_df = pd.read_csv(f"{data_path}/{tenant}/crm_records.csv")
        for _, row in crm_df.iterrows():
            text_to_embed = f"Customer: {row['name']}, Email: {row['email']}"
            payload = {"tenant_id": tenant, "source_type": "crm", **row.to_dict(), "text_embeded": text_to_embed}
            user_data_points.append((text_to_embed, payload))

        helpdesk_df = pd.read_csv(f"{data_path}/{tenant}/helpdesk_logs.csv")
        for _, row in helpdesk_df.iterrows():
            text_to_embed = f"Ticket: {row['issue_summary']}, Status: {row['status']}"
            payload = {"tenant_id": tenant, "source_type": "helpdesk", **row.to_dict(), "text_embeded": text_to_embed}
            user_data_points.append((text_to_embed, payload))

        kb_path = f"{data_path}/{tenant}/knowledge_base"
        process_unstructured_files(kb_path, tenant, kb_points)

    # --- Upload to Qdrant ---
    # User Data
    user_data_texts, user_data_payloads = zip(*user_data_points)
    # user_data_embeddings = list(DENSE_EMBEDDING_MODEL.embed(user_data_texts))[0]
    upsert_in_batch(user_data_texts, user_data_payloads, "user_data", batch_size)
    print(f"\nIngested {len(user_data_points)} points into 'user_data' collection.")

    # Knowledge Base
    kb_texts, kb_payloads = zip(*kb_points)
    # kb_embeddings = list(embedding_model.embed(kb_texts))
    upsert_in_batch(kb_texts, kb_payloads, "knowledge_base", batch_size)
    print(f"Ingested {len(kb_points)} points into 'knowledge_base' collection.")

In [8]:
ingest_data(data_path = '../data')


--- Processing data for tenant: ecom ---

--- Processing data for tenant: fintech ---


100%|██████████| 5/5 [00:00<00:00, 15.98it/s]



Ingested 300 points into 'user_data' collection.


100%|██████████| 4/4 [00:00<00:00, 15.97it/s]

Ingested 225 points into 'knowledge_base' collection.





In [None]:
if __name__ == "__main__":
    user_data_indexes = {
        "tenant_id": models.KeywordIndexParams(
            type='keyword',
            is_tenant=True,
            on_disk=True
        ),
        "customer_id": models.KeywordIndexParams(
            type='keyword',
            is_tenant=True,
            on_disk=True
        )
    }

    kb_indexes = {
        "tenant_id": models.KeywordIndexParams(
            type='keyword',
            is_tenant=True,
            on_disk=True
        ),
        "tags": models.KeywordIndexParams(
            type='keyword',
            on_disk=True
        ),
        "source_type": models.KeywordIndexParams(
            type='keyword',
            on_disk=True
        )
    }

    # Create collections with the new advanced settings
    create_or_recreate_collection("user_data", indexes=user_data_indexes, use_hnsw_optimization=True)
    create_or_recreate_collection("knowledge_base", indexes=kb_indexes, use_hnsw_optimization=True)
    
    ingest_data(data_path = '../data')