# Pre-requisite to run this notebook: Start zookeeper and Kafka Server.
ie., for windows - zookeeper-server-start.bat and kafka-server-start.bat .

We have used Kafka 2.12-2.1.1 .


# This notebook does following: 
# 1) Create a Kafka Topic
# 2) Create a Kafka Producer
# 3) Read messages from user as console input
# 4) Send messages to Topic using Producer

# We have used confluent libraries here.

reference: https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/

# To install python kafka plugin: 
pip install kafka-python<br/>
pip install confluent_kafka 

In [1]:
#pip install kafka-python
#from kafka import KafkaProducer, TopicPartition
from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource
from confluent_kafka import Producer

import sys

# Kafka runs on local host.


In [2]:
conf = {'bootstrap.servers': 'localhost'}
topic_name="topic_events"

# Create Topic if it doesn't exist

In [3]:
# Create Admin client
admin_client = AdminClient(conf)
    
def is_topic(topicname_in):
    tops=admin_client.list_topics()
    if tops.topics.get(topicname_in) is None:
        return False
    return True

def create_topic(topicname_in):
    if is_topic(topicname_in):
        print("Topic ",topicname_in," already exists")
        return
    
    new_topics = [NewTopic(topicname_in, num_partitions=3, replication_factor=1)]
    
    tops=admin_client.list_topics()
    print(tops)
    # Call create_topics to asynchronously create topics, a dict
    # of <topic,future> is returned.
    fs = admin_client.create_topics(new_topics)

    # Wait for operation to finish.
    # Timeouts are preferably controlled by passing request_timeout=15.0
    # to the create_topics() call.
    # All futures will finish at the same time.
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            print("Topic {} created".format(topic))
        except Exception as e:
            print("Failed to create topic {}: {}".format(topic, e))

create_topic(topic_name)


Topic  topic_events  already exists


# Create Producer Instance 

In [4]:
# Create Producer instance
producer = Producer(**conf)

# Optional per-message delivery callback (triggered by poll() or flush())


In [5]:

# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
    if err:
        print('Message failed delivery: ', err)
    else:
        print('Message delivered to topic={} partition={} offset={}'.format(msg.topic(), msg.partition(), msg.offset()))

# Read user input from console and send to Topic.
# enter 'q' to Quit.

In [None]:
# Read lines from stdin, produce/push each line to Kafka
while (True):
    line=input()
    if line == "q":
        print("Quitting")
        break
    try:
        # Push line (without newline)
        producer.produce(topic_name, line.rstrip(), callback=delivery_callback)

    except BufferError:
        print("Local producer queue is full, {} messages awaiting delivery: try again".format(len(producer)))
    producer.flush()

a1
Message delivered to topic=topic_events partition=1 offset=24
b1
Message delivered to topic=topic_events partition=2 offset=24
c1
Message delivered to topic=topic_events partition=1 offset=25
d1
Message delivered to topic=topic_events partition=0 offset=24
e1
Message delivered to topic=topic_events partition=0 offset=25
f1
Message delivered to topic=topic_events partition=1 offset=26
g1
Message delivered to topic=topic_events partition=2 offset=25
h1
Message delivered to topic=topic_events partition=2 offset=26
j1
Message delivered to topic=topic_events partition=2 offset=27
k1
Message delivered to topic=topic_events partition=1 offset=27
l1
Message delivered to topic=topic_events partition=0 offset=26
m1
Message delivered to topic=topic_events partition=1 offset=28
n1
Message delivered to topic=topic_events partition=2 offset=28
o1
Message delivered to topic=topic_events partition=1 offset=29
p1
Message delivered to topic=topic_events partition=2 offset=29
