In [3]:
import cv2
import numpy as np
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt 
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import roc_auc_score, confusion_matrix, classification_report, roc_curve, auc
from sklearn.utils import class_weight
from tensorflow.keras.preprocessing import image
import numpy as np
import os
import json
import random
# remove warnings
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Step 1: Load the JSON file and handle multiple JSON objects
def load_json_data(json_path, limit=10000):
    data = []
    with open(json_path, 'r') as f:
        for line in f:  # Read file line by line in case it's not an array
            try:
                data.append(json.loads(line))  # Load each JSON object line by line
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
    
    # Shuffle the data and take a subset of size 'limit'
    random.shuffle(data)
    data = data[:limit]  # Select only the first 'limit' samples
    return data

In [None]:
json_file_path = 'photos.json'  # Update with the correct path
json_data = load_json_data(json_file_path, limit=10000)  # Limit to 10,000 samples
# Step 2: Add Preprocessing Functions (Detailed Preprocessing)

def apply_threshold(image, threshold_value=127):
    if len(image.shape) == 3 and image.shape[2] == 3:
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray_image = image
    
    _, thresh_image = cv2.threshold(gray_image, threshold_value, 255, cv2.THRESH_BINARY)
    return thresh_image
def apply_histogram_equalization(image):
    if len(image.shape) == 3 and image.shape[2] == 3:
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray_image = image
    
    equalized_image = cv2.equalizeHist(gray_image)
    return equalized_image
def apply_gaussian_blur(image, kernel_size=(5, 5)):
    blurred_image = cv2.GaussianBlur(image, kernel_size, 0)
    return blurred_image
def detailed_preprocessing(image):
    thresholded_image = apply_threshold(image)
    equalized_image = apply_histogram_equalization(thresholded_image)
    blurred_image = apply_gaussian_blur(equalized_image)
    return blurred_image

In [None]:
# Step 3: Load Images from JSON metadata and apply preprocessing

def load_images_with_preprocessing(image_dir, json_data, img_size=(224, 224)):
    images = []
    labels = []
    
    for item in json_data:
        photo_id = item['photo_id']
        label = item['label']
        
        image_path = os.path.join(image_dir, f"{photo_id}.jpg")
        if os.path.exists(image_path):
            image = cv2.imread(image_path, cv2.IMREAD_COLOR)
            
            if image is None:
                print(f"Warning: Failed to load image {image_path}. Skipping.")
                continue

            try:
                image = cv2.resize(image, img_size)
            except Exception as e:
                print(f"Error resizing image {image_path}: {e}")
                continue
            
            image = detailed_preprocessing(image)
            image = image.astype('float32') / 255.0
            
            images.append(image)
            labels.append(label)
        else:
            print(f"Image {image_path} not found.")
    
    images = np.array(images)
    labels = np.array(labels)
    
    if images.ndim == 3:
        images = np.expand_dims(images, axis=-1)
    
    return images, labels

In [None]:
# Directory where the photos are stored
image_dir = './photos'

# Load images and labels from the JSON data
images, labels = load_images_with_preprocessing(image_dir, json_data)

In [None]:
labels

In [None]:
# Step 4: Convert Labels to Numerical Values using LabelEncoder
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(labels)
# Step 5: Split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(images, y_encoded, test_size=0.2, random_state=42)

if X_train.shape[-1] == 1:
    X_train = np.repeat(X_train, 3, axis=-1)
    X_test = np.repeat(X_test, 3, axis=-1)
# Step 6: Data Augmentation
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)
test_datagen = ImageDataGenerator()
train_generator = train_datagen.flow(X_train, y_train, batch_size=32)
test_generator = test_datagen.flow(X_test, y_test, batch_size=32)

In [None]:
# Step 7: Build CNN Model
def build_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model
# Compute class weights
class_weights = class_weight.compute_class_weight('balanced',
                                                  classes=np.unique(y_train),
                                                  y=y_train)
class_weight_dict = dict(enumerate(class_weights))
print(f"Class Weights: {class_weight_dict}")

In [None]:
# Step 8: Train and Evaluate Final Model

final_model = build_cnn_model()

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model with class weights
final_model.fit(train_generator,
                validation_data=test_generator,
                epochs=3,
                callbacks=[early_stopping],
                class_weight=class_weight_dict)  # Pass class weights

In [None]:
# Step 9: Evaluate the Model

# Evaluate on test data
test_loss, test_acc = final_model.evaluate(test_generator)
print(f"Final Test Accuracy: {test_acc * 100:.2f}%")
old_acc = test_acc

In [None]:
# Step 1: Check the number of unique classes
num_classes = len(np.unique(y_test))
print(f"Number of classes in dataset: {num_classes}")

