# Creating a cloud messaging service using Kafka Cloud (Karafka) and Python

In this code we will create the customer service. This service captures events uploaded to Karafka Cloud Service, for use according to the rules of each business.

# Installing kafka package

In [1]:
!pip install confluent_kafka

Collecting confluent_kafka
  Downloading confluent_kafka-1.7.0-cp37-cp37m-manylinux2010_x86_64.whl (2.7 MB)
[K     |████████████████████████████████| 2.7 MB 5.1 MB/s 
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-1.7.0


In this code we create an infinite loop that will be listening to the Kafka broker, waiting for messages from the producers. As soon as a new message is made available in Kafka, the consumer retrieves the message data and uses it according to business rules.

In [2]:
import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError

def createConsumer():

    topics = ['euaa7a9x-default']

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': 'dory-01.srvs.cloudkafka.com:9094,dory-02.srvs.cloudkafka.com:9094,dory-03.srvs.cloudkafka.com:9094',
        'group.id': "%s-consumer" % 'euaa7a9x',
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
	'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': 'euaa7a9x',
        'sasl.password': 'VKUGQ6Fj6r4JP6fewYRyPEwlNSDrKvf7'
    }

    c = Consumer(conf)
    c.subscribe(topics)
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

createConsumer()


% euaa7a9x-default [4] at offset 0 with key None:


b'my new message 1 - Rafael Luz New Test'


% euaa7a9x-default [3] at offset 0 with key None:
% euaa7a9x-default [1] at offset 0 with key None:


b'my new message 1 - Rafael Luz New Test'
b'my new message 2 - Rafael Luz New Test'


% euaa7a9x-default [3] at offset 1 with key None:
% euaa7a9x-default [3] at offset 2 with key None:
% euaa7a9x-default [4] at offset 1 with key None:


b'my new message 2 - Rafael Luz New Test'
b'my new message 3 - Rafael Luz New Test'
b'my new message 1 - Rafael Luz New Test'


%% Aborted by user
