# **Agentic AI for Image Classification (Stage 2: Expanded)**

This builds on Stage 1 improvements and adds new capabilities:

**Stage 1 improvements (carried forward):**
- Fixed ResNet augmentation bug, enhanced data augmentation, full test set evaluation
- Per-class metrics, confusion matrices, training history plots
- Improved Judge strategy, reproducibility seeds, model save/load
- Kafka decoupled, error handling in agents

**Stage 2 new capabilities:**
- **2.1** Third model agent (MobileNetV2 with transfer learning)
- **2.2** Soft voting ensemble across all three agents
- **2.10** CSV export of results and McNemar's statistical test

See [NEXT_STEPS.md](NEXT_STEPS.md) for full rationale behind each enhancement.

# Step 1: Setup Required Packages & Verify Kafka
Kafka and Zookeeper must be running as external services before starting this notebook.
See [RUN_ON_VPS.md](RUN_ON_VPS.md) for setup instructions.

In [None]:
# Stage 2: Additional packages for statistical analysis
# Install Python packages (Kafka must already be running as a system service)

!pip install tensorflow keras numpy matplotlib kafka-python pillow tqdm scikit-learn seaborn scipy pandas

# Verify Kafka is reachable
import time
from kafka import KafkaProducer

try:
    p = KafkaProducer(bootstrap_servers=['localhost:9092'])
    p.close()
    print("Kafka is reachable")
except Exception as e:
    raise RuntimeError(
        "Kafka is not running. Start Kafka and Zookeeper before running this notebook.\n"
        "See RUN_ON_VPS.md for setup instructions."
    ) from e

# Step 2: Load CIFAR-10 Data and setup functions

In [None]:
# Stage 2: Added MobileNet topic and model path
# Step 2: Load CIFAR-10 and create helper functions

import os
import json
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import cifar10
from PIL import Image
import csv
import pandas as pd

# Set seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# CIFAR-10 class names
CIFAR10_CLASSES = [
    'airplane', 'automobile', 'bird', 'cat', 'deer',
    'dog', 'frog', 'horse', 'ship', 'truck'
]

# Topics for message broker (Stage 2: added MobileNet topic)
REQUEST_TOPIC = 'cifar_classification_requests'
CNN_RESPONSE_TOPIC = 'cnn_classifications'
RESNET_RESPONSE_TOPIC = 'resnet_classifications'
MOBILENET_RESPONSE_TOPIC = 'mobilenet_classifications'

# Model paths for save/load
MODEL_DIR = 'saved_models'
os.makedirs(MODEL_DIR, exist_ok=True)
CNN_PATH = os.path.join(MODEL_DIR, 'cnn_cifar_model.keras')
RESNET_PATH = os.path.join(MODEL_DIR, 'resnet_cifar_model.keras')
MOBILENET_PATH = os.path.join(MODEL_DIR, 'mobilenet_cifar_model.keras')

# Load CIFAR-10 dataset
print("Loading CIFAR-10 dataset...")
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Normalize pixel values (0-255 -> 0-1)
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0

# Flatten labels
y_train = y_train.flatten()
y_test = y_test.flatten()

print(f"Training images: {x_train.shape}")
print(f"Test images: {x_test.shape}")


def get_random_test_image():
    """Get a random test image with its metadata"""
    idx = np.random.randint(0, len(x_test))
    return {
        'image_id': idx,
        'image_data': x_test[idx].tolist(),
        'true_label': int(y_test[idx]),
        'true_class': CIFAR10_CLASSES[y_test[idx]]
    }


def decode_prediction(predictions):
    """Convert model output to class name and confidence"""
    predicted_class = np.argmax(predictions)
    confidence = float(predictions[predicted_class])
    return {
        'predicted_class': int(predicted_class),
        'predicted_name': CIFAR10_CLASSES[predicted_class],
        'confidence': confidence
    }

def numpy_to_json(arr):
    """Convert numpy array to JSON-serializable format"""
    return json.dumps(arr.tolist())

def json_to_numpy(json_str):
    """Convert JSON string back to numpy array"""
    return np.array(json.loads(json_str))

# Display a sample image
import matplotlib.pyplot as plt

sample = get_random_test_image()
plt.figure(figsize=(6, 6))
plt.imshow(np.array(sample['image_data']))
plt.title(f"Sample Image: {sample['true_class']} (Label: {sample['true_label']})\nImage ID: {sample['image_id']}")
plt.axis('off')
plt.show()
print(f"CIFAR-10 classes: {CIFAR10_CLASSES}")

# Step 3: Build & train the CNN Model

In [None]:
# Stage 1 improvements: 1.2 (augmentation), 1.3 (full test set), 1.8 (save/load)
# Step 3: Build and train the CNN model
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping

