Побудувати CNN на основі ResNet-50 для класифікації зображень на основі
датасету fashion-mnist.
Зробити налаштування моделі для досягнення необхідної точності. На базі
Siamese networks побудувати систему для пошуку подібних зображень в
датасеті fashion-mnist. Візуалізувати отримані результати t-SNE.

In [41]:
import numpy as np
import os
import pandas as pd
import cv2
import torch
import torch.nn as nn
from keras.preprocessing.image import ImageDataGenerator
from keras.utils.np_utils import to_categorical
from IPython.display import display
from PIL import Image
import tensorflow as tf
from tensorflow.keras.datasets import fashion_mnist
#from keras.models import Model
#from keras.layers import Input, Conv2D, BatchNormalization, MaxPool2D, Activation, Flatten, Dense, Dropout
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, ZeroPadding1D, Flatten, BatchNormalization, AveragePooling1D, Dense, Activation, Add 
from tensorflow.keras.models import Model
from tensorflow.keras import activations
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.regularizers import l2

In [42]:
(X_train, Y_train), (X_test, Y_test) = tf.keras.datasets.fashion_mnist.load_data()

In [43]:
tf.config.run_functions_eagerly(True)

In [44]:
img_shape =  np.concatenate((X_test.shape[1:], [1]))
CLASSES = len(np.unique(Y_test))
EPOCH=17
BS = 128

train_images = X_train / 255.0
test_images = X_test / 255.0

In [46]:
import random

def create_pairs(x, digit_indices):
    '''
    Positive and negative pair creation.
    Alternates between positive and negative pairs.
    '''
    pairs = []
    labels = []
    n = min([len(digit_indices[d]) for d in range(CLASSES)]) - 1
    for d in range(CLASSES):
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
            pairs += [[x[z1], x[z2]]]
            inc = random.randrange(1, CLASSES)
            dn = (d + inc) % CLASSES
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            pairs += [[x[z1], x[z2]]]
            labels += [1, 0]
    return np.array(pairs), np.array(labels)

def make_pair_dataset(images, labels):
    digit_indices = [np.where(labels == i)[0] for i in range(CLASSES)]
    pairs, y = create_pairs(images, digit_indices)
    return pairs, y

tr_pairs, tr_y = make_pair_dataset(train_images, Y_train)
te_pairs, te_y = make_pair_dataset(test_images, Y_test)

In [1]:
def show_image(image, size):
    scale_percent = size
    width = int(image.shape[1] * scale_percent / 100)
    height = int(image.shape[0] * scale_percent / 100)
    dim = (width, height)
    resized = cv2.resize(image, dim, interpolation = cv2.INTER_AREA)
    display(Image.fromarray(resized))

#show_image(X_train[30], 700)

In [2]:
tr_y = tr_y.astype(np.float32)
te_y = te_y.astype(np.float32)

x_tra, x_val, y_tra, y_val = train_test_split(tr_pairs, tr_y, test_size=0.2, stratify=tr_y, random_state=21)

NameError: name 'tr_y' is not defined

In [27]:
### One hot encoding for labels 

train_lab_categorical = tf.keras.utils.to_categorical(
    Y_train, num_classes=10, dtype='uint8')

test_lab_categorical = tf.keras.utils.to_categorical(
    Y_test, num_classes=10, dtype='uint8')

In [28]:
### Train -test split 

from sklearn.model_selection import train_test_split 
X_train, X_test, Y_train, Y_test = train_test_split(X_train, train_lab_categorical, test_size=0.20, 
                                                            stratify=train_lab_categorical, 
                                                            random_state=40, shuffle = True)

print ("train data shape after the split: ", X_train.shape)
print ('new validation data shape: ', X_test.shape)
print ("validation labels shape: ", Y_test.shape)

train data shape after the split:  (48000, 28, 28)
new validation data shape:  (12000, 28, 28)
validation labels shape:  (12000, 10)


In [29]:
class_types = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

