## Enhanced Machine Learning Pipeline: 
## Kafka-Powered Real-Time Predictions and Feedback

### Data Producer
The data producer reads the Iris dataset and streams it to the iris_stream topic. This part serves to simulate real-time data streaming by setting the hardcoded sleep time.

In [1]:
# Import required libraries
import json # For converting Python dictionaries to JSON formatted strings
import time # For adding delays in the loop to simulate streaming
from kafka import KafkaProducer # Kafka library to produce messages
import pandas as pd
from kafka.errors import KafkaError # For handling Kafka-specific errors

# Initialize Kafka producer
# Specifies the Kafka server to connect to and the method to serialize the message values to JSON formatted strings.
import time
start_time = time.time()

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

df = pd.read_csv('iris.csv')

for _, row in df.iterrows():
    # Convert the row to a dictionary, suitable for JSON serialization
    message = row.to_dict()
    try:
        # Attempt to send the message to the 'iris_stream' Kafka topic
        producer.send('iris_demo1_topic', value=message)
        print(f"Sent: {message}") # Print a confirmation that the message was sent
    except KafkaError as e:
        # If sending fails, catch the KafkaError and print the error message
        print(f"Failed to send message: {e}")
        
    time.sleep(0.1)   # Wait for 0.1 seconds before sending the next message to simulate real-time data streaming

# After all messages have been sent, close the producer to free up resources
producer.close()

end_time = time.time()

# Print a confirmation that all messages have been sent
print("All messages sent to Kafka topic 'iris_demo1_topic'.")

print(f"Producer creation time takes {end_time - start_time} seconds")

