# Group No: 18

## Group Members Names:

 1. PEYALA SAMARASIMHA REDDY - 2023AA05072
 2. PEGALLAPATI SAI MAHARSHI - 2023AA05924
 3. ANIRUDDHA DILIP MADURWAR - 2023AA05982
 4. TUSHAR DEEP - 2023AA05885

## Assignment - 2

Real-Time Prediction with Apache Kafka and Neural Networks

- Here, we have setup the Apache Kafka locally, we started the Kafka and Zookeeper. Then we will run this producer and consumer via jupyterbooks. 
- This is consumer.ipynb, it will take the input data from the producer and processes it, loads the trained model and get the predictions.
- For every input message row, it will give predicted valuea and for every 10 rows, it calculates the MSE, MAE etc

## Consumer File

In [6]:
from kafka import KafkaConsumer, KafkaProducer
import json
import time
import numpy as np
import joblib
import tensorflow as tf
from tensorflow import keras
from sklearn.metrics import mean_squared_error, mean_absolute_error
import logging

# Reduce TensorFlow logging to avoid unnecessary messages
tf.get_logger().setLevel(logging.ERROR)

# Setup logging configuration
log_file = "consumer_logs.log"
logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s - %(message)s",
    filemode="w"  # Overwrite previous logs on each run
)

# Load the trained deep learning model
custom_objects = {"mse": keras.losses.MeanSquaredError()}
print("\nLoading the trained neural network model california_housing_model.h5...\n")
model = keras.models.load_model("california_housing_model.h5", custom_objects=custom_objects)

# Load the StandardScaler object used for data preprocessing
scaler = joblib.load("scaler.pkl")

# Define the feature names in order
feature_names = ["MedInc", "HouseAge", "AveRooms", "AveBedrms", "Population", "AveOccup", "Latitude", "Longitude"]

# Kafka Consumer Setup - Reads only new messages from "input-data" topic
consumer = KafkaConsumer(
    "input-data",
    bootstrap_servers="localhost:9092",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="latest"
)

# Kafka Producer for sending predictions to the "predictions" topic
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

# Initialize tracking variables
message_count = 0
latencies = []
actual_values = []
predicted_values = []
all_received_data = []

print("\nConsumer is listening for messages...\n")

# Loop to consume messages, stopping after 100 messages
for message in consumer:
    start_time = time.time()  # Start time for latency calculation

    # Extract the received JSON data
    data = message.value
    message_id = data.get("id", "unknown")
    feature_values = data.get("features")
    actual_price = data.get("actual_price", None)
    timestamp = data.get("timestamp", "N/A")

    # Validate feature data
    if feature_values is None or not isinstance(feature_values, (list, np.ndarray)) or len(feature_values) != 8:
        warning_message = f"Skipping message ID {message_id} due to invalid feature data. Received: {feature_values}"
        print(warning_message)
        logging.warning(warning_message)
        continue

    # Store received input message
    all_received_data.append(data)

    # Preprocess the input data
    feature_array = np.array(feature_values).reshape(1, -1)
    feature_scaled = scaler.transform(feature_array)

    # Make prediction
    predicted_price = model.predict(feature_scaled)[0][0]

    # Calculate latency
    latency = time.time() - start_time
    latencies.append(latency)

    # Print received message details
    print(f"\nProcessed Message ID: {message_id}")
    print(f"   - Timestamp: {timestamp}")
    for feature_name, feature_value in zip(feature_names, feature_values):
        print(f"   - {feature_name}: {feature_value:.4f}")
    
    print(f"   - Actual Price: {actual_price if actual_price is not None else 'N/A'}")
    print(f"   - Predicted Price: {predicted_price:.2f}")
    print(f"   - Latency: {latency:.4f} sec")

    # Store actual and predicted values
    if actual_price is not None:
        actual_values.append(actual_price)
        predicted_values.append(predicted_price)

    message_count += 1  # Increment message count

    # Log the processed message details (including all input feature values)
    log_message = f"ID: {message_id}, Timestamp: {timestamp}, Features: {dict(zip(feature_names, feature_values))}, Actual Price: {actual_price}, Predicted Price: {predicted_price:.2f}, Latency: {latency:.4f} sec"
    logging.info(log_message)

    # Compute performance metrics every 10 messages
    if message_count % 10 == 0 and actual_values:
        mse = mean_squared_error(actual_values, predicted_values)
        mae = mean_absolute_error(actual_values, predicted_values)
        avg_latency = np.mean(latencies) if latencies else 0.0

        # Print performance metrics
        print("\nPerformance Metrics (Last 10 Messages):")
        print(f"   - Mean Squared Error (MSE): {mse:.4f}")
        print(f"   - Mean Absolute Error (MAE): {mae:.4f}")
        print(f"   - Average Latency: {avg_latency:.4f} sec\n")

        # Log performance metrics
        performance_log = f"MSE: {mse:.4f}, MAE: {mae:.4f}, Avg Latency: {avg_latency:.4f} sec"
        logging.info(performance_log)

        # Reset tracking lists
        actual_values = []
        predicted_values = []
        latencies = []

    # Send prediction to Kafka "predictions" topic
    output_message = {
        "id": message_id,
        "actual_price": actual_price,
        "predicted_price": float(predicted_price),
        "latency": latency,
        "timestamp": time.time()
    }
    producer.send("predictions", value=output_message)
    producer.flush()

    # Stop execution after processing 100 messages
    if message_count >= 100:
        break  # Exit loop after 100 messages

