In [1]:
%matplotlib notebook

import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
from tensorflow import keras
from tensorflow.keras import regularizers

import tensorflow_datasets as tfds 

In [None]:
ds, ds_info = tfds.load('omniglot', split=['train','test'],with_info=True)

In [None]:
ds[0]

In [None]:
for example in ds[0]:
    print(list(example.keys()))
    alphabet = example["alphabet"]
    alphabet_char_id = example["alphabet_char_id"]
    image = example["image"]
    label = example["label"]
    print(alphabet)
    print(alphabet_char_id)
    print(image.shape)
    print(label)
    break

In [None]:
def plot_triplet(triplet):
    plt.figure(figsize=(6,2))
    for i in range(0, 3):
        plt.subplot(1,3,i+1)
        plt.imshow(triplet[i].shape)
        plt.xticks([])
        plt.yticks([])
    plt.show()

In [None]:
plot_triplet([ds[0][0]["image"], ds[0][1]["image"], ds[0][2]["image"]])

In [None]:
a = ds[0].take(3)

In [None]:
a

In [None]:
examples = []
for example in a:
    print(example["image"].shape)
    examples.append(example["image"])

In [None]:
def plot_triplet(triplet):
    plt.figure(figsize=(6,2))
    for i in range(0, 3):
        plt.subplot(1,3,i+1)
        print(triplet[i].shape[0:2])
        plt.imshow(np.reshape(triplet[i],(105,105,3)), cmap='binary')
        plt.xticks([])
        plt.yticks([])
    plt.show()

In [None]:
plot_triplet(examples)

In [None]:
b = examples[0]

In [None]:
# taken from https://keras.io/examples/vision/reptile/
def extraction(image, label):
    # This function will shrink the Omniglot images to the desired size,
    # scale pixel values and convert the RGB image to grayscale
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.rgb_to_grayscale(image)
    image = tf.image.resize(image, [28, 28])
    return image, label

In [2]:
class Dataset:
    '''
    This class will facilitate the creation of a few-shot dataset
    from the Omniglot dataset that can be sampled from quickly while also
    allowing to create new labels at the same time.
    
    Taken from https://keras.io/examples/vision/reptile/
    '''
    
    def __init__(self, training):
        # Download the tfrecord files containing the omniglot data and convert to a dataset
        split = "train" if training else "test"
        ds = tfds.load('omniglot', split=split, as_supervised=True)
        # Iterate over the dataset to get each individual image and its class,
        # and put that data into a dictionary.
        self.data = {}
        
        def extraction(image, label):
            # This function will shrink the Omniglot images to the desired size,
            # scale pixel values and convert the RGB image to grayscale
            image = tf.image.convert_image_dtype(image, tf.float32)
            image = tf.image.rgb_to_grayscale(image)
            image = tf.image.resize(image, [28, 28])
            return image, label
        
        for image, label in ds.map(extraction):
            image = image.numpy()
            label = str(label.numpy())
            if label not in self.data:
                self.data[label] = []
            self.data[label].append(image)
            self.labels = list(self.data.keys())

In [None]:
ds, ds_info = tfds.load('omniglot', split=['train','test'],with_info=True)

In [None]:
train_data, test_data = ds[0], ds[1]

In [None]:
ds_info

In [3]:
# x_train, y_train = tfds.as_numpy(tfds.load('omniglot', split='train', batch_size=-1, as_supervised=True))
# x_test, y_test = tfds.as_numpy(tfds.load('omniglot', split='test', batch_size=-1, as_supervised=True))

train = Dataset(training=True)

In [6]:
for key, value in train.data.items():
    print(key)
    break

617


In [None]:
test = Dataset(training = False)
x_test, y_test = test.data, test.labels

In [None]:
for i in x_train:
    print(i)
    break

In [None]:
print(x_train.shape, y_train.shape)

In [None]:
print(x_test.shape, y_test.shape)

In [None]:
x_train = x_train/255.

In [None]:
x_test = x_test/255.

In [None]:
def plot_triplet(triplet):
    plt.figure(figsize=(6,2))
    for i in range(0, 3):
        plt.subplot(1,3,i+1)
        plt.imshow(np.reshape(triplet[i], (105, 105, 3)), cmap='binary')
        plt.xticks([])
        plt.yticks([])
    plt.show()

In [None]:
plot_triplet([x_train[0], x_train[1], x_train[2]])

