In [1]:
from langchain_core.documents import Document

docs = [
    Document(
        page_content="In 'The Whispering Walls' by Ava Moreno, a young journalist named Sophia uncovers a decades-old conspiracy hidden within the crumbling walls of an ancient mansion, where the whispers of the past threaten to destroy her own sanity.",
        metadata={"category": "Mystery"},
    ),
    Document(
        page_content="In 'The Last Refuge' by Ethan Blackwood, a group of survivors must band together to escape a post-apocalyptic wasteland, where the last remnants of humanity cling to life in a desperate bid for survival.",
        metadata={"category": "Post-Apocalyptic"},
    ),
    Document(
        page_content="In 'The Memory Thief' by Lila Rose, a charismatic thief with the ability to steal and manipulate memories is hired by a mysterious client to pull off a daring heist, but soon finds themselves trapped in a web of deceit and betrayal.",
        metadata={"category": "Heist/Thriller"},
    ),
    Document(
        page_content="In 'The City of Echoes' by Julian Saint Clair, a brilliant detective must navigate a labyrinthine metropolis where time is currency, and the rich can live forever, but at a terrible cost to the poor.",
        metadata={"category": "Science Fiction"},
    ),
    Document(
        page_content="In 'The Starlight Serenade' by Ruby Flynn, a shy astronomer discovers a mysterious melody emanating from a distant star, which leads her on a journey to uncover the secrets of the universe and her own heart.",
        metadata={"category": "Science Fiction/Romance"},
    ),
    Document(
        page_content="In 'The Shadow Weaver' by Piper Redding, a young orphan discovers she has the ability to weave powerful illusions, but soon finds herself at the center of a deadly game of cat and mouse between rival factions vying for control of the mystical arts.",
        metadata={"category": "Fantasy"},
    ),
    Document(
        page_content="In 'The Lost Expedition' by Caspian Grey, a team of explorers ventures into the heart of the Amazon rainforest in search of a lost city, but soon finds themselves hunted by a ruthless treasure hunter and the treacherous jungle itself.",
        metadata={"category": "Adventure"},
    ),
    Document(
        page_content="In 'The Clockwork Kingdom' by Augusta Wynter, a brilliant inventor discovers a hidden world of clockwork machines and ancient magic, where a rebellion is brewing against the tyrannical ruler of the land.",
        metadata={"category": "Steampunk/Fantasy"},
    ),
    Document(
        page_content="In 'The Phantom Pilgrim' by Rowan Welles, a charismatic smuggler is hired by a mysterious organization to transport a valuable artifact across a war-torn continent, but soon finds themselves pursued by deadly assassins and rival factions.",
        metadata={"category": "Adventure/Thriller"},
    ),
    Document(
        page_content="In 'The Dreamwalker's Journey' by Lyra Snow, a young dreamwalker discovers she has the ability to enter people's dreams, but soon finds herself trapped in a surreal world of nightmares and illusions, where the boundaries between reality and fantasy blur.",
        metadata={"category": "Fantasy"},
    ),
]


In [8]:
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
model = BGEM3EmbeddingFunction()

Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 94254.02it/s]


In [15]:
from langchain_milvus import Milvus
from langchain_milvus.utils.sparse import BaseSparseEmbedding
from langchain_core.embeddings import Embeddings
from typing import List, Dict, Any
import scipy.sparse as sp

class DenseEmbeddings(Embeddings):
    def __init__(self, model):
        self.ef = model
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        return self.ef.encode_documents(texts)["dense"]
    def embed_query(self, text: str) -> List[float]:
        return self.ef.encode_queries([text])["dense"][0]

