# Loading Data from Kafka into LlamaIndex

This notebook demonstrates how to use the `KafkaReader` class to load data from a Kafka topic into LlamaIndex and perform queries on the indexed data.

## Prerequisites

Before running this notebook, make sure you have the following:

- A running Kafka cluster with the specified topic and data available.
- The `llama-index` and `confluent-kafka` libraries installed.

## Imports

First, let's import the necessary classes:

```python
# from llama_index import VectorStoreIndex
from llama_index.readers.kafka import KafkaReader
```

### Simple Example

In [None]:
from llama_index.readers.kafka import KafkaReader

# Configure the KafkaReader
bootstrap_servers = ["localhost:9092"]
topics = ["llama_index_test_topic"]
group_id = "llama_index_test_group"
security_protocol = "PLAINTEXT"

# Create an instance of KafkaReader
reader = KafkaReader(
    bootstrap_servers=bootstrap_servers,
    topics=topics,
    group_id=group_id,
    security_protocol=security_protocol,
)

# Load data from Kafka
documents = reader.load()

# Create an index using the loaded documents
# index = VectorStoreIndex.from_documents(documents)

# Query the index
# query_engine = index.as_query_engine()
# response = query_engine.query("What is the main topic of the documents?")

# print(response)

# Close the Kafka reader
reader.close()

In [None]:
import os
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.readers.kafka import KafkaReader
from IPython.display import Markdown, display
import chromadb

# Kafka configuration
bootstrap_servers = ["localhost:9092"]
topics = ["my_topic"]
group_id = "my_group"

# Load data from Kafka
def load_data_from_kafka():
    reader = KafkaReader(
        bootstrap_servers=bootstrap_servers,
        topics=topics,
        group_id=group_id,
    )
    return reader.load()

# Create Chroma client and collection
chroma_client = chromadb.Client()
chroma_collection = chroma_client.create_collection("my_collection")

# Define embedding function
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-base-en-v1.5")

# Build index
def build_index(data):
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex.from_documents(
        data, storage_context=storage_context, embed_model=embed_model
    )
    return index

# Query the index
def query_index(index):
    query_engine = index.as_query_engine()
    response = query_engine.query("What is the main topic of the documents?")
    display(Markdown(f"Query Response: {response}"))

# Update metadata
def update_metadata(chroma_collection):
    doc_to_update = chroma_collection.get(limit=1)
    doc_to_update["metadatas"][0] = {
        **doc_to_update["metadatas"][0],
        **{"source": "Kafka"},
    }
    chroma_collection.update(
        ids=[doc_to_update["ids"][0]],
        metadatas=[doc_to_update["metadatas"][0]]
    )
    updated_doc = chroma_collection.get(limit=1)
    print("Updated Metadata:", updated_doc["metadatas"][0])

# Delete document
def delete_document(chroma_collection):
    doc_to_delete = chroma_collection.get(limit=1)
    print("Count before deletion:", chroma_collection.count())
    chroma_collection.delete(ids=[doc_to_delete["ids"][0]])
    print("Count after deletion:", chroma_collection.count())

def main():
    # Load data from Kafka
    data = load_data_from_kafka()

    # Build index
    index = build_index(data)

    # Query the index
    query_index(index)

    # Update metadata
    update_metadata(chroma_collection)

    # Delete document
    delete_document(chroma_collection)

if __name__ == "__main__":
    main()

In [None]:
import os
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext, VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.core.embeddings import resolve_embed_model
from llama_index.llms.ollama import Ollama
from llama_index.readers.kafka import KafkaReader
import chromadb

# Kafka configuration
bootstrap_servers = ["localhost:9092"]
topics = ["my_topic"]
group_id = "my_group"

# Load data from Kafka
def load_data_from_kafka():
    reader = KafkaReader(
        bootstrap_servers=bootstrap_servers,
        topics=topics,
        group_id=group_id,
    )
    return reader.load()

# Create Chroma client and collection
chroma_client = chromadb.Client()
chroma_collection = chroma_client.create_collection("my_collection")

# Configure local embedding model and LLM
Settings.embed_model = resolve_embed_model("local:BAAI/bge-small-en-v1.5")
Settings.llm = Ollama(model="mistral", request_timeout=30.0)

# Build index
def build_index(data):
    vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    index = VectorStoreIndex.from_documents(
        data, storage_context=storage_context
    )
    return index

# Query the index
def query_index(index):
    query_engine = index.as_query_engine()
    response = query_engine.query("What is the main topic of the documents?")
    print("Query Response:", response)

# Update metadata
def update_metadata(chroma_collection):
    doc_to_update = chroma_collection.get(limit=1)
    doc_to_update["metadatas"][0] = {
        **doc_to_update["metadatas"][0],
        **{"source": "Kafka"},
    }
    chroma_collection.update(
        ids=[doc_to_update["ids"][0]],
        metadatas=[doc_to_update["metadatas"][0]]
    )
    updated_doc = chroma_collection.get(limit=1)
    print("Updated Metadata:", updated_doc["metadatas"][0])

# Delete document
def delete_document(chroma_collection):
    doc_to_delete = chroma_collection.get(limit=1)
    print("Count before deletion:", chroma_collection.count())
    chroma_collection.delete(ids=[doc_to_delete["ids"][0]])
    print("Count after deletion:", chroma_collection.count())

def main():
    # Load data from Kafka
    data = load_data_from_kafka()

    # Build index
    index = build_index(data)

    # Query the index
    query_index(index)

    # Update metadata
    update_metadata(chroma_collection)

    # Delete document
    delete_document(chroma_collection)

if __name__ == "__main__":
    main()