![Kafka Example](kafka_example.JPG)

# Order Process

# Lab: Kafka - Publisher / Subscriber Model

In [1]:
import kafka

import time

import numpy as np

import json

import pprint

from IPython.display import clear_output


## List existing topics (if any); Delete existing topics from prior runs

In [2]:
admin_client = kafka.KafkaAdminClient(bootstrap_servers=['kafka:29092'])

In [3]:
admin_client.list_topics()

['back_order_pub_sub',
 'fulfillment_queue',
 'fulfillment_pub_sub',
 'stock_check_stream',
 'back_order_stream',
 'orders_stream',
 'stock_check_queue',
 'stock_check_pub_sub',
 'orders_pub_sub',
 'back_order_queue',
 'fulfillment_stream']

In [4]:
admin_client.delete_topics(admin_client.list_topics(), timeout_ms=25000)

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='back_order_pub_sub', error_code=0), (topic='fulfillment_queue', error_code=0), (topic='fulfillment_pub_sub', error_code=0), (topic='stock_check_stream', error_code=0), (topic='back_order_stream', error_code=0), (topic='orders_stream', error_code=0), (topic='stock_check_queue', error_code=0), (topic='stock_check_pub_sub', error_code=0), (topic='orders_pub_sub', error_code=0), (topic='back_order_queue', error_code=0), (topic='fulfillment_stream', error_code=0)])

In [5]:
admin_client.list_topics()

[]

## Create the topics needed for this example from the above diagram

In [6]:
topic_list = []

topic_list.append(kafka.admin.NewTopic(name="orders_pub_sub", num_partitions=1, replication_factor=1))
topic_list.append(kafka.admin.NewTopic(name="orders_stream", num_partitions=1, replication_factor=1))

topic_list.append(kafka.admin.NewTopic(name="stock_check_queue", num_partitions=1, replication_factor=1))
topic_list.append(kafka.admin.NewTopic(name="stock_check_pub_sub", num_partitions=1, replication_factor=1))
topic_list.append(kafka.admin.NewTopic(name="stock_check_stream", num_partitions=1, replication_factor=1))

topic_list.append(kafka.admin.NewTopic(name="back_order_queue", num_partitions=1, replication_factor=1))
topic_list.append(kafka.admin.NewTopic(name="back_order_pub_sub", num_partitions=1, replication_factor=1))
topic_list.append(kafka.admin.NewTopic(name="back_order_stream", num_partitions=1, replication_factor=1))

topic_list.append(kafka.admin.NewTopic(name="fulfillment_queue", num_partitions=1, replication_factor=1))
topic_list.append(kafka.admin.NewTopic(name="fulfillment_pub_sub", num_partitions=1, replication_factor=1))
topic_list.append(kafka.admin.NewTopic(name="fulfillment_stream", num_partitions=1, replication_factor=1))


In [7]:
admin_client.create_topics(topic_list)

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='orders_pub_sub', error_code=0, error_message=None), (topic='orders_stream', error_code=0, error_message=None), (topic='stock_check_queue', error_code=0, error_message=None), (topic='stock_check_pub_sub', error_code=0, error_message=None), (topic='stock_check_stream', error_code=0, error_message=None), (topic='back_order_queue', error_code=0, error_message=None), (topic='back_order_pub_sub', error_code=0, error_message=None), (topic='back_order_stream', error_code=0, error_message=None), (topic='fulfillment_queue', error_code=0, error_message=None), (topic='fulfillment_pub_sub', error_code=0, error_message=None), (topic='fulfillment_stream', error_code=0, error_message=None)])

In [8]:
admin_client.list_topics()

['back_order_pub_sub',
 'fulfillment_queue',
 'fulfillment_pub_sub',
 'stock_check_stream',
 'back_order_stream',
 'orders_stream',
 'stock_check_queue',
 'stock_check_pub_sub',
 'orders_pub_sub',
 'back_order_queue',
 'fulfillment_stream']

## Loop and generate orders; publish the orders to the topic orders_pub_sub

In [9]:
producer = kafka.KafkaProducer(bootstrap_servers=['kafka:29092'])


In [10]:
order_id = 13927