# Compute overall performance metrics
if len(predicted_values) > 0:
    overall_mse = mean_squared_error(actual_values, predicted_values)
    overall_mae = mean_absolute_error(actual_values, predicted_values)
    overall_avg_latency = np.mean(latencies) if latencies else 0.0

    # Print final metrics
    print("\nFinal Overall Performance Metrics (100 Messages):")
    print(f"   - Mean Squared Error (MSE): {overall_mse:.4f}")
    print(f"   - Mean Absolute Error (MAE): {overall_mae:.4f}")
    print(f"   - Average Latency: {overall_avg_latency:.4f} sec\n")

    # Log final metrics
    final_performance_log = f"Final Overall Metrics - MSE: {overall_mse:.4f}, MAE: {overall_mae:.4f}, Avg Latency: {overall_avg_latency:.4f} sec"
    logging.info(final_performance_log)

# Print all received input data
print("\nAll Received Input Data (100 Messages):")
for i, data in enumerate(all_received_data):
    print(f"\nMessage {i+1}:")
    print(json.dumps(data, indent=4))

# Log and print total messages processed
final_message = f"\nProcessing Completed: {message_count} messages processed."
print(final_message)
logging.info(final_message)

# Log confirmation message about saved logs
log_saved_message = f"Log file saved at: {log_file}"
print(log_saved_message)
logging.info(log_saved_message)

# Close Kafka connections
consumer.close()
producer.close()

print("\nKafka Consumer Stopped.\n")



Loading the trained neural network model california_housing_model.h5...


Consumer is listening for messages...



https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 139ms/step

Processed Message ID: 0
   - Timestamp: 2025-03-16 18:07:01
   - MedInc: 3.1908
   - HouseAge: 16.0000
   - AveRooms: 4.3868
   - AveBedrms: 0.9811
   - Population: 1386.0000
   - AveOccup: 2.1792
   - Latitude: 38.5400
   - Longitude: -121.7200
   - Actual Price: 1.943
   - Predicted Price: 1.39
   - Latency: 0.1707 sec
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step

Processed Message ID: 1
   - Timestamp: 2025-03-16 18:07:02
   - MedInc: 4.6225
   - HouseAge: 13.0000
   - AveRooms: 6.1157
   - AveBedrms: 1.0386
   - Population: 2828.0000
   - AveOccup: 2.5363
   - Latitude: 38.5400
   - Longitude: -121.7000
   - Actual Price: 2.265
   - Predicted Price: 1.80
   - Latency: 0.0409 sec
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step

Processed Message ID: 2
   - Timestamp: 2025-03-16 18:07:03
   - MedInc: 4.7308
   - HouseAge: 33.0000
   - AveRooms: 6.5756
   - AveB