# KafkaLoader

In [None]:
from langchain_community.document_loaders import KafkaDocumentLoader

## Kafka Producer in Langchain Integration with Kafka
In the Langchain integration with Kafka, a Kafka Producer is used to send messages to a Kafka topic.

In [None]:
from confluent_kafka import Producer
import json

# Kafka broker configuration
broker_config = {
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'my-producer',
    'security.protocol': 'PLAINTEXT'
}

# broker_config = {
#     'bootstrap.servers': 'broker:29092,broker2:29092,broker3:29092',
#     'client.id': 'my-producer',
#     'security.protocol': 'PLAINTEXT'
# }

# Create a Kafka producer instance
producer = Producer(broker_config)

# Callback function to handle delivery reports
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Test data
data = [
    {'name': 'John', 'age': 30},
    {'name': 'Alice', 'age': 25},
    {'name': 'Bob', 'age': 35}
]

# Send messages to Kafka
for item in data:
    message = json.dumps(item)
    producer.produce('Langchain_test_topic', value=message.encode('utf-8'), callback=delivery_report)

# Flush the producer to ensure all messages are sent
producer.flush()

## Example of Using and Integrating the Apache Kafka Loader with Langchain

In [None]:
# Import necessary modules
from typing import Any, Callable, Iterator, Mapping, Optional
from confluent_kafka import Consumer, KafkaException, KafkaError
from langchain_core.documents import Document
from langchain_community.document_loaders import KafkaDocumentLoader

# Define a record handler function
def record_handler(record: Any, id: Optional[str]) -> Document:
    return Document(page_content=record.value().decode('utf-8'), metadata={'offset': record.offset(), 'timestamp': record.timestamp()})

# Initialize KafkaDocumentLoader
kafka_loader = KafkaDocumentLoader(
    bootstrap_servers='localhost:9092',
    topic='test_topic',
    group_id='test_group',
    auto_offset_reset='earliest',
    record_handler=record_handler
)

# Load documents
documents = kafka_loader.load()

# Print the first document
print(documents[0])

# Close the loader
kafka_loader.close()