<a href="https://colab.research.google.com/github/pthinh14/triplet-loss/blob/master/triplet_loss_keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
num_classes_per_batch = 4
num_images_per_class = 8
base_path= './drive/My Drive/IKEA/'
IMG_TYPES = ['.png', '.jpg']

In [0]:
# %pip install tensorflow==1.13.1
import tensorflow as tf
import tensorflow.keras.applications.inception_v3 as kai
import tensorflow.keras.layers as kl
import tensorflow.keras.models as km

from keras import backend as K

## required for semi-hard triplet loss:
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.framework import dtypes
import tensorflow as tf


def triplet_loss(y_true, y_pred):
    y_pred = K.l2_normalize(y_pred,axis=1)
    batch = 2
    ref1 = y_pred[0:1,:]
    pos1 = y_pred[1:num_images_per_class,:]
    neg1 = y_pred[num_images_per_class:,:]
    dis_pos = K.sum(K.square(ref1 - pos1), axis=1, keepdims=True)
    dis_neg = K.sum(K.square(ref1 - neg1), axis=1, keepdims=True)
    dis_pos = K.sqrt(dis_pos)
    dis_neg = K.sqrt(dis_neg)
    a1 = 0.6
    d1 = K.maximum(0.0,dis_pos-dis_neg+a1)
    return K.mean(d1)


def build_model(image_shape=(640, 640, 3), embedding_length=128, trainable=True):
    backbone = kai.InceptionV3(input_shape=image_shape, include_top=False)
    backbone.trainable = trainable
    x = kl.GlobalMaxPooling2D()(backbone.output)
    x = kl.Dense(embedding_length * 2)(x)
    x = kl.Dense(embedding_length)(x)
    embedding = kl.Dense(embedding_length, name='embedding')(x)
    model = km.Model(inputs=[backbone.input], outputs=[embedding])
    # opt = tf.train.AdamOptimizer(0.0001)
    model.compile(loss=triplet_loss, optimizer='adam')
    return model



In [0]:
import cv2
import itertools
import matplotlib.pyplot as plt
import numpy as np
import os
import skimage.transform
IMG_SIZE = 640
def load_image(filename):
    # print(filename)
    # # raw = cv2.imread(filename)
    # # label = os.path.dirname(filename).split("/")[-2] +"/"+ os.path.dirname(filename).split("/")[-1]
    
    # # print(label)
    # filename_queue = tf.train.string_input_producer(filenames)
    # reader = tf.WholeFileReader()
    # key, value = reader.read(filename_queue)

    # images = tf.image.decode_jpeg(value, channels=3)
    # return images
    # # if len(raw.shape) == 2:
    # #     raw = np.stack((raw,)*3, axis=-1)
    # # elif len(raw.shape) > 2 and raw.shape[2] == 4:
    # #     #convert the image from RGBA2RGB
    # #     raw = cv2.cvtColor(raw, cv2.COLOR_BGRA2BGR)
    # # curr_crop = skimage.transform.rescale(raw, size / max(len(raw), len(raw[0])),
    # #                                         mode='constant', multichannel=True)
    # # return np.pad(curr_crop, ((0, size - len(curr_crop)), (0, size - len(curr_crop[0])), (0, 0)),
    # #                 mode='constant'), label
    # # return image, label
    image_string = tf.read_file(filename)

    #Don't use tf.image.decode_image, or the output shape will be undefined
    image = tf.image.decode_jpeg(image_string, channels=3)

    #This will convert to float values in [0, 1]
    image = tf.image.convert_image_dtype(image, tf.float32)

    resized_image = tf.image.resize_images(image, [64, 64])
    return resized_image

In [0]:
# %pip install tf-nightly
import sys
import keras.utils as ku
import keras.callbacks as KC
import matplotlib.pyplot as plt
import numpy as np
import os

