# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Apache Kafka Streaming (This particular notebook is part of Assignment 7)

### Install Confluent kafka

In [1]:
!pip install confluent_kafka



Specify your own `BROKERS`, `USERNAME`, `PASSWORD` and `TOPIC` in the below script files.

### Example 1: Send and receive messages

Here we create two files one is `producer1.py` and another one is `consumer1.py`(in Consumer notebook). Producer will send messages to a topic and consumer will read these messages in real-time from that particular topic and displays the message along with its word count and an `ALERT` message if the number of words exceeds 6.

#### Write Consumer file

Here we create a consumer file that will read messages from the specified `topic`.

In [2]:
%%writefile consumer1.py

import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError

# Specify BROKERS, USERNAME, PASSWORD and TOPIC
# Use the same credentials from Assignment 7
brokers = "dory-01.srvs.cloudkafka.com:9094, dory-02.srvs.cloudkafka.com:9094, dory-03.srvs.cloudkafka.com:9094"
username = "pikjfbyx"
password = "4iUJFg-WrwqDrae9vCJ4oen99wCNXpVY"
topic = "pikjfbyx-default"

# Set the path for the user-defined modules so that they can be directly imported into the python program
os.environ['CLOUDKARAFKA_BROKERS']= brokers
os.environ['CLOUDKARAFKA_USERNAME']= username
os.environ['CLOUDKARAFKA_PASSWORD']= password
os.environ['CLOUDKARAFKA_TOPIC']= topic

if __name__ == '__main__':
    topics = os.environ['CLOUDKARAFKA_TOPIC'].split(",")

    # Consumer configuration
    conf = {
        'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],                # Specify kafka servers
        'group.id': "%s-consumer" % os.environ['CLOUDKARAFKA_USERNAME'],
        'session.timeout.ms': 6000,                                             # The consumer sends periodic heartbeats to indicate its liveness to the broker
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
	      'sasl.mechanisms': 'SCRAM-SHA-512',
        'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
        'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
    }

    c = Consumer(**conf)
    c.subscribe(topics)
    try:
        while True:                       # read data continuously
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                # sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % (msg.topic(), msg.partition(), msg.offset(), str(msg.key())))
                msg_ = msg.value()
                n = len(msg_.split())
                if n<=6:
                  print(msg.value())
                  print('Number of words: %d'%n)
                if n>6:
                  print('ALERT: Word limit exceeds!!')
                print("")

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

Writing consumer1.py


#### Run consumer file

The consumer will keep on running and will show output whenever the producer sends some data to that topic.

<font color='blue'>Before executing the below cell ensure that you created the CloudKarafka account and specified the credentials.</font>

In [3]:
!python consumer1.py

%6|1729146963.528|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-01.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-01.srvs.cloudkafka.com:9094/0: Disconnected (after 3896ms in state UP)
%6|1729146963.773|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-01.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-01.srvs.cloudkafka.com:9094/0: Disconnected (after 25ms in state UP, 1 identical error(s) suppressed)
%6|1729146963.917|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-02.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-02.srvs.cloudkafka.com:9094/1: Disconnected (after 68ms in state UP)
%4|1729146963.941|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-03.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-03.srvs.cloudkafka.com:9094/2: Disconnected (after 60ms in state UP)
%6|1729146964.156|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-02.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-02.srvs.cloudkafka.com:9094/1: Disconnected (after 21ms in state UP, 1 identical err

For next example **create a new topic** on CloudKarafka and use its topic name. To create a topic, please refer to step 11 in this [document](https://cdn.iisc.talentsprint.com/CDS/CloudKarafka.pdf).

### Example 2: Compute the rolling mean of last three insertions

Here we create two files one is `producer2.py` and another one is `consumer2.py`(in Consumer notebook). Producer will send data to a topic and consumer will read these records in real-time from that particular topic and displays the rolling mean of the last three insertions. Only the added numbers will be displayed for the first two insertions.

#### Write Consumer file

Here we create a consumer file that will read messages from the specified `topic`.

In [2]:
%%writefile consumer2.py

import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError

# Specify BROKERS, USERNAME, PASSWORD and TOPIC
# Use the same credentials from Assignment 7

# Set the path for the user-defined modules so that they can be directly imported into the python program
brokers = "dory-01.srvs.cloudkafka.com:9094, dory-02.srvs.cloudkafka.com:9094, dory-03.srvs.cloudkafka.com:9094"
username = "pikjfbyx"
password = "4iUJFg-WrwqDrae9vCJ4oen99wCNXpVY"
topic = "pikjfbyx-default"

# Set the path for the user-defined modules so that they can be directly imported into the python program
os.environ['CLOUDKARAFKA_BROKERS']= brokers
os.environ['CLOUDKARAFKA_USERNAME']= username
os.environ['CLOUDKARAFKA_PASSWORD']= password
os.environ['CLOUDKARAFKA_TOPIC']= topic

if __name__ == '__main__':
    topics = os.environ['CLOUDKARAFKA_TOPIC'].split(",")

    # Consumer configuration
    conf = {
        'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],                # Specify kafka servers
        'group.id': "%s-consumer" % os.environ['CLOUDKARAFKA_USERNAME'],
        'session.timeout.ms': 6000,                                             # The consumer sends periodic heartbeats to indicate its liveness to the broker
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
	      'sasl.mechanisms': 'SCRAM-SHA-512',
        'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
        'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
    }

    c = Consumer(**conf)
    c.subscribe(topics)
    numbers = []
    count = 1
    window = 3                              # rolling window
    try:
        while True:                    # read data continuously
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                # sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % (msg.topic(), msg.partition(), msg.offset(), str(msg.key())))
                msg_ = msg.value()                             # read data
                num = float(msg_.decode("utf-8"))              # decode from bytes and convert it into float type
                numbers.append(num)                            # add current data to list
                if count<window:
                  print(num)
                if count>=window:
                  print(num)
                  mean = sum(numbers[-3:])/3                   # calculate rolling mean
                  print("mean of last three insertions: {}".format(mean))
                print("")
                count += 1

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

Writing consumer2.py


#### Run Consumer file

The consumer will keep on running and will show output whenever the producer sends some data to that topic.

<font color='blue'>Before executing the below cell ensure that you created the CloudKarafka account and specified the credentials.</font>

In [None]:
!python consumer2.py

%6|1729147033.305|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-03.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-03.srvs.cloudkafka.com:9094/2: Disconnected (after 3985ms in state UP)
%6|1729147033.501|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-03.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-03.srvs.cloudkafka.com:9094/2: Disconnected (after 20ms in state UP, 1 identical error(s) suppressed)
%4|1729147033.540|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-01.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-01.srvs.cloudkafka.com:9094/0: Disconnected (after 61ms in state UP)
%6|1729147033.602|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-02.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-02.srvs.cloudkafka.com:9094/1: Disconnected (after 62ms in state UP)
%6|1729147033.747|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://dory-01.srvs.cloudkafka.com:9094/bootstrap]: sasl_ssl://dory-01.srvs.cloudkafka.com:9094/0: Disconnected (after 41ms in state UP, 1 identical err