# Notebook: Connect and Write to Milvus DB in watsonx.data

This notebook demonstrates how to interact with an existing Milvus database within the watsonx.data

**Key Steps:**

- Fetching Wikipedia articles from the Wikipedia API
- Embedding text using the Multilingual-E5-Large model
- Writing text with embeddings to Milvus

**Extending the Notebook:**

This notebook serves as a starting point for building a custom data ingestion pipeline. You can enhance it by:

- Scraping a website of your choice
- Using alternative embedding models available in WatsonX

**Important Note:**

When using different embedding models, ensure that they are available in WatsonX.

## Imports

In [1]:
from pymilvus import connections, DataType, Collection, FieldSchema, CollectionSchema, utility, db
from llama_index.core import StorageContext
import pandas as pd
import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import warnings
warnings.filterwarnings("ignore")
import dotenv
from dotenv import load_dotenv
import wikipedia
import requests
import re
load_dotenv()

  from .autonotebook import tqdm as notebook_tqdm


True

# Credentials 
Define the paratmers in a .env

For the Lab, the credentials might be provided for you.

- **ProjectID and Access Token**: From a watsonx.ai project. You **often** need them but **not** for this notebook.
- **API-Key**: IBM Cloud/ IAM Access Management
- **Miluvs-Password**: API Key
- **Milvus-Host**: From watsonx.data/infrastructure/miluvs-instance

In [2]:
import os
host = os.getenv("MILVUS_HOST", None)
port = os.getenv("MILVUS_PORT", None)
password = os.getenv("MILVUS_PASSWORD", None)
user = os.getenv("MILVUS_USER", None)
print(f"Host: {host}, Port: {port}, Password: {password}, User: {user}")

Host: 102092af-5474-4a42-8dc2-35bb05ffdd0e.cvgfjtof0l91rq0joaj0.lakehouse.appdomain.cloud, Port: 31574, Password: oLDjQJic9EgszqIbNKUCR4_pxvr1L-DGKlpxkQ7p7zJK, User: ibmlhapikey


# Connect to Milvus

In [3]:
connections.connect(user=user, password=password, host=host, port=port, secure=True)
# test connection
print(connections.list_connections())
print(connections.get_connection_addr(alias="default"))
print(db.list_database())

[('default', <pymilvus.client.grpc_handler.GrpcHandler object at 0x00000221BE001CA0>)]
{'secure': True, 'address': '102092af-5474-4a42-8dc2-35bb05ffdd0e.cvgfjtof0l91rq0joaj0.lakehouse.appdomain.cloud:31574', 'user': 'ibmlhapikey'}
['default', 'group_01', 'group_02', 'group_03']


## To Disconnect

In [4]:
# connections.disconnect("default")
print(connections.list_connections())

[('default', <pymilvus.client.grpc_handler.GrpcHandler object at 0x00000221BE001CA0>)]


## To Delete Collection

In [29]:
#utility.drop_collection('docling_helvetia')

---
# Set Collection Name and Get Wiki Data

Lets check all the existing collections in the default database

In [30]:
collections = utility.list_collections()
for collection in collections:
    # Initialize the Collection object
    collection = Collection(collection)
    # Print collection name
    print(f"Collection: {collection.name}\n")
    # Print collection schema
    print(f"Schema: {collection.schema}\n")

Name the collection and define the wiki-article you want to fetch.

In [7]:
# Set collection name
COLLECTION_NAME = "docling_helvetia"  #name
# Set wiki article name
# wiki_title = 'wiki-page-title' # 'wiki-page-title'

- Some patterns are removed from the article
- You can use the print statements to see how the article looks


In [8]:
# Demonstration of combining Google Cloud Storage and Qdrant
from docling_core.transforms.chunker.hybrid_chunker import HybridChunker
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode, AcceleratorOptions, \
    AcceleratorDevice, TesseractCliOcrOptions
from docling.document_converter import DocumentConverter
from docling.document_converter import PdfFormatOption
from docling_core.transforms.chunker import HierarchicalChunker
from docling.datamodel.settings import settings
from dotenv import load_dotenv
from langchain_docling import DoclingLoader

# Docling loader
def docling_loader(file_path, use_ocr_override: bool = False):
    pipeline_options = PdfPipelineOptions()
    pipeline_options.accelerator_options = AcceleratorOptions(
        num_threads=8, device=AcceleratorDevice.CUDA
    )
    # Set OCR based on the parameter (default: OCR disabled)
    pipeline_options.do_ocr = use_ocr_override

    pipeline_options.do_table_structure = True
    pipeline_options.table_structure_options.do_cell_matching = True  # uses text cells predicted from table structure model
    pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE

    doc_converter = DocumentConverter(
        format_options={
            InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
        }
    )
    # Enable the profiling to measure the time spent
    # settings.debug.profile_pipeline_timings = True

    # return DoclingLoader(chunker=HybridChunker(tokenizer="intfloat/multilingual-e5-large", max_tokens=512), file_path=file_path, converter=doc_converter)
    return DoclingLoader(chunker=HierarchicalChunker(), file_path=file_path, converter=doc_converter)

