In [1]:
!pip install confluent_kafka -q

[K     |████████████████████████████████| 2.7MB 5.6MB/s 
[?25h

In [23]:
import json
import sys
import os
import pandas as pd

from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException, KafkaError

In [13]:
os.environ['CLOUDKARAFKA_BROKERS'] = "<secret>"
os.environ['CLOUDKARAFKA_USERNAME'] = "<secret>"
os.environ['CLOUDKARAFKA_PASSWORD'] = "<secret>"
os.environ['CLOUDKARAFKA_TOPIC'] = "<secret>"

In [17]:
# !wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
# !unzip ml-latest-small.zip

df = pd.read_csv('./ml-latest-small/movies.csv')
df.head().to_json('df.json')
df_json = pd.read_json('df.json')
movie_list= df_json.to_dict(orient="records")
print(movie_list[0])

{'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}


In [38]:
topic = os.environ['CLOUDKARAFKA_TOPIC'].split(",")[0]

conf = {
    'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
    'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
    }

p = Producer(**conf)

def delivery_callback(err, msg):
    if err:
        sys.stderr.write('%% Message failed delivery: %s\n' % err)
    else:
        sys.stderr.write('%% Message delivered to %s [%d]\n' %
                          (msg.topic(), msg.partition()))

for movie in movie_list:
    try:
        print("Message to be send : ", movie)
        p.produce(topic, str(movie), callback=delivery_callback)
    except BufferError as e:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                          len(p))
    p.poll(0)

sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()

Message to be send :  {'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}
Message to be send :  {'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}
Message to be send :  {'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}
Message to be send :  {'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}
Message to be send :  {'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}


% Waiting for 5 deliveries
% Message delivered to 2nizjcn5-entree [1]
% Message delivered to 2nizjcn5-entree [1]
% Message delivered to 2nizjcn5-entree [3]
% Message delivered to 2nizjcn5-entree [2]
% Message delivered to 2nizjcn5-entree [2]


0

In [25]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
# !tar xf spark-3.0.0-bin-hadoop3.2.tgz
# !pip install -q findspark
# !pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

In [39]:
topics = os.environ['CLOUDKARAFKA_TOPIC'].split(",")

# Consumer configuration
conf = {
    'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],
    'group.id': "%s-consumer" % os.environ['CLOUDKARAFKA_USERNAME'],
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
    'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
}

c = Consumer(**conf)
c.subscribe(topics)

# while True:
for i in range(10):
  i+=1
  print(i)
  msg = c.poll(timeout=1.0)
  if msg is None:
      continue
  if msg.error():
      # Error or event
      if msg.error().code() == KafkaError._PARTITION_EOF:
          # End of partition event
          sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                            (msg.topic(), msg.partition(), msg.offset()))
      elif msg.error():
          # Error
          raise KafkaException(msg.error())
  else:
      # Proper message
      sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                        (msg.topic(), msg.partition(), msg.offset(),
                        str(msg.key())))
      print(msg.value())

c.close()

1
2
3
4
5
6


% 2nizjcn5-entree [1] at offset 2 with key None:
% 2nizjcn5-entree [1] at offset 3 with key None:


b"{'movieId': 2, 'title': 'Jumanji (1995)', 'genres': 'Adventure|Children|Fantasy'}"
7
b"{'movieId': 3, 'title': 'Grumpier Old Men (1995)', 'genres': 'Comedy|Romance'}"
8


% 2nizjcn5-entree [3] at offset 6 with key None:
% 2nizjcn5-entree [2] at offset 4 with key None:
% 2nizjcn5-entree [2] at offset 5 with key None:


b"{'movieId': 1, 'title': 'Toy Story (1995)', 'genres': 'Adventure|Animation|Children|Comedy|Fantasy'}"
9
b"{'movieId': 4, 'title': 'Waiting to Exhale (1995)', 'genres': 'Comedy|Drama|Romance'}"
10
b"{'movieId': 5, 'title': 'Father of the Bride Part II (1995)', 'genres': 'Comedy'}"