def build_cnn_model():
    model = models.Sequential([
      # BLOCK 1: DATA AUGMENTATION (Stage 1 improvement 1.2)
      layers.RandomFlip("horizontal", input_shape=(32, 32, 3)),
      layers.RandomRotation(0.05),
      layers.RandomZoom(0.1),
      layers.RandomTranslation(0.1, 0.1),

      # BLOCK 2: INITIAL FEATURE EXTRACTION
      layers.Conv2D(32, (3, 3), padding='same', activation='relu'),
      layers.BatchNormalization(),
      layers.Conv2D(32, (3, 3), activation='relu'),
      layers.BatchNormalization(),
      layers.MaxPooling2D(pool_size=(2, 2)),
      layers.Dropout(0.2),

      # BLOCK 3: SECONDARY FEATURE MAPPING
      layers.Conv2D(64, (3, 3), padding='same', activation='relu'),
      layers.BatchNormalization(),
      layers.MaxPooling2D(pool_size=(2, 2)),
      layers.Dropout(0.3),

      # BLOCK 4: CLASSIFIER HEAD
      layers.Flatten(),
      layers.Dense(128, activation='relu'),
      layers.BatchNormalization(),
      layers.Dropout(0.5),
      layers.Dense(10, activation='softmax')
    ])

    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model


# Stage 1 improvement 1.8: Load saved model if available, otherwise train
if os.path.exists(CNN_PATH):
    print(f"Loading pre-trained CNN model from {CNN_PATH}...")
    cnn_model = keras.models.load_model(CNN_PATH)
    history_cnn = None
    test_loss, test_acc = cnn_model.evaluate(x_test, y_test, verbose=0)
    print(f"CNN Test Accuracy: {test_acc*100:.2f}%")
else:
    print("Building and training CNN model...")
    cnn_model = build_cnn_model()
    cnn_model.summary()

    early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    history_cnn = cnn_model.fit(
        x_train,
        y_train,
        epochs=50,
        batch_size=64,
        validation_split=0.2,
        callbacks=[early_stop]
    )

    # Stage 1 improvement 1.3: Evaluate on FULL test set
    test_loss, test_acc = cnn_model.evaluate(x_test, y_test, verbose=0)
    print(f"CNN Test Accuracy (full test set): {test_acc*100:.2f}%")

    # Save the trained model
    cnn_model.save(CNN_PATH)
    print(f"CNN model saved to {CNN_PATH}")

# Step 3b: CNN Evaluation - Confusion Matrix & Training History
Stage 1 improvements 1.4 (per-class metrics) and 1.5 (training history plots)

In [None]:
# Stage 1 improvements 1.4 and 1.5: Confusion matrix and training curves for CNN
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

y_pred_cnn = np.argmax(cnn_model.predict(x_test), axis=1)
cm_cnn = confusion_matrix(y_test, y_pred_cnn)

fig, axes = plt.subplots(1, 2, figsize=(18, 6))

# Confusion matrix
sns.heatmap(cm_cnn, annot=True, fmt='d', cmap='Blues',
            xticklabels=CIFAR10_CLASSES, yticklabels=CIFAR10_CLASSES, ax=axes[0])
axes[0].set_title('CNN Confusion Matrix')
axes[0].set_ylabel('True Label')
axes[0].set_xlabel('Predicted Label')
axes[0].tick_params(axis='x', rotation=45)

# Training curves (only if we trained this session)
if history_cnn is not None:
    axes[1].plot(history_cnn.history['accuracy'], label='Train Accuracy')
    axes[1].plot(history_cnn.history['val_accuracy'], label='Val Accuracy')
    axes[1].plot(history_cnn.history['loss'], label='Train Loss', linestyle='--')
    axes[1].plot(history_cnn.history['val_loss'], label='Val Loss', linestyle='--')
    axes[1].set_title('CNN Training History')
    axes[1].legend()
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Value')
else:
    axes[1].text(0.5, 0.5, 'Model loaded from disk\n(no training history)',
                 ha='center', va='center', fontsize=14)
    axes[1].set_title('CNN Training History')

plt.tight_layout()
plt.show()

print("\nCNN Classification Report:")
print(classification_report(y_test, y_pred_cnn, target_names=CIFAR10_CLASSES))

In [None]:
# Stage 1 improvements: 1.1 (bug fix), 1.2 (augmentation), 1.3 (full test set), 1.8 (save/load)
# Step 4: Build and train the ResNet model
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

