In [None]:
import os
import random
import shutil
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import cosine_similarity
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from keras.callbacks import ReduceLROnPlateau, EarlyStopping

import typing

In [None]:
def VGGNet(
    name: str,
    architecture: typing.List[ typing.Union[int, str] ],
    input_shape: typing.Tuple[int],
    classes: int = 1000
) -> tf.keras.Model:
    X_input = tf.keras.layers.Input(input_shape)

    X = make_conv_layer(X_input, architecture)

    X = tf.keras.layers.Flatten()(X)
    X = make_dense_layer(X, 4096)
    X = make_dense_layer(X, 4096)

    # classification layer
    X = tf.keras.layers.Dense(units = classes, activation = "softmax")(X)

    model = tf.keras.Model(inputs = X_input, outputs = X, name = name)
    return model

def make_conv_layer(
    X: tf.Tensor,
    architecture: typing.List[ typing.Union[int, str] ],
    activation: str = 'relu'
) -> tf.Tensor:

    for output in architecture:

        if type(output) == int:
            out_channels = output

            X = tf.keras.layers.Conv2D(
                filters = out_channels,
                kernel_size = (3, 3),
                strides = (1, 1),
                padding = "same"
            )(X)
            X = tf.keras.layers.BatchNormalization()(X)
            X = tf.keras.layers.Activation(activation)(X)
        else:
            X = tf.keras.layers.MaxPooling2D(
                pool_size = (2, 2),
                strides = (2, 2)
            )(X)

    return X

def make_dense_layer(X: tf.Tensor, output_units: int, dropout = 0.5, activation = 'relu') -> tf.Tensor:
    X = tf.keras.layers.Dense(units = output_units)(X)
    X = tf.keras.layers.BatchNormalization()(X)
    X = tf.keras.layers.Activation(activation)(X)
    X = tf.keras.layers.Dropout(dropout)(X)

    return X

In [None]:
VGG_types = {
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
}


vgg_model = VGGNet(name = "VGGNet16", architecture = VGG_types["VGG16"], input_shape=(224, 224, 3), classes = 8)

In [None]:
# Compile the model
vgg_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

vgg_model.summary()

In [None]:
data_folder = 'data/Flowers'
train_data_folder = 'data/train'
validation_data_folder = 'data/validation'
flower_types = ["Babi", "Calimerio", "Chrysanthemum", "Hydrangeas", "Lisianthus", "Pingpong", "Rosy", "Tana"]

# Create empty lists to store image paths and corresponding labels
train_image_paths = []
train_labels = []
val_image_paths = []
val_labels = []

# Loop over the flower types and add image paths and labels to the lists
for i, flower_type in enumerate(flower_types):
    folder_path = os.path.join(data_folder, flower_type)
    files = os.listdir(folder_path)
    random.shuffle(files)
    split_index = int(0.8 * len(files))
    train_files = files[:split_index]
    val_files = files[split_index:]
    
    for file_name in train_files:
        if file_name.endswith('.jpg'):
            image_path = os.path.join(folder_path, file_name)
            train_image_paths.append(image_path)
            train_labels.append(i)
            
            os.makedirs(os.path.join(train_data_folder, flower_type), exist_ok=True)
            shutil.copy(image_path, os.path.join(train_data_folder, flower_type, file_name))
            
    for file_name in val_files:
        if file_name.endswith('.jpg'):
            image_path = os.path.join(folder_path, file_name)
            val_image_paths.append(image_path)
            val_labels.append(i)
            
            os.makedirs(os.path.join(validation_data_folder, flower_type), exist_ok=True)
            shutil.copy(image_path, os.path.join(validation_data_folder, flower_type, file_name))
            
# Define the data generators for the training and validation sets
train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1./255,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)

val_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1./255
)

train_generator = train_datagen.flow_from_directory(
    train_data_folder,
    target_size=(224, 224),
    batch_size=64,
    class_mode='categorical',
    shuffle=True,
    seed=42
)