product_list = [
                {'product_id': 1, 'product_name': 'ARM Board', 'product_price': 99.95},
                {'product_id': 2, 'product_name': '8 GiB RAM DIMM', 'product_price': 29.95},
                {'product_id': 3, 'product_name': '16 GiB RAM DIMM', 'product_price': 69.95},
                {'product_id': 4, 'product_name': '32 GiB RAM DIMM', 'product_price': 149.95}
               ]

## Infinite loop - to stop: menu bar => Kernel => Interrupt

In [11]:
while (True):
    
    order_id += 1
    
    num_line_items = np.random.randint(1,5)
    
    random_products = np.random.choice(product_list, size=num_line_items, replace=False)
    
    sorted_random_products = sorted(random_products, key = lambda k: k['product_id'])
    
    line_items = []
    
    sub_total = 0
    
    for product in sorted_random_products:
        
        quantity = np.random.randint(1,13)
        line_item_total = round(quantity * product["product_price"], 2)
        sub_total += line_item_total
        
        line_item = {}
        line_item["product_id"] = product["product_id"]
        line_item["product_name"] = product["product_name"]
        line_item["product_price"] = product["product_price"]
        line_item["quantity"] = quantity
        line_item["total"] = line_item_total
        
        line_items.append(line_item)
    
    sub_total = round(sub_total, 2)
    
    tax = round(sub_total * 0.08, 2)
    
    total = round(sub_total + tax, 2)
    
    order = {"order_id": order_id,
             "sub_total": sub_total,
             "tax": tax,
             "total": total,
             "line_items": line_items}
    
    clear_output(wait=True)
    print("\n=================================")
    print("   Order Process")
    print("=================================\n")    
    pprint.pprint(order, sort_dicts=False)
    
    producer.send("orders_pub_sub", json.dumps(order).encode())
    
    time.sleep(2.0)
    


   Order Process

{'order_id': 14374,
 'sub_total': 739.55,
 'tax': 59.16,
 'total': 798.71,
 'line_items': [{'product_id': 1,
                 'product_name': 'ARM Board',
                 'product_price': 99.95,
                 'quantity': 1,
                 'total': 99.95},
                {'product_id': 2,
                 'product_name': '8 GiB RAM DIMM',
                 'product_price': 29.95,
                 'quantity': 2,
                 'total': 59.9},
                {'product_id': 3,
                 'product_name': '16 GiB RAM DIMM',
                 'product_price': 69.95,
                 'quantity': 4,
                 'total': 279.8},
                {'product_id': 4,
                 'product_name': '32 GiB RAM DIMM',
                 'product_price': 149.95,
                 'quantity': 2,
                 'total': 299.9}]}


KeyboardInterrupt: 

## You try it - demonstrate that 2 or more subscribers can subscribe to the same topic at the same time and both receive the same data;  make 1 or more copies of orders_lambda_batch and run multiple subscribers, both reading all and reading in batch mode

# Lab: Kafka - Producer / Consumer Model

## Modify the previous code to also write to the stock_check_queue

In [13]:
while (True):
    
    order_id += 1
    
    num_line_items = np.random.randint(1,5)
    
    random_products = np.random.choice(product_list, size=num_line_items, replace=False)
    
    sorted_random_products = sorted(random_products, key = lambda k: k['product_id'])
    
    line_items = []
    
    sub_total = 0
    
    for product in sorted_random_products:
        
        quantity = np.random.randint(1,13)
        line_item_total = round(quantity * product["product_price"], 2)
        sub_total += line_item_total
        
        line_item = {}
        line_item["product_id"] = product["product_id"]
        line_item["product_name"] = product["product_name"]
        line_item["product_price"] = product["product_price"]
        line_item["quantity"] = quantity
        line_item["total"] = line_item_total
        
        line_items.append(line_item)
    
    sub_total = round(sub_total, 2)
    
    tax = round(sub_total * 0.08, 2)
    
    total = round(sub_total + tax, 2)
    
    order = {"order_id": order_id,
             "sub_total": sub_total,
             "tax": tax,
             "total": total,
             "line_items": line_items}
    
    clear_output(wait=True)
    print("\n=================================")
    print("   Order Process")
    print("=================================\n")
    pprint.pprint(order, sort_dicts=False)
    
    producer.send("orders_pub_sub", json.dumps(order).encode())
    
    producer.send("stock_check_queue", json.dumps(order).encode())
    
    time.sleep(2.0)
    


   Order Process

