# Kafka MongoDB Real-time Streaming: Kafka Producer
> Sending events to Kafka broker in real-time

- toc: true
- badges: true
- comments: true
- categories: [kafka, real time]
- image: 

In [1]:
!pip install confluent_kafka -q

[K     |████████████████████████████████| 2.7MB 5.1MB/s 
[?25h

In [2]:
import json
import sys
import os
import pandas as pd

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException, KafkaError

In [3]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip ml-latest-small.zip

--2021-06-11 16:23:29--  http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2021-06-11 16:23:29 (4.56 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  


In [4]:
df = pd.read_csv('./ml-latest-small/movies.csv')
df.to_json('df.json')
df_json = pd.read_json('df.json')
movie_list= df_json.to_dict(orient="records")
print(movie_list[0])

{'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}


### Producer Setup [notebook]

In [11]:
CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1******'

In [12]:
topic = CLOUDKARAFKA_TOPIC.split(",")[0]

conf = {
    'bootstrap.servers': CLOUDKARAFKA_BROKERS,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': CLOUDKARAFKA_USERNAME,
    'sasl.password': CLOUDKARAFKA_PASSWORD
    }

In [7]:
p = Producer(**conf)

In [8]:
def delivery_callback(err, msg):
  if err:
      sys.stderr.write('%% Message failed delivery: %s\n' % err)
  else:
      sys.stderr.write('%% Message delivered to %s [%d]\n' %
                        (msg.topic(), msg.partition()))

In [9]:
for movie in movie_list[0:5]:
    try:
        print("Message to be send : ", movie)
        p.produce(topic, str(movie), callback=delivery_callback)
    except BufferError as e:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                          len(p))
    p.poll(0)

sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()

Message to be send :  {'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}
Message to be send :  {'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}
Message to be send :  {'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}
Message to be send :  {'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}
Message to be send :  {'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}


% Waiting for 5 deliveries
% Message delivered to yx03wajr-demo [1]
% Message delivered to yx03wajr-demo [2]
% Message delivered to yx03wajr-demo [0]
% Message delivered to yx03wajr-demo [0]
% Message delivered to yx03wajr-demo [0]


0

### Producer Setup [terminal]

In [13]:
%%writefile producer.py

import sys
import os

from confluent_kafka import Producer

CLOUDKARAFKA_TOPIC = 'yx03wajr-demo'
CLOUDKARAFKA_BROKERS = 'dory-01.srvs.cloudkafka.com:9094, \
dory-02.srvs.cloudkafka.com:9094, \
dory-03.srvs.cloudkafka.com:9094'
CLOUDKARAFKA_USERNAME = 'yx03wajr'
CLOUDKARAFKA_PASSWORD = 'pHva0afDUXPya6JfKrbM1******'

if __name__ == '__main__':
    topic = CLOUDKARAFKA_TOPIC.split(",")[0]

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
      'bootstrap.servers': CLOUDKARAFKA_BROKERS,
      'session.timeout.ms': 6000,
      'default.topic.config': {'auto.offset.reset': 'smallest'},
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'SCRAM-SHA-256',
      'sasl.username': CLOUDKARAFKA_USERNAME,
      'sasl.password': CLOUDKARAFKA_PASSWORD
      }

    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d]\n' %
                             (msg.topic(), msg.partition()))

    for line in sys.stdin:
        try:
            p.produce(topic, line.rstrip(), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))
        p.poll(0)

    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

Overwriting producer.py


In [None]:
!python producer.py

%4|1623432412.565|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1623432412.565|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}