class SparseEmbeddings(BaseSparseEmbedding):
    def __init__(self, model):
        self.ef = model

    def embed_query(self, text: str) -> Dict[int, float]:
        sparse = self.ef.encode_queries([text])["sparse"]
        return self._sparse_to_dict(sparse)

    def embed_documents(self, texts: List[str]) -> List[Dict[int, float]]:
        sparse_arrays = self.ef.encode_documents(texts)["sparse"]

        # Handle 1D sparse array (single document)
        if sparse_arrays.ndim == 1:
            return [self._sparse_to_dict(sparse_arrays)]

        # Handle 2D sparse array (multiple documents)
        # Convert to CSR for efficient row access
        if not sp.isspmatrix_csr(sparse_arrays):
            sparse_arrays = sp.csr_matrix(sparse_arrays)
        
        return [
            self._sparse_to_dict(self._get_row(sparse_arrays, i))
            for i in range(sparse_arrays.shape[0])
        ]

    def _get_row(self, sparse_matrix, row_idx):
        """Safely extract a single row from a sparse matrix/array."""
        if hasattr(sparse_matrix, 'getrow'):
            # csr_matrix has getrow
            return sparse_matrix.getrow(row_idx)
        else:
            # csr_array: use slicing
            return sparse_matrix[row_idx:row_idx+1]

    def _sparse_to_dict(self, sparse_array: Any) -> Dict[int, float]:
        """Convert sparse array to dictionary format."""
        # Ensure we're working with COO format
        if hasattr(sparse_array, 'tocoo'):
            coo = sparse_array.tocoo()
        else:
            coo = sparse_array
            
        if coo.ndim == 1:  
            # 1D sparse array
            indices = coo.nonzero()[0]
            return {
                int(i): float(coo[i])
                for i in indices
            }
        else:  
            # 2D sparse array (convert to COO for iteration)
            if not sp.isspmatrix_coo(coo):
                coo = coo.tocoo()
            return {
                int(col): float(val)
                for col, val in zip(coo.col, coo.data)
            }


vectorstore = Milvus( 
    collection_name="docs",   
    embedding_function=[
        DenseEmbeddings(model),
        SparseEmbeddings(model)
    ],
    vector_field=[
        "dense",
        "sparse"
    ],    
    consistency_level="Bounded", 
    drop_old=True,
)

from uuid import uuid4

ids = [str(uuid4()) for _ in range(len(docs))]

vectorstore.add_documents(documents=docs, ids=ids)

Chunks:   0%|          | 0/3 [00:00<?, ?it/s]

Chunks: 100%|██████████| 3/3 [00:00<00:00, 31.70it/s]
Chunks: 100%|██████████| 3/3 [00:00<00:00, 31.43it/s]


['fb403565-182e-499d-8364-6030dffe1509',
 '801a0472-71b7-44c3-a89b-ac32ccf2e75d',
 '2eb5da22-b40b-4be0-9879-75d2e7609e3d',
 '412ef4f1-f3af-4299-b4fc-eb5c3332c22f',
 'a3b68209-3fcf-4d7f-8b9e-7e54293fa04b',
 'fd5421cc-3ba2-476e-84a8-674957e78173',
 'a97c9946-7e2c-4cb0-a487-b368df9e5cfb',
 '6bd4f6d7-b250-4f44-ac8d-f676d68dee6a',
 '98a8275c-be37-4b6f-aab5-38ea9cc17107',
 'a1f62b8b-e13f-4a1d-8c40-985e731735ee']

In [14]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain_core.prompts import PromptTemplate
from langchain.chat_models import init_chat_model

llm = init_chat_model(
  "ollama:gpt-oss:20b-cloud",
  base_url = "http://localhost:11234"
)

PROMPT_TEMPLATE = """
Human: You are an AI assistant, and provides answers to questions by using fact based and statistical information when possible.
Use the following pieces of information to provide a concise answer to the question enclosed in <question> tags.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
<context>
{context}
</context>

<question>
{question}
</question>

The response should be specific and use statistics or numbers when possible.

Assistant:"""

prompt = PromptTemplate(
    template=PROMPT_TEMPLATE, input_variables=["context", "question"]
)

retriever = vectorstore.as_retriever(
    search_type="similarity", search_kwargs={"k": 10, "ranker_type": "rrf"}
)

ce_rf = HuggingFaceCrossEncoder(
    model_name="cross-encoder/ms-marco-MiniLM-L-6-v2",  # Specify the model name.
    model_kwargs={
        "device": "cpu"
    } # Specify the device to use, e.g., 'cpu' or 'cuda:0'
)

def rerank(inputs: dict, top_k: int = 5):
    query = inputs["question"]
    docs = inputs["context"]

    pairs = [(query, d.page_content) for d in docs]
    scores = ce_rf.score(pairs)

    for d, s in zip(docs, scores):
        d.metadata["ce_score"] = float(s)

    docs = sorted(
        docs,
        key=lambda d: d.metadata["ce_score"],
        reverse=True
    )[:top_k]

    context = "\n\n".join(d.page_content for d in docs)

    return {
        "context": context,
        "question": query
    }

reranker = RunnableLambda(rerank)

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | reranker
    | prompt
    | llm
)

query = """
Find fantasy novel involving magic abilities, illusions, and a young protagonist caught between rival factions.
"""