def build_resnet_model():
    inputs = layers.Input(shape=(32, 32, 3))

    # Stage 1 improvement 1.1: FIXED - augmentation output now feeds into Conv2D
    # Stage 1 improvement 1.2: Added rotation, zoom, translation
    x = layers.RandomFlip("horizontal")(inputs)
    x = layers.RandomRotation(0.05)(x)
    x = layers.RandomZoom(0.1)(x)
    x = layers.RandomTranslation(0.1, 0.1)(x)

    # Initial Convolutional Layer -- uses x (not inputs)
    x = layers.Conv2D(64, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # REUSABLE RESIDUAL BLOCK FUNCTION
    def residual_block(x, filters, downsample=False):
        shortcut = x
        stride = (2, 2) if downsample else (1, 1)

        # SHORTCUT - Adjust shortcut if dimensions change
        if downsample or x.shape[-1] != filters:
            shortcut = layers.Conv2D(filters, (1, 1), strides=stride, padding='same')(shortcut)
            shortcut = layers.BatchNormalization()(shortcut)

        # BLOCK - First Conv
        x = layers.Conv2D(filters, (3, 3), strides=stride, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation('relu')(x)

        # BLOCK - Second Conv
        x = layers.Conv2D(filters, (3, 3), padding='same')(x)
        x = layers.BatchNormalization()(x)

        # ADDITION AND THEN ACTIVATION
        x = layers.Dropout(0.2)(x)
        x = layers.add([x, shortcut])
        x = layers.Activation('relu')(x)
        return x

    # Model Backbone
    x = residual_block(x, 64)
    x = residual_block(x, 128, downsample=True)  # Downsamples to 16x16
    x = residual_block(x, 256, downsample=True)  # Downsamples to 8x8

    # Modern Classifier Head
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(10, activation='softmax')(x)

    model = models.Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model


# Stage 1 improvement 1.8: Load saved model if available, otherwise train
if os.path.exists(RESNET_PATH):
    print(f"Loading pre-trained ResNet model from {RESNET_PATH}...")
    resnet_model = keras.models.load_model(RESNET_PATH)
    history_resnet = None
    test_loss, test_acc = resnet_model.evaluate(x_test, y_test, verbose=0)
    print(f"ResNet Test Accuracy: {test_acc*100:.2f}%")
else:
    print("Building and training ResNet model...")
    resnet_model = build_resnet_model()
    resnet_model.summary()

    early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    lr_callback = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, verbose=1)

    history_resnet = resnet_model.fit(
        x_train, y_train,
        epochs=50,
        batch_size=64,
        validation_split=0.2,
        callbacks=[early_stop, lr_callback]
    )

    # Stage 1 improvement 1.3: Evaluate on FULL test set
    test_loss, test_acc = resnet_model.evaluate(x_test, y_test, verbose=0)
    print(f"ResNet Test Accuracy (full test set): {test_acc*100:.2f}%")

    # Save the trained model
    resnet_model.save(RESNET_PATH)
    print(f"ResNet model saved to {RESNET_PATH}")

In [None]:
# Stage 1 improvements 1.4 and 1.5: Confusion matrix and training curves for ResNet

y_pred_resnet = np.argmax(resnet_model.predict(x_test), axis=1)
cm_resnet = confusion_matrix(y_test, y_pred_resnet)

fig, axes = plt.subplots(1, 2, figsize=(18, 6))

# Confusion matrix
sns.heatmap(cm_resnet, annot=True, fmt='d', cmap='Reds',
            xticklabels=CIFAR10_CLASSES, yticklabels=CIFAR10_CLASSES, ax=axes[0])
axes[0].set_title('ResNet Confusion Matrix')
axes[0].set_ylabel('True Label')
axes[0].set_xlabel('Predicted Label')
axes[0].tick_params(axis='x', rotation=45)

# Training curves (only if we trained this session)
if history_resnet is not None:
    axes[1].plot(history_resnet.history['accuracy'], label='Train Accuracy')
    axes[1].plot(history_resnet.history['val_accuracy'], label='Val Accuracy')
    axes[1].plot(history_resnet.history['loss'], label='Train Loss', linestyle='--')
    axes[1].plot(history_resnet.history['val_loss'], label='Val Loss', linestyle='--')
    axes[1].set_title('ResNet Training History')
    axes[1].legend()
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Value')
else:
    axes[1].text(0.5, 0.5, 'Model loaded from disk\n(no training history)',
                 ha='center', va='center', fontsize=14)
    axes[1].set_title('ResNet Training History')

plt.tight_layout()
plt.show()

print("\nResNet Classification Report:")
print(classification_report(y_test, y_pred_resnet, target_names=CIFAR10_CLASSES))

# Step 4b: Build & Train the MobileNetV2 Model (Stage 2 Enhancement 2.1)
This demonstrates transfer learning -- using a model architecture pre-trained on ImageNet and training it on CIFAR-10. MobileNetV2 is a lightweight architecture designed for mobile and edge devices.

In [None]:
# Stage 2 enhancement 2.1: MobileNetV2 transfer learning agent
# Step 4b: Build and train the MobileNetV2 model
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

def build_mobilenet_model():
    """Build MobileNetV2 for CIFAR-10 (32x32 input, no pre-trained weights)"""
    base_model = keras.applications.MobileNetV2(
        input_shape=(32, 32, 3),
        include_top=False,
        weights=None  # ImageNet weights require >= 96x96 input
    )

    model = models.Sequential([
        layers.RandomFlip("horizontal", input_shape=(32, 32, 3)),
        layers.RandomRotation(0.05),
        layers.RandomZoom(0.1),
        layers.RandomTranslation(0.1, 0.1),
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])

    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model


# Load saved model if available, otherwise train
if os.path.exists(MOBILENET_PATH):
    print(f"Loading pre-trained MobileNet model from {MOBILENET_PATH}...")
    mobilenet_model = keras.models.load_model(MOBILENET_PATH)
    history_mobilenet = None
    test_loss, test_acc = mobilenet_model.evaluate(x_test, y_test, verbose=0)
    print(f"MobileNet Test Accuracy: {test_acc*100:.2f}%")
else:
    print("Building and training MobileNetV2 model...")
    mobilenet_model = build_mobilenet_model()
    mobilenet_model.summary()

    early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
    lr_callback = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, verbose=1)

    history_mobilenet = mobilenet_model.fit(
        x_train, y_train,
        epochs=50,
        batch_size=64,
        validation_split=0.2,
        callbacks=[early_stop, lr_callback]
    )

    test_loss, test_acc = mobilenet_model.evaluate(x_test, y_test, verbose=0)
    print(f"MobileNet Test Accuracy (full test set): {test_acc*100:.2f}%")

    mobilenet_model.save(MOBILENET_PATH)
    print(f"MobileNet model saved to {MOBILENET_PATH}")

