In [None]:
from kafka import KafkaConsumer
import json
import joblib
import pandas as pd
import requests

# Load the trained model
model_path = "/home/kali/Desktop/stroke-volume-prediction-system/models/Stroke_Volume_model.pkl"
model = joblib.load(model_path)

# Define expected features
expected_features = [
    "Solar8000/RR_CO2",
    "Solar8000/NIBP_MBP",
    "Solar8000/HR",
    "Solar8000/PLETH_SPO2",
    "Solar8000/PLETH_HR",
    "EV1000/ART_MBP",
    "age",
    "body_surface_area"
]

# Function to emit prediction to WebSocket API
def emit_prediction(prediction):
    try:
        response = requests.post("http://localhost:5050/emit", json={"prediction": round(prediction, 2)})
        if response.status_code == 200:
            print(f"📡 Emitted to UI: {prediction:.2f} mL")
        else:
            print(f"❌ Emit failed: {response.status_code} - {response.text}")
    except Exception as e:
        print(f"❌ Error during emit: {e}")

# Function to emit waveform data to WebSocket API
def emit_waveform(data):
    try:
        response = requests.post("http://localhost:5050/emit_waveform", json=data)
        if response.status_code == 200:
            print(f"📈 Emitted waveform data to UI: {data}")
        else:
            print(f"❌ Waveform emit failed: {response.status_code} - {response.text}")
    except Exception as e:
        print(f"❌ Failed to emit waveform: {e}")

# Start Kafka consumer
consumer = KafkaConsumer(
    'stroke_data',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='stroke-consumers'
)

print("✅ Listening for messages on Kafka topic 'stroke_data'...")

for message in consumer:
    data = message.value

    try:
        # Prepare data for prediction
        input_df = pd.DataFrame([data], columns=expected_features)
        prediction = model.predict(input_df)[0]
        print(f"✅ Predicted Stroke Volume: {prediction:.2f} mL")

        # Emit stroke volume
        emit_prediction(prediction)

        # Emit waveform data (excluding age and body_surface_area)
        waveform_data = {k: v for k, v in data.items() if k not in ("age", "body_surface_area")}
        waveform_data["time"] = pd.Timestamp.now().isoformat()
        emit_waveform(waveform_data)

    except Exception as e:
        print(f"❌ Error during prediction or emission: {e}")


✅ Listening for messages on Kafka topic 'stroke_data'...
✅ Predicted Stroke Volume: 90.36 mL
📡 Emitted to UI: 90.36 mL
📈 Emitted waveform data to UI: {'Solar8000/RR_CO2': 14.0, 'Solar8000/NIBP_MBP': 90.0, 'Solar8000/HR': 68.0, 'Solar8000/PLETH_SPO2': 96.0, 'Solar8000/PLETH_HR': 69.0, 'EV1000/ART_MBP': 64.0, 'height': 176.7, 'weight': 97.0, 'time': '2025-07-03T07:20:46.231041'}
✅ Predicted Stroke Volume: 91.04 mL
📡 Emitted to UI: 91.04 mL
📈 Emitted waveform data to UI: {'Solar8000/RR_CO2': 14.0, 'Solar8000/NIBP_MBP': 90.0, 'Solar8000/HR': 70.0, 'Solar8000/PLETH_SPO2': 97.0, 'Solar8000/PLETH_HR': 71.0, 'EV1000/ART_MBP': 69.0, 'height': 176.7, 'weight': 97.0, 'time': '2025-07-03T07:20:46.718773'}
✅ Predicted Stroke Volume: 91.02 mL
📡 Emitted to UI: 91.02 mL
📈 Emitted waveform data to UI: {'Solar8000/RR_CO2': 14.0, 'Solar8000/NIBP_MBP': 90.0, 'Solar8000/HR': 72.0, 'Solar8000/PLETH_SPO2': 96.0, 'Solar8000/PLETH_HR': 72.0, 'EV1000/ART_MBP': 66.0, 'height': 176.7, 'weight': 97.0, 'time': '202