# Kafka MongoDB Real-time Streaming

## Kafka Producer
> Sending events to Kafka broker in real-time

In [None]:
!pip install confluent_kafka -q

In [None]:
import json
import sys
import os
import pandas as pd

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException, KafkaError

In [None]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip ml-latest-small.zip

In [None]:
df = pd.read_csv('./ml-latest-small/movies.csv')
df.to_json('df.json')
df_json = pd.read_json('df.json')
movie_list= df_json.to_dict(orient="records")
print(movie_list[0])

{'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}


### Producer Setup [notebook]

In [None]:
CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1******'

In [None]:
topic = CLOUDKARAFKA_TOPIC.split(",")[0]

conf = {
    'bootstrap.servers': CLOUDKARAFKA_BROKERS,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': CLOUDKARAFKA_USERNAME,
    'sasl.password': CLOUDKARAFKA_PASSWORD
    }

In [None]:
p = Producer(**conf)

In [None]:
def delivery_callback(err, msg):
  if err:
      sys.stderr.write('%% Message failed delivery: %s\n' % err)
  else:
      sys.stderr.write('%% Message delivered to %s [%d]\n' %
                        (msg.topic(), msg.partition()))

In [None]:
for movie in movie_list[0:5]:
    try:
        print("Message to be send : ", movie)
        p.produce(topic, str(movie), callback=delivery_callback)
    except BufferError as e:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                          len(p))
    p.poll(0)

sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()

Message to be send :  {'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}
Message to be send :  {'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}
Message to be send :  {'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}
Message to be send :  {'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}
Message to be send :  {'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}


% Waiting for 5 deliveries
% Message delivered to yx03wajr-demo [1]
% Message delivered to yx03wajr-demo [2]
% Message delivered to yx03wajr-demo [0]
% Message delivered to yx03wajr-demo [0]
% Message delivered to yx03wajr-demo [0]


0

### Producer Setup [terminal]

In [None]:
%%writefile producer.py

import sys
import os

from confluent_kafka import Producer

CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1******'

if __name__ == '__main__':
    topic = CLOUDKARAFKA_TOPIC.split(",")[0]

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
      'bootstrap.servers': CLOUDKARAFKA_BROKERS,
      'session.timeout.ms': 6000,
      'default.topic.config': {'auto.offset.reset': 'smallest'},
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'SCRAM-SHA-256',
      'sasl.username': CLOUDKARAFKA_USERNAME,
      'sasl.password': CLOUDKARAFKA_PASSWORD
      }

    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d]\n' %
                             (msg.topic(), msg.partition()))

    for line in sys.stdin:
        try:
            p.produce(topic, line.rstrip(), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))
        p.poll(0)

    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

Overwriting producer.py


In [None]:
!python producer.py

%4|1623432412.565|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1623432412.565|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}


## Kafka Consumer and MongoDB
> Listening from kafka topic in real-time and storing in mongodb

In [None]:
!pip install confluent_kafka -q

In [None]:
import json
import sys
import os
import pandas as pd

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException, KafkaError

### Consumer Setup [notebook]

In [None]:
CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1j549G*****'

In [None]:
topics = CLOUDKARAFKA_TOPIC.split(",")

# Consumer configuration
conf = {
    'bootstrap.servers': CLOUDKARAFKA_BROKERS,
    'group.id': "%s-consumer" % CLOUDKARAFKA_USERNAME,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': CLOUDKARAFKA_USERNAME,
    'sasl.password': CLOUDKARAFKA_PASSWORD
}

In [None]:
c = Consumer(**conf)
c.subscribe(topics)

In [None]:
# while True:
for i in range(10):
  i+=1
  print(i)
  msg = c.poll(timeout=1.0)
  if msg is None:
      continue
  if msg.error():
      # Error or event
      if msg.error().code() == KafkaError._PARTITION_EOF:
          # End of partition event
          sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                            (msg.topic(), msg.partition(), msg.offset()))
      elif msg.error():
          # Error
          raise KafkaException(msg.error())
  else:
      # Proper message
      sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                        (msg.topic(), msg.partition(), msg.offset(),
                        str(msg.key())))
      print(msg.value())

c.close()

1
2
3
4
5


% yx03wajr-demo [0] at offset 0 with key None:
% yx03wajr-demo [0] at offset 1 with key None:
% yx03wajr-demo [0] at offset 2 with key None:
% yx03wajr-demo [0] at offset 3 with key None:
% yx03wajr-demo [0] at offset 4 with key None:


b"{'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}"
6
b"{'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}"
7
b"{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}"
8
b"{'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}"
9
b"{'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}"
10
b"{'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}"


% yx03wajr-demo [1] at offset 0 with key None:


### Consumer Setup [terminal]

In [None]:
%%writefile consumer.py

import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError


CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1j549G*****'

if __name__ == '__main__':
    topics = CLOUDKARAFKA_TOPIC.split(",")

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': CLOUDKARAFKA_BROKERS,
        'group.id': "%s-consumer" % CLOUDKARAFKA_USERNAME,
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': CLOUDKARAFKA_USERNAME,
        'sasl.password': CLOUDKARAFKA_PASSWORD
    }

    c = Consumer(**conf)
    c.subscribe(topics)
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

Writing consumer.py