In [30]:
def res_identity(x, filters): 
    #renet block where dimension doesnot change.
    #The skip connection is just simple identity conncection
    #we will have 3 blocks and then input will be added

    x_skip = x # this will be used for addition with the residual block 
    f1, f2 = filters

    #first block 
    x = Conv1D(f1, kernel_size=1, strides=1, padding='valid', kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    x = Activation(activations.relu)(x)

    #second block # bottleneck (but size kept same with padding)
    x = Conv1D(f1, kernel_size=3, strides=1, padding='same', kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    x = Activation(activations.relu)(x)

    # third block activation used after adding the input
    x = Conv1D(f2, kernel_size=1, strides=1, padding='valid', kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    # x = Activation(activations.relu)(x)

    # add the input 
    x = Add()([x, x_skip])
    x = Activation(activations.relu)(x)

    return x

In [31]:
def res_conv(x, s, filters):
    '''
    here the input size changes''' 
    x_skip = x
    f1, f2 = filters

    # first block
    x = Conv1D(f1, kernel_size=1, strides=s, padding='valid', kernel_regularizer=l2(0.001))(x)
    # when s = 2 then it is like downsizing the feature map
    x = BatchNormalization()(x)
    x = Activation(activations.relu)(x)

    # second block
    x = Conv1D(f1, kernel_size=3, strides=1, padding='same', kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)
    x = Activation(activations.relu)(x)

    #third block
    x = Conv1D(f2, kernel_size=1, strides=1, padding='valid', kernel_regularizer=l2(0.001))(x)
    x = BatchNormalization()(x)

    # shortcut 
    x_skip = Conv1D(f2, kernel_size=1, strides=s, padding='valid', kernel_regularizer=l2(0.001))(x_skip)
    x_skip = BatchNormalization()(x_skip)

    # add 
    x = Add()([x, x_skip])
    x = Activation(activations.relu)(x)

    return x

In [32]:
def ResNet50():

    input_im = Input(shape=(X_train.shape[1], X_train.shape[2]))
    x = ZeroPadding1D(padding=2)(input_im)

    # 1st stage
    # here we perform maxpooling, see the figure above

    x = Conv1D(64, kernel_size=7, strides=2)(x)
    x = BatchNormalization()(x)
    x = Activation(activations.relu)(x)
    x = MaxPooling1D(3, strides=2)(x)

    #2nd stage 
    # frm here on only conv block and identity block, no pooling

    x = res_conv(x, s=1, filters=(64, 256))
    x = res_identity(x, filters=(64, 256))
    x = res_identity(x, filters=(64, 256))

    # 3rd stage

    x = res_conv(x, s=2, filters=(128, 512))
    x = res_identity(x, filters=(128, 512))
    x = res_identity(x, filters=(128, 512))
    x = res_identity(x, filters=(128, 512))

    # 4th stage

    x = res_conv(x, s=2, filters=(256, 1024))
    x = res_identity(x, filters=(256, 1024))
    x = res_identity(x, filters=(256, 1024))
    x = res_identity(x, filters=(256, 1024))
    x = res_identity(x, filters=(256, 1024))
    x = res_identity(x, filters=(256, 1024))

    # 5th stage

    x = res_conv(x, s=2, filters=(512, 2048))
    x = res_identity(x, filters=(512, 2048))
    x = res_identity(x, filters=(512, 2048))

    # ends with average pooling and dense connection

    x = AveragePooling1D(2, padding='same')(x)

    x = Flatten()(x)
    x = Dense(len(class_types), activation='softmax', kernel_initializer='he_normal')(x)

    # define the model 

    model = Model(inputs=input_im, outputs=x, name='Resnet50')

    return model

In [33]:
### Define some Callbacks
def lrdecay(epoch):
    lr = 1e-3
    if epoch > 180:
        lr *= 0.5e-3
    elif epoch > 160:
        lr *= 1e-3
    elif epoch > 120:
        lr *= 1e-2
    elif epoch > 80:
        lr *= 1e-1
    #print('Learning rate: ', lr)
    return lr
  # if epoch < 40:
  #   return 0.01
  # else:
  #   return 0.01 * np.math.exp(0.03 * (40 - epoch))
lrdecay = tf.keras.callbacks.LearningRateScheduler(lrdecay) # learning rate decay  


def earlystop(mode):
    if mode=='acc':
        estop = tf.keras.callbacks.EarlyStopping(monitor='val_acc', patience=15, mode='max')
    elif mode=='loss':
        estop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=15, mode='min')
    return estop    

In [34]:
feature_model = ResNet50()

In [35]:
#resnet50_model.summary()

In [36]:
from keras.layers import concatenate
img_a_in = Input(shape = X_train.shape[1:], name = 'ImageA_Input')
img_b_in = Input(shape = X_train.shape[1:], name = 'ImageB_Input')
img_a_feat = feature_model(img_a_in)
img_b_feat = feature_model(img_b_in)
combined_features = concatenate([img_a_feat, img_b_feat], name =
'merge_features')
combined_features = Dense(16, activation = 'linear')(combined_features)
combined_features = BatchNormalization()(combined_features)
combined_features = Activation('relu')(combined_features)
combined_features = Dense(4, activation = 'linear')(combined_features)
combined_features = BatchNormalization()(combined_features)
combined_features = Activation('relu')(combined_features)
combined_features = Dense(1, activation = 'sigmoid')(combined_features)
similarity_model = Model(inputs = [img_a_in, img_b_in], outputs =
[combined_features], name = 'Similarity_Model')
similarity_model.summary()

Model: "Similarity_Model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 ImageA_Input (InputLayer)      [(None, 28, 28)]     0           []                               
                                                                                                  
 ImageB_Input (InputLayer)      [(None, 28, 28)]     0           []                               
                                                                                                  
 Resnet50 (Functional)          (None, 10)           16066506    ['ImageA_Input[0][0]',           
                                                                  'ImageB_Input[0][0]']           
                                                                                                  
 merge_features (Concatenate)   (None, 20)           0           ['Resnet50[0][0]',

In [37]:
similarity_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
              loss='binary_crossentropy',
              metrics=['mae'])

In [38]:
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=3,
    verbose=0,
    mode='auto',
    baseline=None,
    restore_best_weights=True
)

save_model = tf.keras.callbacks.ModelCheckpoint(
    "/content/drive/MyDrive/Classroom/similarity_model",
    monitor='val_loss',
    verbose=0,
    save_best_only=True,
    save_weights_only=False,
)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                              patience=1, min_lr=0.0001)

