In [15]:

from confluent_kafka import Producer, Consumer, KafkaError
import json, socket


**Reading json file**

In [16]:

data = ''
with open('data.json', 'r') as file:
        data = json.load(file)


**Producers: Inventory Orders and Delivery Orders**

In [17]:
# setting configurations

conf = {'bootstrap.servers': 'localhost:8888', 'client.id': socket.gethostname()}

inventory_producer = Producer(conf)
delivery_producer = Producer(conf)

# setting topics
inventory_topic = 'inventory'
delivery_topic = 'delivery'

In [18]:
# call back function for message error handling 

def acked(err, msg):
    if err is not None:
        print('Message  delivery failed:', err)
    else:
        print("Message produced: %s" % (str(msg)))
        print('Message delivered to topic:', msg.topic(), 'partition:', msg.partition(), 'offset:', msg.offset())


In [19]:
# Function to produce a message for inventory orders
def produce_inventory_order(order):
    message = json.dumps(order)
    inventory_producer.produce(inventory_topic, message.encode('utf-8'), callback = acked)

# Function to produce a message for delivery orders
def produce_delivery_order(order):
    message = json.dumps(order)
    delivery_producer.produce(delivery_topic, message.encode('utf-8'), callback = acked)

**Message filtering logic**

In [20]:
# Produce messages based on type

for order in data:
    if order['type'] == 'inventory':
        produce_inventory_order(order)
    elif order['type'] == 'delivery':
        produce_delivery_order(order)
    

inventory_producer.poll(), delivery_producer.poll()

(2, 2)

**Consumers: Inventory Data and Delivery Data**

In [21]:
# setting configurations

conf2 = {'bootstrap.servers': 'host1:8888, host2:8888',
        'group.id': 'orders',
        'auto.offset.reset': 'smallest', 'client.id': socket.gethostname()}

inventory_consumer = Consumer(conf2)
delivery_consumer = Consumer(conf2)

In [22]:
def consume(inventory_consumer, delivery_consumer, inventory_topic, delivery_topic):
    
    msg_text = ''
    try:
        inventory_consumer.subscribe([inventory_topic])
        delivery_consumer.subscribe([delivery_topic])


        while True:
            msg = inventory_consumer.poll(timeout=1.0)
            if msg is None: 
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print('error')
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                temp = msg.value().decode('utf-8')
                print(f"Message: {temp}")
                msg_text += '\n' + temp
                
            msg = order_consumer.poll(timeout=1.0)

            if msg is None: 
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print('error')
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                print(f"Message: {msg.value().decode('utf-8')}")
                pass
                
    except KeyboardInterrupt:
        pass
    finally:
        inventory_consumer.close()
        delivery_consumer.close()

In [23]:
consume(inventory_consumer, delivery_consumer, inventory_topic, delivery_topic)

In [24]:
inventory_consumer.close()
delivery_consumer.close()