In [1]:
import tensorflow as tf
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.layers import Input, Dense, Flatten, concatenate, GlobalAveragePooling2D, multiply
import keras
from keras import layers
import numpy as np
from tensorflow.keras.preprocessing import image

from sklearn.metrics import confusion_matrix


In [2]:
from tensorflow.keras.backend import clear_session

clear_session()  # Clears any previous model state


In [3]:
# Your SE Block function
def se_block(in_block, ch, ratio=16):
    # y = GlobalAveragePooling2D()(in_block)
    y = Dense(ch // ratio, activation='relu')
    y = Dense(ch, activation='sigmoid')(y)
    return multiply([in_block, y])



In [4]:
denseNet201 = load_model("new1_denseNet201.keras")
vgg16 = load_model("new1_vgg16.keras")

In [5]:
# Extract feature maps from both models
denseNet201_features = denseNet201.output
vgg_features = vgg16.output

In [6]:
def attention_fusion(input1, input2):
    # Concatenate features from both models
    fused = concatenate([input1, input2])
    
    # Fully connected layer to learn attention weights
    attention = Dense(fused.shape[-1] // 2, activation='relu', name="attention_fc1")(fused)
    attention = Dense(fused.shape[-1], activation='sigmoid', name="attention_fc2")(attention)
    
    # Apply the attention weights
    return multiply([fused, attention])

# Apply attention fusion
fused_features = attention_fusion(denseNet201_features, vgg_features)


In [7]:
# Add a final Dense layer for classification

output = Dense(4, activation='softmax', name="final_dense_layer")(fused_features) 


# Define the final model
fusion_model = Model(inputs=[denseNet201.input, vgg16.input], outputs=output)


# Print model summary

fusion_model.summary(show_trainable=True)

In [8]:
layer = fusion_model.layers[-6]
layer.trainable = False
layer = fusion_model.layers[-7]
layer.trainable = False

fusion_model.summary(show_trainable=True)

In [23]:
dataset = keras.utils.image_dataset_from_directory(
    'small_kaggle',
    labels="inferred",
    label_mode="int",
    class_names=None,
    color_mode="rgb",
    batch_size=16,
    image_size=(150, 150),
    shuffle=True
   
)
fine_tune_ds = keras.utils.image_dataset_from_directory(
    'my_images',
    labels="inferred",
    label_mode="int",
    class_names=None,
    color_mode="rgb",
    batch_size=16,
    image_size=(150, 150),
    shuffle=True
)

Found 160 files belonging to 4 classes.
Found 160 files belonging to 4 classes.


In [24]:
# dataset=dataset.unbatch()
# training_ds, validation_ds = tf.keras.utils.split_dataset(
#     dataset, left_size=0.8, shuffle=False, seed=None
# )
# print(f"length training data set: {len(training_ds)}")
# print(f"length validation data set: {len(validation_ds)}")
# training_ds = training_ds.batch(16)
# validation_ds = validation_ds.batch(16)
# print(f"# of batch for training data set: {len(training_ds)}")
# print(f"# of batch for validation data set: {len(validation_ds)}")

In [25]:
# #Augmentation (flips, zoom...)

# augmentation_layers = [
#     layers.RandomFlip("horizontal_and_vertical"),
#     layers.RandomRotation(0.2),
#     layers.RandomZoom(0.2),                           
#     layers.RandomContrast(0.2),              
#     layers.RandomBrightness(0.2),             
# ]

# def augmentation_layer(x):
#     for layer in augmentation_layers:
#         x = layer(x)
#     return x

# training_ds = training_ds.map(lambda x, y: (augmentation_layer(x), y))


In [26]:


# Define augmentation layers
augmentation_layers = keras.Sequential([
    layers.RandomFlip("horizontal_and_vertical"),
    layers.RandomRotation(0.1),
    layers.RandomZoom(0.1),
    layers.RandomContrast(0.1),
    layers.RandomBrightness(0.1),
])

# Function to apply augmentation multiple times per image
def augment_multiple_times(image, label, num_augmentations=3):
    """Applies augmentation multiple times to each image."""
    augmented_images = [augmentation_layers(image) for _ in range(num_augmentations)]
    augmented_labels = [label] * num_augmentations  # Repeat labels
    return tf.data.Dataset.from_tensor_slices((augmented_images, augmented_labels))

# Apply augmentation multiple times and flatten the dataset
augmented_datasets = fine_tune_ds.flat_map(lambda x, y: augment_multiple_times(x, y, num_augmentations=3))
augmented_datasets1 = dataset.flat_map(lambda x, y: augment_multiple_times(x, y, num_augmentations=3))
# Combine original dataset with augmented dataset
expanded_training_ds = fine_tune_ds.concatenate(augmented_datasets)
expanded_training_ds1 = fine_tune_ds.concatenate(augmented_datasets1)
# Check dataset size
print("Original dataset size:", len(list(fine_tune_ds)))
print("Augmented dataset size:", len(list(augmented_datasets)))
print("Final dataset size:", len(list(expanded_training_ds)))

print('\n')
print("Original dataset size:", len(list(dataset)))
print("Augmented dataset size:", len(list(augmented_datasets1)))
print("Final dataset size:", len(list(expanded_training_ds1)))


Original dataset size: 10
Augmented dataset size: 30
Final dataset size: 40


Original dataset size: 10
Augmented dataset size: 30
Final dataset size: 40


In [27]:
expanded_training_ds=expanded_training_ds.unbatch()
exft_training_ds, exft_validation_ds = tf.keras.utils.split_dataset(
    expanded_training_ds, left_size=0.8, shuffle=False, seed=None
)
print(f"length training data set: {len(exft_training_ds)}")
print(f"length validation data set: {len(exft_validation_ds)}")
exft_training_ds = exft_training_ds.batch(16)
exft_validation_ds = exft_validation_ds.batch(16)


length training data set: 512
length validation data set: 128


In [28]:
expanded_training_ds1=expanded_training_ds1.unbatch()
training_ds1, validation_ds1 = tf.keras.utils.split_dataset(
    expanded_training_ds1, left_size=0.8, shuffle=False, seed=None
)
print(f"length training data set: {len(training_ds1)}")
print(f"length validation data set: {len(validation_ds1)}")
training_ds1 = training_ds1.batch(16)
validation_ds1 = validation_ds1.batch(16)


length training data set: 512
length validation data set: 128


In [29]:
def format_two_stream_data(dataset1, dataset2):
    zipped_ds = tf.data.Dataset.zip((dataset1, dataset2))

    def process_data(sample1, sample2):
        img1, label1 = sample1  # Unpack image and label from dataset1
        img2, label2 = sample2  # Unpack image and label from dataset2
        
        return (img1, img2), label1  # Return ((image1, image2), label)

    return zipped_ds.map(process_data)

# Apply formatting
val_two_stream_dataset = format_two_stream_data(validation_ds1, exft_validation_ds)




In [30]:
def format_two_stream_data(dataset1, dataset2):
    zipped_ds = tf.data.Dataset.zip((dataset1, dataset2))

    def process_data(sample1, sample2):
        img1, label1 = sample1  # Unpack image and label from dataset1
        img2, label2 = sample2  # Unpack image and label from dataset2
        
        return (img1, img2), label1  # Return ((image1, image2), label)

    return zipped_ds.map(process_data)

# Apply formatting
valval_two_stream_dataset = format_two_stream_data(validation_ds1, validation_ds1)




In [31]:
def format_two_stream_data(dataset1, dataset2):
    zipped_ds = tf.data.Dataset.zip((dataset1, dataset2))

    def process_data(sample1, sample2):
        img1, label1 = sample1  # Unpack image and label from dataset1
        img2, label2 = sample2  # Unpack image and label from dataset2
        
        return (img1, img2), label1  # Return ((image1, image2), label)

    return zipped_ds.map(process_data)

# Apply formatting
two_stream_dataset = format_two_stream_data(training_ds1, exft_training_ds)




In [35]:
def format_two_stream_data(dataset1, dataset2):
    zipped_ds = tf.data.Dataset.zip((dataset1, dataset2))

    def process_data(sample1, sample2):
        img1, label1 = sample1  # Unpack image and label from dataset1
        img2, label2 = sample2  # Unpack image and label from dataset2
        
        return (img1, img2), label1  # Return ((image1, image2), label)

    return zipped_ds.map(process_data)

# Apply formatting
two_stream_dataset_val1 = format_two_stream_data(validation_ds1, exft_validation_ds)




In [33]:
for batch in two_stream_dataset.take(1):
    (img1, img2), labels = batch
    print(f"Image 1 batch shape: {img1.shape}")
    print(f"Image 2 batch shape: {img2.shape}")
    print(f"Labels batch shape: {labels.shape}")


Image 1 batch shape: (16, 150, 150, 3)
Image 2 batch shape: (16, 150, 150, 3)
Labels batch shape: (16,)


In [34]:
fusion_model.compile(
    optimizer=keras.optimizers.RMSprop(),  # Optimizer
    # Loss function to minimize
    loss=keras.losses.SparseCategoricalCrossentropy(),
    # List of metrics to monitor
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
print("Fit model on training data")
history = fusion_model.fit(
    two_stream_dataset,
    epochs=30,
)

Fit model on training data
Epoch 1/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 26ms/step - loss: 1.3911 - sparse_categorical_accuracy: 0.5481
Epoch 2/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.8913 - sparse_categorical_accuracy: 0.6815
Epoch 3/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.7033 - sparse_categorical_accuracy: 0.7548
Epoch 4/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.4610 - sparse_categorical_accuracy: 0.8288
Epoch 5/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.3814 - sparse_categorical_accuracy: 0.8645
Epoch 6/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.3282 - sparse_categorical_accuracy: 0.8873
Epoch 7/30
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - loss: 0.4794 - sparse_categorical_accuracy: 0.8284
Epoch 

In [36]:
loss, accuracy = fusion_model.evaluate(two_stream_dataset_val1)

print(f'Validation Accuracy: {accuracy}')
print(f'Validation Loss: {loss}')

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 28ms/step - loss: 0.1755 - sparse_categorical_accuracy: 0.9224
Validation Accuracy: 0.8984375
Validation Loss: 0.26184138655662537


In [37]:
loss, accuracy = fusion_model.evaluate(two_stream_dataset_val1)

print(f'Validation Accuracy: {accuracy}')
print(f'Validation Loss: {loss}')

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 28ms/step - loss: 0.0983 - sparse_categorical_accuracy: 0.9638
Validation Accuracy: 0.9453125
Validation Loss: 0.14443504810333252


In [37]:
fusion_model.save('fusion_model.keras')

In [20]:
y_true = []
y_pred = []

for images, labels in val_two_stream_dataset:
    preds = fusion_model.predict(images)
    y_pred.extend(np.argmax(preds, axis=1))  # Convert one-hot to class index
    y_true.extend(labels)  # Get true labels

# Create a confusion matrix
cm = confusion_matrix(y_true, y_pred)
print(cm)



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 9s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[[49  0  0  0]
 [ 0 26  1  0]
 [ 0  0 29  0]
 [ 0  0  0 23]]


In [39]:
loss, accuracy = fusion_model.evaluate(valval_two_stream_dataset)

print(f'Validation Accuracy: {accuracy}')
print(f'Validation Loss: {loss}')

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step - loss: 0.7200 - sparse_categorical_accuracy: 0.7745
Validation Accuracy: 0.78125
Validation Loss: 0.7730916738510132
