In [10]:
import os
from typing import List, Dict

from pymilvus import MilvusClient, DataType
from sentence_transformers import SentenceTransformer
from pypdf import PdfReader
from dotenv import load_dotenv
load_dotenv()

# ========= CONFIGURATION =========

# Set these as environment variables OR hard-code (not recommended)
MILVUS_HOST = os.getenv("MILVUS_HOST")        # gRPC host from watsonx.data Milvus service
MILVUS_PORT = os.getenv("MILVUS_PORT", "443") # gRPC port from service
MILVUS_API_KEY = os.getenv("MILVUS_API_KEY")  # IBM Cloud API key

# Paths to your PDFs
PUBLIC_PDF_PATH = "./data/offerings_public.pdf"
MANAGERS_PDF_PATH = "./data/offerings_managers_only.pdf"

# Collection names
PUBLIC_COLLECTION = "offerings_public"
MANAGERS_COLLECTION = "offerings_managers_only"

# Embedding model (384 dimensions)
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_DIM = 384  # fixed for this model

# print(MILVUS_HOST)
# print(MILVUS_PORT)
# print(MILVUS_API_KEY)


In [11]:
def connect_milvus() -> MilvusClient:
    if not (MILVUS_HOST and MILVUS_PORT and MILVUS_API_KEY):
        raise RuntimeError("Set MILVUS_HOST, MILVUS_PORT and MILVUS_API_KEY first.")
	

    milvus_uri = f"https://ibmlhapikey_michal.kordyzon@pl.ibm.com:{MILVUS_API_KEY}@{MILVUS_HOST}:{MILVUS_PORT}"

    client = MilvusClient(
        uri=milvus_uri,
        secure=True,           # SaaS Milvus is always TLS
    )
    return client


In [12]:
def load_pdf_text(pdf_path: str) -> str:
    """Read PDF and return the full concatenated text."""
    reader = PdfReader(pdf_path)
    pages_text = []
    for page in reader.pages:
        text = page.extract_text() or ""
        pages_text.append(text)
    return "\n".join(pages_text)

def chunk_text(text: str, max_tokens: int = 256, overlap: int = 20) -> list[str]:
    """
    Simple, safe chunking:
    - If text is short, returns a single chunk.
    - For longer text, creates overlapping windows without infinite loops.
    """
    tokens = text.split()
    if not tokens:
        return []

    # If the text is shorter than one window, just return it as a single chunk
    if len(tokens) <= max_tokens:
        return [" ".join(tokens)]

    chunks = []
    start = 0
    n = len(tokens)

    while start < n:
        end = min(start + max_tokens, n)
        chunk = " ".join(tokens[start:end]).strip()
        if chunk:
            chunks.append(chunk)

        if end == n:  # we've reached the end; break to avoid infinite loop
            break

        # Move window forward with overlap
        start = max(0, end - overlap)

    return chunks


# def chunk_text(text: str, max_tokens: int = 512, overlap: int = 50) -> List[str]:
#     """
#     Very simple character-based chunking.
#     For production, you might switch to token-based chunking, but this is fine to start.
#     """
#     tokens = text.split()
#     chunks = []
#     start = 0
#     while start < len(tokens):
#         end = min(start + max_tokens, len(tokens))
#         chunk = " ".join(tokens[start:end]).strip()
#         if chunk:
#             chunks.append(chunk)
#         start = end - overlap  # small overlap to keep context
#         if start < 0:
#             start = 0
#     return chunks



In [13]:
def ensure_collection(client: MilvusClient, collection_name: str, dim: int):
    """
    Create collection if it does not exist, then create a simple FLAT index
    on the embedding field (works well on IBM Milvus).
    """
    if client.has_collection(collection_name):
        print(f"Collection '{collection_name}' already exists.")
        return

    # 1. Define schema
    schema = client.create_schema(
        auto_id=True,
        enable_dynamic_field=False,
    )

    schema.add_field(
        field_name="id",
        datatype=DataType.INT64,
        is_primary=True,
    )

    schema.add_field(
        field_name="offering_id",
        datatype=DataType.VARCHAR,
        max_length=256,
    )

    schema.add_field(
        field_name="text",
        datatype=DataType.VARCHAR,
        max_length=2048,
    )

    schema.add_field(
        field_name="embedding",
        datatype=DataType.FLOAT_VECTOR,
        dim=dim,
    )

    # 2. Create collection (no index yet)
    client.create_collection(
        collection_name=collection_name,
        schema=schema,
    )
    print(f"Created collection '{collection_name}'.")

    # 3. Create index on embedding field
    index_params = client.prepare_index_params()
    index_params.add_index(
        field_name="embedding",
        index_type="FLAT",      # safe + supported everywhere
        metric_type="COSINE",   # good for sentence-transformers
        # no extra params needed for FLAT
    )

    client.create_index(
        collection_name=collection_name,
        index_params=index_params,
    )
    print(f"Created FLAT index on '{collection_name}.embedding'.")


