In [3]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import Xception
from tensorflow.keras.layers import (
    GlobalAveragePooling2D, Dense, Dropout, Input, Conv2D, BatchNormalization,
    Activation, Add, Multiply, Flatten
)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.callbacks import (
    EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, TensorBoard
)
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# Define constants and hyperparameters
CONFIG = {
    'INPUT_IMAGE_SIZE': (75, 75),
    'TRAIN_BATCH_SIZE': 128,
    'TEST_BATCH_SIZE': 128,
    'EPOCHS': 15,
    'LEARNING_RATE': 0.001,
    'MOMENTUM': 0.8,
    'NUM_CLASSES': 7,
}

# Define the transfer learning model using Xception with added attention mechanism
def create_transfer_model(input_shape, num_classes):
    base_model = Xception(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = True  # Fine-tune the entire model

    # Ensure the attention layer has the same number of filters as the base model output
    num_filters = base_model.output.shape[-1]

    # Add attention mechanism
    attention_layer = Conv2D(num_filters, kernel_size=(1, 1), padding='same')(base_model.output)
    attention_layer = BatchNormalization()(attention_layer)
    attention_layer = Activation('sigmoid')(attention_layer)
    attention_layer = Multiply()([base_model.output, attention_layer])

    x = GlobalAveragePooling2D()(attention_layer)
    x = Dense(1024, activation='relu')(x)
    x = Dropout(0.4)(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=base_model.input, outputs=outputs)

    # Use SGD optimizer with momentum
    optimizer = SGD(learning_rate=CONFIG['LEARNING_RATE'], momentum=CONFIG['MOMENTUM'])
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

    return model


# Load and preprocess the data
def load_data(train_dir, test_dir, img_size, train_batch_size, test_batch_size):
    # Increased data augmentation for the training set
    train_datagen = ImageDataGenerator(
        rescale=1.0/255.0,
        rotation_range=20,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        fill_mode='nearest'
    )
    test_datagen = ImageDataGenerator(rescale=1.0/255.0)

    train_data = train_datagen.flow_from_directory(
        train_dir,
        target_size=img_size,
        color_mode='rgb',
        batch_size=train_batch_size,
        class_mode='categorical',
        shuffle=True
    )

    test_data = test_datagen.flow_from_directory(
        test_dir,
        target_size=img_size,
        color_mode='rgb',
        batch_size=test_batch_size,
        class_mode='categorical',
        shuffle=False
    )

    return train_data, test_data

# Define callbacks
def create_callbacks():
    log_dir = "logs/fit/xception_" + datetime.now().strftime("%Y%m%d-%H%M%S")
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, verbose=1),
        ModelCheckpoint('best_xception_model.keras', monitor='val_accuracy', save_best_only=True, verbose=1),
        TensorBoard(log_dir=log_dir, histogram_freq=1)
    ]
    return callbacks

# Function to train and evaluate the model
def train_and_evaluate_model(model, train_data, test_data, epochs, callbacks):
    history = model.fit(
        train_data,
        epochs=epochs,
        validation_data=test_data,
        callbacks=callbacks,
        verbose=1
    )
    return history

