# Kafka MongoDB Real-time Streaming Kafka Consumer and MongoDB
> Listening from kafka topic in real-time and storing in mongodb

- toc: true
- badges: true
- comments: true
- categories: [mongodb, kafka, real time]
- image: 

In [7]:
!pip install confluent_kafka -q

In [8]:
import json
import sys
import os
import pandas as pd

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException, KafkaError

### Consumer Setup [notebook]

In [9]:
CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1j549G*****'

In [10]:
topics = CLOUDKARAFKA_TOPIC.split(",")

# Consumer configuration
conf = {
    'bootstrap.servers': CLOUDKARAFKA_BROKERS,
    'group.id': "%s-consumer" % CLOUDKARAFKA_USERNAME,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': CLOUDKARAFKA_USERNAME,
    'sasl.password': CLOUDKARAFKA_PASSWORD
}

In [11]:
c = Consumer(**conf)
c.subscribe(topics)

In [12]:
# while True:
for i in range(10):
  i+=1
  print(i)
  msg = c.poll(timeout=1.0)
  if msg is None:
      continue
  if msg.error():
      # Error or event
      if msg.error().code() == KafkaError._PARTITION_EOF:
          # End of partition event
          sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                            (msg.topic(), msg.partition(), msg.offset()))
      elif msg.error():
          # Error
          raise KafkaException(msg.error())
  else:
      # Proper message
      sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                        (msg.topic(), msg.partition(), msg.offset(),
                        str(msg.key())))
      print(msg.value())

c.close()

1
2
3
4
5


% yx03wajr-demo [0] at offset 0 with key None:
% yx03wajr-demo [0] at offset 1 with key None:
% yx03wajr-demo [0] at offset 2 with key None:
% yx03wajr-demo [0] at offset 3 with key None:
% yx03wajr-demo [0] at offset 4 with key None:


b"{'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}"
6
b"{'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}"
7
b"{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}"
8
b"{'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}"
9
b"{'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}"
10
b"{'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}"


% yx03wajr-demo [1] at offset 0 with key None:


### Consumer Setup [terminal]

In [14]:
%%writefile consumer.py

import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError


CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1j549G*****'

if __name__ == '__main__':
    topics = CLOUDKARAFKA_TOPIC.split(",")

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': CLOUDKARAFKA_BROKERS,
        'group.id': "%s-consumer" % CLOUDKARAFKA_USERNAME,
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': CLOUDKARAFKA_USERNAME,
        'sasl.password': CLOUDKARAFKA_PASSWORD
    }

    c = Consumer(**conf)
    c.subscribe(topics)
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

Writing consumer.py


In [None]:
!python consumer.py

% 2nizjcn5-movielens [1] at offset 4 with key None:
b'Transporter | click'
%% Aborted by user


### MongoDB Setup

In [15]:
!pip uninstall pymongo
!pip install pymongo[srv]

