Setting up a Kafka Producer:
a) Python program to create a Kafka producer:

from kafka import KafkaProducer

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Send messages to a Kafka topic
topic = 'my_topic'
message = b'Hello, Kafka!'
producer.send(topic, message)


Setting up a Kafka Consumer:
a) Python program to create a Kafka consumer:

from kafka import KafkaConsumer

# Create a Kafka consumer
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# Consume messages from a Kafka topic
for message in consumer:
    print(message.value.decode())


Creating and Managing Kafka Topics:
a) Python program to create a new Kafka topic:



from kafka import KafkaAdminClient, NewTopic

# Create a Kafka admin client
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

# Create a new Kafka topic
topic_name = 'my_topic'
partitions = 1
replication_factor = 1
topic = NewTopic(name=topic_name, num_partitions=partitions, replication_factor=replication_factor)
admin_client.create_topics([topic])


b) Functionality to list existing topics:



from kafka import KafkaAdminClient

# Create a Kafka admin client
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

# List existing Kafka topics
topics = admin_client.list_topics()
for topic in topics:
    print(topic)


c) Logic to delete an existing Kafka topic:

from kafka import KafkaAdminClient

# Create a Kafka admin client
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

# Delete an existing Kafka topic
topic_name = 'my_topic'
admin_client.delete_topics([topic_name])


Producing and Consuming Messages:
a) Python program to produce messages to a Kafka topic:

from kafka import KafkaProducer

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Send messages to a Kafka topic
topic = 'my_topic'
messages = [b'Message 1', b'Message 2', b'Message 3']
for message in messages:
    producer.send(topic, message)


b) Logic to consume messages from the same Kafka topic:

from kafka import KafkaConsumer

# Create a Kafka consumer
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# Consume messages from a Kafka topic
for message in consumer:
    print(message.value.decode())


c) Testing the end-to-end flow of message production and consumption:

Run the producer program (Step 4a) and then the consumer program (Step 4b) in separate terminals. The consumer will start consuming messages produced by the producer from the specified Kafka topic.

Working with Kafka Consumer Groups:

a) Python program to create a Kafka consumer within a consumer group:

from kafka import KafkaConsumer

# Create a Kafka consumer within a consumer group
consumer = KafkaConsumer('my_topic', group_id='my_consumer_group', bootstrap_servers='localhost:9092')

# Consume messages from a Kafka topic
for message in consumer:
    print(message.value.decode())


In [None]:
b) Logic to handle messages consumed by different consumers within the same group:

When multiple consumers are part of the same consumer group, Kafka will automatically balance the message consumption among them. The logic to handle messages within the same group remains the same as the previous consumer example (Step 2a).

c) Observing the behavior of consumer group rebalancing when adding or removing consumers:

If you add or remove consumers within the same consumer group, Kafka will trigger a rebalance to redistribute the partitions among the consumers. The rebalancing process is automatic and handled by Kafka. You can observe the rebalancing behavior by starting or stopping additional consumers within the same consumer group and monitoring the partition assignment logs in the Kafka broker.

R