In [None]:
# Core Libraries
import os
import glob
import random
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

# Image Processing
from skimage.filters import sobel

# TensorFlow / Keras
import tensorflow as tf
from tensorflow.keras import layers, models, Model, regularizers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Common Layers
from tensorflow.keras.layers import (
    Dense, Dropout, Flatten, Input,
    Conv2D, SeparableConv2D, Conv2DTranspose,
    Concatenate, Add, Activation,
    GlobalAveragePooling2D, GlobalMaxPooling2D,
    MaxPooling2D, AveragePooling2D, BatchNormalization
)

# Pretrained Models
from tensorflow.keras.applications import (
    VGG16, VGG19, ResNet50, ResNet101,
    EfficientNetB0, EfficientNetB2, EfficientNetB3,
    EfficientNetB4, EfficientNetB5, EfficientNetB6,
    InceptionResNetV2, MobileNetV2
)

# Metrics
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

# Visualization
import visualkeras


In [None]:
####  Transition Layer  ######
def transition_block(inputs):
    x = tf.keras.layers.SeparableConv2D(filters=64, kernel_size=1)(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.SeparableConv2D(filters=128, kernel_size=3, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.SeparableConv2D(filters=256, kernel_size=1)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)
    #x = tf.keras.layers.UpSampling2D((2,2))(x)
    x = tf.keras.layers.Conv2DTranspose(filters=256, kernel_size=(3, 3), strides=(2, 2), padding='same')(x)
    x = tf.keras.layers.MaxPooling2D((2,2))(x)
    

   
    return x

def additionalL(inputs):
    x = tf.keras.layers.SeparableConv2D(filters=256, kernel_size=1)(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    return x

In [None]:

def transition_layer(inputs):
    x = Conv2D(512, (1, 1), padding='valid', activation='relu')(inputs)
    while x.shape[1] > 4:  # Adjust to your desired output size
        x = Conv2D(512, (3, 3), strides=(2, 2), padding='same', activation='relu')(x)
    return x

In [None]:
def VM(input_size = (128,128,3),input = input):
    vision_model = vit.vit_b16(input_size[0])(input)
    M= tf.keras.Model(inputs=input, outputs=vision_model)
    return M

In [None]:
from tensorflow.keras.applications import VGG16, ResNet50, VGG19,ResNet101
from vit_keras import vit, utils

In [None]:
def make_model(input_size=(128, 128, 3)):
    input_tensor = Input(shape=input_size)

    # Load VGG19 model without top layers and freeze its weights
    base_model1 = VGG19(include_top=False, weights='imagenet', pooling='max',
                        input_shape=input_size, input_tensor=input_tensor)
    for layer in base_model1.layers:
        layer.trainable = False

    # Load ResNet50 model without top layers and freeze its weights
    base_model2 = ResNet50(include_top=False, weights='imagenet', pooling='max',
                           input_shape=input_size, input_tensor=input_tensor)
    for layer in base_model2.layers:
        layer.trainable = False

    # Load Vision Transformer output
    vision_output = VM(input_size, input_tensor)

    # Extract intermediate layers from VGG19 for bi-fusion and feature fusion
    last_layer11 = base_model1.get_layer('block3_conv2').output
    last_layer12 = base_model1.get_layer('block4_conv3').output
    last_layer13 = base_model1.get_layer('block5_conv1').output
    last_layer14 = base_model1.get_layer('block5_conv3').output
    last_layer15 = base_model1.get_layer('block5_pool').output

    # Extract intermediate layers from ResNet50 for bi-fusion and feature fusion
    last_layer21 = base_model2.get_layer('conv2_block1_out').output
    last_layer22 = base_model2.get_layer('conv3_block4_out').output
    last_layer23 = base_model2.get_layer('conv4_block4_out').output
    last_layer24 = base_model2.get_layer('conv4_block6_out').output
    last_layer25 = base_model2.get_layer('conv5_block3_out').output

    # Apply transition block 2 to VGG19 feature maps to get uniform 4x4x256 shape
    vgg1 = transition_layer(last_layer11)
    vgg2 = transition_layer(last_layer12)
    vgg3 = transition_layer(last_layer13)
    vgg4 = transition_layer(last_layer14)

    # Apply transition block 2 to ResNet50 feature maps to get uniform 4x4x256 shape
    resnet1 = transition_layer(last_layer21)
    resnet2 = transition_layer(last_layer22)
    resnet3 = transition_layer(last_layer23)
    resnet4 = transition_layer(last_layer24)

    # Bi-fusion of corresponding VGG19 and ResNet50 features
    merge1 = tf.keras.layers.Concatenate(axis=-1)([vgg1, resnet1])
    merge2 = tf.keras.layers.Concatenate(axis=-1)([vgg2, resnet2])
    merge3 = tf.keras.layers.Concatenate(axis=-1)([vgg3, resnet3])
    merge4 = tf.keras.layers.Concatenate(axis=-1)([vgg4, resnet4])

    # Feature fusion: combine merged features to form left and right feature streams
    outputL = tf.keras.layers.Concatenate(axis=-1)([merge1, merge3])
    outputR = tf.keras.layers.Concatenate(axis=-1)([merge2, merge4])

    # Passing left fused features through Transition Block 1
    outputLT = transition_block(outputL)
    outputLA = additionalL(outputL)
    outputL = tf.keras.layers.Concatenate(axis=-1)([outputLT, outputLA])

    # Passing right fused features through Transition Block 1
    outputRT = transition_block(outputR)
    outputRA = additionalL(outputR)
    outputR = tf.keras.layers.Concatenate(axis=-1)([outputRT, outputRA])

    # Combine left and right streams
    output2 = tf.keras.layers.Concatenate(axis=-1)([outputL, outputR])

    # Apply Transition Block 1 to ResNet50 final feature map
    resnetT = transition_block(last_layer25)
    resnetA = additionalL(last_layer25)
    resnetout = tf.keras.layers.Concatenate(axis=-1)([resnetT, resnetA])

    # Merge VGG19 final pooling layer with processed ResNet50 output
    output1 = tf.keras.layers.Concatenate(axis=-1)([last_layer15, resnetout])

    # Merge output1 and output2 to form ensemble feature representation
    output_ensemble = tf.keras.layers.Concatenate(axis=-1)([output1, output2])

    # Flatten ensemble features before dense layers
    flatten = Flatten()(output_ensemble)
    dense1 = Dense(256, activation='relu')(flatten)

    # Dense layers for Vision Transformer output
    vision_dense = Dense(256, activation='relu')(vision_output)

    # Merge CNN dense features with Vision Transformer features
    combined_dense = Concatenate()([dense1, vision_dense])

    # Fully connected layers with dropout for regularization
    combined_dense = Dropout(0.2)(combined_dense)
    combined_dense = Dense(128, activation='relu')(combined_dense)
    combined_dense = Dropout(0.3)(combined_dense)
    combined_dense = Dense(64, activation='relu')(combined_dense)

    # Final classification layer with 5 classes
    model_output = Dense(5, activation='softmax', dtype='float32')(combined_dense)

    # Build and compile the model
    M = Model(inputs=input_tensor, outputs=model_output)
    M.compile(optimizer=Adamax(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    return M


In [None]:
# ###########  For batch size 128 ###########
data = tf.keras.utils.image_dataset_from_directory(directory = '/kaggle/input/cervical-preprocessed/Processed Dataset',
                                                   color_mode = 'rgb',
                                                   batch_size = 128,
                                                   image_size = (128,128),
                                                   shuffle=True,)

In [None]:
model = make_model()
# #model.summary()

In [None]:
leng = len(data)
n_folds = 5  # number of folds here

train_size = int(0.7 * leng)
test_size = int(0.3 * leng)

train = data.take(train_size)

remaining2 = data.skip(train_size)
test = remaining2.take(test_size)


# calculate the size of each fold
fold_size = train.cardinality().numpy() // n_folds

# initialize lists to store the folds and remainders
folds = []
remainders = [train]

# loop over the number of folds and create each fold
for i in range(n_folds):
    # take the first `fold_size` samples from the current remainder to create a fold
    fold = remainders[-1].take(fold_size)
    
    # append the fold to the list of folds
    folds.append(fold)
    
    # skip the samples in the fold to create the next remainder
    remainder = remainders[-1].skip(fold_size)
    
    # append the remainder to the list of remainders
    remainders.append(remainder)




################## Test Itera ####################

test_iter = test.as_numpy_iterator()

test_set = {"images":np.empty((0,128,128,3)), "labels":np.empty(0)}
while True:
    try:
        batch = test_iter.next()
        test_set['images'] = np.concatenate((test_set['images'], batch[0]))
        test_set['labels'] = np.concatenate((test_set['labels'], batch[1]))
    except:
        break

y_true = test_set['labels']

In [None]:

# early_stop = EarlyStopping(
#     monitor="val_accuracy", 
#     patience=10,
#     verbose=1,
#     mode="max",
#     restore_best_weights=True, 
# )



for i in range(0,5):

    model = make_model()
    f1_scores = []
    sn_scores = []
    ppv_scores = []
    test_accuracies = []
    test_loss = []
    kappa_scores = []
    specificity_scores = []
    print("Fold: ",i)
    # create a list of folds to use as the training set
    train_folds = [f for j, f in enumerate(folds) if j != i]

    # concatenate the training folds into a single dataset
    train_data = tf.data.experimental.sample_from_datasets(train_folds)

    # train the model using the training data and the current fold as the validation data
    history = model.fit(train_data, epochs = 40, validation_data=folds[i])
    model.save(f"/kaggle/working/model_fold_{i+1}.h5")

    # Evaluate the model on the test set
    test_results = model.evaluate(test)

    # Extract the evaluation metrics
    test_loss_value = test_results[0]
    test_accuracy = test_results[1]



    # Perform necessary calculations for the evaluation metrics (e.g., F1-score, Sensitivity, PPV, Kappa)
    y_pred = model.predict(test_set['images'])
    y_pred_classes = np.argmax(y_pred, axis=1)
    f1_score1 = f1_score(y_true, y_pred_classes, average='macro')
    sn = recall_score(y_true, y_pred_classes, average='macro')
    ppv = precision_score(y_true, y_pred_classes, average='macro')
    kappa = cohen_kappa_score(y_true, y_pred_classes)
    conf_matrix = confusion_matrix(y_true, y_pred_classes)

    # Extract TN (True Negatives) and FP (False Positives) from the confusion matrix
    TN = conf_matrix[0, 0]
    FP = conf_matrix[0, 1]

    # Calculate specificity
    specificity = TN / (TN + FP)

    # Store the evaluation metrics for the current fold
    f1_scores.append(f1_score1)
    sn_scores.append(sn)
    ppv_scores.append(ppv)
    test_accuracies.append(test_accuracy)
    test_loss.append(test_loss_value)
    kappa_scores.append(kappa)
    specificity_scores.append(specificity)



    # Get the accuracy, loss, recall, and sensitivity for each epoch
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    

    # Write the accuracy, loss, recall, and sensitivity to a CSV file
    output_file_path = '/kaggle/working/accuracy_loss_recall_sensitivity.csv'

    with open(output_file_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Epoch', 'Train-Accuracy', 'Validation Accuracy', 'Train-Loss', 'Validation Loss', 'F1-Score', 'Sensitivity','Precition','Test-Acc','Test-Loss','Kappa','Specificity'])
            for j in range(len(acc)):
                writer.writerow([j+1, acc[j], val_acc[j], loss[j], val_loss[j], f1_scores, sn_scores,ppv_scores,test_accuracies,test_loss,kappa_scores,specificity_scores])

    destination_file_path = '/kaggle/working/Bifusion_(-vit)_model_Fold ' + str(i+1) + '.csv'
    shutil.copy(output_file_path, destination_file_path)

In [None]:
# # Calculate FLOPs
# flops = get_flops(model, batch_size=1)
# print(f"FLOPs: {flops / 10**9:.03} G")