DAY-5(Apache Kafka)

1. Setting up a Kafka Producer:
   a) Write a Python program to create a Kafka producer.
   b) Configure the producer to connect to a Kafka cluster.
   c) Implement logic to send messages to a Kafka topic.


In [None]:
## 1a.
from confluent_kafka import Producer

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Kafka topic
topic = 'my_topic'

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def main():
    # Kafka producer configuration
    producer_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer
    producer = Producer(producer_config)

    # Produce some messages
    for i in range(10):
        value = f'Message {i}'
        producer.produce(topic, value.encode('utf-8'), callback=delivery_report)

    # Flush producer's message queue to ensure delivery
    producer.flush()

if __name__ == '__main__':
    main()


In [None]:
## 1b.
from confluent_kafka import Producer

# Kafka cluster details
bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'

# Kafka topic
topic = 'my_topic'

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def main():
    # Kafka producer configuration
    producer_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer
    producer = Producer(producer_config)

    # Produce some messages
    for i in range(10):
        value = f'Message {i}'
        producer.produce(topic, value.encode('utf-8'), callback=delivery_report)

    # Flush producer's message queue to ensure delivery
    producer.flush()

if __name__ == '__main__':
    main()


In [None]:
## 1c.
from confluent_kafka import Producer

# Kafka cluster details
bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'

# Kafka topic
topic = 'my_topic'

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def send_message(producer, message):
    producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
    producer.flush()

def main():
    # Kafka producer configuration
    producer_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer
    producer = Producer(producer_config)

    # Send messages to Kafka topic
    while True:
        message = input("Enter a message (or 'q' to quit): ")
        if message.lower() == 'q':
            break
        send_message(producer, message)

    # Close the producer
    producer.flush()
    producer.close()

if __name__ == '__main__':
    main()


2. Setting up a Kafka Consumer:
   a) Write a Python program to create a Kafka consumer.
   b) Configure the consumer to connect to a Kafka cluster.
   c) Implement logic to consume messages from a Kafka topic.


In [None]:
## 2a.
from confluent_kafka import Consumer, KafkaException
import sys

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Kafka topic
topic = 'my_topic'

# Kafka consumer group
group_id = 'my_consumer_group'

def consume_messages():
    # Kafka consumer configuration
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }

    # Create Kafka consumer
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            # Poll for new messages
            message = consumer.poll(1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # Reached end of partition, continue polling
                    continue
                else:
                    # Log any other error
                    print(f'Error: {message.error()}')
                    continue

            # Process the received message
            value = message.value().decode('utf-8')
            print(f'Received message: {value}')

    except KeyboardInterrupt:
        # Close the consumer upon keyboard interrupt
        consumer.close()

if __name__ == '__main__':
    consume_messages()


In [None]:
##2b.
from confluent_kafka import Consumer, KafkaException
import sys

# Kafka cluster details
bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'

# Kafka topic
topic = 'my_topic'

# Kafka consumer group
group_id = 'my_consumer_group'

def consume_messages():
    # Kafka consumer configuration
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }

    # Create Kafka consumer
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            # Poll for new messages
            message = consumer.poll(1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # Reached end of partition, continue polling
                    continue
                else:
                    # Log any other error
                    print(f'Error: {message.error()}')
                    continue

            # Process the received message
            value = message.value().decode('utf-8')
            print(f'Received message: {value}')

    except KeyboardInterrupt:
        # Close the consumer upon keyboard interrupt
        consumer.close()

if __name__ == '__main__':
    consume_messages()


In [None]:
##2c.
from confluent_kafka import Consumer, KafkaException
import sys

# Kafka cluster details
bootstrap_servers = 'kafka1:9092,kafka2:9092,kafka3:9092'

# Kafka topic
topic = 'my_topic'

# Kafka consumer group
group_id = 'my_consumer_group'

def consume_messages():
    # Kafka consumer configuration
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }

    # Create Kafka consumer
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            # Poll for new messages
            message = consumer.poll(1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # Reached end of partition, continue polling
                    continue
                else:
                    # Log any other error
                    print(f'Error: {message.error()}')
                    continue

            # Process the received message
            value = message.value().decode('utf-8')
            print(f'Received message: {value}')

            # Commit the message offset
            consumer.commit(message)

    except KeyboardInterrupt:
        # Close the consumer upon keyboard interrupt
        consumer.close()

if __name__ == '__main__':
    consume_messages()


3. Creating and Managing Kafka Topics:
   a) Write a Python program to create a new Kafka topic.
   b) Implement functionality to list existing topics.
   c) Develop logic to delete an existing Kafka topic.


In [None]:
##3a.
from confluent_kafka.admin import AdminClient, NewTopic

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# New topic details
topic = 'my_new_topic'
num_partitions = 3
replication_factor = 1

def create_topic():
    # Kafka admin client configuration
    admin_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka admin client
    admin_client = AdminClient(admin_config)

    # Create a new topic
    new_topic = NewTopic(topic, num_partitions, replication_factor)
    topic_config = {'cleanup.policy': 'delete'}  # Additional topic-level configuration if needed
    new_topic.config = topic_config

    # Create the topic using the admin client
    admin_client.create_topics([new_topic])

    # Close the admin client
    admin_client.close()

