# 
1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   b) Configure the producer to connect to a Kafka cluster.
   c) Implement logic to send messages to a Kafka topic.



In [None]:
from confluent_kafka import Producer

def kafka_producer(topic):
    # Configure Kafka producer
    producer = Producer({
        'bootstrap.servers': 'localhost:9092',  # Kafka cluster address
        'client.id': 'python-producer'
    })

    # Send messages to Kafka topic
    for i in range(10):
        message = f'Message {i}'
        producer.produce(topic, value=message.encode('utf-8'))
        producer.flush()

    # Close the producer
    producer.close()

# Specify the Kafka topic to send messages to
topic = 'your_topic_name'

# Call the function to create a Kafka producer and send messages
kafka_producer(topic)


# 2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.


In [None]:
from confluent_kafka import Consumer, KafkaException

def kafka_consumer(topic):
    # Configure Kafka consumer
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',  # Kafka cluster address
        'group.id': 'python-consumer',
        'auto.offset.reset': 'earliest'
    })

    # Subscribe to Kafka topic
    consumer.subscribe([topic])

    # Consume messages from Kafka topic
    try:
        while True:
            message = consumer.poll(1.0)

            if message is None:
                continue
            if message.error():
                raise KafkaException(message.error())

            value = message.value().decode('utf-8')
            print(f"Received message: {value}")

    except KeyboardInterrupt:
        pass

    # Close the consumer
    consumer.close()

# Specify the Kafka topic to consume messages from
topic = 'your_topic_name'

# Call the function to create a Kafka consumer and consume messages
kafka_consumer(topic)


# 3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.


In [None]:
from kafka import KafkaAdminClient, NewTopic

def create_kafka_topic(topic, num_partitions, replication_factor):
    # Configure Kafka AdminClient
    admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

    # Create new Kafka topic
    new_topic = NewTopic(name=topic, num_partitions=num_partitions, replication_factor=replication_factor)
    admin_client.create_topics(new_topics=[new_topic])

    print(f"Topic '{topic}' created successfully.")

def list_kafka_topics():
    # Configure Kafka AdminClient
    admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

    # List existing Kafka topics
    topic_metadata = admin_client.list_topics()
    topics = topic_metadata.topics

    print("Existing topics:")
    for topic in topics:
        print(topic)

def delete_kafka_topic(topic):
    # Configure Kafka AdminClient
    admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

    # Delete existing Kafka topic
    admin_client.delete_topics(topics=[topic])

    print(f"Topic '{topic}' deleted successfully.")

# Call the functions to create, list, and delete Kafka topics
create_kafka_topic('your_topic_name', 3, 1)
list_kafka_topics()
delete_kafka_topic('your_topic_name')


# 4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.



In [None]:
from kafka import KafkaProducer, KafkaConsumer

def produce_kafka_messages(topic, num_messages):
    # Configure Kafka producer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')

    # Produce messages to Kafka topic
    for i in range(num_messages):
        message = f'Message {i}'
        producer.send(topic, message.encode('utf-8'))

    producer.flush()
    producer.close()

def consume_kafka_messages(topic):
    # Configure Kafka consumer
    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9092', group_id='group-1')

    # Consume messages from Kafka topic
    for message in consumer:
        value = message.value.decode('utf-8')
        print(f"Received message: {value}")

    consumer.close()

# Specify the Kafka topic to produce/consume messages
topic = 'your_topic_name'
num_messages = 10

# Produce messages to Kafka topic
produce_kafka_messages(topic, num_messages)

# Consume messages from Kafka topic
consume_kafka_messages(topic)


# 5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   b) Implement logic to handle messages consumed by different consumers within the same group.
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
from kafka import KafkaConsumer

def kafka_consumer_group(topic, group_id):
    # Configure Kafka consumer
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers='localhost:9092',
        group_id=group_id,
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        auto_commit_interval_ms=1000
    )

    # Consume messages from Kafka topic
    for message in consumer:
        value = message.value.decode('utf-8')
        print(f"Consumer: {group_id}, Message: {value}")

# Specify the Kafka topic and consumer group
topic = 'your_topic_name'
consumer_group = 'your_consumer_group'

# Create Kafka consumers within the consumer group
consumers = [KafkaConsumer(topic, bootstrap_servers='localhost:9092', group_id=consumer_group) for _ in range(3)]

# Start consuming messages
for consumer in consumers:
    consumer.subscribe([topic])

# Consume messages within the consumer group
try:
    while True:
        for consumer in consumers:
            for message in consumer:
                value = message.value.decode('utf-8')
                print(f"Consumer: {consumer_group}, Message: {value}")
except KeyboardInterrupt:
    pass

# Close Kafka consumers
for consumer in consumers:
    consumer.close()