In [0]:
def tfdata_generator(images, labels, is_training, batch_size=128):
    '''Construct a data generator using tf.Dataset'''

    def preprocess_fn(image, label):
        '''A transformation function to preprocess raw data
        into trainable input. '''
        x = tf.reshape(tf.cast(image, tf.float32), (28, 28, 1))
        y = tf.one_hot(tf.cast(label, tf.uint8), _NUM_CLASSES)
        return x, y

    #  A Dataset of strings corresponding to file names.
    datasets = [tf.data.Dataset.list_files("{}/*.png".format(base_path + image_dir)) for image_dir in image_classes]

    def generator():
        while True:
            # Sample the labels that will compose the batch
            labels = np.random.choice(range(num_labels),
                                        num_classes_per_batch,
                                        replace=False)
            for label in labels:
                for _ in range(num_images_per_class):
                    # yield images[label][np.random.choice(range(num_images_per_class))]
                    yield label

    choice_dataset = tf.data.Dataset.from_generator(generator, tf.int64).repeat()
    dataset = tf.data.experimental.choose_from_datasets(datasets, choice_dataset)
    # def my_generator(batch_size):
    #     foo = np.zeros((batch_size,))

    #     for images, ids in data.generator(batch_size):
    #         yield images, foo
    dataset = dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(1)
    iterator = dataset.make_initializable_iterator()

    iterator_init_op = iterator.initializer
    dataset = tf.data.Dataset.from_tensor_slices((images, labels))
    if is_training:
        dataset = dataset.shuffle(1000)  # depends on sample size

    # Transform and batch data at the same time
    dataset = dataset.apply(tf.contrib.data.map_and_batch(
        preprocess_fn, batch_size,
        num_parallel_batches=4,  # cpu cores
        drop_remainder=True if is_training else False))
    dataset = dataset.repeat()
    dataset = dataset.prefetch(tf.contrib.data.AUTOTUNE)

    return dataset

In [0]:

# try:
#     device_name = os.environ["COLAB_TPU_ADDR"]
#     TPU_ADDRESS = "grpc://" + device_name
#     print("Found TPU at: {}".format(TPU_ADDRESS))
# except KeyError:
#     print("TPU not found")

print(len(sys.argv))
print("hello world")

# data = DataLoader()
model = build_model(trainable=True)


# total_num_images = data.get_total_num_images()
STEPS_PER_EPOCH = 10

batch_size = num_classes_per_batch*num_images_per_class

path = base_path
directories = os.listdir(path)
images = {}
image_classes = []
for directory in directories:
    sub_dirs = os.listdir(base_path + directory)
    for sub_dir in sub_dirs:
        cur_path = os.path.join(base_path + directory, sub_dir)
        _, _, filenames = next(os.walk(cur_path))
        for filename in filenames:
            if os.path.splitext(filename)[1] in IMG_TYPES:
                if directory +'/'+ sub_dir in images:
                    images[directory +'/'+ sub_dir].append(os.path.join(cur_path, filename))
                else:
                    images[directory +'/'+ sub_dir] = [os.path.join(cur_path, filename)]
                    image_classes.append(directory +'/'+ sub_dir)

# image_classes = (sub_sub_dirs for sub_sub_dirs in (os.listdir(base_path + sub_dir) for sub_dir in directories))
# images = {image_class: filename for filename in next(os.walk(os.path.join(base_path, image_class)) if os.path.splitext(filename)[1] in IMG_TYPES for image_class in image_classes}
datasets = [tf.data.Dataset.list_files("{}/*.png".format(base_path + image_dir)) for image_dir in image_classes]
# per_class_datasets = [tf.data.TFRecordDataset(tf.data.Dataset.list_files(d)) for d in directories]

# datasets, image_classes = load_data()
num_labels=len(image_classes)
print(num_labels)

def generator():
    while True:
        # Sample the labels that will compose the batch
        labels = np.random.choice(range(num_labels),
                                    num_classes_per_batch,
                                    replace=False)
        for label in labels:
            for _ in range(num_images_per_class):
                # yield images[label][np.random.choice(range(num_images_per_class))]
                yield label

choice_dataset = tf.data.Dataset.from_generator(generator, tf.int64).repeat()
dataset = tf.data.experimental.choose_from_datasets(datasets, choice_dataset)
# def my_generator(batch_size):
#     foo = np.zeros((batch_size,))

#     for images, ids in data.generator(batch_size):
#         yield images, foo
dataset = dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(1)
iterator = dataset.make_initializable_iterator()

iterator_init_op = iterator.initializer


checkpt = KC.ModelCheckpoint('./weights.{epoch:02d}-{loss:.2f}.hdf5',
                             monitor='loss',
                             verbose=0,
                             save_best_only=False, 
                             mode='auto',
                             period=1)
history = model.fit_generator(iterator.get_next(),
                    steps_per_epoch=STEPS_PER_EPOCH,
                    epochs=20,
                    validation_data=None,
                    validation_steps=None,
                    callbacks=[checkpt],
                    class_weight=None,
                    max_queue_size=10,
                    workers=1,
                    use_multiprocessing=False,
                    shuffle=False)

# Plot training & validation loss values
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()


3
hello world
219


TypeError: ignored

In [0]:
# iterator_helper = datasets[0].make_one_shot_iterator()
with tf.Session() as sess:
    filename_temp = iterator_helper.get_next()
    print(sess.run(filename_temp))