# Orders Lambda Batch

In [1]:
import kafka

import pprint

from IPython.display import clear_output

import time

## Subscribe and read all the messages from the topic;  we can run this muliple times and always get all the messages; there can be an unlimited number of subscribers

In [2]:
topic = "orders_pub_sub"

subscriber_read_all = kafka.KafkaConsumer(topic, 
                                          bootstrap_servers=['kafka:29092'], 
                                          auto_offset_reset='earliest')


In [3]:
if subscriber_read_all.assignment():

    subscriber_read_all.seek_to_beginning()

message_list = []

while (True):
    
    poll_result = subscriber_read_all.poll(timeout_ms=500)
    
    if poll_result == {}:
        break

    items = poll_result.items()

    for (topic, messages) in items:
    
        for message in messages:
            
            message_list.append([message.offset, message.value])
            
i = 0

for message in message_list:
    
    if (i < 5) or (i > len(message_list) - 6):
        print("Offset:", message[0], "   Value:", message[1][:75])
   
    if i == 5:
        print("\n... only showing a max of first 5 and max of last 5 ... \n")
        
    i += 1
    
    

Offset: 0    Value: b'{"order_id": 13928, "sub_total": 859.5, "tax": 68.76, "total": 928.26, "lin'
Offset: 1    Value: b'{"order_id": 13929, "sub_total": 379.6, "tax": 30.37, "total": 409.97, "lin'
Offset: 2    Value: b'{"order_id": 13930, "sub_total": 269.55, "tax": 21.56, "total": 291.11, "li'
Offset: 3    Value: b'{"order_id": 13931, "sub_total": 2608.45, "tax": 208.68, "total": 2817.13, '
Offset: 4    Value: b'{"order_id": 13932, "sub_total": 2508.6, "tax": 200.69, "total": 2709.29, "'

... only showing a max of first 5 and max of last 5 ... 

Offset: 666    Value: b'{"order_id": 14594, "sub_total": 739.7, "tax": 59.18, "total": 798.88, "lin'
Offset: 667    Value: b'{"order_id": 14595, "sub_total": 149.75, "tax": 11.98, "total": 161.73, "li'
Offset: 668    Value: b'{"order_id": 14596, "sub_total": 2019.1, "tax": 161.53, "total": 2180.63, "'
Offset: 669    Value: b'{"order_id": 14597, "sub_total": 149.75, "tax": 11.98, "total": 161.73, "li'
Offset: 670    Value: b'{"order_id": 14598

## Subscribe and read in batch mode;  read all the messages from the topic the first time we read; read only new messages on subsequent reads; we have a defined batch interval, such as every day, every hour, every 10 minutes, every 1 minute; here we will use 5 seconds not to waste time waiting;  Zookeeper will keep track of the offsets for us

In [4]:
topic = "orders_pub_sub"

subscriber_batch = kafka.KafkaConsumer(topic, 
                                       bootstrap_servers=['kafka:29092'], 
                                       auto_offset_reset='earliest')


In [5]:
batch_time_interval = 5.0

if subscriber_batch.assignment():

    subscriber_batch.seek_to_beginning()

batch_number = 1

message_list = []

while (True):
    
    poll_result = subscriber_batch.poll(timeout_ms=500)
    
    if poll_result == {}:
        
        if len(message_list) > 0:
            
            clear_output(wait=True)
            
            print("\n=================================")
            print("   Orders Lambda Batch Process")
            print("=================================\n")
            print("\n------------------------")
            print("   Batch ", batch_number)
            print("------------------------\n\n")
            
            for message in message_list:
                
                print("Offset:", message[0], "   Value:", message[1][:75])
                
            message_list = []
            
            batch_number += 1
            
        time.sleep(batch_time_interval)
              
    else:

        items = poll_result.items()

        for (topic, messages) in items:
    
            for message in messages:
        
                message_list.append([message.offset, message.value])



   Orders Lambda Batch Process


------------------------
   Batch  34
------------------------


Offset: 761    Value: b'{"order_id": 14689, "sub_total": 89.85, "tax": 7.19, "total": 97.04, "line_'
Offset: 762    Value: b'{"order_id": 14690, "sub_total": 1399.45, "tax": 111.96, "total": 1511.41, '
Offset: 763    Value: b'{"order_id": 14691, "sub_total": 269.55, "tax": 21.56, "total": 291.11, "li'


KeyboardInterrupt: 

## You try it - demonstrate that 2 or more subscribers can subscribe to the same topic at the same time and both receive the same data;  make 1 or more copies of orders_lambda_batch and run multiple subscribers, both reading all and reading in batch mode