# Real-time Genetic Data Processing with Kafka

This notebook demonstrates real-time genetic data processing using Kafka integration.

## Features
- Connect to Kafka cluster
- Send genetic sequences for VEP annotation
- Receive and process annotated results
- Trigger KEDA scaling events
- Monitor cost attribution in real-time

In [None]:
# Import libraries
import os
import json
import time
import pandas as pd
import numpy as np
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import threading
import matplotlib.pyplot as plt
from datetime import datetime
import requests
from IPython.display import clear_output
import warnings
warnings.filterwarnings('ignore')

# Kafka configuration
KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 
                                   'genetic-data-cluster-kafka-bootstrap.healthcare-ml-demo.svc.cluster.local:9092')
RAW_TOPIC = 'genetic-data-raw'
ANNOTATED_TOPIC = 'genetic-data-annotated'

print(f"🔗 Kafka Bootstrap Servers: {KAFKA_BOOTSTRAP_SERVERS}")
print(f"📤 Raw Data Topic: {RAW_TOPIC}")
print(f"📥 Annotated Data Topic: {ANNOTATED_TOPIC}")

## 1. Kafka Connection Setup

In [None]:
def create_kafka_producer():
    """Create Kafka producer with error handling"""
    try:
        producer = KafkaProducer(
            bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',
            retries=3,
            max_in_flight_requests_per_connection=1
        )
        print("✅ Kafka producer created successfully")
        return producer
    except Exception as e:
        print(f"❌ Failed to create Kafka producer: {e}")
        return None

def create_kafka_consumer(topic, group_id='notebook-consumer'):
    """Create Kafka consumer with error handling"""
    try:
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='latest',
            enable_auto_commit=True
        )
        print(f"✅ Kafka consumer created for topic: {topic}")
        return consumer
    except Exception as e:
        print(f"❌ Failed to create Kafka consumer: {e}")
        return None

# Test connections
producer = create_kafka_producer()
consumer = create_kafka_consumer(ANNOTATED_TOPIC)

## 2. Generate and Send Genetic Data

In [None]:
def generate_genetic_sequence(length=100):
    """Generate a random DNA sequence"""
    bases = ['A', 'T', 'G', 'C']
    return ''.join(np.random.choice(bases, length))

def create_genetic_message(sequence_id, processing_mode='normal'):
    """Create a genetic data message for Kafka"""
    sequence = generate_genetic_sequence(np.random.randint(80, 200))
    
    return {
        'sequenceId': sequence_id,
        'sequence': sequence,
        'species': 'human',
        'assembly': 'GRCh38',
        'timestamp': datetime.now().isoformat(),
        'source': 'jupyter-notebook',
        'processingMode': processing_mode,
        'metadata': {
            'length': len(sequence),
            'gc_content': (sequence.count('G') + sequence.count('C')) / len(sequence) * 100,
            'notebook_session': 'genetic-analysis-demo'
        }
    }

# Test message creation
test_message = create_genetic_message('TEST_001')
print("📝 Sample genetic data message:")
print(json.dumps(test_message, indent=2))

In [None]:
def send_genetic_data_batch(producer, num_sequences=10, processing_mode='normal'):
    """Send a batch of genetic sequences to Kafka"""
    if not producer:
        print("❌ No producer available")
        return []
    
    sent_messages = []
    
    for i in range(num_sequences):
        sequence_id = f'SEQ_{datetime.now().strftime("%Y%m%d_%H%M%S")}_{i:03d}'
        message = create_genetic_message(sequence_id, processing_mode)
        
        try:
            # Send to Kafka
            future = producer.send(RAW_TOPIC, key=sequence_id, value=message)
            result = future.get(timeout=10)
            
            sent_messages.append({
                'sequence_id': sequence_id,
                'timestamp': message['timestamp'],
                'length': message['metadata']['length'],
                'gc_content': message['metadata']['gc_content'],
                'processing_mode': processing_mode
            })
            
            print(f"✅ Sent {sequence_id} (length: {message['metadata']['length']}, GC: {message['metadata']['gc_content']:.1f}%)")
            
        except KafkaError as e:
            print(f"❌ Failed to send {sequence_id}: {e}")
        
        # Small delay between messages
        time.sleep(0.1)
    
    producer.flush()
    return sent_messages