In [14]:
def embed_chunks(model, chunks: List[str]) -> List[List[float]]:
    """Embed a list of text chunks using a SentenceTransformer model."""
    embeddings = model.encode(chunks, convert_to_numpy=True, show_progress_bar=True)
    return embeddings.tolist()


def build_insert_payload(
    offering_id: str,
    chunks: List[str],
    embeddings: List[List[float]],
) -> List[Dict]:
    """
    Build Milvus insert payload: one dict per row, with fields matching schema.
    """
    assert len(chunks) == len(embeddings)
    rows = []
    for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
        rows.append(
            {
                # 'id' is auto_id, so we omit it
                "offering_id": offering_id,
                "text": chunk,
                "embedding": emb,
            }
        )
    return rows

In [15]:
# TEMPORARY COMMENTED
# def ingest_pdf_to_collection(
#     client: MilvusClient,
#     collection_name: str,
#     pdf_path: str,
#     offering_id: str,
#     model,
# ):
#     """Load a PDF, chunk it, embed, and insert into the given collection."""
#     print(f"\nIngesting '{pdf_path}' into collection '{collection_name}' for offering_id='{offering_id}'")

#     full_text = load_pdf_text(pdf_path)
#     chunks = chunk_text(full_text, max_tokens=512, overlap=50)
#     print(f"  - Extracted {len(chunks)} chunks from PDF")

#     embeddings = embed_chunks(model, chunks)
#     print(f"  - Generated {len(embeddings)} embeddings")

#     rows = build_insert_payload(offering_id, chunks, embeddings)

#     res = client.insert(
#         collection_name=collection_name,
#         data=rows,
#     )

#     print(f"  - Inserted {len(res['insert_count']) if isinstance(res, dict) and 'insert_count' in res else len(rows)} rows")

In [16]:
import time

def ingest_pdf_to_collection(
    client: MilvusClient,
    collection_name: str,
    pdf_path: str,
    offering_id: str,
    model,
):
    """Load a PDF, chunk it, embed, and insert into the given collection (with debug prints)."""
    print(f"\nIngesting '{pdf_path}' into collection '{collection_name}' for offering_id='{offering_id}'")
    t0 = time.time()

    # 1. Read PDF
    print("  [1] Reading PDF...")
    full_text = load_pdf_text(pdf_path)
    print(f"      PDF length (chars): {len(full_text)}")

    # 2. Chunk
    print("  [2] Chunking text...")
    chunks = chunk_text(full_text, max_tokens=256, overlap=20)
    print(f"      Number of chunks: {len(chunks)}")
    if len(chunks) == 0:
        print("      WARNING: no chunks produced, skipping insert.")
        return

    # 3. Embeddings
    print("  [3] Embedding chunks...")
    t_embed_start = time.time()
    embeddings = embed_chunks(model, chunks)
    print(f"      Generated {len(embeddings)} embeddings in {time.time() - t_embed_start:.2f} s")

    # 4. Build rows
    print("  [4] Building insert payload...")
    rows = build_insert_payload(offering_id, chunks, embeddings)
    print(f"      Rows to insert: {len(rows)}")

    # 5. Insert into Milvus
    print("  [5] Inserting into Milvus...")
    t_ins_start = time.time()
    res = client.insert(
        collection_name=collection_name,
        data=rows,
        # you can add timeout if needed (if supported in your pymilvus version):
        # timeout=60,
    )
    print(f"      Insert done in {time.time() - t_ins_start:.2f} s")
    print(f"  [✓] Ingestion completed in {time.time() - t0:.2f} s")

    # Optional: print a tiny summary of res
    try:
        print("      Insert response keys:", list(res.keys()))
    except Exception:
        pass


