In [103]:
# https://www.tensorflow.org/tutorials/images/classification
# https://github.com/Ekeany/Siamese-Network-with-Triplet-Loss/blob/master/MachinePart1.ipynb
# https://www.kaggle.com/ashishpatel26/triplet-loss-network-for-humpback-whale-prediction

import cv2
import os
import tensorflow as tf
from pickle import load
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from keras.models import Model, Sequential
from keras.optimizers import Adam
from keras.utils import multi_gpu_model
from keras.layers import Input, Lambda
from keras.utils import np_utils
from keras import backend as K
import numpy as np
from keras.applications.vgg16 import VGG16
from keras.callbacks import ModelCheckpoint, EarlyStopping

In [53]:
def load_images_from_folder(folder):
    images = []
    for filename in os.listdir(folder):
        img = cv2.imread(os.path.join(folder,filename))
        if img is not None:
            images.append(img)
    return images

In [54]:
folder = r'C:\Users\Shelby\Desktop\UMKC\Academics\Fall20\ComputerVision\Project\NWPU-All'
images = load_images_from_folder(folder)
labels = load(open(r'C:\Users\Shelby\Desktop\UMKC\Academics\Fall20\ComputerVision\Project\ex_labels.pkl', 'rb'))
d = {'image':images, 'label':labels}
df = pd.DataFrame(d)
df.head()

In [65]:
X_train, X_test, y_train, y_test = train_test_split(df['image'].values, df['label'].values, random_state=17)
X_train = X_train[0]
X_test = X_test[0]

In [68]:
X_train = X_train.reshape(768, 16, 16, 1)
X_test = X_test.reshape(768, 16, 16, 1)
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255

In [69]:
def get_image(label, test=False):
    """Choose an image from our training or test data with the
    given label."""
    if test:
        y = y_test; X = X_test
    else:
        y = y_train; X = X_train
    idx = np.random.randint(len(y))
    while y[idx] != label:
        # keep searching randomly!
        idx = np.random.randint(len(y))
    return X[idx]
    
def get_triplet(test=False):
    """Choose a triplet (anchor, positive, negative) of images
    such that anchor and positive have the same label and
    anchor and negative have different labels."""
    n = a = np.random.randint(10)
    while n == a:
        # keep searching randomly!
        n = np.random.randint(10)
    a, p = get_image(a, test), get_image(a, test)
    n = get_image(n, test)
    return a, p, n

def generate_triplets(test=False):
    """Generate an un-ending stream (ie a generator) of triplets for
    training or test."""
    while True:
        list_a = []
        list_p = []
        list_n = []

        for i in range(batch_size):
            a, p, n = get_triplet(test)
            list_a.append(a)
            list_p.append(p)
            list_n.append(n)
            
        A = np.array(list_a, dtype='float32')
        P = np.array(list_p, dtype='float32')
        N = np.array(list_n, dtype='float32')
        # a "dummy" label which will come in to our identity loss
        # function below as y_true. We'll ignore it.
        label = np.ones(batch_size)
        yield [A, P, N], label

In [99]:
def getModel():
    base_model = VGG16(include_top=False, pooling='max')
    for layer in base_model.layers:
        layer.trainable = False
    x = base_model.output
    x = Dropout(0.25)(x)
    x = Dense(embedding_dim)(x)
    x = Lambda(lambda  x: K.l2_normalize(x,axis=1))(x)
    
    embedding_model = Model(base_model.input, x, name="embedding")
    
    input_shape = (image_size, image_size, 3)
    anchor_input = Input(input_shape, name='anchor_input')
    positive_input = Input(input_shape, name='positive_input')
    negative_input = Input(input_shape, name='negative_input')
    anchor_embedding = embedding_model(anchor_input)
    positive_embedding = embedding_model(positive_input)
    negative_embedding = embedding_model(negative_input)

    inputs = [anchor_input, positive_input, negative_input]
    outputs = [anchor_embedding, positive_embedding, negative_embedding]
       
    triplet_model = Model(inputs, outputs)
    triplet_model.add_loss(K.mean(triplet_loss(outputs)))

    return embedding_model, triplet_model

In [100]:
image_size = 256
embedding_dim = 128
embedding_model, triplet_model = getModel()

In [105]:
checkpoint = ModelCheckpoint('saved_model.hdf5', monitor='loss', verbose=1, save_best_only=True, mode='min')
early = EarlyStopping(monitor="val_loss", mode="min", patience=2)
callbacks_list = [checkpoint, early] 

In [106]:
for i, layer in enumerate(embedding_model.layers):
    print(i, layer.name, layer.trainable)

0 input_10 False
1 block1_conv1 False
2 block1_conv2 False
3 block1_pool False
4 block2_conv1 False
5 block2_conv2 False
6 block2_pool False
7 block3_conv1 False
8 block3_conv2 False
9 block3_conv3 False
10 block3_pool False
11 block4_conv1 False
12 block4_conv2 False
13 block4_conv3 False
14 block4_pool False
15 block5_conv1 False
16 block5_conv2 False
17 block5_conv3 False
18 block5_pool False
19 global_max_pooling2d_7 False
20 dropout_7 True
21 dense_8 True
22 lambda_2 True


In [107]:
for layer in embedding_model.layers[178:]:
    layer.trainable = True
for layer in embedding_model.layers[:178]:
    layer.trainable = False

In [108]:
triplet_model.compile(loss=None, optimizer=Adam(0.01))
history = triplet_model.fit(X_train, y_train, 
                              validation_split=0.2, 
                              epochs=4, 
                              verbose=1, 
                              workers=4,
                              steps_per_epoch=200, 
                              validation_steps=20,use_multiprocessing=True)

  'be expecting any data to be passed to {0}.'.format(name))


ValueError: Error when checking model input: the list of Numpy arrays that you are passing to your model is not the size the model expected. Expected to see 3 array(s), but instead got the following list of 1 arrays: [array([[[[0.57254905],
         [0.56078434],
         [0.54509807],
         ...,
         [0.5647059 ],
         [0.5411765 ],
         [0.5019608 ]],

        [[0.5019608 ],
         [0.47843137],...

In [None]:
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Training and Validation Losses',size = 20)
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper right')
plt.show()