### Kafka Assignment

1. Setting up a Kafka Producer:
a) Write a Python program to create a Kafka producer.
b) Configure the producer to connect to a Kafka cluster.
c) Implement logic to send messages to a Kafka topic.

In [None]:
#a) Write a Python program to create a Kafka producer.

from confluent_kafka import Producer

def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to topic {msg.topic()} partition {msg.partition()} offset {msg.offset()}")

def create_kafka_producer():
    # Configure Kafka producer properties
    producer_config = {
        'bootstrap.servers': 'your_kafka_broker_1, your_kafka_broker_2',  # Replace with your Kafka brokers
        'client.id': 'my_kafka_producer',
        'acks': 'all',  # Wait for leader to write to all replicas before acknowledging
        'compression.type': 'gzip',  # Optional: Use gzip compression for messages
        'batch.num.messages': 1000,  # Optional: Number of messages to batch before sending
    }

    # Create a Kafka producer instance
    producer = Producer(producer_config)

    try:
        # Define the Kafka topic to send messages to
        topic = 'your_kafka_topic'  # Replace with your Kafka topic name

        # Send a message to the Kafka topic
        message_value = "Hello, Kafka!"
        producer.produce(topic, value=message_value.encode('utf-8'), callback=delivery_report)

        # Flush any remaining messages in the producer
        producer.flush()

    except Exception as e:
        print(f"Error while sending a message: {e}")

    finally:
        # Close the Kafka producer
        producer.close()

if __name__ == "__main__":
    create_kafka_producer()
    
    
# b) Configure the producer to connect to a Kafka cluster.
from confluent_kafka import Producer

def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to topic {msg.topic()} partition {msg.partition()} offset {msg.offset()}")

def create_kafka_producer():
    # Configure Kafka producer properties
    producer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'client.id': 'my_kafka_producer',
        'acks': 'all',  # Wait for leader to write to all replicas before acknowledging
        'compression.type': 'gzip',  # Optional: Use gzip compression for messages
        'batch.num.messages': 1000,  # Optional: Number of messages to batch before sending
    }

    # Create a Kafka producer instance
    producer = Producer(producer_config)

    try:
        # Define the Kafka topic to send messages to
        topic = 'your_kafka_topic'  # Replace with your Kafka topic name

        # Send a message to the Kafka topic
        message_value = "Hello, Kafka!"
        producer.produce(topic, value=message_value.encode('utf-8'), callback=delivery_report)

        # Flush any remaining messages in the producer
        producer.flush()

    except Exception as e:
        print(f"Error while sending a message: {e}")

    finally:
        # Close the Kafka producer
        producer.close()

if __name__ == "__main__":
    create_kafka_producer()

# c) Implement logic to send messages to a Kafka topic.

from confluent_kafka import Producer

def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to topic {msg.topic()} partition {msg.partition()} offset {msg.offset()}")

def send_messages_to_kafka_topic(num_messages):
    # Configure Kafka producer properties
    producer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'client.id': 'my_kafka_producer',
        'acks': 'all',  # Wait for leader to write to all replicas before acknowledging
        'compression.type': 'gzip',  # Optional: Use gzip compression for messages
        'batch.num.messages': 1000,  # Optional: Number of messages to batch before sending
    }

    # Create a Kafka producer instance
    producer = Producer(producer_config)

    try:
        # Define the Kafka topic to send messages to
        topic = 'your_kafka_topic'  # Replace with your Kafka topic name

        # Send messages to the Kafka topic
        for i in range(num_messages):
            message_value = f"Message {i+1}"
            producer.produce(topic, value=message_value.encode('utf-8'), callback=delivery_report)

        # Flush any remaining messages in the producer
        producer.flush()

    except Exception as e:
        print(f"Error while sending messages: {e}")

    finally:
        # Close the Kafka producer
        producer.close()

if __name__ == "__main__":
    num_messages_to_send = 10  # Number of messages to send
    send_messages_to_kafka_topic(num_messages_to_send)