if __name__ == '__main__':
    create_topic()


In [None]:
##3b.
from confluent_kafka.admin import AdminClient

# Kafka broker details
bootstrap_servers = 'localhost:9092'

def list_topics():
    # Kafka admin client configuration
    admin_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka admin client
    admin_client = AdminClient(admin_config)

    # Retrieve the existing topics
    topics = admin_client.list_topics(timeout=10).topics

    # Print the list of topics
    print("Existing topics:")
    for topic in topics:
        print(topic)

    # Close the admin client
    admin_client.close()

if __name__ == '__main__':
    list_topics()


In [None]:
##3c.
from confluent_kafka.admin import AdminClient, NewTopic

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Topic to be deleted
topic = 'my_topic'

def delete_topic():
    # Kafka admin client configuration
    admin_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka admin client
    admin_client = AdminClient(admin_config)

    # Delete the topic
    admin_client.delete_topics([topic])

    # Close the admin client
    admin_client.close()

if __name__ == '__main__':
    delete_topic()


4. Producing and Consuming Messages:
   a) Write a Python program to produce messages to a Kafka topic.
   b) Implement logic to consume messages from the same Kafka topic.
   c) Test the end-to-end flow of message production and consumption.


In [None]:
##4a.
from confluent_kafka import Producer

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Kafka topic
topic = 'my_topic'

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def produce_messages():
    # Kafka producer configuration
    producer_config = {
        'bootstrap.servers': bootstrap_servers
    }

    # Create Kafka producer
    producer = Producer(producer_config)

    try:
        while True:
            # Prompt for message input
            message = input("Enter a message (or 'q' to quit): ")
            if message.lower() == 'q':
                break

            # Produce the message
            producer.produce(topic, message.encode('utf-8'), callback=delivery_report)

            # Flush producer's message queue to ensure delivery
            producer.flush()

    except KeyboardInterrupt:
        # Close the producer upon keyboard interrupt
        producer.close()

if __name__ == '__main__':
    produce_messages()


In [None]:
##4b.
from confluent_kafka import Consumer, KafkaException
import sys

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Kafka topic
topic = 'my_topic'

# Kafka consumer group
group_id = 'my_consumer_group'

def consume_messages():
    # Kafka consumer configuration
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }

    # Create Kafka consumer
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            # Poll for new messages
            message = consumer.poll(1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # Reached end of partition, continue polling
                    continue
                else:
                    # Log any other error
                    print(f'Error: {message.error()}')
                    continue

            # Process the received message
            value = message.value().decode('utf-8')
            print(f'Received message: {value}')

    except KeyboardInterrupt:
        # Close the consumer upon keyboard interrupt
        consumer.close()

if __name__ == '__main__':
    consume_messages()


5. Working with Kafka Consumer Groups:
   a) Write a Python program to create a Kafka consumer within a consumer group.
   b) Implement logic to handle messages consumed by different consumers within the same group.
   c) Observe the behavior of consumer group rebalancing when adding or removing consumers.


In [None]:
##5a.
from confluent_kafka import Consumer, KafkaException
import sys

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Kafka topic
topic = 'my_topic'

# Kafka consumer group
group_id = 'my_consumer_group'

def consume_messages():
    # Kafka consumer configuration
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }

    # Create Kafka consumer
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            # Poll for new messages
            message = consumer.poll(1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # Reached end of partition, continue polling
                    continue
                else:
                    # Log any other error
                    print(f'Error: {message.error()}')
                    continue

            # Process the received message
            value = message.value().decode('utf-8')
            print(f'Received message: {value}')

            # Commit the message offset
            consumer.commit(message)

    except KeyboardInterrupt:
        # Close the consumer upon keyboard interrupt
        consumer.close()

if __name__ == '__main__':
    consume_messages()


In [None]:
##5b.
from confluent_kafka import Consumer, KafkaException
import sys

# Kafka broker details
bootstrap_servers = 'localhost:9092'

# Kafka topic
topic = 'my_topic'

# Kafka consumer group
group_id = 'my_consumer_group'

def consume_messages():
    # Kafka consumer configuration
    consumer_config = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'auto.offset.reset': 'earliest'
    }

    # Create Kafka consumer
    consumer = Consumer(consumer_config)

    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            # Poll for new messages
            message = consumer.poll(1.0)

            if message is None:
                continue

            if message.error():
                if message.error().code() == KafkaException._PARTITION_EOF:
                    # Reached end of partition, continue polling
                    continue
                else:
                    # Log any other error
                    print(f'Error: {message.error()}')
                    continue

            # Process the received message
            value = message.value().decode('utf-8')
            partition = message.partition()
            offset = message.offset()
            print(f'Received message: {value} (Partition: {partition}, Offset: {offset})')

            # Commit the message offset
            consumer.commit(message)

    except KeyboardInterrupt:
        # Close the consumer upon keyboard interrupt
        consumer.close()

if __name__ == '__main__':
    consume_messages()
