In [None]:
import tensorflow as tf
import keras
import keras.backend as K
tf.__version__

import os, sys
import random
import numpy as np
from tqdm import tqdm
from PIL import Image
import cv2
import math
import pathlib
import skimage
import copy

import matplotlib.pyplot as plt
%matplotlib inline

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

### Hyperparameters

In [None]:
SHAPE = [476,476,3]

BATCH_SIZE = 16
EPOCHS = 100
# LEARNING_RATE=0.1
# DECAY_RATE = LEARNING_RATE/EPOCHS

RANDOM_SEED = 101
random.seed(RANDOM_SEED)
AUTOTUNE = tf.data.AUTOTUNE
num_classes = 18

clustering_folder = r"" # Folder where unsupervised clustering results are stored
weights_folder = r"" # Folder where model weights are saved
test_folder = r"" # Path to folder where test data is stored

### Data Generation

In [None]:
weight_dict = {}
total_count = len([os.path.join(clustering_folder,str(item)) for item in list(pathlib.Path(clustering_folder).glob('*/*.jpg')) if str(item).endswith('.jpg')])
for folder in os.listdir(clustering_folder):
    classf = folder.split('_')[-1]
    countf = len([file for file in os.listdir(os.path.join(clustering_folder,folder))])
    weight_dict[int(classf)] = math.log(countf)

In [None]:
def normalize_images(image,label):
    image = tf.image.per_image_standardization(image)
    image = tf.math.divide(tf.math.add(image,tf.math.abs(tf.math.reduce_min(image))),tf.math.add(tf.math.reduce_max(image),tf.math.abs(tf.math.reduce_min(image)))+0.00001)
    return image,label