2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.


In [None]:
# a) Write a Python program to create a Kafka consumer.

from confluent_kafka import Consumer, KafkaError

def consume_messages_from_kafka_topic():
    # Configure Kafka consumer properties
    consumer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'group.id': 'my_kafka_consumer_group',
        'auto.offset.reset': 'earliest',  # Start consuming from the beginning of the topic
    }

    # Create a Kafka consumer instance
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    topic = 'your_kafka_topic'  # Replace with your Kafka topic name
    consumer.subscribe([topic])

    try:
        # Poll for new messages
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue
            elif not message.error():
                print(f"Received message: {message.value().decode('utf-8')}")
            elif message.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition {message.partition()}")
            else:
                print(f"Error while consuming message: {message.error().str()}")

    except KeyboardInterrupt:
        print("Consumer interrupted. Closing consumer.")
        consumer.close()

if __name__ == "__main__":
    consume_messages_from_kafka_topic()
    
    
# b) Configure the consumer to connect to a Kafka cluster.

from confluent_kafka import Consumer, KafkaError

def consume_messages_from_kafka_topic():
    # Configure Kafka consumer properties
    consumer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'group.id': 'my_kafka_consumer_group',
        'auto.offset.reset': 'earliest',  # Start consuming from the beginning of the topic
    }

    # Create a Kafka consumer instance
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    topic = 'your_kafka_topic'  # Replace with your Kafka topic name
    consumer.subscribe([topic])

    try:
        # Poll for new messages
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue
            elif not message.error():
                print(f"Received message: {message.value().decode('utf-8')}")
            elif message.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition {message.partition()}")
            else:
                print(f"Error while consuming message: {message.error().str()}")

    except KeyboardInterrupt:
        print("Consumer interrupted. Closing consumer.")
        consumer.close()

if __name__ == "__main__":
    consume_messages_from_kafka_topic()

    
# c) Implement logic to consume messages from a Kafka topic.


from confluent_kafka import Consumer, KafkaError

def consume_messages_from_kafka_topic():
    # Configure Kafka consumer properties
    consumer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'group.id': 'my_kafka_consumer_group',
        'auto.offset.reset': 'earliest',  # Start consuming from the beginning of the topic
    }

    # Create a Kafka consumer instance
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    topic = 'your_kafka_topic'  # Replace with your Kafka topic name
    consumer.subscribe([topic])

    try:
        # Poll for new messages
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue
            elif not message.error():
                # Process the received message
                process_message(message.value().decode('utf-8'))
            elif message.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition {message.partition()}")
            else:
                print(f"Error while consuming message: {message.error().str()}")

    except KeyboardInterrupt:
        print("Consumer interrupted. Closing consumer.")
        consumer.close()

def process_message(message_value):
    # Implement your custom logic to process the received message here
    print(f"Received message: {message_value}")

if __name__ == "__main__":
    consume_messages_from_kafka_topic()


3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.


In [None]:
# a) Write a Python program to create a new Kafka topic.

from confluent_kafka.admin import AdminClient, NewTopic

def create_kafka_topic():
    # Configure Kafka broker address
    bootstrap_servers = 'your_kafka_broker_1:9092, your_kafka_broker_2:9092'  # Replace with your Kafka brokers

    # Create the admin client
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # Define the properties of the new topic
    topic_name = 'your_new_topic'  # Replace with your desired topic name
    num_partitions = 3  # Number of partitions for the topic
    replication_factor = 1  # Replication factor for the topic (number of replicas)

    # Create the new topic
    topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
    futures = admin_client.create_topics([topic])

    # Wait for the topic creation to complete and check for any errors
    for topic_name, future in futures.items():
        try:
            future.result()
            print(f"Topic '{topic_name}' created successfully.")
        except Exception as e:
            print(f"Failed to create topic '{topic_name}': {e}")

if __name__ == "__main__":
    create_kafka_topic()

    
