### 1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   
   b) Configure the producer to connect to a Kafka cluster.
   
   c) Implement logic to send messages to a Kafka topic.


In [1]:
from kafka import KafkaProducer

def create_kafka_producer():
    # Configure Kafka broker address and port
    bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'
    
    # Configure additional producer settings (optional)
    producer_config = {
        'bootstrap_servers': bootstrap_servers,
        'security_protocol': 'PLAINTEXT',  # Example: Adjust based on your Kafka cluster security configuration
        'ssl_cafile': '/path/to/ca.crt',  # Example: Adjust based on your SSL certificate authority file path
        'sasl_mechanism': 'PLAIN',  # Example: Adjust based on your Kafka cluster authentication mechanism
        'sasl_plain_username': 'my-username',  # Example: Adjust based on your Kafka cluster authentication credentials
        'sasl_plain_password': 'my-password'  # Example: Adjust based on your Kafka cluster authentication credentials
    }

    # Create Kafka producer
    producer = KafkaProducer(**producer_config)

    return producer

def send_messages(producer, topic):
    # Send messages to Kafka topic
    for i in range(10):
        message = f'Message {i}'
        producer.send(topic, value=message.encode('utf-8'))

    # Flush and close the producer
    producer.flush()
    producer.close()

if __name__ == '__main__':
    # Set the Kafka topic to which messages will be sent
    topic = 'my-topic'

    # Create a Kafka producer
    producer = create_kafka_producer()

    # Send messages to the Kafka topic
    send_messages(producer, topic)


### 2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   
   b) Configure the consumer to connect to a Kafka cluster.
   
   c) Implement logic to consume messages from a Kafka topic.


In [2]:
from kafka import KafkaConsumer

def create_kafka_consumer():
    # Configure Kafka broker address and port
    bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

    # Configure additional consumer settings (optional)
    consumer_config = {
        'bootstrap_servers': bootstrap_servers,
        'security_protocol': 'PLAINTEXT',  # Example: Adjust based on your Kafka cluster security configuration
        'ssl_cafile': '/path/to/ca.crt',  # Example: Adjust based on your SSL certificate authority file path
        'sasl_mechanism': 'PLAIN',  # Example: Adjust based on your Kafka cluster authentication mechanism
        'sasl_plain_username': 'my-username',  # Example: Adjust based on your Kafka cluster authentication credentials
        'sasl_plain_password': 'my-password',  # Example: Adjust based on your Kafka cluster authentication credentials
        'auto_offset_reset': 'latest',  # Example: Adjust based on your desired offset reset behavior
        'group_id': 'my-consumer-group'  # Example: Adjust based on your consumer group ID
    }

    # Create Kafka consumer
    consumer = KafkaConsumer(**consumer_config)

    return consumer

def consume_messages(consumer, topic):
    # Subscribe to Kafka topic
    consumer.subscribe(topics=[topic])

    # Consume messages from Kafka
    for message in consumer:
        print(f"Received message: {message.value.decode('utf-8')}")

    # Close the consumer
    consumer.close()

if __name__ == '__main__':
    # Set the Kafka topic from which messages will be consumed
    topic = 'my-topic'

    # Create a Kafka consumer
    consumer = create_kafka_consumer()

    # Consume messages from the Kafka topic
    consume_messages(consumer, topic)


### 3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   
   b) Implement functionality to list existing topics.
   
   c) Develop logic to delete an existing Kafka topic.


In [None]:
from kafka.admin import KafkaAdminClient, NewTopic

def create_kafka_topic(admin_client, topic_name, partitions=1, replication_factor=1):
    # Create a new Kafka topic
    topic = NewTopic(name=topic_name, num_partitions=partitions, replication_factor=replication_factor)
    admin_client.create_topics([topic])
    print(f"Topic '{topic_name}' created successfully.")

def list_kafka_topics(admin_client):
    # List all existing Kafka topics
    topic_metadata = admin_client.list_topics()
    print("Existing Kafka topics:")
    for topic in topic_metadata.topics:
        print(topic)

def delete_kafka_topic(admin_client, topic_name):
    # Delete an existing Kafka topic
    admin_client.delete_topics([topic_name])
    print(f"Topic '{topic_name}' deleted successfully.")

if __name__ == '__main__':
    # Configure Kafka broker address and port
    bootstrap_servers = 'kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092'

    # Create Kafka admin client
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

    # Create a new Kafka topic
    create_kafka_topic(admin_client, topic_name='my-topic', partitions=3, replication_factor=2)

    # List existing Kafka topics
    list_kafka_topics(admin_client)

    # Delete an existing Kafka topic
    delete_kafka_topic(admin_client, topic_name='my-topic')

    # Close the admin client
    admin_client.close()


### 4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   
   b) Implement logic to consume messages from the same Kafka topic.
   
   c) Test the end-to-end flow of message production and consumption.


In [None]:
from kafka import KafkaProducer, KafkaConsumer

def produce_messages(producer, topic):
    # Send messages to Kafka topic
    for i in range(10):
        message = f'Message {i}'
        producer.send(topic, value=message.encode('utf-8'))
        print(f"Produced message: {message}")

    # Flush and close the producer
    producer.flush()
    producer.close()

def consume_messages(consumer, topic):
    # Subscribe to Kafka topic
    consumer.subscribe(topics=[topic])

    # Consume messages from Kafka
    for message in consumer:
        print(f"Consumed message: {message.value.decode('utf-8')}")

    # Close the consumer
    consumer.close()

if __name__ == '__main__':
    # Set the Kafka topic to produce and consume messages
    topic = 'my-topic'

    # Create a Kafka producer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')

    # Produce messages to the Kafka topic
    produce_messages(producer, topic)

    # Create a Kafka consumer
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest')

    # Consume messages from the Kafka topic
    consume_messages(consumer, topic)


### 5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   
   b) Implement logic to handle messages consumed by different consumers within the same group.
   
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
from kafka import KafkaConsumer, TopicPartition

def create_kafka_consumer(group_id):
    # Create Kafka consumer with a consumer group
    consumer = KafkaConsumer(
        'my-topic',
        bootstrap_servers='localhost:9092',
        group_id=group_id,
        auto_offset_reset='earliest'
    )

    return consumer

def consume_messages(consumer):
    # Consume messages from Kafka
    for message in consumer:
        print(f"Consumer {consumer.group_id} - Partition {message.partition} - Offset {message.offset}: {message.value.decode('utf-8')}")

if __name__ == '__main__':
    # Create a Kafka consumer within a consumer group
    consumer1 = create_kafka_consumer('my-consumer-group')
    consumer2 = create_kafka_consumer('my-consumer-group')

    # Start consuming messages within the group
    consume_messages(consumer1)
    consume_messages(consumer2)
