# Kafka Consumer Example - Printing Messages

For an introduction to [Kafka](https://kafka.apache.org/), you may want to read some of the main [concepts](https://kafka.apache.org/documentation/#intro_concepts_and_terms).  An **event** records the fact that "something happened". An event has a key, value, timestamp, and optional metadata headers. **Producers** are those client applications that publish (write) events to Kafka, and **consumers** are those that subscribe to (read and process) these events.

This is an example of how to write a very simple Kafka [consumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html) using the kafka-python library.  This example connects to a Kafka broker configured with the `SASL_PLAIN` security protocol. It receives simple JSON formatted strings on a kafka **topic** and prints them out to the console.  This is a single consumer, but could be part of a **consumer group**. A consumer group is a set of consumers which cooperate to consume data from some topics.

For a sample producer, refer to the notebook [1_kafka_producer.ipynb](./1_kafka_producer.ipynb) to send some sample events.

For further reading, visit the [documentation](https://kafka.apache.org/documentation/) for Kafka and for [kafka-python](https://kafka-python.readthedocs.io/)


## Dependencies

- [kafka-python](https://pypi.org/project/kafka-python/) Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators).


In [None]:
!pip install -r requirements.txt

## Connection Information

Generally, much of your connection information (servers, username,  password) will be injected as environment variables.  This prevents a user from uploading private information to source control.

#### Expected Environment Variables
- `KAFKA_BOOTSTRAP_SERVER` location of the Kafka Bootstrap Server.  e.g. 'abc.xyz.kafka.rhcloud.com:443'
- `KAFKA_USERNAME` SASL username or client ID e.g. 'srvc-acct-1234-5678-abcd-efgj-12345678abcd'
- `KAFKA_PASSWORD` SASL password or client secret. e.g. 'abcd1234-5678-abcd-efgj-12345678abcd'

In [None]:
import os

# location of the Kafka Bootstrap Server loaded from the environment variable.
# e.g. 'abc.xyz.kafka.rhcloud.com:443'
KAFKA_BOOTSTRAP_SERVER = os.environ.get('KAFKA_BOOTSTRAP_SERVER')

# SASL settings.  Defaults to SASL_SSL/PLAIN.
# No auth would be PLAINTEXT/''
KAFKA_SECURITY_PROTOCOL = os.environ.get('KAFKA_SECURITY_PROTOCOL', 'SASL_SSL')
KAFKA_SASL_MECHANISM = os.environ.get('KAFKA_SASL_MECHANISM', 'PLAIN')

# SASL username or client ID,  e.g. 'srvc-acct-1234-5678-abcd-efgj-12345678abcd'
KAFKA_USERNAME = os.environ.get('KAFKA_USERNAME')

# SASL password or client secret, e.g. 'abcd1234-5678-abcd-efgj-12345678abcd'
KAFKA_PASSWORD = os.environ.get('KAFKA_PASSWORD')

# Name of the topic for the producer to send messages.
# Consumers will listen to this topic for events.
KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC') or 'notebook-test'

# Kafka consumer group to which this consumer belongs
KAFKA_CONSUMER_GROUP = 'notebook-consumer'


# Uncomment for debug purposes,
# but don't save the output anywhere
# print(f'KAFKA_BOOTSTRAP_SERVER="{KAFKA_BOOTSTRAP_SERVER}"')
# print(f'KAFKA_SECURITY_PROTOCOL="{KAFKA_SECURITY_PROTOCOL}"')
# print(f'KAFKA_SASL_MECHANISM="{KAFKA_SASL_MECHANISM}"')
# print(f'KAFKA_USERNAME="{KAFKA_USERNAME}"')
# print(f'KAFKA_PASSWORD="{KAFKA_PASSWORD}"')
# print(f'KAFKA_TOPIC="{KAFKA_TOPIC}"')
# print(f'KAFKA_CONSUMER_GROUP="{KAFKA_CONSUMER_GROUP}"')

## Creating the Consumer

This function will create a consumer that connects to the Kafka server set by variable `KAFKA_BOOTSTRAP_SERVER` and listen to the topic set by variable `KAFKA_TOPIC`.  The consumer receive and print out messages.  The consumer will run until the kernel is stopped.

In [None]:
from kafka import KafkaConsumer


def create_consumer_print():
    consumer = KafkaConsumer(KAFKA_TOPIC,
                             group_id=KAFKA_CONSUMER_GROUP,
                             bootstrap_servers=[KAFKA_BOOTSTRAP_SERVER],
                             security_protocol=KAFKA_SECURITY_PROTOCOL,
                             sasl_mechanism=KAFKA_SASL_MECHANISM,
                             sasl_plain_username=KAFKA_USERNAME,
                             sasl_plain_password=KAFKA_PASSWORD,
                             api_version_auto_timeout_ms=30000,
                             request_timeout_ms=450000)
    print(f'Subscribed to "{KAFKA_BOOTSTRAP_SERVER}" consuming topic "{KAFKA_TOPIC}"...')

    try:
        for record in consumer:
            msg = record.value.decode('utf-8')
            topic = record.topic
            print(('Received the following message on the '
                   f'topic "{topic}": {msg}'))

    finally:
        print("Closing consumer...")
        consumer.close()
    print("Kafka consumer stopped.")

## Start Listening

Starts the consumer.  Listens to events and prints out the values to the console.  **Stop the kernel to quit**

In [None]:
try:
    create_consumer_print()
except KeyboardInterrupt:
    print('Stopped')