# Kafka Consumer Example - Saving Messages to S3 Storage

For an introduction to [Kafka](https://kafka.apache.org/), you may want to read some of the main [concepts](https://kafka.apache.org/documentation/#intro_concepts_and_terms).  An **event** records the fact that "something happened". An event has a key, value, timestamp, and optional metadata headers. **Producers** are those client applications that publish (write) events to Kafka, and **consumers** are those that subscribe to (read and process) these events.

This is an example of how to write a very simple Kafka [consumer](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html) using the kafka-python library. It receives simple JSON formatted strings on a kafka **topic** and save them to S3 storage.  This is a single consumer, but could be part of a **consumer group**. A consumer group is a set of consumers which cooperate to consume data from some topics.

For a sample producer, refer to the notebook [1_kafka_producer.ipynb](./1_kafka_producer.ipynb) to send some sample events.

For further reading, visit the [documentation](https://kafka.apache.org/documentation/) for Kafka and for [kafka-python](https://kafka-python.readthedocs.io/)

## Dependencies

- [kafka-python](https://pypi.org/project/kafka-python/) Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators).
- [boto3](https://pypi.org/project/boto3/) Boto3 is the Amazon Web Services (AWS) Software Development Kit (SDK) for Python. which allows Python developers to write software that makes use of services like Amazon S3 and Amazon EC2.  Can also be used to connect to other S3 API compatible storage.

*Note:  In general, you want to manage your dependencies in a `requirements.txt` file. For easy demonstration, we've installed the libraries inline here.*


In [None]:
!pip install kafka-python
!pip install boto3

## Connection Information

Generally, much of your connection information (servers, username,  password) will be injected as environment variables.  This prevents a user from uploading private information to source control.

#### Expected Environment Variables
- `KAFKA_BOOTSTRAP_SERVER` location of the Kafka Bootstrap Server.  e.g. 'my-kafka-bootstrap.namespace.svc.cluster.local:9092'
- `KAFKA_TOPIC` Name of Kafka topic to consume messages from.
- `AWS_ACCESS_KEY_ID` S3 Key.  e.g. 'BKIAI4VI8CXLOWIZ5BGR'
- `AWS_SECRET_ACCESS_KEY` S3 Secret.  e.g. 'mdIRYm4nCpu/G7Wf5QhCC+RI/s92+vHF68n3T9mH'
- `AWS_BUCKET` Bucket name e.g. 'my-bucket'
- `AWS_PREFIX` Path like prefix for files e.g. '/dir/subdir/'


In [None]:
import os

# location of the Kafka Bootstrap Server loaded from the environment variable.
# e.g. 'my-kafka-bootstrap.namespace.svc.cluster.local:9092'
KAFKA_BOOTSTRAP_SERVER = os.environ.get('KAFKA_BOOTSTRAP_SERVER')

# Name of the topic which the consumer will subscribe
KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC') or 'notebook-test'

# Kafka consumer group to which this consumer belongs
KAFKA_CONSUMER_GROUP = 'notebook-consumer-save-s3'

# AWS credentials
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')

# AWS bucket and path like prefix
AWS_BUCKET = os.environ.get('AWS_BUCKET')
AWS_PREFIX = os.environ.get('AWS_PREFIX')


### Create the S3 Resource

To connect to Ceph or other S3 API compatible storage, you'll need to specify `endpoint_url` but for AWS, you can omit that parameter

```python
session = boto3.session.Session(aws_access_key_id='',
                                aws_secret_access_key='')

s3_resource = session.resource(
    's3',
    config=botocore.client.Config(signature_version='s3v4'),
    endpoint_url='https://s3.amazonaws.com',
    region_name='us-east-1')
)
```

In [None]:
import os
import boto3
import botocore

key_id = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

session = boto3.session.Session(aws_access_key_id=key_id,
                                aws_secret_access_key=secret_key)

s3_resource = session.resource(
    's3',
    config=botocore.client.Config(signature_version='s3v4')
)


## Creating the Consumer

This function will create a consumer that connects to the Kafka server set by variable `KAFKA_BOOTSTRAP_SERVER` and listen to the topic set by variable `KAFKA_TOPIC`.  The consumer receive and save messages to S3 storage.  The consumer will run until the kernel is stopped.

In [None]:
from kafka import KafkaConsumer
import json

def create_consumer_save_s3():
    consumer = KafkaConsumer(KAFKA_TOPIC,
                             group_id=KAFKA_CONSUMER_GROUP,
                             bootstrap_servers=[KAFKA_BOOTSTRAP_SERVER],
                             security_protocol='PLAINTEXT',
                             auto_offset_reset='earliest',
                             api_version_auto_timeout_ms=30000,
                             request_timeout_ms=450000)

    print(f'Subscribed to "{KAFKA_BOOTSTRAP_SERVER}" consuming topic "{KAFKA_TOPIC}"...')

    bucket = s3_resource.Bucket(AWS_BUCKET)

    try:
        for record in consumer:
            timestamp = record.timestamp
            filename = f'{AWS_PREFIX}/kafka-messages/{timestamp}.json'
            msg = record.value.decode('utf-8')
            print(f'Writing msg to s3:"{AWS_BUCKET}/{filename}"')
            json_obj = bucket.Object(filename)
            json_obj.put(Body=msg)


    finally:
        print("Closing consumer...")
        consumer.close()
    print("Kafka consumer stopped.")



## Start Listening

Starts the consumer.  Listens to events and saves the values to the S3 storage.  **Stop the kernel to quit**

In [None]:

try:
    create_consumer_save_s3()
except KeyboardInterrupt:
    print('Stopped')