- The docs will be split into chunks, which will be vectorized using an embedding model before writing it into Milvus.
- This process is necessary even when you write your own web scraping script, as you'll need to split the retrieved text before embedding it

In [9]:
def load_document_with_conditional_ocr(file_path, text_length_threshold: int = 10):
    # First, try loading without OCR.
    loader = docling_loader(file_path, use_ocr_override=False)
    chunks = loader.load()

    # Check if we got any chunks and whether they contain enough text.
    total_text_length = sum(
        len(chunk.page_content) for chunk in chunks if hasattr(chunk, "page_content") and chunk.page_content)
    if not chunks or total_text_length < text_length_threshold:
        print("Detected empty or minimal text output. Reprocessing with OCR enabled...")
        # Re-run with OCR enabled.
        loader = docling_loader(file_path, use_ocr_override=True)
        chunks = loader.load()
    else:
        print("Text detected without OCR; skipping OCR step.")

    return chunks


from pathlib import Path

path = Path(r"../.data/AVB_HELVETIA/2-0071 06.68_DE.pdf")  #A01-S_DE.pdf")
files_path = []
# for file in path.iterdir():
chunks = load_document_with_conditional_ocr(path)
# Print the chunks that will be embedded later
for i, chunk in enumerate(chunks):
    print(f"Chunk {i + 1}:")
    print(chunk)
    print("-" * 40)

Text detected without OCR; skipping OCR step.
Chunk 1:
page_content='PATRIA
SCHWEIZERISCHE LEBENSVERSICHERUNGS-GESELLSCHAFT AUF GEGENSEITIGKEIT BASEL' metadata={'source': WindowsPath('../.data/AVB_HELVETIA/2-0071 06.68_DE.pdf'), 'dl_meta': {'schema_name': 'docling_core.transforms.chunker.DocMeta', 'version': '1.0.0', 'doc_items': [{'self_ref': '#/texts/1', 'parent': {'$ref': '#/groups/0'}, 'children': [], 'content_layer': 'body', 'label': 'text', 'prov': [{'page_no': 1, 'bbox': {'l': 42.48, 't': 754.35, 'r': 448.262, 'b': 734.195, 'coord_origin': 'BOTTOMLEFT'}, 'charspan': [0, 73]}]}], 'headings': ['PATRIA'], 'origin': {'mimetype': 'application/pdf', 'binary_hash': 8325773841557957054, 'filename': '2-0071 06.68_DE.pdf'}}}
----------------------------------------
Chunk 2:
page_content='Artikel 1  Umfang des Anspruches
- 1.Auf  die  versicherten  Leistungen  besteht  Anspruch, wenn  der  Versicherte  infolge  von  Krankheit,  Zerfall der geistigen und körperlichen  Kräfte oder  infolge v

# Prepare Chunks and Create Embeddings

intfloat/multilingual-e5-large expects a "passage: " prefix before each text passage.

In [16]:
import json
from pathlib import Path

# Function to make metadata JSON serializable (reuse from previous context)
def make_serializable(metadata_dict):
    serializable_meta = {}
    if not isinstance(metadata_dict, dict): # Handle cases where input isn't a dict
        return str(metadata_dict) if isinstance(metadata_dict, Path) else metadata_dict

    for key, value in metadata_dict.items():
        if isinstance(value, Path):
            serializable_meta[key] = str(value) # Convert Path to string
        elif isinstance(value, dict):
            serializable_meta[key] = make_serializable(value) # Recursively process nested dicts
        elif isinstance(value, list):
             serializable_meta[key] = [make_serializable(item) for item in value] # Recursively process list items
        else:
            # Assume other types are directly serializable
            serializable_meta[key] = value
    return serializable_meta

# Add  "passage: " to beginning of each chunk for e5-large
input_texts = ["passage: " + chunk.page_content for chunk in chunks]
docling_metadata = [chunk.metadata for chunk in chunks]
# Make metadata serializable
serializable_metadata = [make_serializable(meta) for meta in docling_metadata]

