# 1. Imports

In [1]:
from confluent_kafka.admin import AdminClient
import logging
import json
import time
from confluent_kafka import Producer
import typing
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("producer")

config = json.load(open("config.json"))
print(json.dumps(config, indent=2))

{
  "global": {
    "kafka_bootstrap_servers": "kafka:9092",
    "kafka_topic": "test-structured-streaming",
    "kafka_consumer_group": "ss_job",
    "max_records_per_batch": 20
  }
}


# 2. Data Generator

In [2]:
def data_generator() -> str:
    # Read data from json (data.json)
    # Push 1 record at a time - Make use of python 'yield' operator
    ...
    
    f = open('data.json')
    d = json.load(f)
    
    for i in d:
        yield str(i)

# Push data to kafka
data = data_generator()

# 3. Build Kafka Producer class

In [3]:
broker_conf = {
    'bootstrap.servers': config["global"]["kafka_bootstrap_servers"],
}

producer_conf = {**broker_conf}

class KafkaProducer(object):
    def __init__(self, broker_conf, debug=False):
        self.broker_conf = broker_conf
        self.producer = Producer(self.broker_conf)
        self.debug = debug

    def delivery_report(self, err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            logger.error('Message delivery failed: {}'.format(err))
        else:
            # if self.debug:
            logger.info('Message delivered to topic: {} [parition={}]'.format(msg.topic(), msg.partition()))

    def push(self, data: typing.List[str], topic: str, delay_in_secs: int = 0):
        start_time = time.time()
        num_records = 0
        
        for i in list(data):
            self.producer.poll(0)
            self.producer.produce(topic, i.encode('utf-8'), callback=self.delivery_report)
            num_records += 1
        self.producer.flush()
        
        end_time = time.time()
        completion_time = end_time-start_time
        logger.info(
            f"Pushed {num_records} records with {delay_in_secs} secs delay. Task completed in {completion_time:.2f} secs")
        return num_records, completion_time

# 4. Push data to kafka

In [4]:
kafka_produce = KafkaProducer(broker_conf=broker_conf, debug=True)
num_records, _ = kafka_produce.push(data=data,  topic=config["global"]["kafka_topic"], delay_in_secs=0)

INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Message delivered to topic: test-structured-streaming [parition=0]
INFO:producer:Pushed 10 records with 0 secs delay. Task completed in 4.12 secs