In [None]:
# MobileNet confusion matrix and training curves

y_pred_mobilenet = np.argmax(mobilenet_model.predict(x_test), axis=1)
cm_mobilenet = confusion_matrix(y_test, y_pred_mobilenet)

fig, axes = plt.subplots(1, 2, figsize=(18, 6))

# Confusion matrix
sns.heatmap(cm_mobilenet, annot=True, fmt='d', cmap='Greens',
            xticklabels=CIFAR10_CLASSES, yticklabels=CIFAR10_CLASSES, ax=axes[0])
axes[0].set_title('MobileNetV2 Confusion Matrix')
axes[0].set_ylabel('True Label')
axes[0].set_xlabel('Predicted Label')
axes[0].tick_params(axis='x', rotation=45)

# Training curves
if history_mobilenet is not None:
    axes[1].plot(history_mobilenet.history['accuracy'], label='Train Accuracy')
    axes[1].plot(history_mobilenet.history['val_accuracy'], label='Val Accuracy')
    axes[1].plot(history_mobilenet.history['loss'], label='Train Loss', linestyle='--')
    axes[1].plot(history_mobilenet.history['val_loss'], label='Val Loss', linestyle='--')
    axes[1].set_title('MobileNetV2 Training History')
    axes[1].legend()
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Value')
else:
    axes[1].text(0.5, 0.5, 'Model loaded from disk\n(no training history)',
                 ha='center', va='center', fontsize=14)
    axes[1].set_title('MobileNetV2 Training History')

plt.tight_layout()
plt.show()

print("\nMobileNetV2 Classification Report:")
print(classification_report(y_test, y_pred_mobilenet, target_names=CIFAR10_CLASSES))

# Step 5: Build the CNN Agent (Publisher/Subscriber)

In [None]:
# Stage 1 improvement 1.10: Error handling added to agents
# Step 5: CNN Agent - Listens for requests and classifies images

import json
import time
from kafka import KafkaConsumer, KafkaProducer
import numpy as np
import logging

logging.getLogger('kafka').setLevel(logging.CRITICAL)

def run_cnn_agent(max_messages=100):
    try:
        consumer = KafkaConsumer(
            REQUEST_TOPIC,
            bootstrap_servers=['localhost:9092'],
            group_id='cnn_classifier_group',
            auto_offset_reset='latest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            consumer_timeout_ms=60000
        )

        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
    except Exception as e:
        print(f"CNN AGENT ERROR: Failed to connect to Kafka: {e}")
        return

    print("CNN-CIFAR AGENT is ready and listening for requests")

    message_count = 0
    for message in consumer:
        try:
            request = message.value
            image_id = request['image_id']

            image_data = np.array(request['image_data'])
            image_batch = np.expand_dims(image_data, axis=0)

            start_time = time.time()
            predictions = cnn_model.predict(image_batch, verbose=0)[0]
            inference_time = time.time() - start_time

            result = decode_prediction(predictions)

            response = {
                'agent': 'CNN-CIFAR',
                'image_id': image_id,
                'true_label': request['true_label'],
                'true_class': request['true_class'],
                'predicted_class': result['predicted_class'],
                'predicted_name': result['predicted_name'],
                'confidence': result['confidence'],
                'probabilities': predictions.tolist(),
                'inference_time_ms': inference_time * 1000,
                'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
            }

            producer.send(CNN_RESPONSE_TOPIC, value=response)

        except Exception as e:
            print(f"CNN AGENT ERROR processing message: {e}")
            continue

        message_count += 1
        if message_count >= max_messages:
            break

    producer.flush()
    consumer.close()

