In [2]:
from kafka import KafkaAdminClient

try:
    client = KafkaAdminClient(bootstrap_servers="localhost:9092")  # Adjust the broker
    print("Kafka broker is reachable!")
except Exception as e:
    print(f"Error: {e}")


Kafka broker is reachable!


In [45]:
from kafka import KafkaProducer
import json

# Kafka configuration
KAFKA_BROKER = "localhost:9092"  # Update if using Docker
TOPIC = "test_logs"

# Initialize Kafka producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),  # Serialize to JSON
    acks="all",  # Ensure all replicas acknowledge the message
    retries=5,  # Retry sending up to 5 times
    request_timeout_ms=20000,  # Wait up to 20 seconds for broker response
)

# Send a message
try:
    future = producer.send(TOPIC, value={"type": "test", "message": "Hello from Python"})
    result = future.get(timeout=10)  # Block until the message is acknowledged
    print(f"Message sent to {result.topic}, partition {result.partition}, offset {result.offset}")
except Exception as e:
    print(f"Error sending message: {e}")
finally:
    producer.close()


Message sent to test_logs, partition 0, offset 0


In [3]:
from kafka import KafkaConsumer
import json

# Kafka broker address
KAFKA_BROKER = "localhost:9092"  # Update this if running inside Docker

# Kafka topic to consume messages from
TOPIC = "industry_logs"  # Replace with your desired topic name

# Initialize Kafka consumer
consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=KAFKA_BROKER,
    auto_offset_reset='earliest',  # Start reading at the earliest message
    enable_auto_commit=True,       # Automatically commit offsets
    # group_id='my-group',           # Consumer group ID (use the same for all consumers in the group)
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))  # Deserialize JSON messages
)

print(f"Listening for messages on topic: {TOPIC}")

try:
    for message in consumer:
        print(f"Received message: {message.value}")
except KeyboardInterrupt:
    print("Consumer stopped.")
finally:
    consumer.close()  # Close the consumer connection


Listening for messages on topic: industry_logs
Consumer stopped.
