# Example 29: Healthcare V2

Patient risk analysis.

**Architecture**:
*   **Source**: Kafka (Streaming)
*   **Enrichment/State**: Redis (Fast lookups)
*   **Sink**: MongoDB (Document storage)

## 1. Environment Setup
Install Java, Spark, Kafka, Redis, MongoDB, and Python drivers.

In [None]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Spark & Kafka
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!wget -q https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
!tar xf kafka_2.13-3.6.1.tgz

# Install Redis & MongoDB
!apt-get install redis-server -qq > /dev/null
!wget -qO - https://www.mongodb.org/static/pgp/server-6.0.asc | apt-key add -
!echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu jammy/mongodb-org/6.0 multiverse" | tee /etc/apt/sources.list.d/mongodb-org-6.0.list
!apt-get update -qq > /dev/null
!apt-get install -y mongodb-org -qq > /dev/null

# Install Python Libs
!pip install -q findspark pyspark kafka-python redis pymongo

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
import findspark
findspark.init()

## 2. Start Services

In [None]:
%%bash
# Start Redis
service redis-server start

# Start MongoDB (Background)
mkdir -p /data/db
mongod --fork --logpath /var/log/mongodb.log --bind_ip 127.0.0.1

# Start Kafka
cd kafka_2.13-3.6.1
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
sleep 5
bin/kafka-server-start.sh -daemon config/server.properties
sleep 5
bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

In [None]:
# Verify Connections
import redis
from pymongo import MongoClient

try:
    r = redis.Redis(host='localhost', port=6379, db=0)
    r.set('test', 'connected')
    print(f"Redis: {r.get('test').decode('utf-8')}")
except Exception as e: print(f"Redis Error: {e}")

try:
    m = MongoClient('localhost', 27017)
    print(f"MongoDB: {m.list_database_names()}")
except Exception as e: print(f"Mongo Error: {e}")

## 3. Producer (Simulated Data)

In [None]:
from kafka import KafkaProducer
import json, time, random
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for _ in range(500):
    data = {'player': f'p{random.randint(1,100)}', 'points': random.randint(10, 100)}
    producer.send('input-topic', value=data)
    time.sleep(0.05)
producer.close()

## 4. Consumer (Spark + Redis + Mongo)

In [None]:
%%writefile kafka_consumer.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf, struct, to_json
from pyspark.sql.types import *
import redis
from pymongo import MongoClient
import json

# --- Init Spark ---
spark = SparkSession.builder \
    .appName("AdvancedIntegration") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.output") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# --- Redis & Mongo Connection Helpers (Executed on Workers) ---
def get_redis():
    return redis.Redis(host='localhost', port=6379, db=0)

def write_to_mongo_redis(batch_df, batch_id):
    # This function runs on the driver, but operations should be optimized for workers if large scale.
    # For simplicity in this example, we collect minor batches or use foreachPartition inside.
    
    data = batch_df.collect()
    if not data: return
    
    r_client = get_redis()
    m_client = MongoClient('localhost', 27017)
    db = m_client['streaming_db']
    collection = db['output_collection']
    
    print(f"Processing Batch {batch_id} with {len(data)} records")
    
    for row in data:
        record = row.asDict()
        

                val = json.loads(record['value'])
                pid = val['patient_id']
                bpm = val['bpm']
                
                # Redis: Get max limit for patient (default 120)
                limit_key = f"limit:{pid}"
                max_bpm = r_client.get(limit_key)
                if not max_bpm: max_bpm = 120
                else: max_bpm = int(max_bpm)
                
                risk = "LOW"
                if bpm > max_bpm: risk = "CRITICAL"
                elif bpm > 100: risk = "ELEVATED"
                
                record['risk_level'] = risk
                record['parsed_data'] = val
        
        # Save to Mongo
        try:
            collection.insert_one(record)
        except Exception as e: print(e)
            
    m_client.close()
    r_client.close()

# --- Stream Definition ---
schema = StructType([
    StructField("id", StringType()),
    StructField("timestamp", StringType()),
    StructField("value", StringType()),
    StructField("meta", StringType())
     # Simplified schema for template, can be overridden
])

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input-topic") \
    .load()

# Generic parsing - tailored in specific notebooks if needed, but here we assume JSON value
parsed = df.selectExpr("CAST(value AS STRING)")

# Apply Custom Processing
query = parsed.writeStream \
    .foreachBatch(write_to_mongo_redis) \
    .outputMode("update") \
    .start()

query.awaitTermination()

In [None]:
!spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.mongodb.spark:mongo-spark-connector_2.12:10.2.1 kafka_consumer.py