train_ds_series = tf.keras.utils.image_dataset_from_directory(
    clustering_folder,
    labels="inferred",
    label_mode = 'categorical',
    class_names = [f'class_{i}' for i in range(len(os.listdir(clustering_folder)))],
    shuffle=True,
    seed=RANDOM_SEED,
    validation_split=0.05,
    subset='training',
    image_size=(SHAPE[0],SHAPE[1]),
    batch_size=None).map(normalize_images,num_parallel_calls=AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

val_ds_series = tf.keras.utils.image_dataset_from_directory(
    clustering_folder,
    labels="inferred",
    label_mode = 'categorical',
    class_names = [f'class_{i}' for i in range(len(os.listdir(clustering_folder)))],
    shuffle=True,
    seed=RANDOM_SEED,
    validation_split=0.05,
    subset='validation',
    image_size=(SHAPE[0],SHAPE[1]),
    batch_size=None).map(normalize_images,num_parallel_calls=AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

In [None]:
plt.figure(figsize=(20, 20))
for i,image in enumerate(train_ds_series.take(25)):
    plt.subplot(5, 5, i + 1)
    plt.imshow(image[0][0,:,:,:])
    plt.title(f"Class {tf.argmax(image[1][0])}")
    plt.axis("off")
plt.subplots_adjust(hspace=0.1, wspace=0.01)

### Classifier

In [None]:
def create_resnet_classifier():
    inputs = keras.Input(shape=(SHAPE[0],SHAPE[1],3))
    # x = keras.layers.RandomCrop(128,128)(inputs)
    num_filters = 32
    
    def relu_batchn(inputs: tf.Tensor) -> tf.Tensor:
        relu = keras.layers.ReLU()(inputs)
        batchn = keras.layers.BatchNormalization()(relu)
        return batchn
    
    def res_block(x: tf.Tensor, downsample: bool, filters: int, kernel_size: int = 3) -> tf.Tensor:
        y = keras.layers.Conv2D(kernel_size=kernel_size, strides=(1 if not downsample else 2), filters=filters,padding="same")(x)
        y = relu_batchn(y)
        y = keras.layers.Conv2D(kernel_size=(5 if i==0 else 3),strides=1,filters=filters,padding="same")(y)
        if downsample:
            x = keras.layers.Conv2D(kernel_size=1,strides=2,filters=filters,padding="same")(x)
        out = keras.layers.Add()([x, y])
        out = relu_batchn(out)
        return out
    
    x = keras.layers.Conv2D(num_filters, kernel_size = 5, strides = 1, padding="same")(inputs)
    x = relu_batchn(x)
    
    blocks_list = [2,3,3,3,3,2]
    for i in range(len(blocks_list)):
        num_blocks = blocks_list[i]
        for j in range(num_blocks):
            x = res_block(x, downsample=(j==0 and i!=0), filters=num_filters)
        num_filters += 32

    x = keras.layers.Conv2D(num_filters+32*2, kernel_size = 3, strides = 2, padding="same")(x)
    x = relu_batchn(x)
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dropout(0.3)(x)
    output = keras.layers.Dense(num_classes,kernel_regularizer=tf.keras.regularizers.l2(0.0001))(x)
    
    model = keras.models.Model(inputs, output, name='Classifier')
    return model

def create_cnn_classifier():
    inputs = keras.Input(shape=(SHAPE[0],SHAPE[1],3))
    x = keras.layers.Conv2D(64, 5, padding='same',strides=(1,1))(inputs)
    
    for filt in [128,128,128,128,128,128]:
        x = keras.layers.Conv2D(filt, 5, strides=2, padding='same')(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.ReLU()(x)
        x = keras.layers.Conv2D(filt, 5, padding='same')(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.ReLU()(x)
    x = keras.layers.Conv2D(256, 3,strides=2, padding='same')(x)
    x = keras.layers.BatchNormalization()(x) 
    x = keras.layers.ReLU()(x)   
    x = keras.layers.GlobalMaxPooling2D()(x)   
    output = keras.layers.Dense(num_classes,kernel_regularizer=tf.keras.regularizers.l2(0.0001))(x)
    
    model = keras.models.Model(inputs, output, name='Classifier')
    return model

In [None]:
model = create_resnet_classifier()
model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.SGD(learning_rate=0.005, momentum=0.9), 
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True, label_smoothing=0.1),
    metrics=['accuracy',tf.keras.metrics.TopKCategoricalAccuracy(k=3)])

model_early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_top_k_categorical_accuracy', patience=15, min_delta = 0.01, mode='max', restore_best_weights=True)
history = model.fit(train_ds_series, validation_data=val_ds_series, shuffle=True, epochs=EPOCHS, class_weight=weight_dict, callbacks=[model_early_stopping])

In [None]:
for x in range(30):
    hit = next(iter(train_ds_series))
    arr = model(np.expand_dims(hit[0][0],0),training=False).numpy().flatten()
    print(np.argpartition(arr,-3)[-3:],
        np.argmax(hit[1][0]))

In [None]:
weights_dir = os.path.join(weights_folder,'supervised')
if not os.path.isdir(weights_dir):
    os.makedirs(weights_dir, exist_ok=True)
model.save(os.path.join(weights_dir,"supervised_weights.h5"))

In [None]:
# model = tf.keras.models.load_model(os.path.join(weights_dir,"supervised_weights.h5")

In [None]:
model.evaluate(train_ds_series)

In [None]:
test = [os.path.join(path,name) for path, s, files in os.walk(test_folder) for name in files if name.endswith('jpg')]
unknown_path = test[0]
# img = np.asarray(Image.open(img))[:478,:478,:]
    
def normalize_image_only(image):
    image = tf.convert_to_tensor(image)
    image = tf.keras.layers.Resizing(SHAPE[0],SHAPE[1])(image)
    image = tf.image.per_image_standardization(image)
    image = tf.math.divide(tf.math.add(image,tf.math.abs(tf.math.reduce_min(image))),tf.math.add(tf.math.reduce_max(image),tf.math.abs(tf.math.reduce_min(image)))+0.00001)
    return image

img = normalize_image_only(np.asarray(Image.open(unknown_path)))

arr = model(np.expand_dims(img,0),training=False).numpy().flatten()
class_matches = np.argpartition(arr,-3)
class_matches = class_matches[::-1]
filtered_matches = {key:arr[key] for key in class_matches if arr[key]>2}
print(filtered_matches)

In [None]:
length = val_ds_series.cardinality().numpy()
top1 = length
top2 = length
top3 = length
for input_data in tqdm(val_ds_series):
    prediction = np.argpartition(model.predict(np.expand_dims(input_data[0],0),verbose=0)[0],-3)
    label = np.argmax(input_data[1])
    # print(prediction[-3:],prediction[-2:],prediction[-1:],label)
    if label not in prediction[-1:]:
        top1-=1
    if label not in prediction[-2:]:
        top2-=1
    if label not in prediction[-3:]:
        top3-=1
print(top1,top2,top3)

In [None]:
print(top1/length,top2/length,top3/length)

#### Process Test Images

In [None]:
def process_unknown(unknown_path,model_path,class_name=None,save=True):
    
    def normalize_image_only(image):
        image = tf.convert_to_tensor(image)
        image = tf.keras.layers.Resizing(SHAPE[0],SHAPE[1])(image)
        image = tf.image.per_image_standardization(image)
        image = tf.math.divide(tf.math.add(image,tf.math.abs(tf.math.reduce_min(image))),tf.math.add(tf.math.reduce_max(image),tf.math.abs(tf.math.reduce_min(image)))+0.00001)
        return image
    
    img = normalize_image_only(np.asarray(Image.open(unknown_path)))
    
    model = tf.keras.models.load_model(model_path)
    
    arr = model(np.expand_dims(img,0),training=False).numpy().flatten()
    class_matches = np.argpartition(arr,-3)[-3:]
    class_matches = class_matches[::-1]

    fig, ax = plt.subplots(4,4,figsize=(10,10))
    ax[0,0].imshow(img)
    if class_name != None:
        ax[0,0].set_title(class_name)
    else:
        ax[0,0].set_title('Unknown Image')
    ax[0,0].axis('off')
    for i in [1,2,3]:
        ax[0,i].axis('off')

    for r in [1,2,3]:
        classx = [file for file in os.listdir(os.path.join(clustering_folder,f'class_{class_matches[r-1]}'))]
        random.shuffle(classx)
        for c in range(4):
            c_idx = random.randint(0,25)
            ax[r,c].set_title(f'Class {str(class_matches[r-1])}')
            class_data = Image.open(os.path.join(clustering_folder,f'class_{class_matches[r-1]}',classx[c_idx]))
            ax[r,c].imshow(normalize_image_only(np.asarray(class_data)))
            ax[r,c].axis('off')
    plt.tight_layout()
    if save is True:
        out_split = os.path.split(unknown_path)
        out_path = os.path.join(os.path.split(out_split[0])[0],f'evaluated_{out_split[1]}')
        plt.savefig(out_path)
    return fig

In [None]:
for file in test:
    print(file)
    process_unknown(unknown_path = file,
                      model_path = os.path.join(weights_dir,"supervised_weights.h5"),
                      class_name = os.path.split(os.path.split(file)[0])[-1])