print("CNN Agent function created and ready!")

import threading
threading.Thread(target=run_cnn_agent, daemon=True).start()
print("CNN Agent is now running in the background...")
time.sleep(2)

# Step 6: Build the ResNet Agent

In [None]:
# Stage 1 improvement 1.10: Error handling added to agents
# Step 6: ResNet Agent - Listens for requests and classifies images

logging.getLogger('kafka').setLevel(logging.CRITICAL)

def run_resnet_agent(max_messages=100):
    try:
        consumer = KafkaConsumer(
            REQUEST_TOPIC,
            bootstrap_servers=['localhost:9092'],
            group_id='resnet_classifier_group',
            auto_offset_reset='latest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            consumer_timeout_ms=60000
        )

        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
    except Exception as e:
        print(f"RESNET AGENT ERROR: Failed to connect to Kafka: {e}")
        return

    print("RESNET-CIFAR AGENT is ready and listening for requests...")

    message_count = 0
    for message in consumer:
        try:
            request = message.value
            image_id = request['image_id']

            image_data = np.array(request['image_data'])
            image_batch = np.expand_dims(image_data, axis=0)

            start_time = time.time()
            predictions = resnet_model.predict(image_batch, verbose=0)[0]
            inference_time = time.time() - start_time

            result = decode_prediction(predictions)

            response = {
                'agent': 'RESNET-CIFAR',
                'image_id': image_id,
                'true_label': request['true_label'],
                'true_class': request['true_class'],
                'predicted_class': result['predicted_class'],
                'predicted_name': result['predicted_name'],
                'confidence': result['confidence'],
                'probabilities': predictions.tolist(),
                'inference_time_ms': inference_time * 1000,
                'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
            }

            producer.send(RESNET_RESPONSE_TOPIC, value=response)

        except Exception as e:
            print(f"RESNET AGENT ERROR processing message: {e}")
            continue

        message_count += 1
        if message_count >= max_messages:
            break

    producer.flush()
    consumer.close()

print("ResNet Agent function created and ready!")

import threading
threading.Thread(target=run_resnet_agent, daemon=True).start()
print("ResNet Agent is now running in the background...")
time.sleep(2)

# Step 6b: Build the MobileNet Agent (Stage 2 Enhancement 2.1)

In [None]:
# Stage 2 enhancement 2.1: MobileNet Agent
# Step 6b: MobileNet Agent - Listens for requests and classifies images

def run_mobilenet_agent(max_messages=100):
    try:
        consumer = KafkaConsumer(
            REQUEST_TOPIC,
            bootstrap_servers=['localhost:9092'],
            group_id='mobilenet_classifier_group',
            auto_offset_reset='latest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            consumer_timeout_ms=60000
        )

        producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
    except Exception as e:
        print(f"MOBILENET AGENT ERROR: Failed to connect to Kafka: {e}")
        return

    print("MOBILENET-CIFAR AGENT is ready and listening for requests...")

    message_count = 0
    for message in consumer:
        try:
            request = message.value
            image_id = request['image_id']

            image_data = np.array(request['image_data'])
            image_batch = np.expand_dims(image_data, axis=0)

            start_time = time.time()
            predictions = mobilenet_model.predict(image_batch, verbose=0)[0]
            inference_time = time.time() - start_time

            result = decode_prediction(predictions)

            response = {
                'agent': 'MOBILENET-CIFAR',
                'image_id': image_id,
                'true_label': request['true_label'],
                'true_class': request['true_class'],
                'predicted_class': result['predicted_class'],
                'predicted_name': result['predicted_name'],
                'confidence': result['confidence'],
                'probabilities': predictions.tolist(),
                'inference_time_ms': inference_time * 1000,
                'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
            }

            producer.send(MOBILENET_RESPONSE_TOPIC, value=response)

        except Exception as e:
            print(f"MOBILENET AGENT ERROR processing message: {e}")
            continue

        message_count += 1
        if message_count >= max_messages:
            break

    producer.flush()
    consumer.close()

print("MobileNet Agent function created and ready!")

import threading
threading.Thread(target=run_mobilenet_agent, daemon=True).start()
print("MobileNet Agent is now running in the background...")
time.sleep(2)

# Step 7: Build the Judge Agent