In [None]:
# Step 2: Ensure y_test is one-hot encoded for AUC calculation
y_test_one_hot = to_categorical(y_test, num_classes=num_classes)
# Step 3: Predict test set probabilities and labels
y_pred_prob = final_model.predict(X_test)  # Probabilities for each class
y_pred = np.argmax(y_pred_prob, axis=1)    # Get the predicted class labels

In [None]:
# Debug: Check shapes
print(f"Shape of y_test_one_hot: {y_test_one_hot.shape}")
print(f"Shape of y_pred_prob: {y_pred_prob.shape}")

In [None]:
# Step 4: AUC-ROC Curve for each class
fpr = {}
tpr = {}
roc_auc = {}

plt.figure(figsize=(10, 8))

for i in range(num_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test_one_hot[:, i], y_pred_prob[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
    plt.plot(fpr[i], tpr[i], label=f'Class {i} (area = {roc_auc[i]:.2f})')

# Plotting AUC-ROC Curve
plt.plot([0, 1], [0, 1], 'k--', lw=2)  # Dashed diagonal line
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('AUC-ROC Curve for Multi-Class')
plt.legend(loc="lower right")
plt.show()

In [None]:
# Step 5: Confusion Matrix
conf_matrix = confusion_matrix(y_test, y_pred)

# Plot Confusion Matrix using Seaborn heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=np.unique(y_test), yticklabels=np.unique(y_test))
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

old_loss,old_acc = final_model.evaluate(X_test, y_test, verbose=1) 

In [None]:
# Step 6: Classification Report
print("Classification Report:")
print(classification_report(y_test, y_pred))

In [None]:
# Model Hyperperameter Tuning

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001)

# Re-train the model with ReduceLROnPlateau
final_model.fit(train_generator,validation_data=test_generator,epochs=3,callbacks=[early_stopping, reduce_lr],class_weight=class_weight_dict)
# Increase the number of epochs and Added ReduceLROnPlateau

In [None]:
kf = KFold(n_splits=3, shuffle=True, random_state=42) 

fold_no = 1
for train_index, val_index in kf.split(X_train):    
    print(f'Training fold no.{fold_no}...')         
    X_train_fold, X_val_fold = X_train[train_index], X_train[val_index]    
    y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]       
    # Train the model on the current fold
    final_model = build_cnn_model() 
    # Re-initialize model for each fold
    final_model.fit(X_train_fold, y_train_fold, epochs=3, validation_data=(X_val_fold, y_val_fold), class_weight=class_weight_dict)
    # Evaluate the model on the validation set of the current fold
    val_loss, val_acc = final_model.evaluate(X_val_fold, y_val_fold)
    print(f'Fold{fold_no}- Validation Accuracy:{val_acc * 100:.2f}%') 
    fold_no += 1

In [None]:
history = final_model.fit(train_generator,
                          validation_data=test_generator,
                          epochs=3,  # Set your desired number of epochs
                          callbacks=[early_stopping, reduce_lr],
                          class_weight=class_weight_dict)

In [None]:
# Visualize training vs validation accuracy and loss curves
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training vs Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'],label='Training Loss')
plt.plot(history.history['val_loss'],label='Validation Loss')
plt.title('Training vs Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
new_loss,new_acc = final_model.evaluate(X_test, y_test, verbose=1) 

print("Model Accuracy before Tuning",test_acc,"Model Tuning After Tuning",new_acc)

In [None]:
# Function to predict random images from the dataset and show results
def predict_random_images(generator, model, label_encoder, num_images=4):
    # Get a batch of images and true labels
    images, true_labels = next(generator)  # Get one batch of images from the generator
    
    # Predict using the trained model
    predictions = model.predict(images)

    # Convert predicted probabilities to class indices
    predicted_labels = np.argmax(predictions, axis=1)
    
    # Convert true labels to class indices if they are one-hot encoded
    if true_labels.ndim == 2:  # for one-hot encoding
        true_labels = np.argmax(true_labels, axis=1)
    
    # Get the class labels using the label_encoder
    class_labels = label_encoder.inverse_transform(range(len(label_encoder.classes_)))

    # Plot random images with true labels and predictions
    plt.figure(figsize=(8,8))
    for i in range(num_images):
        random_index = random.randint(0, len(images) - 1)  # Pick a random image from the batch
        plt.subplot(2, num_images // 2, i + 1)
        plt.imshow(images[random_index])
        plt.title(f"True: {class_labels[true_labels[random_index]]}\nPred: {class_labels[predicted_labels[random_index]]}")
        plt.axis('off')
    plt.show()

predict_random_images(train_generator, final_model, label_encoder, num_images=4)