In [40]:
EPOCH = 17
BS = 128
history = similarity_model.fit([X_train[:,0], X_train[:, 1]], Y_train, epochs=EPOCH, batch_size=BS, 
                    validation_data=([X_test[:, 0], X_test[:, 1]], Y_test), callbacks=[early_stopping, save_model, reduce_lr])

Epoch 1/17


ValueError: in user code:

    File "/home/sveta/anaconda3/lib/python3.9/site-packages/keras/engine/training.py", line 1021, in train_function  *
        return step_function(self, iterator)
    File "/home/sveta/anaconda3/lib/python3.9/site-packages/keras/engine/training.py", line 1010, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/home/sveta/anaconda3/lib/python3.9/site-packages/keras/engine/training.py", line 1000, in run_step  **
        outputs = model.train_step(data)
    File "/home/sveta/anaconda3/lib/python3.9/site-packages/keras/engine/training.py", line 859, in train_step
        y_pred = self(x, training=True)
    File "/home/sveta/anaconda3/lib/python3.9/site-packages/keras/utils/traceback_utils.py", line 67, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/home/sveta/anaconda3/lib/python3.9/site-packages/keras/engine/input_spec.py", line 214, in assert_input_compatibility
        raise ValueError(f'Input {input_index} of layer "{layer_name}" '

    ValueError: Exception encountered when calling layer "Resnet50" (type Functional).
    
    Input 0 of layer "zero_padding1d_2" is incompatible with the layer: expected ndim=3, found ndim=2. Full shape received: (128, 28)
    
    Call arguments received:
      • inputs=tf.Tensor(shape=(128, 28), dtype=float32)
      • training=True
      • mask=None