validation_generator = val_datagen.flow_from_directory(
    validation_data_folder,
    target_size=(224, 224),
    batch_size=64,
    class_mode='categorical',
    shuffle=True,
    seed=42
)

In [None]:
batch_size=64
epochs=70

callbacks = [EarlyStopping(patience=5, restore_best_weights=True), ReduceLROnPlateau(monitor='val_accuracy', patience = 2, verbose=1,factor=0.3, min_lr=0.000001)]

vgg_history = vgg_model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // batch_size,
    epochs=epochs,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // batch_size,
    callbacks=callbacks
)

In [None]:
plt.plot(vgg_history.history['accuracy'])
plt.plot(vgg_history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

ResNet

In [None]:
def block(
    X: tf.Tensor,
    kernel_size: int,
    filters: typing.List[int],
    stage_no: int,
    block_name: str,
    is_conv_layer: bool = False,
    stride: int = 2
) -> tf.Tensor:

    # names
    conv_name_base = "res" + str(stage_no) + block_name + "_branch"
    bn_name_base = "bn" + str(stage_no) + block_name + "_branch"

    # filters
    F1, F2, F3 = filters

    # save the input value for shortcut.
    X_shortcut = X

    #  First component
    # NOTE: if conv_layer, you need to do downsampling
    X = tf.keras.layers.Conv2D(
        filters = F1,
        kernel_size = (1, 1),
        strides = (stride, stride) if is_conv_layer else (1, 1),
        padding = "valid",
        name = conv_name_base + "2a",
        kernel_initializer = "glorot_uniform",
    )(X)
    X = tf.keras.layers.BatchNormalization(axis = 3, name = bn_name_base + "2a")(X)
    X = tf.keras.layers.Activation("relu")(X)

    # Second component
    X = tf.keras.layers.Conv2D(
        filters = F2,
        kernel_size = (kernel_size, kernel_size),
        strides = (1, 1),
        padding = "same",
        name = conv_name_base + "2b",
        kernel_initializer = "glorot_uniform",
    )(X)
    X = tf.keras.layers.BatchNormalization(axis = 3, name = bn_name_base + "2b")(X)
    X = tf.keras.layers.Activation("relu")(X)

    # Third component
    X = tf.keras.layers.Conv2D(
        filters = F3,
        kernel_size = (1, 1),
        strides = (1, 1),
        padding = "valid",
        name = conv_name_base + "2c",
        kernel_initializer = "glorot_uniform",
    )(X)
    X = tf.keras.layers.BatchNormalization(axis = 3, name = bn_name_base + "2c")(X)

    if is_conv_layer:
        X_shortcut = tf.keras.layers.Conv2D(
            filters = F3,
            kernel_size = (1, 1),
            strides = (stride, stride),
            padding = "valid",
            name = conv_name_base + "1",
            kernel_initializer = "glorot_uniform",
        )(X_shortcut)
        X_shortcut = tf.keras.layers.BatchNormalization(axis = 3, name = bn_name_base + "1")(X_shortcut)

    # Shortcut value
    X = tf.keras.layers.Add()([X, X_shortcut])
    X = tf.keras.layers.Activation("relu")(X)

    return X

In [None]:
def ResNet(name: str, layers: typing.List[int], input_shape: typing.Tuple[int] = (64, 64, 3), classes: int = 6) -> tf.keras.Model:
    
    # get layers (layer1 is always the same so no need to provide)
    layer2, layer3, layer4, layer5 = layers

    # convert input shape into tensor
    X_input = tf.keras.layers.Input(input_shape)

    # zero-padding
    X = tf.keras.layers.ZeroPadding2D((3, 3))(X_input)

    # conv1
    X = tf.keras.layers.Conv2D(
        filters = 64,
        kernel_size = (7, 7),
        strides = (2, 2),
        name = "conv1",
        kernel_initializer = "glorot_uniform",
    )(X)
    X = tf.keras.layers.BatchNormalization(axis = 3, name = "bn_conv1")(X)
    X = tf.keras.layers.Activation("relu")(X)
    X = tf.keras.layers.MaxPooling2D((3, 3), strides = (2, 2))(X)

    # conv2_x
    X = make_layer(X, layers = layer2, kernel_size = 3, filters = [64, 64, 256], stride = 1, stage_no = 2)

    # conv3_x
    X = make_layer(X, layers = layer3, kernel_size = 3, filters = [128, 128, 512], stride = 2, stage_no = 3)

    # conv4_x
    X = make_layer(X, layers = layer4, kernel_size = 3, filters = [256, 256, 1024], stride = 2, stage_no = 4)

    # conv5_x
    X = make_layer(X, layers = layer5, kernel_size = 3, filters = [512, 512, 2048], stride = 1, stage_no = 5)

    # average pooling
    X = tf.keras.layers.AveragePooling2D((2, 2), name = "avg_pool")(X)

    # output layer
    X = tf.keras.layers.Flatten()(X)
    X = tf.keras.layers.Dense(
        classes,
        activation = "softmax",
        name="fc" + str(classes),
        kernel_initializer = "glorot_uniform"
    )(X)

    model = tf.keras.Model(inputs = X_input, outputs = X, name = name)
    return model

def make_layer(X: tf.Tensor, layers: int, kernel_size: int, filters: typing.List[int], stride: int, stage_no: int) -> tf.Tensor:

    # create convolution block
    X = block(
        X,
        kernel_size = kernel_size,
        filters = filters,
        stage_no = stage_no,
        block_name = "a",
        is_conv_layer = True,
        stride = stride
    )

    # create identity block
    block_name_ordinal = ord("b")
    for _ in range(layers - 1):
        X = block(
            X,
            kernel_size = kernel_size,
            filters =  filters,
            stage_no = stage_no,
            block_name = chr(block_name_ordinal)
        )
        block_name_ordinal += 1

    return X

In [None]:
from PIL import Image

def recommend_similar_images(vgg_model, image_path, data_folder, flower_types, top_n=10):
    # Load and preprocess the input image
    img = tf.keras.preprocessing.image.load_img(image_path, target_size=(224, 224))
    x = tf.keras.preprocessing.image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = tf.keras.applications.vgg16.preprocess_input(x)

    # Extract the feature vector of the input image
    features = vgg_model.predict(x)

    # Calculate cosine similarities between the input image and all the images in the dataset
    similarities = []
    for i, flower_type in enumerate(flower_types):
        folder_path = os.path.join(data_folder, flower_type)
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.jpg'):
                image_path = os.path.join(folder_path, file_name)
                img = tf.keras.preprocessing.image.load_img(image_path, target_size=(224, 224))
                x = tf.keras.preprocessing.image.img_to_array(img)
                x = np.expand_dims(x, axis=0)
                x = tf.keras.applications.vgg16.preprocess_input(x)
                features_i = vgg_model.predict(x)
                similarity = cosine_similarity(features, features_i)
                similarities.append((image_path, flower_type, similarity[0][0]))

    # Sort the similarities in descending order and select the top_n images
    similarities = sorted(similarities, key=lambda x: x[2], reverse=True)[:top_n]

    # Display the input image, filename, and flower type
    display(Image(filename=image_path))
    print("Input Image: ", os.path.basename(image_path), flower_types[train_labels[image_paths.index(image_path)]])

    # Display the top_n similar images with their filenames, flower types, and cosine similarity scores
    for image_path, flower_type, similarity in similarities:
        display(Image(filename=image_path))
        print(os.path.basename(image_path), flower_type, similarity)

In [None]:
recommend_similar_images(vgg_model, 'data/Flowers/Babi/babi_1.jpg', 'data/Flowers', flower_types)