# Hands-on with Kafka

![kafka](https://images.contentful.com/gt6dp23g0g38/53UO4964r0e7kRVm0mcUUZ/f6f6d7b1b90e8e88a5be0d1845bdf950/what_is_kafka_and_how_does_it_work.png)

## Installing Kafka Python Client

First, we'll need to install Kafka Python client `kafka-python` to consume messages from a Kafka cluster.

In [1]:
import json
from kafka import KafkaConsumer, TopicPartition

First, we'll instantiate the consumer.

Our data was serialized in JSON during publishing, hence we'll need to deserialize as JSON when consuming. In actual production, it is recommended to use the binary format that we've learnt, such as Avro.

In [2]:
consumer = KafkaConsumer(
  bootstrap_servers=['localhost:9092'],
  auto_offset_reset='earliest',
  value_deserializer = lambda v: json.loads(v.decode('ascii')),
  key_deserializer = lambda v: json.loads(v.decode('ascii')),
)

We set the offset for the consumer to be `earliest` to get the earliest order (message).

List the available topics:

In [3]:
consumer.topics()

{'pizza-orders'}

Let's assume we are the owners of a pizza delivery chain, and we'd like to push the orders to Apache Kafka as they come in.

We have a topic consisting of pizza orders, which contains the Order ID, Order Details (pizzas and toppings), client's Name, Address, Phone Number, the Shop Name, Total Cost and Timestamp.

There is only one partition for the topic. First, let's subscribe to the topic:

In [4]:
topic_name = "pizza-orders"

In [5]:
consumer.subscribe(topics=[topic_name])
consumer.subscription()

{'pizza-orders'}

We can then start reading the events, let's read and (pretty) print the first 5:

In [6]:
from pprint import pprint

In [19]:
for ix, message in enumerate(consumer, start=1):
    print("%d:%d: k=%s" % (message.partition,
                           message.offset,
                           message.key))
    pprint("-------------------")
    pprint(message.value)
    print()
    if ix == 5:
        break

0:0: k={'shop': 'Marios Pizza'}
'-------------------'
{'address': '21087 Calvin Plains\nJonesland, NY 76392',
 'cost': 35.65,
 'id': 1,
 'name': 'Jessica Smith',
 'phoneNumber': '001-701-915-3000',
 'pizzas': [{'additionalToppings': ['🥚 egg', '🍍 pineapple'],
             'pizzaName': 'Marinara'},
            {'additionalToppings': [], 'pizzaName': 'Margherita'}],
 'shop': 'Marios Pizza',
 'timestamp': 1742305285331}

0:1: k={'shop': 'Marios Pizza'}
'-------------------'
{'address': '19721 Drew Key\nNew Donaldport, NH 05690',
 'cost': 47.92,
 'id': 2,
 'name': 'Roger Brown',
 'phoneNumber': '475-943-3780x8105',
 'pizzas': [{'additionalToppings': ['🌶️ hot pepper'], 'pizzaName': 'Marinara'},
            {'additionalToppings': ['🐟 tuna', '🫑 green peppers'],
             'pizzaName': 'Salami'},
            {'additionalToppings': ['🌶️ hot pepper'], 'pizzaName': 'Salami'},
            {'additionalToppings': ['🐟 tuna', '🧀 blue cheese'],
             'pizzaName': 'Margherita'}],
 'shop': 'Mario

Now, continue printing the next 5 events, notice the offset number and order id:

In [None]:
for ix, message in enumerate(consumer, start=1):
    print("%d:%d: k=%s" % (message.partition,
                           message.offset,
                           message.key))
    pprint(message.value)
    print()
    if ix == 5:
        break

0:5: k={'shop': 'Mammamia Pizza'}
{'address': '4874 Wright Lock\nNorth Leonardton, AZ 12821',
 'cost': 42.26,
 'id': 6,
 'name': 'Joseph Lowe',
 'phoneNumber': '743.698.6156x6633',
 'pizzas': [{'additionalToppings': ['🫒 olives'], 'pizzaName': 'Margherita'},
            {'additionalToppings': ['🫑 green peppers'], 'pizzaName': 'Salami'},
            {'additionalToppings': ['🫑 green peppers', '🍍 pineapple'],
             'pizzaName': 'Peperoni'},
            {'additionalToppings': ['🍓 strawberry'], 'pizzaName': 'Salami'},
            {'additionalToppings': ['🫒 olives'], 'pizzaName': 'Marinara'}],
 'shop': 'Mammamia Pizza',
 'timestamp': 1696869300368}

0:6: k={'shop': 'Luigis Pizza'}
{'address': '9167 Thomas Mission Apt. 588\nPort Sarahhaven, WI 31469',
 'cost': 26.65,
 'id': 7,
 'name': 'Andrea Robbins',
 'phoneNumber': '242.719.1159',
 'pizzas': [{'additionalToppings': ['🧀 blue cheese'], 'pizzaName': 'Peperoni'},
            {'additionalToppings': ['🍌 banana', '🍍 pineapple', '🍍 pineappl

In [18]:
# go to beginning 
consumer.seek_to_beginning()

In [None]:
# close consumer 
consumer.close()

This is the full code to publish an example order:

```python
from kafka import KafkaProducer

producer = KafkaProducer(
  bootstrap_servers=['brave-fish-11463-us1-kafka.upstash.io:9092'],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username='YnJhdmUtZmlzaC0xMTQ2MyQSvwXBuLOQsV1W7YffuC8cDaZcA3fKQwakMhnQGgg',
  sasl_plain_password='MDUxNjc4YzEtYzYxNy00NTE1LWEwNWYtMDBhODRlZmE0OGJm',
  value_serializer=lambda v: json.dumps(v).encode('ascii'),
  key_serializer=lambda v: json.dumps(v).encode('ascii')
)

message = {'address': '8697 Anthony Valley\nPort Kellymouth, FL 64221',
 'id': 10,
 'name': 'Patricia Castaneda',
 'phoneNumber': '328-798-9970x51560',
 'pizzas': [{'additionalToppings': ['🍅 tomato'], 'pizzaName': 'Mari & Monti'}],
 'shop': 'Mammamia Pizza',
 'timestamp': 1696609343719}

key = {'shop': 'Mammamia Pizza'}

producer.send(topic_name, key=key, value=message)
```