In [None]:
# Stage 2 enhancement 2.2: Three-agent soft voting ensemble
# Step 7: Judge Agent - Sends requests to all 3 agents, compares with soft voting

import json
import time
from kafka import KafkaProducer, KafkaConsumer
from collections import defaultdict
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

def run_judge_agent(num_images=500):
    try:
        request_producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

        cnn_consumer = KafkaConsumer(
            CNN_RESPONSE_TOPIC,
            bootstrap_servers=['localhost:9092'],
            group_id=f'judge_cnn_{int(time.time())}',
            auto_offset_reset='earliest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            consumer_timeout_ms=10000
        )

        resnet_consumer = KafkaConsumer(
            RESNET_RESPONSE_TOPIC,
            bootstrap_servers=['localhost:9092'],
            group_id=f'judge_resnet_{int(time.time())}',
            auto_offset_reset='earliest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            consumer_timeout_ms=10000
        )

        mobilenet_consumer = KafkaConsumer(
            MOBILENET_RESPONSE_TOPIC,
            bootstrap_servers=['localhost:9092'],
            group_id=f'judge_mobilenet_{int(time.time())}',
            auto_offset_reset='earliest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            consumer_timeout_ms=10000
        )
    except Exception as e:
        print(f"JUDGE AGENT ERROR: Failed to connect to Kafka: {e}")
        return None

    print(f"JUDGE AGENT: Submitting {num_images} images to Broker (3 agents)...")

    # Publish requests
    for _ in tqdm(range(num_images), desc="Publishing"):
        image_request = get_random_test_image()
        request_producer.send(REQUEST_TOPIC, value=image_request)
        time.sleep(0.01)

    request_producer.flush()

    # Collect results from all 3 agents
    cnn_results = []
    with tqdm(total=num_images, desc="CNN Processing", leave=False) as pbar:
        for message in cnn_consumer:
            cnn_results.append(message.value)
            pbar.update(1)
            if len(cnn_results) >= num_images: break

    resnet_results = []
    with tqdm(total=num_images, desc="ResNet Processing", leave=False) as pbar:
        for message in resnet_consumer:
            resnet_results.append(message.value)
            pbar.update(1)
            if len(resnet_results) >= num_images: break

    mobilenet_results = []
    with tqdm(total=num_images, desc="MobileNet Processing", leave=False) as pbar:
        for message in mobilenet_consumer:
            mobilenet_results.append(message.value)
            pbar.update(1)
            if len(mobilenet_results) >= num_images: break

    # Cleanup
    cnn_consumer.close()
    resnet_consumer.close()
    mobilenet_consumer.close()
    request_producer.close()

    # Build comparison map
    comparison_map = defaultdict(dict)

    for r in cnn_results:
        comparison_map[r['image_id']]['cnn'] = r
    for r in resnet_results:
        comparison_map[r['image_id']]['resnet'] = r
    for r in mobilenet_results:
        comparison_map[r['image_id']]['mobilenet'] = r

    # Track performance
    cnn_total_correct = 0
    resnet_total_correct = 0
    mobilenet_total_correct = 0
    ensemble_total_correct = 0
    all_agree_count = 0
    images_with_all_responses = 0

    for img_id, results in comparison_map.items():
        if 'cnn' in results and 'resnet' in results and 'mobilenet' in results:
            images_with_all_responses += 1

            cnn_res = results['cnn']
            res_res = results['resnet']
            mob_res = results['mobilenet']
            true_label = cnn_res['true_label']

            # Individual accuracy
            if cnn_res['predicted_class'] == true_label:
                cnn_total_correct += 1
            if res_res['predicted_class'] == true_label:
                resnet_total_correct += 1
            if mob_res['predicted_class'] == true_label:
                mobilenet_total_correct += 1

            # Check agreement
            preds = [cnn_res['predicted_class'], res_res['predicted_class'], mob_res['predicted_class']]
            if len(set(preds)) == 1:
                all_agree_count += 1

            # Stage 2 enhancement 2.2: Soft voting ensemble
            # Average probability vectors from all 3 agents
            cnn_probs = np.array(cnn_res['probabilities'])
            resnet_probs = np.array(res_res['probabilities'])
            mobilenet_probs = np.array(mob_res['probabilities'])
            ensemble_probs = (cnn_probs + resnet_probs + mobilenet_probs) / 3.0
            ensemble_pred = int(np.argmax(ensemble_probs))

            if ensemble_pred == true_label:
                ensemble_total_correct += 1

    return {
        'total': images_with_all_responses,
        'cnn_correct': cnn_total_correct,
        'resnet_correct': resnet_total_correct,
        'mobilenet_correct': mobilenet_total_correct,
        'ensemble_correct': ensemble_total_correct,
        'all_agree_count': all_agree_count,
        'comparison_map': dict(comparison_map)
    }

