# Using Python to send Avro data with Kafka

Notes for using Python with the confluent-kafka Python client (which uses [librdkafka](https://github.com/edenhill/librdkafka)) to send Avro data in Kafka.

Here is my github repofor this code and notebook:  https://github.com/mtpatter/python-kafka-avro. It has a Dockerfile for my public Docker image also available on Docker hub at https://hub.docker.com/r/mtpatter/python-kafka-avro/.

This version uses the following:

1. Kafka version 0.10 from the Confluent docker repo
2. zookeeper from wurstmeister's docker repo
3. my own docker image with the new Python client from Kafka (confluent-kafka) and avro-python3
4. simple producer and consumer scripts modified from [cuongbangoc's upstream repo](https://github.com/cuongbangoc/python-kafka-avro)

Not sure if this is the best way to do these things, but it works for me currently as a start.  I am also not sure why I get an assertion error in the consumer, so I've caught that exception and ignored it.

I used these very helpful resources:

https://activisiongamescience.github.io/2016/06/15/Kafka-Client-Benchmarking/  
https://github.com/cuongbangoc/python-kafka-avro  
https://gist.github.com/jakekdodd/e7ee38fd945818d86eb4  
https://sonnguyen.ws/simple-example-python-kafka-avro/

## To run this demo

Start a Kafka broker and zookeeper with docker-compose:

In [1]:
!docker-compose up -d

Creating network "pythonkafkaavro_default" with the default driver
Creating pythonkafkaavro_zookeeper_1
Creating pythonkafkaavro_kafka_1


Get a producer going to produce some Avro data:

In [2]:
!docker run -it -P --network=host --volume=${PWD}:/home mtpatter/python-kafka-avro python producer.py

Get a consumer going to listen:

In [None]:
!docker run -it -P --network=host --volume=${PWD}:/home mtpatter/python-kafka-avro python -u consumer.py

% my-topic [0] at offset 0 with key None:
{'favorite_number': 6, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 1 with key None:
{'favorite_number': 2, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 2 with key None:
{'favorite_number': 7, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 3 with key None:
{'favorite_number': 3, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 4 with key None:
{'favorite_number': 8, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 5 with key None:
{'favorite_number': 2, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 6 with key None:
{'favorite_number': 10, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 7 with key None:
{'favorite_number': 4, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 8 with key None:
{'favorite_number': 2, 'favorite_color': '111', 'name': '123'}
% my-topic [0] at offset 9 with key None:
{'favorite_n

Output appears in consumer.  Remember to clean up by shutting down producer and consumer and run "docker-compose down" to shut down Kafka.

In [2]:
!cat producer.py

from confluent_kafka import Producer
import avro.schema
import avro.io
import io
import random

if __name__ == "__main__":

    conf = {'bootstrap.servers': 'localhost:9092'}
    producer = Producer(**conf)

    # Kafka topic
    topic = "my-topic"

    # Path to user.avsc avro schema
    schema_path = "user.avsc"
    schema = avro.schema.Parse(open(schema_path).read())

    for i in range(10):
        writer = avro.io.DatumWriter(schema)
        bytes_writer = io.BytesIO()
        encoder = avro.io.BinaryEncoder(bytes_writer)
        writer.write({"name": "123",
                      "favorite_color": "111",
                      "favorite_number": random.randint(0, 10)}, encoder)
        raw_bytes = bytes_writer.getvalue()
        producer.produce(topic, raw_bytes)
    producer.flush()


In [3]:
!cat consumer.py

from confluent_kafka import Consumer, KafkaException, KafkaError
import avro.schema
import avro.io
import io
import sys

if __name__ == "__main__":

    # To consume messages
    conf = {'bootstrap.servers': 'localhost:9092',
            'group.id': 'my_group',
            'default.topic.config': {'auto.offset.reset': 'smallest'}}
    consumer = Consumer(**conf)
    topic = consumer.subscribe(['my-topic'])

    schema_path = "user.avsc"
    schema = avro.schema.Parse(open(schema_path).read())

    try:
        running = True
        while running:
            msg = consumer.poll(timeout=60000)

            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(),
                                      msg.offset()))
                eli