In [None]:
!python consumer.py

% 2nizjcn5-movielens [1] at offset 4 with key None:
b'Transporter | click'
%% Aborted by user


### MongoDB Setup

In [None]:
!pip uninstall pymongo
!pip install pymongo[srv]

In [None]:
MONGODB_USER = 'kafka-demo'
MONGODB_PASSWORD = '<your-pass>'
MONGODB_CLUSTER = 'cluster0.ca4wh.mongodb.net'
MONGODB_DATABASE = 'movielens'

In [None]:
import pymongo
import urllib 

mongo_uri = f"mongodb+srv://{MONGODB_USER}:{MONGODB_PASSWORD}@{MONGODB_CLUSTER}/{MONGODB_DATABASE}?retryWrites=true&w=majority"
client = pymongo.MongoClient(mongo_uri)

In [None]:
mydb = client["movielens"]
mydb.list_collection_names()

['movies']

In [None]:
client.list_database_names()

['movielens', 'admin', 'local']

In [None]:
movies = mydb.movies

In [None]:
result = movies.insert_one({'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'})
result

<pymongo.results.InsertOneResult at 0x7f6cf017b820>

In [None]:
print(f"One movie: {result.inserted_id}")

One movie: 60c39abe413cf49529925bf4


In [None]:
# single-line command to insert record
print(client.movielens.movies.insert_one({'movieId':5, 'title':'Bride', 'genres':'Comedy'}).inserted_id)

60c39ac1413cf49529925bf5


In [None]:
movie2 = {'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}
movie3 = {'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}

new_result = movies.insert_many([movie2, movie3])
print(f"Multiple movies: {new_result.inserted_ids}")

Multiple movies: [ObjectId('60c39ac3413cf49529925bf6'), ObjectId('60c39ac3413cf49529925bf7')]


In [None]:
import pprint

for doc in movies.find():
  pprint.pprint(doc)

{'_id': ObjectId('60c39abe413cf49529925bf4'),
 'genres': 'Comedy|Romance',
 'movieId': 3,
 'title': 'Grumpier Old Men (1995)'}
{'_id': ObjectId('60c39ac1413cf49529925bf5'),
 'genres': 'Comedy',
 'movieId': 5,
 'title': 'Bride'}
{'_id': ObjectId('60c39ac3413cf49529925bf6'),
 'genres': 'Adventure|Children|Fantasy',
 'movieId': 2,
 'title': 'Jumanji (1995)'}
{'_id': ObjectId('60c39ac3413cf49529925bf7'),
 'genres': 'Comedy|Romance',
 'movieId': 3,
 'title': 'Grumpier Old Men (1995)'}


In [None]:
%%writefile consumer.py

import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError
import pymongo

CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1j549G*****'

MONGODB_USER = 'kafka-demo'
MONGODB_PASSWORD = '<your-pass>'
MONGODB_CLUSTER = 'cluster0.ca4wh.mongodb.net'
MONGODB_DATABASE = 'movielens'

mongo_uri = f"mongodb+srv://{MONGODB_USER}:{MONGODB_PASSWORD}@{MONGODB_CLUSTER}/{MONGODB_DATABASE}?retryWrites=true&w=majority"
client = pymongo.MongoClient(mongo_uri)
mydb = client[MONGODB_DATABASE]
movies = mydb.movies

if __name__ == '__main__':
    topics = CLOUDKARAFKA_TOPIC.split(",")

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': CLOUDKARAFKA_BROKERS,
        'group.id': "%s-consumer" % CLOUDKARAFKA_USERNAME,
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': CLOUDKARAFKA_USERNAME,
        'sasl.password': CLOUDKARAFKA_PASSWORD
    }

    c = Consumer(**conf)
    c.subscribe(topics)
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())
                try:
                  movies.insert_one(eval(msg.value().decode('utf-8')))
                except:
                  movies.insert_one({"err_flag":True, "msg":str(msg.value())})

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

Overwriting consumer.py


In [None]:
!python consumer.py

% yx03wajr-demo [2] at offset 4 with key None:
b"{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}"


## MongoDB Listener
> Listening mongoDB data events in real-time.

In [None]:
!pip uninstall pymongo
!pip install pymongo[srv]

In [None]:
import os
import pymongo
from bson.json_util import dumps

MONGODB_USER = 'kafka-demo'
MONGODB_PASSWORD = '<your-pass>'
MONGODB_CLUSTER = 'cluster0.ca4wh.mongodb.net'
MONGODB_DATABASE = 'movielens'

mongo_uri = f"mongodb+srv://{MONGODB_USER}:{MONGODB_PASSWORD}@{MONGODB_CLUSTER}/{MONGODB_DATABASE}?retryWrites=true&w=majority"
client = pymongo.MongoClient(mongo_uri)

In [None]:
import pandas as pd
movies = pd.DataFrame(columns=['_id','movieId','title','genres'])

In [None]:
change_stream = client.movielens.movies.watch()
for change in change_stream:
  _temp = change['fullDocument']
  movies = movies.append(pd.DataFrame(_temp, columns=_temp.keys(), index=[0]), ignore_index=True)
  display(movies)

Unnamed: 0,_id,movieId,title,genres
0,60c39d0078640c489089261e,3,Grumpier Old Men (1995),Comedy|Romance