page_numbers = []
chapters = [] # Example: Extract from 'headings' if relevant
file_names = []
for meta in serializable_metadata:
    # Page Number Extraction (Example)
    page_no = -1 # Default value
    try:
        # Adjust path based on your actual metadata structure
        page_no = meta.get('dl_meta', {}).get('doc_items', [{}])[0].get('prov', [{}])[0].get('page_no', -1)
    except (IndexError, KeyError, AttributeError):
        pass # Keep default if extraction fails
    page_numbers.append(page_no)

    # Chapter Extraction (Example - using 'headings')
    chapter_name = "Unknown" # Default value
    try:
        # Assumes 'headings' list exists and contains the relevant chapter
        headings = meta.get('dl_meta', {}).get('headings', [])
        if headings:
            chapter_name = headings[0] # Take the first heading as chapter
    except (KeyError, AttributeError):
        pass
    # Ensure chapter name does not exceed max_length if defined in schema
    chapters.append(chapter_name) # Use schema max_length

    file_name= "Unknown" # Default value
    try:
        # Adjust path based on your actual metadata structure
        file_name = meta.get('dl_meta', {}).get('origin', [{}]).get('filename', "Unknown")
    except (IndexError, KeyError, AttributeError):
        pass # Keep default if extraction fails
    file_names.append(file_name)

# Placeholder values - replace with actual extraction logic
product_names = ["Helvetia Product X"] * len(chunks) # Example
product_years = ["2024"] * len(chunks) # Example
company_entity_names = ["Helvetia"] * len(chunks) # Example


print (f"Input texts: {input_texts}\n\nPage: {page_numbers}\nChapters:{chapters}\nFile_name:{file_names}\n Metadata: {docling_metadata}\n\n")
# Embed the Chunks using SentenceTransformer with intfloat/multilingual-e5-large
model = SentenceTransformer("intfloat/multilingual-e5-large")
embeddings = model.encode(input_texts, normalize_embeddings=True)
dim = embeddings.shape[1]

Input texts: ['passage: PATRIA\nSCHWEIZERISCHE LEBENSVERSICHERUNGS-GESELLSCHAFT AUF GEGENSEITIGKEIT BASEL', 'passage: Artikel 1  Umfang des Anspruches\n- 1.Auf  die  versicherten  Leistungen  besteht  Anspruch, wenn  der  Versicherte  infolge  von  Krankheit,  Zerfall der geistigen und körperlichen  Kräfte oder  infolge von  Körperverletzung  ganz oder  teilweise  nicht  mehr imstande ist, seinen Beruf oder eine andere seiner  Lebensstellung und seinen  Fähigkeiten angemessene Erwerbstätigkeit auszuüben.\n- 2. Ist  der  Versicherte  nur  teilweise  arbeitsunfähig,  so  entsprechen  die  Leistungen  dem  Grade  der  Arbeitsunfähigkeit.  Eine Arbeitsunfähigkeit  von weniger  als  25%  begründet  keinen  Anspruch  auf  Leistungen; beträgt  die  Arbeitsunfähigkeit  mindestens 75%,  so  wird die  volle Leistung  gewährt.', 'passage: Artikel 2  Beginn der Leistungen  -  Rentenzahlung  -  Kapitalzahlung\n- 1. a)  Die  Leistungen werden entsprechend der vereinbarten  Wartefrist  nach  einer  u

---
# Write Data into Milvus

## Check if Collection exists, Define Schema

Before proceeding, we need to check if the collection already exists in the Milvus database.

**Check if Collection Existst:**

If the collection already exists, you can drop it using the command provided earlier in this notebook.

**Define Collection Schema:**

If the collection does not exist, we define the schema for the collection.
The embeddings will be stored in vector and the assosiated text chunks in text.

watsonx.data **expects** the following fields:**"vector"** (index) and **"text"**.

In [18]:
# Check if collection already exists and create fields
if COLLECTION_NAME in utility.list_collections():
    collection = Collection(COLLECTION_NAME)
    print(f"Collection '{COLLECTION_NAME}' already exists. Rename or drop the collection first.")
else:
    print(f"Collection '{COLLECTION_NAME}' does not exist. Proceed.")
    id_field = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True)
    vector_field = FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dim)
    text_field = FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535)
    metadata_field = FieldSchema(name="metadata", dtype=DataType.JSON)
    page_number = FieldSchema(name="page_number", dtype=DataType.INT64)
    file_name = FieldSchema(name="file_name", dtype=DataType.VARCHAR, max_length=2048)
    product_name = FieldSchema(name="product_name", dtype=DataType.VARCHAR, max_length=256)
    product_year = FieldSchema(name="product_year", dtype=DataType.VARCHAR, max_length=256)
    chapter = FieldSchema(name="chapter", dtype=DataType.VARCHAR, max_length=2048)
    company_entity = FieldSchema(name="company_entity", dtype=DataType.VARCHAR, max_length=256) #Helvetia, National, Phoenix
    schema = CollectionSchema(
        fields=[id_field, vector_field, text_field,  metadata_field, page_number, file_name, product_name, product_year, chapter, company_entity],
    description="Collection for storing text chunks and their embeddings",
    enable_dynamic_field=True)
    collection = Collection(name=COLLECTION_NAME, schema=schema)