In [17]:
# Drop old collections so we can recreate them cleanly

client = connect_milvus()
print("Connected to IBM Milvus.")

collections_to_drop = [PUBLIC_COLLECTION, MANAGERS_COLLECTION]

for name in collections_to_drop:
    if client.has_collection(name):
        print(f"Dropping collection '{name}'...")
        client.drop_collection(name)
        print(f"Collection '{name}' dropped.")
    else:
        print(f"Collection '{name}' does not exist, skipping.")

print("Done dropping collections.")


Connected to IBM Milvus.
Dropping collection 'offerings_public'...
Collection 'offerings_public' dropped.
Dropping collection 'offerings_managers_only'...
Collection 'offerings_managers_only' dropped.
Done dropping collections.


In [18]:
# ========= RUN PIPELINE IN NOTEBOOK =========

# 1. Connect to IBM Milvus (SaaS)
client = connect_milvus()
print("Connected to IBM Milvus.")

# 2. Load embedding model
print("Loading embedding model...")
model = SentenceTransformer(EMBEDDING_MODEL_NAME)

# 3. Ensure both collections exist
ensure_collection(client, PUBLIC_COLLECTION, EMBEDDING_DIM)
ensure_collection(client, MANAGERS_COLLECTION, EMBEDDING_DIM)

# 4. Ingest public PDF into offerings_public
ingest_pdf_to_collection(
    client=client,
    collection_name=PUBLIC_COLLECTION,
    pdf_path=PUBLIC_PDF_PATH,
    offering_id="offering_xyz",  # use a real ID if you have one
    model=model,
)


# 5. Ingest managers-only PDF into offerings_managers_only
ingest_pdf_to_collection(
    client=client,
    collection_name=MANAGERS_COLLECTION,
    pdf_path=MANAGERS_PDF_PATH,
    offering_id="offering_xyz",  # same ID links public + managers-only
    model=model,
)

print("\nDone.")


Connected to IBM Milvus.
Loading embedding model...
Created collection 'offerings_public'.
Created FLAT index on 'offerings_public.embedding'.
Created collection 'offerings_managers_only'.


Ignoring wrong pointing object 6 0 (offset 0)
Ignoring wrong pointing object 8 0 (offset 0)
Ignoring wrong pointing object 10 0 (offset 0)
Ignoring wrong pointing object 12 0 (offset 0)
Ignoring wrong pointing object 15 0 (offset 0)
Ignoring wrong pointing object 38 0 (offset 0)


Created FLAT index on 'offerings_managers_only.embedding'.

Ingesting './data/offerings_public.pdf' into collection 'offerings_public' for offering_id='offering_xyz'
  [1] Reading PDF...
      PDF length (chars): 3289
  [2] Chunking text...
      Number of chunks: 2
  [3] Embedding chunks...


Batches: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.98s/it]


      Generated 2 embeddings in 3.21 s
  [4] Building insert payload...
      Rows to insert: 2
  [5] Inserting into Milvus...


Ignoring wrong pointing object 6 0 (offset 0)
Ignoring wrong pointing object 8 0 (offset 0)
Ignoring wrong pointing object 10 0 (offset 0)
Ignoring wrong pointing object 12 0 (offset 0)
Ignoring wrong pointing object 14 0 (offset 0)
Ignoring wrong pointing object 16 0 (offset 0)
Ignoring wrong pointing object 31 0 (offset 0)
Ignoring wrong pointing object 41 0 (offset 0)


      Insert done in 0.50 s
  [✓] Ingestion completed in 3.78 s
      Insert response keys: ['insert_count', 'ids', 'cost']

Ingesting './data/offerings_managers_only.pdf' into collection 'offerings_managers_only' for offering_id='offering_xyz'
  [1] Reading PDF...
      PDF length (chars): 4046
  [2] Chunking text...
      Number of chunks: 3
  [3] Embedding chunks...


Batches: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  2.61it/s]

      Generated 3 embeddings in 0.39 s
  [4] Building insert payload...
      Rows to insert: 3
  [5] Inserting into Milvus...
      Insert done in 0.17 s
  [✓] Ingestion completed in 0.62 s
      Insert response keys: ['insert_count', 'ids', 'cost']

Done.



