In [None]:
from kafka import KafkaConsumer
import pandas as pd
import json
import time
from IPython.display import display, clear_output
import threading

# Configuration
topic_name = 'RandomNumber'
kafka_server = 'localhost:9092'
batch_size = 50  # Number of messages to process in each batch
refresh_interval = 5  # Seconds between batch processing

# Data storage
message_buffer = []
processed_data = pd.DataFrame()
running = True

# Create consumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=kafka_server,
    auto_offset_reset='earliest',
    group_id='batch_processor',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

def process_batch():
    """Process the current batch of messages"""
    global message_buffer, processed_data
    
    if not message_buffer:
        return
    
    # Create a dataframe from the current batch
    batch_df = pd.DataFrame(message_buffer)
    
    # Perform analytics (similar to what you might do with Spark)
    if not batch_df.empty:
        # Add is_even column (similar to the Spark example)
        batch_df['is_even'] = batch_df['number'] % 2 == 0
        
        # Calculate statistics
        stats = {
            'count': len(batch_df),
            'avg': batch_df['number'].mean(),
            'max': batch_df['number'].max(),
            'min': batch_df['number'].min(),
            'even_count': batch_df['is_even'].sum(),
            'odd_count': len(batch_df) - batch_df['is_even'].sum()
        }
        
        # Append to processed data
        processed_data = pd.concat([processed_data, batch_df])
        
        # Clear the buffer
        message_buffer.clear()
        
        # Display results
        clear_output(wait=True)
        print(f"--- Batch Processing Results (Updated every {refresh_interval} seconds) ---")
        print(f"Total records processed: {len(processed_data)}")
        print("\nCurrent Batch Statistics:")
        for k, v in stats.items():
            print(f"  {k}: {v}")
        
        print("\nLast 10 records:")
        display(processed_data.tail(10))
        
        print("\nAggregate Statistics:")
        print(f"  Overall average: {processed_data['number'].mean():.2f}")
        print(f"  Even numbers: {processed_data['is_even'].sum()} ({processed_data['is_even'].mean()*100:.1f}%)")
        print(f"  Odd numbers: {len(processed_data) - processed_data['is_even'].sum()} ({(1-processed_data['is_even'].mean())*100:.1f}%)")

def batch_processor():
    """Thread function to periodically process batches"""
    global running
    while running:
        process_batch()
        time.sleep(refresh_interval)

# Start the batch processor thread
processor_thread = threading.Thread(target=batch_processor)
processor_thread.start()

try:
    print(f"Starting to consume messages from {topic_name}...")
    print(f"Will process in batches every {refresh_interval} seconds")
    print("Press Ctrl+C to stop")
    
    # Main loop to consume messages
    for message in consumer:
        message_buffer.append(message.value)
        
        # Process immediately if we've reached batch_size
        if len(message_buffer) >= batch_size:
            process_batch()
            
except KeyboardInterrupt:
    print("\nStopping batch processor...")
    running = False
    processor_thread.join()
    consumer.close()
    print("Batch processor stopped")


--- Batch Processing Results (Updated every 5 seconds) ---
Total records processed: 622

Current Batch Statistics:
  count: 1
  avg: 10.0
  max: 10
  min: 10
  even_count: 1
  odd_count: 0

Last 10 records:


Unnamed: 0,number,is_even
0,55,False
0,82,True
0,5,False
0,71,False
0,15,False
0,17,False
0,24,True
0,45,False
0,98,True
0,10,True
