<a href="https://colab.research.google.com/github/vrishmi/BigData2/blob/master/kafka_BD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
!pip install kafka-python


Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/246.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━[0m [32m204.8/246.5 kB[0m [31m6.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [8]:
pip install confluent-kafka



###1. Implement Kafka Producers:
Producer 1: Inventory Orders Producer  This producer sends messages related to inventory checks.  It should filter and send only messages where type is inventory.

Producer 2: Delivery Orders Producer  This producer handles messages related to the delivery of orders.

In [14]:
from confluent_kafka import Producer as KP
import socket as S

# Configuration common to both producers
config = {
    'bootstrap.servers': "localhost:9092",
    'client.id': S.gethostname()
}

# Function to acknowledge message delivery
def acked(err, msg):
    if err is not None:
        print("Error delivering message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message sent successfully: %s" % (str(msg)))

# Inventory Orders Producer
producer_inv = KP(config)
def send_inv_msg(data):
    producer_inv.produce('inventory_topic', key=str(data['order_id']), value=str(data), callback=acked)
    producer_inv.poll(0)
    print("Sent inventory message:", data)

# Delivery Orders Producer
producer_del = KP(config)
def send_del_msg(data):
    producer_del.produce('delivery_topic', key=str(data['order_id']), value=str(data), callback=acked)
    producer_del.poll(0)
    print("Sent delivery message:", data)

# Ensure all messages are sent
producer_inv.flush()
producer_del.flush()


0

###2. Implement Kafka Consumers:
Consumer 1: Inventory Data Consumer  This consumer is responsible for handling inventory data.  It should listen for messages where type is inventory and process them to update
inventory databases or systems accordingly.

In [15]:
from confluent_kafka import Consumer as KC, KafkaError as KE

# Common configuration for both consumers
config_cons = {
    'bootstrap.servers': "localhost:9092",
    'group.id': "group1",
    'auto.offset.reset': 'earliest'
}

# Inventory Data Consumer
cons_inv = KC(config_cons)
cons_inv.subscribe(['inventory_topic'])

def consume_inv():
    while True:
        msg = cons_inv.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KE._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        print('Received inventory message: {}'.format(msg.value()))

# Delivery Data Consumer
cons_del = KC(config_cons)
cons_del.subscribe(['delivery_topic'])

def consume_del():
    while True:
        msg = cons_del.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KE._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        print('Received delivery message: {}'.format(msg.value()))


Consumer 2: Delivery Data Consumer  This consumer manages delivery tasks.  It should listen for messages where type is delivery and perform actions such as
scheduling deliveries, updating delivery status, and notifying customers. 3. Develop Message Filtering Logic  Develop the logic for each producer to filter messages based on the type field

In [16]:
from confluent_kafka import Consumer as KCons, KafkaError as KErr
import threading as T

# Common configuration for both consumers
config_cons = {
    'bootstrap.servers': "localhost:9092",
    'group.id': "group1",
    'auto.offset.reset': 'earliest'
}

# Inventory Data Consumer
cons_inv = KCons(config_cons)
cons_inv.subscribe(['inventory_topic'])

def consume_inv():
    while True:
        msg = cons_inv.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KErr._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        print('Received inventory message: {}'.format(msg.value()))

# Delivery Data Consumer
cons_del = KCons(config_cons)
cons_del.subscribe(['delivery_topic'])

def consume_del():
    while True:
        msg = cons_del.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KErr._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break
        print('Received delivery message: {}'.format(msg.value()))

# Start consuming messages in separate threads
inv_thread = T.Thread(target=consume_inv)
del_thread = T.Thread(target=consume_del)

inv_thread.start()
del_thread.start()


In [17]:
# Inventory Data Consumer
cons_inv = KCons(config_cons)
cons_inv.subscribe(['inventory_topic'])

# Delivery Data Consumer
cons_del = KCons(config_cons)
cons_del.subscribe(['delivery_topic'])


 ### 3. Develop Message Filtering Logic 

Develop the logic for each producer to filter messages based on the type field from the JSON data before sending them to the Kafka topic.  Ensure consumers can correctly identify and process only the messages that are
relevant to them. 4. Submission Guideline:  You are required to submit

In [19]:
import json as j
import zipfile as z

# Assuming send_inventory_message and send_delivery_message functions are defined

with open('/content/data.json', 'r') as f:
    orders = j.load(f)
    for o in orders:
        if o['type'] == 'inventory':
            send_inv_msg(o)
            print(f"Sent inventory order: {o}")
        elif o['type'] == 'delivery':
            send_del_msg(o)
            print(f"Sent delivery order: {o}")


Sent inventory message: {'order_id': '56', 'product_id': '700', 'quantity': 63, 'type': 'inventory', 'timestamp': '2/22/2024'}
Sent inventory order: {'order_id': '56', 'product_id': '700', 'quantity': 63, 'type': 'inventory', 'timestamp': '2/22/2024'}
Sent delivery message: {'order_id': '819', 'product_id': '50', 'quantity': 60, 'type': 'delivery', 'timestamp': '2/14/2024'}
Sent delivery order: {'order_id': '819', 'product_id': '50', 'quantity': 60, 'type': 'delivery', 'timestamp': '2/14/2024'}
Sent inventory message: {'order_id': '273', 'product_id': '7', 'quantity': 90, 'type': 'inventory', 'timestamp': '5/9/2023'}
Sent inventory order: {'order_id': '273', 'product_id': '7', 'quantity': 90, 'type': 'inventory', 'timestamp': '5/9/2023'}
Sent inventory message: {'order_id': '77591', 'product_id': '11', 'quantity': 18, 'type': 'inventory', 'timestamp': '5/30/2023'}
Sent inventory order: {'order_id': '77591', 'product_id': '11', 'quantity': 18, 'type': 'inventory', 'timestamp': '5/30/202