print("Judge Agent function ready (3-agent soft voting)")
time.sleep(2)

# Step 8: Run All Agents with Threading

In [None]:
# Step 8: Orchestration (3 agents + Judge)

import threading

def run_complete_system(num_test_images=500):
    final_output = []

    def judge_wrapper():
        res = run_judge_agent(num_images=num_test_images)
        final_output.append(res)

    cnn_thread = threading.Thread(
        target=run_cnn_agent,
        args=(num_test_images,),
        daemon=True
    )
    resnet_thread = threading.Thread(
        target=run_resnet_agent,
        args=(num_test_images,),
        daemon=True
    )
    mobilenet_thread = threading.Thread(
        target=run_mobilenet_agent,
        args=(num_test_images,),
        daemon=True
    )
    judge_thread = threading.Thread(target=judge_wrapper)

    # Start all agent threads
    cnn_thread.start()
    resnet_thread.start()
    mobilenet_thread.start()
    time.sleep(2)

    # Start and wait for judge
    judge_thread.start()
    judge_thread.join()
    time.sleep(2)

    return final_output[0] if final_output else None

print("System orchestration function ready (3 agents)!")

# Stage 1 improvement 1.9: Kafka restart removed
Kafka is managed as an external system service. No restart cell needed.
See [RUN_ON_VPS.md](RUN_ON_VPS.md) for Kafka management instructions.

# Step 10: Execute the System & Visualize Performance

In [None]:
# Step 10: Full Scale Test and Visualization (3 agents + ensemble)

import matplotlib.pyplot as plt
import logging

logging.getLogger('kafka').setLevel(logging.ERROR)

if __name__ == "__main__":
    TOTAL_IMAGES = 500
    print(f"Executing Broker Test: Processing {TOTAL_IMAGES} images across 3 agents...")
    stats = run_complete_system(num_test_images=TOTAL_IMAGES)

    if stats:
        actual_total = stats.get('total', TOTAL_IMAGES)
        all_agree = stats.get('all_agree_count', 0)

        labels = ['CNN', 'ResNet', 'MobileNet', 'Ensemble\n(Soft Vote)']
        counts = [
            stats['cnn_correct'],
            stats['resnet_correct'],
            stats['mobilenet_correct'],
            stats['ensemble_correct']
        ]
        accuracies = [(c / actual_total) * 100 for c in counts]

        fig, axes = plt.subplots(1, 2, figsize=(16, 6))

        # Accuracy bar chart
        colors = ['#3498db', '#e74c3c', '#f39c12', '#2ecc71']
        bars = axes[0].bar(labels, accuracies, color=colors, edgecolor='black', alpha=0.8)

        for bar, count in zip(bars, counts):
            height = bar.get_height()
            axes[0].text(bar.get_x() + bar.get_width()/2., height + 1,
                     f'{height:.1f}%\n({count}/{actual_total})',
                     ha='center', va='bottom', fontweight='bold')

        axes[0].set_title(f'3-Agent Classification Performance (n={actual_total})',
                          fontsize=14, fontweight='bold')
        axes[0].set_ylabel('Accuracy %')
        axes[0].set_ylim(0, 115)
        axes[0].grid(axis='y', linestyle='--', alpha=0.7)

        # Agreement pie chart
        disagree = actual_total - all_agree
        axes[1].pie([all_agree, disagree],
                    labels=[f'All 3 Agreed\n({all_agree})', f'Disagreed\n({disagree})'],
                    colors=['#2ecc71', '#e67e22'], autopct='%1.1f%%',
                    startangle=90, textprops={'fontweight': 'bold'})
        axes[1].set_title(f'3-Agent Agreement Rate (n={actual_total})',
                          fontsize=14, fontweight='bold')

        plt.tight_layout()
        plt.show()

        print(f"\nSummary:")
        print(f"  CNN Accuracy:      {accuracies[0]:.1f}% ({counts[0]}/{actual_total})")
        print(f"  ResNet Accuracy:   {accuracies[1]:.1f}% ({counts[1]}/{actual_total})")
        print(f"  MobileNet Accuracy:{accuracies[2]:.1f}% ({counts[2]}/{actual_total})")
        print(f"  Ensemble Accuracy: {accuracies[3]:.1f}% ({counts[3]}/{actual_total})")
        print(f"  All 3 Agreed:      {all_agree}/{actual_total} ({all_agree/actual_total*100:.1f}%)")

# Step 11: Export Results & Statistical Analysis (Stage 2 Enhancement 2.10)

In [None]:
# Stage 2 enhancement 2.10: CSV export and statistical analysis
from scipy import stats