# Function to plot training and validation metrics
def plot_metrics(history):
    accuracy = history.history['accuracy']
    val_accuracy = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs_range = range(len(accuracy))

    plt.figure(figsize=(14, 5))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, accuracy, label='Training Accuracy')
    plt.plot(epochs_range, val_accuracy, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.show()

# Function to evaluate the model and display classification metrics
def evaluate_model(model, test_data):
    test_loss, test_accuracy = model.evaluate(test_data, verbose=0)
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

    predictions = model.predict(test_data, verbose=0)
    predicted_classes = np.argmax(predictions, axis=1)
    true_classes = test_data.classes
    class_labels = list(test_data.class_indices.keys())

    print("Classification Report:")
    print(classification_report(true_classes, predicted_classes, target_names=class_labels))

    cm = confusion_matrix(true_classes, predicted_classes)
    plt.figure(figsize=(10, 10))
    sns.heatmap(cm, annot=True, fmt='g', xticklabels=class_labels, yticklabels=class_labels)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

# Main script
def main():
    train_dir = 'data/train'
    test_dir = 'data/test'

    train_data, test_data = load_data(
        train_dir,
        test_dir,
        img_size=CONFIG['INPUT_IMAGE_SIZE'],
        train_batch_size=CONFIG['TRAIN_BATCH_SIZE'],
        test_batch_size=CONFIG['TEST_BATCH_SIZE']
    )
    

    model = create_transfer_model(input_shape=(*CONFIG['INPUT_IMAGE_SIZE'], 3), num_classes=CONFIG['NUM_CLASSES'])
    callbacks = create_callbacks()
    history = train_and_evaluate_model(model, train_data, test_data, CONFIG['EPOCHS'], callbacks)
    plot_metrics(history)
    evaluate_model(model, test_data)
    model.save('xception_model.h5')

if __name__ == '__main__':
    main()

Found 28709 images belonging to 7 classes.
Found 7178 images belonging to 7 classes.
Epoch 1/15


Epoch 1: val_accuracy improved from -inf to 0.24575, saving model to best_xception_model.keras
Epoch 2/15
Epoch 2: val_accuracy improved from 0.24575 to 0.24909, saving model to best_xception_model.keras
Epoch 3/15
Epoch 3: val_accuracy improved from 0.24909 to 0.26191, saving model to best_xception_model.keras
Epoch 4/15
Epoch 4: val_accuracy improved from 0.26191 to 0.31443, saving model to best_xception_model.keras
Epoch 5/15
Epoch 5: val_accuracy improved from 0.31443 to 0.37197, saving model to best_xception_model.keras
Epoch 6/15
Epoch 6: val_accuracy improved from 0.37197 to 0.39496, saving model to best_xception_model.keras
Epoch 7/15
Epoch 7: val_accuracy improved from 0.39496 to 0.42268, saving model to best_xception_model.keras
Epoch 8/15
Epoch 8: val_accuracy improved from 0.42268 to 0.42839, saving model to best_xception_model.keras
Epoch 9/15
Epoch 9: val_accuracy improved fr

In [None]:
# Grad-CAM visualization
import cv2
from tensorflow.keras.models import Model
from tensorflow.keras.applications.xception import preprocess_input

def generate_grad_cam(model, image_path, layer_name='block14_sepconv2_act'):
    img = cv2.imread(image_path)
    img = cv2.resize(img, CONFIG['INPUT_IMAGE_SIZE'])
    img = np.expand_dims(img, axis=0)
    img = preprocess_input(img)

    grad_model = Model(inputs=model.inputs, outputs=[model.get_layer(layer_name).output, model.output])

    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img)
        loss = predictions[:, np.argmax(predictions[0])]

    grads = tape.gradient(loss, conv_outputs)[0]
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    heatmap = tf.reduce_mean(tf.multiply(pooled_grads, conv_outputs), axis=-1)
    heatmap = np.maximum(heatmap, 0) / np.max(heatmap)
    heatmap = cv2.resize(heatmap[0], CONFIG['INPUT_IMAGE_SIZE'])
    heatmap = cv2.applyColorMap(np.uint8(255 * heatmap), cv2.COLORMAP_JET)

    img = cv2.imread(image_path)
    img = cv2.resize(img, CONFIG['INPUT_IMAGE_SIZE'])
    superimposed_img = cv2.addWeighted(img, 0.6, heatmap, 0.4, 0)

    return superimposed_img

# Error analysis
def analyze_misclassifications(model, test_data):
    misclassified_samples = []
    true_labels = []
    predicted_labels = []

    for images, labels in test_data:
        predictions = model.predict(images)
        predicted_classes = np.argmax(predictions, axis=1)
        true_classes = np.argmax(labels, axis=1)

        misclassified_mask = predicted_classes != true_classes
        misclassified_samples.extend(images[misclassified_mask])
        true_labels.extend(true_classes[misclassified_mask])
        predicted_labels.extend(predicted_classes[misclassified_mask])

    misclassified_samples = np.array(misclassified_samples)
    true_labels = np.array(true_labels)
    predicted_labels = np.array(predicted_labels)

    return misclassified_samples, true_labels, predicted_labels

def display_misclassifications(misclassified_samples, true_labels, predicted_labels, class_labels):
    fig, axes = plt.subplots(3, 3, figsize=(12, 12))
    for i, ax in enumerate(axes.flat):
        ax.imshow(misclassified_samples[i])
        ax.set_title(f"True: {class_labels[true_labels[i]]}, Predicted: {class_labels[predicted_labels[i]]}")
        ax.axis('off')
    plt.show()

# Load the best model and visualize Grad-CAM
best_model = tf.keras.models.load_model('best_xception_model.keras')
image_path = 'data/test/angry/angry1.jpg'
grad_cam_img = generate_grad_cam(best_model, image_path)
plt.imshow(grad_cam_img)
plt.axis('off')
plt.show()


test_data = load_data(
    'data/test',
    img_size=CONFIG['INPUT_IMAGE_SIZE'],
    train_batch_size=CONFIG['TEST_BATCH_SIZE'],
    test_batch_size=CONFIG['TEST_BATCH_SIZE']
)[1]

# Analyze misclassifications
class_labels = list(test_data.class_indices.keys())
misclassified_samples, true_labels, predicted_labels = analyze_misclassifications(best_model, test_data)
display_misclassifications(misclassified_samples, true_labels, predicted_labels, class_labels)