Sent: {'Id': 1, 'SepalLengthCm': 5.1, 'SepalWidthCm': 3.5, 'PetalLengthCm': 1.4, 'PetalWidthCm': 0.2, 'Species': 'Iris-setosa'}
Sent: {'Id': 2, 'SepalLengthCm': 4.9, 'SepalWidthCm': 3.0, 'PetalLengthCm': 1.4, 'PetalWidthCm': 0.2, 'Species': 'Iris-setosa'}
Sent: {'Id': 3, 'SepalLengthCm': 4.7, 'SepalWidthCm': 3.2, 'PetalLengthCm': 1.3, 'PetalWidthCm': 0.2, 'Species': 'Iris-setosa'}
Sent: {'Id': 4, 'SepalLengthCm': 4.6, 'SepalWidthCm': 3.1, 'PetalLengthCm': 1.5, 'PetalWidthCm': 0.2, 'Species': 'Iris-setosa'}
Sent: {'Id': 5, 'SepalLengthCm': 5.0, 'SepalWidthCm': 3.6, 'PetalLengthCm': 1.4, 'PetalWidthCm': 0.2, 'Species': 'Iris-setosa'}
Sent: {'Id': 6, 'SepalLengthCm': 5.4, 'SepalWidthCm': 3.9, 'PetalLengthCm': 1.7, 'PetalWidthCm': 0.4, 'Species': 'Iris-setosa'}
Sent: {'Id': 7, 'SepalLengthCm': 4.6, 'SepalWidthCm': 3.4, 'PetalLengthCm': 1.4, 'PetalWidthCm': 0.3, 'Species': 'Iris-setosa'}
Sent: {'Id': 8, 'SepalLengthCm': 5.0, 'SepalWidthCm': 3.4, 'PetalLengthCm': 1.5, 'PetalWidthCm': 0.2, 'S

Sent: {'Id': 65, 'SepalLengthCm': 5.6, 'SepalWidthCm': 2.9, 'PetalLengthCm': 3.6, 'PetalWidthCm': 1.3, 'Species': 'Iris-versicolor'}
Sent: {'Id': 66, 'SepalLengthCm': 6.7, 'SepalWidthCm': 3.1, 'PetalLengthCm': 4.4, 'PetalWidthCm': 1.4, 'Species': 'Iris-versicolor'}
Sent: {'Id': 67, 'SepalLengthCm': 5.6, 'SepalWidthCm': 3.0, 'PetalLengthCm': 4.5, 'PetalWidthCm': 1.5, 'Species': 'Iris-versicolor'}
Sent: {'Id': 68, 'SepalLengthCm': 5.8, 'SepalWidthCm': 2.7, 'PetalLengthCm': 4.1, 'PetalWidthCm': 1.0, 'Species': 'Iris-versicolor'}
Sent: {'Id': 69, 'SepalLengthCm': 6.2, 'SepalWidthCm': 2.2, 'PetalLengthCm': 4.5, 'PetalWidthCm': 1.5, 'Species': 'Iris-versicolor'}
Sent: {'Id': 70, 'SepalLengthCm': 5.6, 'SepalWidthCm': 2.5, 'PetalLengthCm': 3.9, 'PetalWidthCm': 1.1, 'Species': 'Iris-versicolor'}
Sent: {'Id': 71, 'SepalLengthCm': 5.9, 'SepalWidthCm': 3.2, 'PetalLengthCm': 4.8, 'PetalWidthCm': 1.8, 'Species': 'Iris-versicolor'}
Sent: {'Id': 72, 'SepalLengthCm': 6.1, 'SepalWidthCm': 2.8, 'PetalLen

Sent: {'Id': 127, 'SepalLengthCm': 6.2, 'SepalWidthCm': 2.8, 'PetalLengthCm': 4.8, 'PetalWidthCm': 1.8, 'Species': 'Iris-virginica'}
Sent: {'Id': 128, 'SepalLengthCm': 6.1, 'SepalWidthCm': 3.0, 'PetalLengthCm': 4.9, 'PetalWidthCm': 1.8, 'Species': 'Iris-virginica'}
Sent: {'Id': 129, 'SepalLengthCm': 6.4, 'SepalWidthCm': 2.8, 'PetalLengthCm': 5.6, 'PetalWidthCm': 2.1, 'Species': 'Iris-virginica'}
Sent: {'Id': 130, 'SepalLengthCm': 7.2, 'SepalWidthCm': 3.0, 'PetalLengthCm': 5.8, 'PetalWidthCm': 1.6, 'Species': 'Iris-virginica'}
Sent: {'Id': 131, 'SepalLengthCm': 7.4, 'SepalWidthCm': 2.8, 'PetalLengthCm': 6.1, 'PetalWidthCm': 1.9, 'Species': 'Iris-virginica'}
Sent: {'Id': 132, 'SepalLengthCm': 7.9, 'SepalWidthCm': 3.8, 'PetalLengthCm': 6.4, 'PetalWidthCm': 2.0, 'Species': 'Iris-virginica'}
Sent: {'Id': 133, 'SepalLengthCm': 6.4, 'SepalWidthCm': 2.8, 'PetalLengthCm': 5.6, 'PetalWidthCm': 2.2, 'Species': 'Iris-virginica'}
Sent: {'Id': 134, 'SepalLengthCm': 6.3, 'SepalWidthCm': 2.8, 'PetalLe

### Model Predictor
A Kafka consumer that uses the streamed data for real-time predictions with a pre-trained machine learning model and sends prediction feedback for potential model adjustments. It uses the Iris dataset to train a K-Nearest Neighbors (KNN) classifier, consumes new data from a Kafka topic, makes predictions, and sends feedback on predictions through another Kafka topic. 

In [2]:
# Import necessary libraries
from kafka import KafkaConsumer, KafkaProducer  # For interacting with Kafka
import json  # For serialization/deserialization of data
from sklearn.neighbors import KNeighborsClassifier  # KNN classifier from scikit-learn
from sklearn.model_selection import train_test_split  
from sklearn.metrics import classification_report 
import pandas as pd  

# Load and prepare the dataset
df = pd.read_csv('iris.csv') 

X = df[['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']].values
y = df['Species'].values

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train the KNN model
knn = KNeighborsClassifier(n_neighbors=5)  # Instantiate the KNN model with 5 neighbors
knn.fit(X_train, y_train)

# Initialize Kafka consumer for consuming incoming data
consumer = KafkaConsumer(
    'iris_demo1_topic',  
    bootstrap_servers=['localhost:9092'],  # List of Kafka server addresses
    auto_offset_reset='earliest',  # Start reading at the earliest message
    consumer_timeout_ms=10000,  # Stop consuming if no message for 10 seconds
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize messages from JSON
)

start_time = time.time()

# Initialize Kafka producer for sending feedback
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # List of Kafka server addresses
    value_serializer=lambda x: json.dumps(x).encode('utf-8')  # Serialize messages as JSON
)

def send_feedback(message):
    try:
        producer.send('feedback_demo1', value=message)  # Send feedback message to Kafka topic
        producer.flush()  # Ensure message is sent
        print(f"Feedback: {message}")
    except KafkaError as e:
        print(f"Failed to send message: {e}")  # Handle any errors during sending

# Prepare lists to store true labels and predictions for evaluation
true_labels = []
predictions = []
    
for message in consumer:  # Consume messages from Kafka
    # Extract features from the message
    features = [message.value[f] for f in ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']]
    true_label = message.value['Species']  # Extract the true label

    predicted_label = knn.predict([features])[0]  # Predict the label based on the features
    
    # Store true labels and predictions for later evaluation
    true_labels.append(true_label)
    predictions.append(predicted_label)

    print(f"Predicted Species: {predicted_label}, True Species: {true_label}")

    # If prediction is incorrect, send feedback
    if predicted_label != true_label:
        feedback_message = message.value
        feedback_message['PredictedSpecies'] = predicted_label  # Add the predicted label to the feedback
        send_feedback(feedback_message)  # Send feedback

# After processing all messages, print a report on model performance
print(classification_report(true_labels, predictions))
            
# Close the consumer and producer to release resources
consumer.close()
producer.close()

end_time = time.time()
print(f"Prediction takes {end_time - start_time} seconds")

Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Species: Iris-setosa, True Species: Iris-setosa
Predicted Spec

                 precision    recall  f1-score   support

    Iris-setosa       1.00      1.00      1.00        50
Iris-versicolor       0.98      0.94      0.96        50
 Iris-virginica       0.94      0.98      0.96        50

       accuracy                           0.97       150
      macro avg       0.97      0.97      0.97       150
   weighted avg       0.97      0.97      0.97       150

Prediction takes 10.61453104019165 seconds


### Feedback Consumer
This part initializes a Kafka consumer for consuming messages from a Kafka topic. It uses the KafkaConsumer class from the kafka-python library to subscribe to a topic named 'feedback_demo1' and print out each message it consumes.

In [3]:
from kafka import KafkaConsumer

# Initialize Kafka consumer for consuming feedback messages
feedback_consumer = KafkaConsumer(
    'feedback_demo1',
    bootstrap_servers=['localhost:9092'],
    group_id='consumer_wrong',  # Set the consumer group to 'consumer_wrong'
    auto_offset_reset='earliest',
    consumer_timeout_ms=10000,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Consume messages from the Kafka topic
for message in feedback_consumer:
    print(f"Feedback received: {message.value}")

# Close the consumer to release system resources
feedback_consumer.close()

Feedback received: {'Id': 71, 'SepalLengthCm': 5.9, 'SepalWidthCm': 3.2, 'PetalLengthCm': 4.8, 'PetalWidthCm': 1.8, 'Species': 'Iris-versicolor', 'PredictedSpecies': 'Iris-virginica'}
Feedback received: {'Id': 73, 'SepalLengthCm': 6.3, 'SepalWidthCm': 2.5, 'PetalLengthCm': 4.9, 'PetalWidthCm': 1.5, 'Species': 'Iris-versicolor', 'PredictedSpecies': 'Iris-virginica'}
Feedback received: {'Id': 84, 'SepalLengthCm': 6.0, 'SepalWidthCm': 2.7, 'PetalLengthCm': 5.1, 'PetalWidthCm': 1.6, 'Species': 'Iris-versicolor', 'PredictedSpecies': 'Iris-virginica'}
Feedback received: {'Id': 107, 'SepalLengthCm': 4.9, 'SepalWidthCm': 2.5, 'PetalLengthCm': 4.5, 'PetalWidthCm': 1.7, 'Species': 'Iris-virginica', 'PredictedSpecies': 'Iris-versicolor'}


### Feedback Producer
In a real-world application, this could be part of an application interface where users confirm or correct predictions, thereby generating feedback. Combined with the automatically generated feedback above, this can be used either as an automated system or as a manual process to validate predictions and produce feedback.

In [4]:
from kafka import KafkaProducer
import json

# Initialize Kafka producer for sending feedback messages
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda x: json.dumps(x).encode('utf-8'))

# Define a function to send feedback to a Kafka topic
def send_feedback(record_id, correct_label):
    feedback_message = {
        'record_id': record_id,  # Unique identifier for the record being corrected. 
        'correct_label': correct_label # The correct label for the record
    }
    # Send the feedback message to the 'feedback_demo1' topic
    producer.send('feedback_demo1', value=feedback_message)
    producer.flush()

# Send an example feedback message
# In a real scenario, 'record_id' and 'correct_label' would be dynamically determined
send_feedback(record_id="123", correct_label="Iris-versicolor")

# Print confirmation that feedback was sent
print("Feedback sent to 'feedback_demo1'.")

Feedback sent to 'feedback_demo1'.


### Model Retrainer
Triggered based on a schedule or specific messages, to retrain the model with all available data, including new or corrected labels. The goal is to improve model accuracy by incorporating corrected labels or new data. 

In [5]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# Define a function to retrain the model with updated feedback data
def retrain_model_with_feedback():
    # Load the dataset, which is now assumed to include feedback corrections
    df = pd.read_csv('iris.csv')  # Assumes 'iris.csv' is the updated dataset file path

    X = df[['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']].values
    y = df['Species'].values

    # Splitting dataset into training and testing
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Initialize and train the KNN model
    knn = KNeighborsClassifier(n_neighbors=5)
    knn.fit(X_train, y_train)

    # Evaluate model accuracy
    predictions = knn.predict(X_test)
    print(f"Accuracy after retraining: {accuracy_score(y_test, predictions)}")

# Trigger the retraining process manually
retrain_model_with_feedback()

Accuracy after retraining: 1.0
