# Step 5: Kafka Consumer & Real-Time Analysis App

**Objective:**  
Consume streamed telemetry, run analytics or ML on each incoming message.

**Instructions:**
- Implement Python consumer subscribing to "f1-speed-stream"
- On message receipt: Parse JSON, run analytics/model inference, record latency
- Validate: Consumer receives messages, analysis completes, latency logged


In [None]:
# Import required libraries
import sys
import os
import time
from pathlib import Path

# Add project root to Python path
# In Jupyter, getcwd() typically returns the project root
# If not, navigate up from notebooks/ directory
current_dir = os.getcwd()
if os.path.basename(current_dir) == 'notebooks':
    # We're in notebooks/ directory, go up one level
    project_root = Path(os.path.dirname(current_dir))
else:
    # We're already at project root
    project_root = Path(current_dir)

# Add to path if not already there
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from src.kafka_consumer import RealTimeConsumer


âœ… Imports successful

AUTO-STARTING PRODUCER FOR CONTINUOUS STREAMING
âœ… Producer is already running in background
   Continuous data streaming is active!


In [None]:
# Initialize consumer
consumer = RealTimeConsumer(config_path="../config/config.yaml")


âœ… Consumer initialized


In [3]:
# ============================================================================
# CONSUME MESSAGES: Real-Time Processing
# ============================================================================
# The producer is now streaming data continuously in the background
# This consumer will process messages as they arrive in real-time

print("=" * 70)
print("STARTING REAL-TIME MESSAGE CONSUMPTION")
print("=" * 70)
print("\nðŸ“¥ Consumer is ready to process messages from Kafka")
print("   Producer is streaming data continuously in the background")
print("   This will consume and analyze messages in real-time...\n")

# Consume messages (adjust timeout as needed)
# Set timeout_seconds=None for continuous consumption (until interrupted)
# Or set a specific duration for testing
stats = consumer.consume_messages(timeout_seconds=60, max_messages=5000)  # 60 seconds or 5000 messages

print("\nðŸ“Š Consumption Statistics:")
print(f"  Messages processed: {stats.get('messages_processed', 0)}")
print(f"  Duration: {stats.get('duration_seconds', 0):.2f} seconds")
print(f"  Avg rate: {stats.get('avg_rate_messages_per_sec', 0):.2f} msg/s")
print(f"  Avg latency: {stats.get('avg_latency_ms', 0):.2f} ms")
print(f"  Max latency: {stats.get('max_latency_ms', 0):.2f} ms")
print(f"  Total anomalies: {stats.get('total_anomalies', 0)}")
print(f"  Success: {stats.get('success', False)}")


INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=bootstrap-0 host=localhost:9092 <checking_api_versions_recv> [IPv6 ('::1', 9092, 0, 0)]>: Broker version identified as 2.6
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('f1-speed-stream',)
INFO:src.kafka_consumer:Connected to Kafka at localhost:9092, subscribed to 'f1-speed-stream'
INFO:src.kafka_consumer:ML models loaded successfully
INFO:src.kafka_consumer:Starting message consumption... (timeout: 60s, max_messages: 5000)
INFO:src.kafka_consumer:Using poll timeout: 1000ms
INFO:kafka.consumer.subscription_state:Updated partit

STARTING REAL-TIME MESSAGE CONSUMPTION

ðŸ“¥ Consumer is ready to process messages from Kafka
   Producer is streaming data continuously in the background
   This will consume and analyze messages in real-time...



INFO:src.kafka_consumer:Processed 500 messages in 0.2s (2822.9 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 600 messages in 0.2s (2841.9 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 700 messages in 0.2s (3164.1 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 800 messages in 0.2s (3512.9 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 900 messages in 0.2s (3849.4 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 1000 messages in 0.2s (4248.9 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 1100 messages in 0.3s (3903.2 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 1200 messages in 0.3s (4202.5 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 1300 messages in 0.3s (4467.1 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 1400 messages in 0.3s (4801.9 msg/s, avg latency: 0.00 ms)
INFO:src.kafka_consumer:Processed 1500 messages in 0.3s (5003.2 msg/s, avg 


ðŸ“Š Consumption Statistics:
  Messages processed: 5000
  Duration: 0.64 seconds
  Avg rate: 7789.97 msg/s
  Avg latency: 0.00 ms
  Max latency: 0.29 ms
  Total anomalies: 590
  Success: True


In [4]:
# View analysis results
results = consumer.get_analysis_results()
print(f"\nTotal analysis results: {len(results)}")

if results:
    print("\nSample analysis result:")
    sample = results[0]
    print(f"  Driver: {sample['message'].get('driver', 'N/A')}")
    print(f"  Speed: {sample['message'].get('speed', 0):.1f} km/h")
    print(f"  Anomalies: {len(sample['analysis'].get('anomalies', []))}")
    print(f"  Predictions: {sample['analysis'].get('predictions', {})}")
    print(f"  Processing time: {sample['processing_time']*1000:.2f} ms")
    
    # Check latency stats
    latency_stats = consumer.get_latency_stats()
    if latency_stats:
        print(f"\nLatency Statistics:")
        print(f"  Avg: {latency_stats.get('avg', 0):.2f} ms")
        print(f"  Min: {latency_stats.get('min', 0):.2f} ms")
        print(f"  Max: {latency_stats.get('max', 0):.2f} ms")
        print(f"  Count: {latency_stats.get('count', 0)}")

# Close consumer
consumer.close()

print("\n" + "=" * 70)
print("âœ… Step 5 Complete: Kafka consumer analysis successful!")
print("=" * 70)

# Note: Producer is still running in background
# To stop it, run: from src.producer_manager import stop_producer; stop_producer()
# Or use notebook 03_kafka_producer.ipynb (Cell 8)


INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=1 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. 
INFO:src.kafka_consumer:Kafka consumer closed



Total analysis results: 5000

Sample analysis result:
  Driver: SAR
  Speed: 0.0 km/h
  Anomalies: 0
  Predictions: {'crash_risk': 0.05, 'tire_failure': 0.03, 'pit_stop_probability': 0.15}
  Processing time: 0.01 ms

Latency Statistics:
  Avg: 0.00 ms
  Min: 0.00 ms
  Max: 0.29 ms
  Count: 1000

âœ… Step 5 Complete: Kafka consumer analysis successful!