if stats is not None and 'comparison_map' in stats:
    comparison_map = stats['comparison_map']

    # Build results dataframe
    all_results = []
    for img_id, results in comparison_map.items():
        if 'cnn' in results and 'resnet' in results and 'mobilenet' in results:
            row = {
                'image_id': img_id,
                'true_label': results['cnn']['true_label'],
                'true_class': results['cnn']['true_class'],
                'cnn_prediction': results['cnn']['predicted_name'],
                'cnn_confidence': results['cnn']['confidence'],
                'cnn_correct': int(results['cnn']['predicted_class'] == results['cnn']['true_label']),
                'cnn_time_ms': results['cnn']['inference_time_ms'],
                'resnet_prediction': results['resnet']['predicted_name'],
                'resnet_confidence': results['resnet']['confidence'],
                'resnet_correct': int(results['resnet']['predicted_class'] == results['resnet']['true_label']),
                'resnet_time_ms': results['resnet']['inference_time_ms'],
                'mobilenet_prediction': results['mobilenet']['predicted_name'],
                'mobilenet_confidence': results['mobilenet']['confidence'],
                'mobilenet_correct': int(results['mobilenet']['predicted_class'] == results['mobilenet']['true_label']),
                'mobilenet_time_ms': results['mobilenet']['inference_time_ms'],
            }
            all_results.append(row)

    df = pd.DataFrame(all_results)
    df.to_csv('classification_results.csv', index=False)
    print(f"Exported {len(df)} results to classification_results.csv")

    # McNemar's test: CNN vs ResNet
    print("\n--- McNemar's Test: CNN vs ResNet ---")
    cnn_only = ((df['cnn_correct'] == 1) & (df['resnet_correct'] == 0)).sum()
    resnet_only = ((df['cnn_correct'] == 0) & (df['resnet_correct'] == 1)).sum()

    if cnn_only + resnet_only > 0:
        mcnemar_stat = (abs(cnn_only - resnet_only) - 1)**2 / (cnn_only + resnet_only)
        mcnemar_pvalue = 1 - stats.chi2.cdf(mcnemar_stat, df=1)
        print(f"  CNN correct, ResNet wrong: {cnn_only}")
        print(f"  ResNet correct, CNN wrong: {resnet_only}")
        print(f"  Chi-squared: {mcnemar_stat:.4f}, p-value: {mcnemar_pvalue:.4f}")
        if mcnemar_pvalue < 0.05:
            print("  Result: Models are SIGNIFICANTLY different (p < 0.05)")
        else:
            print("  Result: No significant difference (p >= 0.05)")
    else:
        print("  Models made identical predictions on all images")

    # McNemar's test: CNN vs MobileNet
    print("\n--- McNemar's Test: CNN vs MobileNet ---")
    cnn_only_m = ((df['cnn_correct'] == 1) & (df['mobilenet_correct'] == 0)).sum()
    mobilenet_only = ((df['cnn_correct'] == 0) & (df['mobilenet_correct'] == 1)).sum()

    if cnn_only_m + mobilenet_only > 0:
        mcnemar_stat = (abs(cnn_only_m - mobilenet_only) - 1)**2 / (cnn_only_m + mobilenet_only)
        mcnemar_pvalue = 1 - stats.chi2.cdf(mcnemar_stat, df=1)
        print(f"  CNN correct, MobileNet wrong: {cnn_only_m}")
        print(f"  MobileNet correct, CNN wrong: {mobilenet_only}")
        print(f"  Chi-squared: {mcnemar_stat:.4f}, p-value: {mcnemar_pvalue:.4f}")
        if mcnemar_pvalue < 0.05:
            print("  Result: Models are SIGNIFICANTLY different (p < 0.05)")
        else:
            print("  Result: No significant difference (p >= 0.05)")
    else:
        print("  Models made identical predictions on all images")

    # Inference time comparison
    print("\n--- Average Inference Time (ms) ---")
    print(f"  CNN:       {df['cnn_time_ms'].mean():.2f} ms (std: {df['cnn_time_ms'].std():.2f})")
    print(f"  ResNet:    {df['resnet_time_ms'].mean():.2f} ms (std: {df['resnet_time_ms'].std():.2f})")
    print(f"  MobileNet: {df['mobilenet_time_ms'].mean():.2f} ms (std: {df['mobilenet_time_ms'].std():.2f})")

    # Inference time box plot
    fig, ax = plt.subplots(figsize=(10, 5))
    ax.boxplot([df['cnn_time_ms'], df['resnet_time_ms'], df['mobilenet_time_ms']],
               labels=['CNN', 'ResNet', 'MobileNet'],
               patch_artist=True,
               boxprops=dict(facecolor='lightblue'))
    ax.set_title('Inference Time Distribution by Agent', fontsize=14, fontweight='bold')
    ax.set_ylabel('Inference Time (ms)')
    ax.grid(axis='y', linestyle='--', alpha=0.7)
    plt.show()
else:
    print("No results to export. Run the system first.")