1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   b) Configure the producer to connect to a Kafka cluster.
   c) Implement logic to send messages to a Kafka topic.


In [None]:
from confluent_kafka import Producer

# Configure the Kafka cluster
bootstrap_servers = 'localhost:9092'  # Replace with the appropriate Kafka broker addresses

# Create a Kafka producer
producer = Producer({'bootstrap.servers': bootstrap_servers})

# Define the Kafka topic
topic = 'your_topic_name'  # Replace with the desired Kafka topic

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Send messages to the Kafka topic
messages = [
    'Message 1',
    'Message 2',
    'Message 3',
    # Add more messages as needed
]

for message in messages:
    producer.produce(topic, value=message, callback=delivery_report)

# Wait for the messages to be delivered
producer.flush()

# Close the Kafka producer
producer.close()



2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.


In [None]:
from confluent_kafka import Consumer, KafkaException

# Configure the Kafka cluster
bootstrap_servers = 'localhost:9092'  # Replace with the appropriate Kafka broker addresses
group_id = 'your_consumer_group_id'  # Replace with a unique consumer group ID
topic = 'your_topic_name'  # Replace with the desired Kafka topic

# Create a Kafka consumer
consumer = Consumer({
    'bootstrap.servers': bootstrap_servers,
    'group.id': group_id
})

# Subscribe to the Kafka topic
consumer.subscribe([topic])

# Consume messages from the Kafka topic
try:
    while True:
        message = consumer.poll(timeout=1.0)

        if message is None:
            continue

        if message.error():
            if message.error().code() == KafkaException._PARTITION_EOF:
                # End of partition event
                print(f'Reached end of partition {message.topic()} [{message.partition()}]')
            else:
                # Error occurred
                print(f'Error occurred: {message.error().str()}')
            continue

        # Process the consumed message
        print(f'Received message: {message.value().decode()}')

except KeyboardInterrupt:
    pass

# Close the Kafka consumer
consumer.close()



3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.


In [None]:
from confluent_kafka.admin import AdminClient, NewTopic, KafkaException

# Configure the Kafka cluster
bootstrap_servers = 'localhost:9092'  # Replace with the appropriate Kafka broker addresses

# Create an AdminClient
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

def create_topic(topic_name, num_partitions, replication_factor):
    # Create a new topic
    topic = NewTopic(topic_name, num_partitions, replication_factor)

    # Create the topic in Kafka
    try:
        admin_client.create_topics([topic])
        print(f'Topic "{topic_name}" created successfully.')
    except KafkaException as e:
        print(f'Failed to create topic "{topic_name}": {e}')

def list_topics():
    # List existing topics
    topics = admin_client.list_topics().topics
    print('Existing topics:')
    for topic_name in topics:
        print(topic_name)

def delete_topic(topic_name):
    # Delete an existing topic
    try:
        admin_client.delete_topics([topic_name])
        print(f'Topic "{topic_name}" deleted successfully.')
    except KafkaException as e:
        print(f'Failed to delete topic "{topic_name}": {e}')

# Usage example:

# Create a new topic
create_topic('your_topic_name', num_partitions=3, replication_factor=1)

# List existing topics
list_topics()

# Delete an existing topic
delete_topic('your_topic_name')



4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.


In [None]:
from confluent_kafka import Producer, Consumer, KafkaException

# Configure the Kafka cluster
bootstrap_servers = 'localhost:9092'  # Replace with the appropriate Kafka broker addresses
topic = 'your_topic_name'  # Replace with the desired Kafka topic

# Create a Kafka producer
producer = Producer({'bootstrap.servers': bootstrap_servers})

# Create a Kafka consumer
consumer = Consumer({
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'your_consumer_group_id'  # Replace with a unique consumer group ID
})

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def consume_messages():
    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # End of partition event
                    print(f'Reached end of partition {message.topic()} [{message.partition()}]')
                else:
                    # Error occurred
                    print(f'Error occurred: {message.error().str()}')
                continue

            # Process the consumed message
            print(f'Received message: {message.value().decode()}')

    except KeyboardInterrupt:
        pass

def produce_messages():
    # Send messages to the Kafka topic
    messages = [
        'Message 1',
        'Message 2',
        'Message 3',
        # Add more messages as needed
    ]

    for message in messages:
        producer.produce(topic, value=message, callback=delivery_report)

    # Wait for the messages to be delivered
    producer.flush()

# Produce messages
produce_messages()

# Consume messages
consume_messages()

# Close the Kafka producer and consumer
producer.close()
consumer.close()



5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   b) Implement logic to handle messages consumed by different consumers within the same group.
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
from confluent_kafka import Consumer, KafkaException

# Configure the Kafka cluster
bootstrap_servers = 'localhost:9092'  # Replace with the appropriate Kafka broker addresses
group_id = 'your_consumer_group_id'  # Replace with the desired consumer group ID
topic = 'your_topic_name'  # Replace with the desired Kafka topic

# Create a Kafka consumer
consumer = Consumer({
    'bootstrap.servers': bootstrap_servers,
    'group.id': group_id
})

def message_handler(message):
    # Process the consumed message
    print(f'Consumer {consumer.id()} received message: {message.value().decode()}')

# Subscribe to the Kafka topic
consumer.subscribe([topic])

try:
    while True:
        message = consumer.poll(timeout=1.0)

        if message is None:
            continue

        if message.error():
            if message.error().code() == KafkaException._PARTITION_EOF:
                # End of partition event
                print(f'Reached end of partition {message.topic()} [{message.partition()}]')
            else:
                # Error occurred
                print(f'Error occurred: {message.error().str()}')
            continue

        # Process the consumed message
        message_handler(message)

except KeyboardInterrupt:
    pass

# Close the Kafka consumer
consumer.close()