Collection 'docling_helvetia' does not exist. Proceed.


## Insert the Data, Create Index

Now we insert the embeddings and the text into milvus and define the index parameters. 

Refer to the pymilvus documentation for the index parameters.

In [19]:
# Insert .data
import json

# Convert each metadata dict to a JSON string, letting non-serializable objects be turned into strings.
def insert_into_milvus(collection, text_chunks, embeddings, serializable_metadata, page_numbers, file_names, product_names, product_years, chapters, entity):
    data_to_insert = []
    for i in range(len(text_chunks)):
        chunk_data = {
            "vector": embeddings[i].tolist(),  # Ensure individual vector is a list
            "text": input_texts[i],
            "metadata": serializable_metadata[i], # Already serialized JSON-compatible dict
            "page_number": page_numbers[i],
            "file_name": file_names[i],
            "product_name": product_names[i],
            "product_year": product_years[i],
            "chapter": chapters[i],
            "company_entity": entity[i]
        }
        data_to_insert.append(chunk_data)
    collection.insert(data_to_insert)
    collection.flush()

insert_into_milvus(collection, input_texts, embeddings, serializable_metadata, page_numbers, file_names, product_names, product_years, chapters, entity_names)
# Define the index 
index_params = {
    "metric_type": "COSINE",  # or L2 for example
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024}
}
# Create the index for the vector
collection.create_index(field_name="vector", index_params=index_params)
print(f"Index created on field 'vector' with params: {index_params}")

Index created on field 'vector' with params: {'metric_type': 'COSINE', 'index_type': 'IVF_FLAT', 'params': {'nlist': 1024}}


---
# Test the Collection

In [26]:
collection.load()
user_query = "Worum geht's im Art. 70?"
model = SentenceTransformer("intfloat/multilingual-e5-large")
query_embedding = model.encode(user_query).tolist()
search_results = collection.search(
    data=[query_embedding],
    anns_field="vector",
    param={"metric_type": "COSINE", "params": {"nprobe": 10}},
    limit=5,
    output_fields=["text", "metadata", "page_number", "file_name", "product_name", "product_year", "chapter", "entity"],
)

for hits in search_results:
    for hit in hits:
        print(f"Chunk content: {hit.entity.get('text')}")
        print(f"Page Number: {hit.entity.get('page_number')}")
        print(f"File Name: {hit.entity.get('file_name')}")
        print(f"Product Name: {hit.entity.get('product_name')}")
        print(f"Product Year: {hit.entity.get('product_year')}")
        print(f"Chapter: {hit.entity.get('chapter')}")
        print(f"Entity: {hit.entity.get('company_entity')}")
        print(f"Metadata: {hit.entity.get('metadata')}") # Metadata is already requested
        print(f"Distance: {hit.distance}")
        print("---")

Chunk content: passage: Artikel 6  Erlöschen der Zusatzversicherung
a)  wenn  der  Versicherte  an  einem  Krieg  oder  an  kriegsähnlichen  Handlungen  teilnimmt,  ohne  dass  die Schweiz selbst  Krieg  führt  oder  in  kriegsähnliche Handlungen  hineingezogen  ist;
Page Number: 2
File Name: 2-0071 06.68_DE.pdf
Product Name: Helvetia Product X
Product Year: 2024
Chapter: Artikel 6  Erlöschen der Zusatzversicherung
Entity: Helvetia
Metadata: {'source': '..\\.data\\AVB_HELVETIA\\2-0071 06.68_DE.pdf', 'dl_meta': {'schema_name': 'docling_core.transforms.chunker.DocMeta', 'version': '1.0.0', 'doc_items': [{'self_ref': '#/texts/33', 'parent': {'$ref': '#/body'}, 'children': [], 'content_layer': 'body', 'label': 'text', 'prov': [{'page_no': 2, 'bbox': {'l': 61.44, 't': 514.886, 'r': 473.664, 'b': 501.051, 'coord_origin': 'BOTTOMLEFT'}, 'charspan': [0, 199]}]}], 'headings': ['Artikel 6  Erlöschen der Zusatzversicherung'], 'origin': {'mimetype': 'application/pdf', 'binary_hash': 83257738415579