In [1]:
from kafka import KafkaProducer, KafkaConsumer, TopicPartition

In [2]:
files = "files/"
ca_filename = "ca.pem"
cert_filename = "service.cert"
key_filename = "service.key"
topic_name = "demo-topic"

## Producer

In [3]:
producer = KafkaProducer(
    bootstrap_servers="message-service-eero-6762.aivencloud.com:18806",
    security_protocol="SSL",
    ssl_cafile= files + ca_filename,
    ssl_certfile= files + cert_filename,
    ssl_keyfile= files + key_filename
)

## Consumer

In [4]:
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers="message-service-eero-6762.aivencloud.com:18806",
    client_id="demo-client-1",
    group_id="demo-group",
    security_protocol="SSL",
    ssl_cafile= files + ca_filename,
    ssl_certfile= files + cert_filename,
    ssl_keyfile= files + key_filename
)


### Consumer parts and offsets

In [39]:
parts = [TopicPartition(topic_name, p) for p in consumer.partitions_for_topic(topic_name)]
parts

[TopicPartition(topic='demo-topic', partition=0)]

In [13]:
offsets = [consumer.end_offsets(parts)[p] for p in parts]
offsets

[0]

## Playing with Producer and Consumer

### Add messages

In [15]:
## For topic with only 1 partition
def add_messages(amount, first_number):
    for i in range(first_number, first_number + amount):
        message = "message number {}".format(i)
        print("Sending: {}".format(message))
        m = producer.send(topic_name, message.encode("utf-8")).get()
        print("Offset: " + str(m.offset))

In [16]:
add_messages(4, offsets[0])

Sending: message number 0
Offset: 0
Sending: message number 1
Offset: 1
Sending: message number 2
Offset: 2
Sending: message number 3
Offset: 3


In [17]:
offsets = [consumer.end_offsets(parts)[p] for p in parts]
offsets

[4]

In [18]:
add_messages(4, offsets[0])

Sending: message number 4
Offset: 4
Sending: message number 5
Offset: 5
Sending: message number 6
Offset: 6
Sending: message number 7
Offset: 7


In [19]:
producer.flush()

### Read all messages

In [34]:
consumer.seek_to_beginning(*parts)

In [35]:
raw_msgs = consumer.poll(timeout_ms=1000)
raw_msgs

{TopicPartition(topic='demo-topic', partition=0): [ConsumerRecord(topic='demo-topic', partition=0, offset=0, timestamp=1528974988355, timestamp_type=0, key=None, value=b'message number 0', checksum=None, serialized_key_size=-1, serialized_value_size=16),
  ConsumerRecord(topic='demo-topic', partition=0, offset=1, timestamp=1528974988385, timestamp_type=0, key=None, value=b'message number 1', checksum=None, serialized_key_size=-1, serialized_value_size=16),
  ConsumerRecord(topic='demo-topic', partition=0, offset=2, timestamp=1528974988391, timestamp_type=0, key=None, value=b'message number 2', checksum=None, serialized_key_size=-1, serialized_value_size=16),
  ConsumerRecord(topic='demo-topic', partition=0, offset=3, timestamp=1528974988397, timestamp_type=0, key=None, value=b'message number 3', checksum=None, serialized_key_size=-1, serialized_value_size=16),
  ConsumerRecord(topic='demo-topic', partition=0, offset=4, timestamp=1528975016309, timestamp_type=0, key=None, value=b'messag

In [36]:
for tp, msgs in raw_msgs.items():
    for msg in msgs:
        print("Received: {}".format(msg.value))

Received: b'message number 0'
Received: b'message number 1'
Received: b'message number 2'
Received: b'message number 3'
Received: b'message number 4'
Received: b'message number 5'
Received: b'message number 6'
Received: b'message number 7'


### Read message with offset

In [37]:
def get_message(part: TopicPartition, offset: int):
    consumer.seek(part, offset)
    raw_msgs = consumer.poll(timeout_ms=1000)
    msg = list(raw_msgs.values())[0][0]
    return msg

In [38]:
get_message(parts[0], 3)

ConsumerRecord(topic='demo-topic', partition=0, offset=3, timestamp=1528974988397, timestamp_type=0, key=None, value=b'message number 3', checksum=None, serialized_key_size=-1, serialized_value_size=16)