{'order_id': 15240,
 'sub_total': 749.75,
 'tax': 59.98,
 'total': 809.73,
 'line_items': [{'product_id': 4,
                 'product_name': '32 GiB RAM DIMM',
                 'product_price': 149.95,
                 'quantity': 5,
                 'total': 749.75}]}


KeyboardInterrupt: 

## Run the following processes to complete the producer / consumer processes and queues: stock_check_process, back_order_process, and fulfillment_process

## You try it - 

## Modify stock_check_process to publish to the stock_check_pub_sub topic; create a new jupyter notebook stock_check_lambda_batch to subscribe to stock_check_pub_sub in batch mode; solutions can be found in stock_check_process_solutions and stock_check_lambda_batch_solutions

## Modify back_order_process to publish to the back_order_pub_sub topic;  create a new jupyter notebook back_order_lambda_batch to subscribe to back_order_pub_sub in batch mode; solutions can be found in back_order_process_solutions and back_order_lambda_batch_solutions

## Modify fulfillment_process to publish to the fulfillment_pub_sub topic; create a new jupyter notebook fulfillment_lambda_batch to subscribe to fulfillment_pub_sub in batch mode; solutions can be found in fulfillment_process_solutions and fulfillment_lambda_batch_solutions

# Lab: Kafka - Streaming Model

## Modify the previous code to also stream to orders_stream

In [14]:
while (True):
    
    order_id += 1
    
    num_line_items = np.random.randint(1,5)
    
    random_products = np.random.choice(product_list, size=num_line_items, replace=False)
    
    sorted_random_products = sorted(random_products, key = lambda k: k['product_id'])
    
    line_items = []
    
    sub_total = 0
    
    for product in sorted_random_products:
        
        quantity = np.random.randint(1,13)
        line_item_total = round(quantity * product["product_price"], 2)
        sub_total += line_item_total
        
        line_item = {}
        line_item["product_id"] = product["product_id"]
        line_item["product_name"] = product["product_name"]
        line_item["product_price"] = product["product_price"]
        line_item["quantity"] = quantity
        line_item["total"] = line_item_total
        
        line_items.append(line_item)
    
    sub_total = round(sub_total, 2)
    
    tax = round(sub_total * 0.08, 2)
    
    total = round(sub_total + tax, 2)
    
    order = {"order_id": order_id,
             "sub_total": sub_total,
             "tax": tax,
             "total": total,
             "line_items": line_items}
    
    clear_output(wait=True)
    print("\n=================================")
    print("   Order Process")
    print("=================================\n")
    pprint.pprint(order, sort_dicts=False)
    
    producer.send("orders_pub_sub", json.dumps(order).encode())
    
    producer.send("stock_check_queue", json.dumps(order).encode())
    
    producer.send("orders_stream", json.dumps(order).encode())
    
    time.sleep(2.0)
    


   Order Process

{'order_id': 15763,
 'sub_total': 829.3,
 'tax': 66.34,
 'total': 895.64,
 'line_items': [{'product_id': 1,
                 'product_name': 'ARM Board',
                 'product_price': 99.95,
                 'quantity': 3,
                 'total': 299.85},
                {'product_id': 2,
                 'product_name': '8 GiB RAM DIMM',
                 'product_price': 29.95,
                 'quantity': 6,
                 'total': 179.7},
                {'product_id': 3,
                 'product_name': '16 GiB RAM DIMM',
                 'product_price': 69.95,
                 'quantity': 5,
                 'total': 349.75}]}


KeyboardInterrupt: 

## You try it - 

## Modify stock_check_process to stream to the stock_check_stream topic; create a new jupyter notebook stock_check_lambda_speed to subscribe to stock_check_stream in streaming mode; solutions can be found in stock_check_process_solutions and stock_check_lambda_speed_solutions

## Modify back_order_process to publish to the back_order_stream topic;  create a new jupyter notebook back_order_lambda_speed to subscribe to back_order_stream in streaming mode; solutions can be found in back_order_process_solutions and back_order_lambda_speed_solutions

## Modify fulfillment_process to publish to the fulfillment_stream topic; create a new jupyter notebook fulfillment_lambda_speed to subscribe to fulfillment_stream in batch mode; solutions can be found in fulfillment_process_solutions and fulfillment_lambda_speed_solutions