In [1]:
!pip install confluent_kafka -q

[K     |████████████████████████████████| 2.7MB 4.0MB/s 
[?25h

In [2]:
import json
import sys
import os
import pandas as pd

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException, KafkaError

In [6]:
sys.path.append("/content/drive/MyDrive")
import mykeys

In [5]:
CLOUDKARAFKA_TOPIC = '2nizjcn5-movielens'

In [None]:
!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
!unzip ml-latest-small.zip

In [11]:
df = pd.read_csv('./ml-latest-small/movies.csv')
df.to_json('df.json')
df_json = pd.read_json('df.json')
movie_list= df_json.to_dict(orient="records")
print(movie_list[0])

{'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}


### Producer Setup [notebook]

In [None]:
topic = CLOUDKARAFKA_TOPIC.split(",")[0]

conf = {
    'bootstrap.servers': mykeys.CLOUDKARAFKA_BROKERS,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': mykeys.CLOUDKARAFKA_USERNAME,
    'sasl.password': mykeys.CLOUDKARAFKA_PASSWORD
    }

In [8]:
p = Producer(**conf)

In [9]:
def delivery_callback(err, msg):
  if err:
      sys.stderr.write('%% Message failed delivery: %s\n' % err)
  else:
      sys.stderr.write('%% Message delivered to %s [%d]\n' %
                        (msg.topic(), msg.partition()))

In [10]:
for movie in movie_list[0:5]:
    try:
        print("Message to be send : ", movie)
        p.produce(topic, str(movie), callback=delivery_callback)
    except BufferError as e:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                          len(p))
    p.poll(0)

sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()

Message to be send :  {'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}
Message to be send :  {'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}
Message to be send :  {'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}
Message to be send :  {'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}
Message to be send :  {'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}


% Waiting for 5 deliveries
% Message delivered to 2nizjcn5-movielens [0]
% Message delivered to 2nizjcn5-movielens [4]
% Message delivered to 2nizjcn5-movielens [1]
% Message delivered to 2nizjcn5-movielens [1]
% Message delivered to 2nizjcn5-movielens [1]


0

### Producer Setup [terminal]

In [27]:
%%writefile producer.py

import sys
import os

from confluent_kafka import Producer

sys.path.append("/content/drive/MyDrive")
import mykeys

CLOUDKARAFKA_TOPIC = '2nizjcn5-movielens'

if __name__ == '__main__':
    topic = CLOUDKARAFKA_TOPIC.split(",")[0]

    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
      'bootstrap.servers': mykeys.CLOUDKARAFKA_BROKERS,
      'session.timeout.ms': 6000,
      'default.topic.config': {'auto.offset.reset': 'smallest'},
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'SCRAM-SHA-256',
      'sasl.username': mykeys.CLOUDKARAFKA_USERNAME,
      'sasl.password': mykeys.CLOUDKARAFKA_PASSWORD
      }

    p = Producer(**conf)

    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d]\n' %
                             (msg.topic(), msg.partition()))

    for line in sys.stdin:
        try:
            p.produce(topic, line.rstrip(), callback=delivery_callback)
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))
        p.poll(0)

    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()

Overwriting producer.py


In [29]:
!python producer.py

%4|1620914051.795|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1620914051.795|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
{'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}
{'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}
% Message delivered to 2nizjcn5-movielens [1]
{'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}
% Message delivered to 2nizjcn5-movielens [4]
{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}
% Message delivered to 2nizjcn5-movielens [4]
{'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}
% Message delivered to 2nizjcn5-movielens [3]
Traceback (most recent call last):
  File "