**Install Kafka and the required libraries**

First, you'll need to have Apache Kafka installed and running. You can follow Apache Kafka's Quickstart to set up Kafka on your machine. You also need to install the confluent_kafka Python library:

In [None]:
pip install confluent_kafka

**Create Kafka Topics**

Create topics in Kafka for the producer and consumer to use. I used Kafka's command-line tools to do this.

In [None]:
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

**Producer Code**

The producer sends data to the Kafka topic.

In [None]:
from confluent_kafka import Producer
import time

# Configuration for Kafka Producer
conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka broker
    'client.id': 'python-producer'
}

# Create Producer instance
producer = Producer(**conf)

# Function to send data to Kafka
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

# Produce data to Kafka
for i in range(10):
    data = f'hello {i}'
    # Produce message
    producer.produce('test-topic', key=str(i), value=data, callback=delivery_report)
    # Wait up to 1 second for events. Callbacks will be invoked during
    # this method call if the message is acknowledged.
    producer.poll(1)
    time.sleep(1)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
producer.flush()


**Consumer Code**

The consumer reads data from the Kafka topic and processes it in real time.

In [None]:
from confluent_kafka import Consumer, KafkaError

# Configuration for Kafka Consumer
conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka broker
    'group.id': 'python-consumer',          # Consumer group ID
    'auto.offset.reset': 'earliest'         # Start reading from the earliest message
}

# Create Consumer instance
consumer = Consumer(**conf)

# Subscribe to topic
consumer.subscribe(['test-topic'])

# Process messages from Kafka
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f'{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}')
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Proper message
            print(f'Received message: {msg.value().decode("utf-8")}')

except KeyboardInterrupt:
    pass
finally:
    # Close down consumer to commit final offsets.
    consumer.close()