# Send a small batch for testing
print("📤 Sending test batch of genetic sequences...")
sent_sequences = send_genetic_data_batch(producer, num_sequences=5)
print(f"\n✅ Successfully sent {len(sent_sequences)} sequences")

## 3. Monitor Annotated Results

In [None]:
def monitor_annotated_results(consumer, duration_seconds=30):
    """Monitor and collect annotated results from Kafka"""
    if not consumer:
        print("❌ No consumer available")
        return []
    
    print(f"👂 Monitoring annotated results for {duration_seconds} seconds...")
    
    results = []
    start_time = time.time()
    
    try:
        while time.time() - start_time < duration_seconds:
            # Poll for messages with timeout
            message_batch = consumer.poll(timeout_ms=1000)
            
            for topic_partition, messages in message_batch.items():
                for message in messages:
                    try:
                        data = message.value
                        results.append({
                            'sequence_id': data.get('sequenceId', 'unknown'),
                            'status': data.get('status', 'unknown'),
                            'variant_count': data.get('variantCount', 0),
                            'processing_time': data.get('processingTime', 0),
                            'timestamp': data.get('timestamp', ''),
                            'source': data.get('source', ''),
                            'message': data.get('message', '')
                        })
                        
                        print(f"📥 Received: {data.get('sequenceId')} - {data.get('status')} - {data.get('variantCount', 0)} variants")
                        
                    except Exception as e:
                        print(f"⚠️ Error processing message: {e}")
            
            # Show progress
            elapsed = time.time() - start_time
            remaining = duration_seconds - elapsed
            if int(elapsed) % 5 == 0:  # Update every 5 seconds
                print(f"⏱️ Monitoring... {remaining:.0f}s remaining, {len(results)} results received")
    
    except KeyboardInterrupt:
        print("\n⏹️ Monitoring stopped by user")
    
    print(f"\n✅ Monitoring completed. Received {len(results)} annotated results")
    return results

# Monitor for annotated results
annotated_results = monitor_annotated_results(consumer, duration_seconds=20)

if annotated_results:
    df_results = pd.DataFrame(annotated_results)
    print("\n📊 Annotated Results Summary:")
    print(df_results.head())
else:
    print("\n📝 No annotated results received yet. VEP service may be processing or starting up.")

## 4. Scaling Demonstration

In [None]:
def trigger_scaling_event(producer, num_sequences=50, processing_mode='big-data'):
    """Send a large batch to trigger KEDA scaling"""
    print(f"🚀 Triggering scaling event with {num_sequences} sequences in {processing_mode} mode...")
    
    start_time = time.time()
    sent_sequences = send_genetic_data_batch(producer, num_sequences, processing_mode)
    end_time = time.time()
    
    print(f"\n⚡ Scaling trigger completed:")
    print(f"   📤 Sent: {len(sent_sequences)} sequences")
    print(f"   ⏱️ Time: {end_time - start_time:.2f} seconds")
    print(f"   📊 Rate: {len(sent_sequences)/(end_time - start_time):.1f} sequences/second")
    print(f"   🎯 Mode: {processing_mode}")
    
    return sent_sequences

# Trigger a scaling event
scaling_sequences = trigger_scaling_event(producer, num_sequences=20, processing_mode='big-data')

print("\n🔍 This should trigger:")
print("   📈 VEP service pod scaling (KEDA)")
print("   📊 Increased Kafka consumer lag")
print("   💰 Cost attribution changes")
print("   ⚡ Potential node scaling if needed")