#  b) Implement functionality to list existing topics.
from confluent_kafka.admin import AdminClient

def list_kafka_topics():
    # Configure Kafka broker address
    bootstrap_servers = 'your_kafka_broker_1:9092, your_kafka_broker_2:9092'  # Replace with your Kafka brokers

    # Create the admin client
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # List existing topics
    topics = admin_client.list_topics().topics

    # Print the list of topics
    print("Existing Kafka topics:")
    for topic_name in topics:
        print(topic_name)

if __name__ == "__main__":
    list_kafka_topics()

# c) Develop logic to delete an existing Kafka topic.

from confluent_kafka.admin import AdminClient, NewTopic

def delete_kafka_topic(topic_name):
    # Configure Kafka broker address
    bootstrap_servers = 'your_kafka_broker_1:9092, your_kafka_broker_2:9092'  # Replace with your Kafka brokers

    # Create the admin client
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

    # Delete the topic
    futures = admin_client.delete_topics([topic_name])

    # Wait for the topic deletion to complete and check for any errors
    for topic_name, future in futures.items():
        try:
            future.result()
            print(f"Topic '{topic_name}' deleted successfully.")
        except Exception as e:
            print(f"Failed to delete topic '{topic_name}': {e}")

if __name__ == "__main__":
    topic_name_to_delete = 'your_topic_to_delete'  # Replace with the name of the Kafka topic you want to delete
    delete_kafka_topic(topic_name_to_delete)

4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.


In [None]:
#  a) Write a Python program to produce messages to a Kafka topic.

from confluent_kafka import Producer

def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to topic {msg.topic()} partition {msg.partition()} offset {msg.offset()}")

def produce_messages_to_kafka_topic():
    # Configure Kafka producer properties
    producer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'client.id': 'my_kafka_producer',
        'acks': 'all',  # Wait for leader to write to all replicas before acknowledging
        'compression.type': 'gzip',  # Optional: Use gzip compression for messages
        'batch.num.messages': 1000,  # Optional: Number of messages to batch before sending
    }

    # Create a Kafka producer instance
    producer = Producer(producer_config)

    try:
        # Define the Kafka topic to send messages to
        topic = 'your_kafka_topic'  # Replace with your Kafka topic name

        # Send messages to the Kafka topic
        for i in range(10):  # Send 10 messages as an example
            message_value = f"Message {i}"
            producer.produce(topic, value=message_value.encode('utf-8'), callback=delivery_report)

        # Flush any remaining messages in the producer
        producer.flush()

    except Exception as e:
        print(f"Error while sending messages: {e}")

    finally:
        # Close the Kafka producer
        producer.close()

if __name__ == "__main__":
    produce_messages_to_kafka_topic()

#   b) Implement logic to consume messages from the same Kafka topic.

from confluent_kafka import Consumer, KafkaError

def consume_messages_from_kafka_topic():
    # Configure Kafka consumer properties
    consumer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'group.id': 'my_kafka_consumer_group',
        'auto.offset.reset': 'earliest',  # Start consuming from the beginning of the topic
    }

    # Create a Kafka consumer instance
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    topic = 'your_kafka_topic'  # Replace with your Kafka topic name
    consumer.subscribe([topic])

    try:
        # Poll for new messages
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue
            elif not message.error():
                print(f"Received message: {message.value().decode('utf-8')}")
            elif message.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition {message.partition()}")
            else:
                print(f"Error while consuming message: {message.error().str()}")

    except KeyboardInterrupt:
        print("Consumer interrupted. Closing consumer.")
        consumer.close()

if __name__ == "__main__":
    consume_messages_from_kafka_topic()

    
# c) Test the end-to-end flow of message production and consumption.
# Steps to check nd-to-end flow of message production and consumption.

# To test the end-to-end flow of message production and consumption, we need to run both the producer and consumer Python scripts simultaneously. The producer will send messages to the Kafka topic, and the consumer will receive and print those messages. Before running the scripts, make sure you have Kafka installed, running, and the topic is created.