In [None]:
def create_batch(batch_size):
    anchors = np.zeros((batch_size, 105, 105, 3))
    positives = np.zeros((batch_size, 105, 105, 3))
    negatives = np.zeros((batch_size, 105, 105, 3))
    
    for i in range(0, batch_size):
        # inidex for anchor image
        index = random.randint(0, 19280-1)
        anc = x_train[index]
        y = y_train[index]
        
        # np.where -> return tuple
        # np.squeeze -> can change this tuple to ndarray
        indices_for_pos = np.squeeze(np.where(y_train == y))
        indices_for_neg = np.squeeze(np.where(y_train != y))
        
        pos = x_train[indices_for_pos[random.randint(0, len(indices_for_pos)-1)]]
        neg = x_train[indices_for_neg[random.randint(0, len(indices_for_neg)-1)]]
        
        anchors[i] = anc
        positives[i] = pos
        negatives[i] = neg
    return [anchors, positives, negatives]

In [None]:
np.where(y_train == 963)[0]

In [None]:
triplet = create_batch(10)
plot_triplet([triplet[0][0], triplet[1][0], triplet[2][0]])

In [None]:
def create_cnn_model(input_shape, num_classes):
    tf.keras.backend.clear_session()
    cnn_model = keras.models.Sequential()
    
    # add layer
    cnn_model.add(keras.layers.Conv2D(32, kernel_size=(3,3), activation="relu", input_shape=input_shape))
    cnn_model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    cnn_model.add(keras.layers.Conv2D(64, (3,3), activation="relu"))
    cnn_model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    cnn_model.add(keras.layers.Conv2D(128, (3,3), activation="relu"))
    cnn_model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    cnn_model.add(keras.layers.Flatten())
    cnn_model.add(keras.layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.01), bias_regularizer=regularizers.l1(0.01)))
    cnn_model.add(keras.layers.Dropout(0.25))
    cnn_model.add(keras.layers.Dense(num_classes, activation='softmax'))
    
    return cnn_model

In [None]:
def euclidean_distance(vects):
    '''
    Function used to calculate Euclidean distance, which is the straight-line distance between two points
    Taken from https://github.com/keras-team/keras/blob/master/examples/mnist_siamese.py
    -args:
        vects       A pair of vectors
    -returns:
        The distance of the pair
    '''


    x, y = vects

    sum_square = keras.backend.sum(keras.backend.square(x - y), axis=1, keepdims=True)

    return keras.backend.sqrt(keras.backend.maximum(sum_square, keras.backend.epsilon()))

In [None]:
def eucl_dist_output_shape(shapes):
    '''
    Function used to return the Euclidean shape
    Taken from https://github.com/keras-team/keras/blob/master/examples/mnist_siamese.py
    '''

    shape1, shape2 = shapes

    return (shape1[0], 1)

In [None]:
def triplet_loss(margin, num_classes):
    def loss(y_true, y_pred):
        anc, pos, neg = y_pred[:, :num_classes], y_pred[:, num_classes:2*num_classes], y_pred[:, 2*num_classes:]
        dp = tf.reduce_sum(tf.square(anc - pos), axis=1)
        dn = tf.reduce_sum(tf.square(anc - neg), axis=1)
        return tf.reduce_mean(tf.maximum(dp - dn + margin, 0.))
    # return function
    return loss

In [None]:
def data_generator(batch_size, num_classes):
    while True:
        x = create_batch(batch_size)
#         y = np.zeros((batch_size, 3*num_classes))
        y = np.random.randint(0,1,size=(batch_size)).astype(np.float32)
        yield x, y

In [None]:
num_classes = len(np.unique(y_train))

In [None]:
num_classes

In [None]:
input_size = x_train.shape[1:4]

In [None]:
cnn_model = create_cnn_model(input_size, num_classes)
cnn_model.summary()

In [None]:
in_anc = tf.keras.layers.Input(shape=input_size)
in_pos = tf.keras.layers.Input(shape=input_size)
in_neg = tf.keras.layers.Input(shape=input_size)

In [None]:
em_anc = cnn_model(in_anc)
em_pos = cnn_model(in_pos)
em_neg = cnn_model(in_neg)

In [None]:
# 先合併，之後在 loss function 裡才要算距離
# 前人的做法是先算完距離，在丟進去 loss function，所以 loss function 裡就沒有算距離
out = tf.keras.layers.concatenate([em_anc, em_pos, em_neg], axis=1)

In [None]:
net = tf.keras.models.Model(
    [in_anc, in_pos, in_neg], 
    out
)
net.summary()

In [None]:
net.compile(loss=triplet_loss(margin=0.2, num_classes=num_classes), optimizer='adam', metrics=['accuracy'])

In [None]:
# eopchs = 12
#debug
steps_per_epoch = int(19280/num_classes)
epochs = 3
_ = net.fit(
    data_generator(num_classes, num_classes), 
    epochs=epochs, steps_per_epoch=steps_per_epoch,
    verbose=True
)

In [None]:
a = np.zeros((1024, 64))

In [None]:
len(a)

In [None]:
len(a[0])

In [None]:
np.empty((10,10))

In [None]:
np.random.rand(10, 6)