# Streaming RAG Demo with LangChain, Milvus, Quix and Mistral

This notebook demonstrates how to build a Retrieval-Augmented Generation (RAG) system that can:
1. Answer questions using a vector database (Milvus)
2. Stream new data from Kafka using Quix
3. Update its knowledge base in real-time

We'll use:
- **LangChain**: For orchestrating the RAG pipeline
- **Milvus**: As our vector database
- **Ollama**: For running the LLM locally (`mistral-small` model)
- **Quix**: For Kafka streaming integration

## Setup and Imports

First, let's import all necessary libraries:

In [1]:
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_milvus import Milvus
from langchain_ollama.llms import OllamaLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
import json
import logging
import sys
import time

# Configure logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

## Initialize RAG Components

Now we'll set up our RAG system with:
1. Embeddings model for converting text to vectors
2. LLM for generating responses
3. Vector store for storing and retrieving documents
4. RAG prompt template
5. The complete RAG chain

In [None]:
def setup_rag_components():
    """Initialize RAG components"""
    # Initialize components
    embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5")
    llm = OllamaLLM(model="mistral-small")
    
    # Set up empty Milvus vector store
    vector_store = Milvus.from_texts(
        texts=["Initial empty document"],  # Need at least one document to create collection
        embedding=embeddings,
        connection_args={"host": "localhost", "port": "19530"},
        collection_name="streaming_rag_demo",
        drop_old=True
    )
    
    # Create RAG prompt
    template = """Answer the question based only on the following context:

{context}

Question: {question}
Answer:"""
    
    prompt = ChatPromptTemplate.from_template(template)
    
    # Create RAG chain
    rag_chain = (
        {"context": vector_store.as_retriever(), "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )
    
    return vector_store, rag_chain

# Initialize our components
vector_store, rag_chain = setup_rag_components()

## Test Initial RAG System

Let's test our RAG system before adding any real data. It should respond that it doesn't have relevant information since our vector store is empty.

In [None]:
print("Initial Query (before streaming):")
question = "What do you know about artificial intelligence developments?"
print(f"Question: {question}")
print(f"Answer: {rag_chain.invoke(question)}\n")

## Set Up Kafka Producer

Now let's create a producer that will send some sample messages to Kafka. These messages will contain information that our RAG system can learn from.

## Clean up the Kakfa Topic 

To make sure we have a clean state, we'll delete and recreate the topic before adding some sample messages.

In [4]:
from quixstreams.kafka import Producer, Consumer

def cleanup_topic():
    """Delete and recreate the topic to ensure clean state"""
    print("\nCleaning up Kafka topic...")
    
    consumer = Consumer(
        broker_address="localhost:29092",
        consumer_group="rag-consumer",
        auto_offset_reset="earliest"
    )
    
    try:
        # Try to subscribe - this will fail if topic doesn't exist
        consumer.subscribe(["messages"])
        msg = consumer.poll(timeout=1.0)
        if msg:
            print("Found existing messages, recreating topic...")
            consumer.close()
            
            # Create producer with admin rights to delete topic
            with Producer(
                broker_address="localhost:29092",
                extra_config={
                    "allow.auto.create.topics": "true",
                },
            ) as producer:
                producer.delete_topics(["messages"])
                time.sleep(2)  # Wait for deletion
                
    except Exception as e:
        print(f"Topic doesn't exist yet: {e}")
    finally:
        consumer.close()

In [None]:
cleanup_topic()

In [None]:
from quixstreams.kafka import Producer, Consumer

def send_sample_messages():
    """Send sample messages to Kafka"""
    messages = [
        {"chat_id": "id1", "text": "The latest developments in artificial intelligence have revolutionized how we approach problem solving"},
        {"chat_id": "id2", "text": "Climate change poses significant challenges to global ecosystems and human societies"},
        {"chat_id": "id3", "text": "Quantum computing promises to transform cryptography and drug discovery"},
        {"chat_id": "id4", "text": "Sustainable energy solutions are crucial for addressing environmental concerns"}
    ]
    
    with Producer(
        broker_address="localhost:29092",
        extra_config={"allow.auto.create.topics": "true"},
    ) as producer:
        print("\nSending messages to Kafka...")
        for message in messages:
            print(f'Sending: "{message["text"]}"')
            producer.produce(
                topic="messages",
                key=message["chat_id"].encode(),
                value=json.dumps(message).encode(),
            )
            time.sleep(1)  # Wait for processing
        print("\nAll messages sent!")

# Send our sample messages
send_sample_messages()

## Process Streaming Data

Now we'll consume the messages from Kafka and add them to our vector store. This simulates how our RAG system can learn from streaming data.

In [None]:
def process_stream(vector_store):
    """Process all messages from Kafka stream and add to RAG system"""
    consumer = Consumer(
        broker_address="localhost:29092",
        consumer_group="rag-consumer",
        auto_offset_reset="earliest"
    )
    consumer.subscribe(["messages"])
    
    print("\nProcessing messages from Kafka...")
    empty_polls = 0
    max_empty_polls = 5  # Wait for a few empty polls before giving up
    
    while empty_polls < max_empty_polls:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            empty_polls += 1
            continue
            
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue
            
        try:
            value = json.loads(msg.value().decode())
            text = value["text"]
            print(f"\nReceived message: {text}")
            
            # Add text directly to vector store
            vector_store.add_texts([text])
            empty_polls = 0  # Reset counter when we get a message
            
        except Exception as e:
            print(f"Error processing message: {e}")
    
    consumer.close()
    print("\nFinished processing messages from Kafka")

# Process the streaming data
process_stream(vector_store)

## Test Updated RAG System

Now let's test our RAG system again. This time it should have knowledge from the streamed messages.

In [None]:
# Query about AI
print("Query about AI developments:")
question = "What do you know about artificial intelligence developments?"
print(f"Question: {question}")
print(f"Answer: {rag_chain.invoke(question)}\n")

# Query about climate change
print("Query about climate change:")
question = "What information do you have about climate change?"
print(f"Question: {question}")
print(f"Answer: {rag_chain.invoke(question)}\n")