# Run the Producer:
# Save the producer script (as shown in the previous answer) to a Python file, for example, kafka_producer.py. Replace the bootstrap server address and the topic name with your Kafka cluster configuration.

# Run the Consumer:
# Save the consumer script (also provided in the previous answer) to another Python file, for example, kafka_consumer.py. Replace the bootstrap server address and the topic name with your Kafka cluster configuration.

# Open two terminal windows or command prompt sessions. In one of them, run the producer script:

# bash Copy code python kafka_producer.py
# This will start the producer and begin sending messages to the Kafka topic.

# In the second terminal or command prompt session, run the consumer script:
# bash Copy code python kafka_consumer.py
# This will start the consumer, which will begin to consume and print the messages received from the Kafka topic.

# Observe the Output:
# You should now see messages being produced by the producer and consumed by the consumer. The producer will deliver the messages to the Kafka topic, and the consumer will print the received messages.

# Test Termination:
# To stop the producer and consumer, press Ctrl + C in each terminal window. This will gracefully close the producer and consumer.

# By running both the producer and consumer scripts together, you can verify the end-to-end flow of message production and consumption in Kafka. Messages sent by the producer will be received by the consumer, demonstrating successful communication between the two components. This testing approach allows you to ensure that the messages are correctly produced, delivered, and consumed within the Kafka cluster


5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   b) Implement logic to handle messages consumed by different consumers within the same group.
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
#  a) Write a Python program to create a Kafka consumer within a consumer group.

from confluent_kafka import Consumer, KafkaError

def consume_messages_from_kafka_topic():
    # Configure Kafka consumer properties
    consumer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'group.id': 'my_kafka_consumer_group',  # Consumer group ID
        'auto.offset.reset': 'earliest',  # Start consuming from the beginning of the topic
    }

    # Create a Kafka consumer instance
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic within the consumer group
    topic = 'your_kafka_topic'  # Replace with your Kafka topic name
    consumer.subscribe([topic])

    try:
        # Poll for new messages
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue
            elif not message.error():
                print(f"Received message: {message.value().decode('utf-8')}")
            elif message.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition {message.partition()}")
            else:
                print(f"Error while consuming message: {message.error().str()}")

    except KeyboardInterrupt:
        print("Consumer interrupted. Closing consumer.")
        consumer.close()

if __name__ == "__main__":
    consume_messages_from_kafka_topic()
    
    
#  b) Implement logic to handle messages consumed by different consumers within the same group.

from confluent_kafka import Consumer, KafkaError

def consume_messages_from_kafka_topic():
    # Configure Kafka consumer properties
    consumer_config = {
        'bootstrap.servers': 'your_kafka_broker_1:9092, your_kafka_broker_2:9092',  # Replace with your Kafka brokers
        'group.id': 'my_kafka_consumer_group',  # Consumer group ID
        'auto.offset.reset': 'earliest',  # Start consuming from the beginning of the topic
    }

    # Create a Kafka consumer instance
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic within the consumer group
    topic = 'your_kafka_topic'  # Replace with your Kafka topic name
    consumer.subscribe([topic])

    try:
        # Poll for new messages
        while True:
            message = consumer.poll(timeout=1.0)

            if message is None:
                continue
            elif not message.error():
                partition = message.partition()
                offset = message.offset()
                key = message.key()
                value = message.value().decode('utf-8')
                print(f"Consumer in group '{consumer_config['group.id']}' received message: "
                      f"Partition: {partition}, Offset: {offset}, Key: {key}, Value: {value}")
            elif message.error().code() == KafkaError._PARTITION_EOF:
                print(f"Reached end of partition {message.partition()}")
            else:
                print(f"Error while consuming message: {message.error().str()}")

    except KeyboardInterrupt:
        print("Consumer interrupted. Closing consumer.")
        consumer.close()

if __name__ == "__main__":
    consume_messages_from_kafka_topic()
