In [1]:
!pip install kafka-python

[0m

# Create topic in Kafka

In terminal:
```
docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
             --create \
             --topic ny_taxi_rides \
             --config delete.retention.ms=60000
```

Result:

```
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic ny_taxi_rides.
```

## Delete topic

```
docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
             --delete \
             --topic ny_taxi_rides
```

In [1]:
topic = 'ny_taxi_rides'

In [2]:
!pip list | grep kafka

kafka-python             1.4.7
robinhood-aiokafka       1.1.6


In [3]:
BOOTSTRAP_SERVERS = ['localhost:9092']

In [63]:
import logging

# log_level = logging.DEBUG
# logging.basicConfig(level=log_level)
# log = logging.getLogger('kafka')
#log.setLevel(log_level)

# Write to topic

In [17]:
import json
from kafka import KafkaProducer
from kafka.errors import KafkaTimeoutError

In [18]:
producer = KafkaProducer(
    bootstrap_servers=BOOTSTRAP_SERVERS,
    key_serializer=lambda key: str(key).encode(),
    value_serializer=lambda x: json.dumps(x, default=str).encode('utf-8')
)

In [19]:
messages = [
    {'key': 1, 'value': {'foo': 'bar'}}
]

for message in messages:
    try:
        record = producer.send(topic=topic, key=message['key'], value=message['value'])
        print('Record {} successfully produced at offset {}'.format(message['key'], record.get().offset))
    except KafkaTimeoutError as e:
        print(e.__str__())

Record 1 successfully produced at offset 0


# Read messages from topic

In [20]:
from kafka import KafkaConsumer

In [28]:
consumer = KafkaConsumer(
    topic, 
    bootstrap_servers=BOOTSTRAP_SERVERS,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    key_deserializer=lambda key: int(key.decode('utf-8')),
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    group_id='group-id-0',
)

In [29]:
consumer.topics()

{'ny_taxi_rides'}

In [30]:
consumer.bootstrap_connected()

True

In [31]:
consumer.partitions_for_topic(topic)

{0}

In [32]:
consumer.subscription()

{'ny_taxi_rides'}

In [37]:
def consume_from_kafka(consumer, topic: str):
    consumer.subscribe(topic)
    print('Available topics to consume: ', consumer.subscription())
    while True:
        try:
            # SIGINT can't be handled when polling, limit timeout to 1 second.
            message = consumer.poll(1.0)
            if message is None or message == {}:
                continue
            for message_key, message_value in message.items():
                for msg_val in message_value:
                    print(msg_val.key, msg_val.value)
        except KeyboardInterrupt:
            break

    consumer.close()

In [38]:
consume_from_kafka(consumer, topic)

Consuming from Kafka started
Available topics to consume:  {'ny_taxi_rides'}
1 {'foo': 'bar'}