Uninstalling pymongo-3.11.4:
  Would remove:
    /usr/local/lib/python3.7/dist-packages/bson/*
    /usr/local/lib/python3.7/dist-packages/gridfs/*
    /usr/local/lib/python3.7/dist-packages/pymongo-3.11.4.dist-info/*
    /usr/local/lib/python3.7/dist-packages/pymongo/*
Proceed (y/n)? y
  Successfully uninstalled pymongo-3.11.4
Collecting pymongo[srv]
[?25l  Downloading https://files.pythonhosted.org/packages/b1/29/c0c8791ba972456f8aa3f027af33206499bc9f52a948e0d9c10909339b3c/pymongo-3.11.4-cp37-cp37m-manylinux2014_x86_64.whl (512kB)
[K     |████████████████████████████████| 522kB 7.8MB/s 
[?25hCollecting dnspython<2.0.0,>=1.16.0; extra == "srv"
[?25l  Downloading https://files.pythonhosted.org/packages/ec/d3/3aa0e7213ef72b8585747aa0e271a9523e713813b9a20177ebe1e939deb0/dnspython-1.16.0-py2.py3-none-any.whl (188kB)
[K     |████████████████████████████████| 194kB 13.9MB/s 
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-1.16.0 pymongo-3.11.4


In [28]:
MONGODB_USER = 'kafka-demo'
MONGODB_PASSWORD = '<your-pass>'
MONGODB_CLUSTER = 'cluster0.ca4wh.mongodb.net'
MONGODB_DATABASE = 'movielens'

In [29]:
import pymongo
import urllib 

mongo_uri = f"mongodb+srv://{MONGODB_USER}:{MONGODB_PASSWORD}@{MONGODB_CLUSTER}/{MONGODB_DATABASE}?retryWrites=true&w=majority"
client = pymongo.MongoClient(mongo_uri)

In [31]:
mydb = client["movielens"]
mydb.list_collection_names()

['movies']

In [30]:
client.list_database_names()

['movielens', 'admin', 'local']

In [32]:
movies = mydb.movies

In [33]:
result = movies.insert_one({'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'})
result

<pymongo.results.InsertOneResult at 0x7f6cf017b820>

In [34]:
print(f"One movie: {result.inserted_id}")

One movie: 60c39abe413cf49529925bf4


In [35]:
# single-line command to insert record
print(client.movielens.movies.insert_one({'movieId':5, 'title':'Bride', 'genres':'Comedy'}).inserted_id)

60c39ac1413cf49529925bf5


In [36]:
movie2 = {'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}
movie3 = {'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}

new_result = movies.insert_many([movie2, movie3])
print(f"Multiple movies: {new_result.inserted_ids}")

Multiple movies: [ObjectId('60c39ac3413cf49529925bf6'), ObjectId('60c39ac3413cf49529925bf7')]


In [37]:
import pprint

for doc in movies.find():
  pprint.pprint(doc)

{'_id': ObjectId('60c39abe413cf49529925bf4'),
 'genres': 'Comedy|Romance',
 'movieId': 3,
 'title': 'Grumpier Old Men (1995)'}
{'_id': ObjectId('60c39ac1413cf49529925bf5'),
 'genres': 'Comedy',
 'movieId': 5,
 'title': 'Bride'}
{'_id': ObjectId('60c39ac3413cf49529925bf6'),
 'genres': 'Adventure|Children|Fantasy',
 'movieId': 2,
 'title': 'Jumanji (1995)'}
{'_id': ObjectId('60c39ac3413cf49529925bf7'),
 'genres': 'Comedy|Romance',
 'movieId': 3,
 'title': 'Grumpier Old Men (1995)'}


In [38]:
%%writefile consumer.py

import sys
import os

from confluent_kafka import Consumer, KafkaException, KafkaError
import pymongo

CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1j549G*****'

MONGODB_USER = 'kafka-demo'
MONGODB_PASSWORD = '<your-pass>'
MONGODB_CLUSTER = 'cluster0.ca4wh.mongodb.net'
MONGODB_DATABASE = 'movielens'

mongo_uri = f"mongodb+srv://{MONGODB_USER}:{MONGODB_PASSWORD}@{MONGODB_CLUSTER}/{MONGODB_DATABASE}?retryWrites=true&w=majority"
client = pymongo.MongoClient(mongo_uri)
mydb = client[MONGODB_DATABASE]
movies = mydb.movies

if __name__ == '__main__':
    topics = CLOUDKARAFKA_TOPIC.split(",")

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': CLOUDKARAFKA_BROKERS,
        'group.id': "%s-consumer" % CLOUDKARAFKA_USERNAME,
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': CLOUDKARAFKA_USERNAME,
        'sasl.password': CLOUDKARAFKA_PASSWORD
    }

    c = Consumer(**conf)
    c.subscribe(topics)
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())
                try:
                  movies.insert_one(eval(msg.value().decode('utf-8')))
                except:
                  movies.insert_one({"err_flag":True, "msg":str(msg.value())})

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    # Close down consumer to commit final offsets.
    c.close()

Overwriting consumer.py


In [None]:
!python consumer.py

% yx03wajr-demo [2] at offset 4 with key None:
b"{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}"
