In [1]:
from kafka import KafkaConsumer
from kafka import KafkaProducer
import json
import matplotlib.pyplot as plt
import time
import joblib

In [2]:
model = joblib.load("aeration_classifier.joblib")

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [3]:
# Initialize data storage for plotting
timestamps = []
water_temperatures = []
ph_levels = []
turbidities = []
dissolved_oxygen_levels = []

In [4]:
# Kafka configuration
def initialize_consumer():
    kafka_topic = "water_quality"
    kafka_bootstrap_servers = ["localhost:9092"]

    # Create Kafka consumer
    consumer = KafkaConsumer(
        kafka_topic,
        bootstrap_servers=kafka_bootstrap_servers,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='latest',
        enable_auto_commit=True, 
        group_id = 'water_quality_predictors'
    )
    return consumer


In [5]:
kafka_topic_pred = "water_quality_predictions"

def predict(consumer, producer):
    try:
        for message in consumer:
            # Parse the message
            sensor_data = message.value
            sensor_key = message.key
            print(f"Received: {sensor_data} Key: {sensor_key}")

            timestamp = sensor_data['timestamp']
            water_temperature = sensor_data['water_temperature']
            ph_level = sensor_data['ph_level']
            turbidity = sensor_data['turbidity']
            dissolved_oxygen_level = sensor_data['dissolved_oxygen']

            prediction = model.predict([[water_temperature, ph_level, turbidity, dissolved_oxygen_level]])
            prediction_probs = model.predict_proba([[water_temperature, ph_level, turbidity, dissolved_oxygen_level]])

            message = { 'prediction': prediction.tolist(), 'prediction_probabilities': prediction_probs.tolist(), 'timestamp': timestamp }
            
            producer.send(kafka_topic_pred, message, key = sensor_key)

            print(f"Mensaje: {message}")
            break  # Process one message at a time
    except KeyboardInterrupt:
        print("Stopped consuming messages.")
        consumer.close()
    

In [None]:
consumer = initialize_consumer()

# Kafka configuration
kafka_bootstrap_servers = ["localhost:9092"] 

# Create Kafka producer
producer = KafkaProducer(
    bootstrap_servers=kafka_bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

try:
    while True:
        predict(consumer, producer)
except KeyboardInterrupt:
    print("Stopped.")
    consumer.close()

Received: {'timestamp': 1741192688, 'water_temperature': 29.047912734799443, 'ph_level': 7.645161503397677, 'turbidity': 5.79, 'dissolved_oxygen': 8.59} Key: b'0'
Mensaje: {'prediction': [0], 'prediction_probabilities': [[0.9, 0.1]], 'timestamp': 1741192688}
Received: {'timestamp': 1741192690, 'water_temperature': 28.453307150752053, 'ph_level': 8.02617302635067, 'turbidity': 44.41, 'dissolved_oxygen': 7.92} Key: b'2'
Mensaje: {'prediction': [0], 'prediction_probabilities': [[0.5, 0.5]], 'timestamp': 1741192690}
Received: {'timestamp': 1741192693, 'water_temperature': 28.767757629496217, 'ph_level': 8.019071952778631, 'turbidity': 31.86, 'dissolved_oxygen': 11.75} Key: b'5'
Mensaje: {'prediction': [0], 'prediction_probabilities': [[1.0, 0.0]], 'timestamp': 1741192693}
Received: {'timestamp': 1741192694, 'water_temperature': 29.57965417854885, 'ph_level': 7.8101650201926445, 'turbidity': 16.99, 'dissolved_oxygen': 11.15} Key: b'6'
Mensaje: {'prediction': [0], 'prediction_probabilities':