In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras as ks
import os
from skimage.filters import gaussian

import matplotlib.pyplot as plt

from tensorflow.keras import backend as K

from tensorflow.python.data.experimental import AUTOTUNE

from tripletdataset import TripletDataset


In [None]:
# def load_image(image_path):
#     img = tf.io.read_file(image_path)
#     img = tf.image.decode_png(img, channels=3)
#     img = tf.image.convert_image_dtype(img, tf.float32)
#     img = tf.image.resize(img, (385, 275))
#     return img

def preprocess(img_path):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = tf.image.resize(img, (385, 275))
    img = tf.image.random_brightness(img, max_delta=0.6)
    img = tf.image.random_contrast(img, 0.2, 3.0)
    img = tf.image.random_jpeg_quality(img, 20, 80)
    #img = gaussian(img, sigma=np.random.choice(range(1,4)))
    img = tf.clip_by_value(img, 0.0, 1.0)
    img = tf.image.random_crop(img, [350, 250, 3])
    return img

def preprocess_triplet(anchor, positive, negative):
    return (
        preprocess(anchor),
        preprocess(positive),
        preprocess(negative)
    )

def get_card_paths(root_path, set_codes):
    card_paths = []
    print(root_path, set_codes)
    for s in set_codes:
        assert os.path.exists(os.path.join(root_path, s)), \
            'Error: path {} does not exist'.format(os.path.join(root_path, s))
        set_path = os.listdir(os.path.join(root_path, s))
        for card in set_path:
            card_paths.append(os.path.join(root_path, s, card))
    return card_paths

def visualize(anchor, positive, negative):
    """Visualize a few triplets from the supplied batches."""

    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(9, 9))

    axs = fig.subplots(3, 3)
    for i in range(3):
        show(axs[i, 0], anchor[i])
        show(axs[i, 1], positive[i])
        show(axs[i, 2], negative[i])

root_path = '/mnt/d/cardimagescans'
set_codes = ['khm', 'isd', 'sta', 'ogw', 'soi', 'ori', 'znr', 'lgn', 'plc', 'inv']
#set_codes = ['khm']
img_paths = get_card_paths(root_path, set_codes)
image_counts = len(img_paths)

anchor_dataset = tf.data.Dataset.from_tensor_slices(img_paths)
positive_dataset = tf.data.Dataset.from_tensor_slices(img_paths)
np.random.shuffle(img_paths)
negative_dataset = tf.data.Dataset.from_tensor_slices(img_paths)
negative_dataset = negative_dataset.shuffle(buffer_size=1024)
dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplet, num_parallel_calls=4)

train_dataset = dataset.take(round(image_counts * 0.8))
val_dataset = dataset.skip(round(image_counts * 0.8))

train_dataset = train_dataset.batch(8, drop_remainder=False)
train_dataset = train_dataset.prefetch(4)

val_dataset = val_dataset.batch(8, drop_remainder=False)
val_dataset = val_dataset.prefetch(4)

In [None]:
visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])

In [None]:
# Implementing a triplet loss model the Keras way
input_shape=(350,250,3)

class DistanceLayer(ks.layers.Layer):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        distance_ap = tf.reduce_sum(tf.square(anchor-positive), -1)
        distance_an = tf.reduce_sum(tf.square(anchor-negative), -1)
        return (distance_ap, distance_an)

class SiameseModel(ks.Model):

    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = ks.metrics.Mean(name='loss')

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):

        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)
        
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
        
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        self.loss_tracker.update_state(loss)
        return {'loss':self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        self.loss_tracker.update_state(loss)
        return {'loss': self.loss_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_network(data)

        loss = tf.maximum(ap_distance - an_distance + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        return [self.loss_tracker]


vgg = ks.applications.VGG19(
        include_top=False,
        weights='imagenet',
        input_shape=input_shape,
        pooling='max'
    )

flatten = ks.layers.Flatten()(vgg.output)
dense1 = ks.layers.Dense(512, activation='relu')(flatten)
dense1 = ks.layers.BatchNormalization()(dense1)
dense2 = ks.layers.Dense(256, activation='relu')(dense1)
dense2 = ks.layers.BatchNormalization()(dense2)
output = ks.layers.Dense(256, activation='relu')(dense2)

encoder = ks.Model(vgg.input, output, name='Encoder')

input_a = ks.layers.Input(name='anchor_input', shape=input_shape)
#input_a = tf.expand_dims(input_a, axis=-4)
input_p = ks.layers.Input(name='positive_input', shape=input_shape)
#input_p = tf.expand_dims(input_p, axis=-4)
input_n = ks.layers.Input(name='negative_input', shape=input_shape)
#input_n = tf.expand_dims(input_n, axis=-4)

distances = DistanceLayer()(
    encoder(input_a),
    encoder(input_p),
    encoder(input_n),
)

siamese_network = ks.Model(
    inputs=[input_a, input_p, input_n],
    outputs=distances
)



In [None]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=ks.optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=20, validation_data=val_dataset)

In [None]:
ks.utils.plot_model(vgg, show_shapes=True)

In [None]:
dummy_data = dataset.get_triplet_batch()
print(np.shape(dummy_data))

In [None]:
#dummy_data = dataset.get_batch_list()
siamese_model.fit(dataset.get_triplet_batch(), epochs=20, validation_data=dataset.get_triplet_batch(train=False))

In [None]:
K.set_image_data_format('channels_last')

# triplet model from scratch
def tripletloss(model_anchor, model_positive, model_negative, margin=0.5):
    distance1 = tf.sqrt(tf.reduce_mean(tf.pow(model_anchor - model_positive, 2), 1, keepdims=True))
    distance2 = tf.sqrt(tf.reduce_mean(tf.pow(model_anchor - model_negative, 2), 1, keepdims=True))
    return tf.reduce_mean(tf.maximum(distance1 - distance2 + margin, 0))

def tripletloss_wrapper(loss_list, margin = 0.5):
    return (tripletloss(loss_list[0], loss_list[1], loss_list[2], margin))

def get_siamese_net(input_shape):
    input_a = ks.Input(shape=input_shape)
    input_p = ks.Input(shape=input_shape)
    input_n = ks.Input(shape=input_shape)

    vgg = ks.applications.VGG19(
        include_top=False,
        weights='imagenet',
        input_shape=input_shape,
        pooling='max'
    )

    flatten = ks.layers.Flatten()
    encoder = ks.layers.Dense(265, activation='relu')

    encoded_a = vgg(input_a)
    encoded_a = flatten(encoded_a)
    encoded_a = encoder(encoded_a)

    encoded_p = vgg(input_p)
    encoded_p = flatten(encoded_p)
    encoded_p = encoder(encoded_p)

    encoded_n = vgg(input_n)
    encoded_n = flatten(encoded_n)
    encoded_n = encoder(encoded_n)

    Distance_layer_ap = ks.layers.Lambda(lambda x: tf.sqrt(tf.reduce_mean(tf.pow(x[0] - x[1], 2), 1, keepdims=True)))
    Distance_layer_an = ks.layers.Lambda(lambda x: tf.sqrt(tf.reduce_mean(tf.pow(x[0] - x[1], 2), 1, keepdims=True)))
    distance_ap = Distance_layer_ap([encoded_a, encoded_p])
    distance_an = Distance_layer_an([encoded_a, encoded_n])

    siamese_net = ks.Model(inputs=(input_a, input_p, input_n), outputs=(distance_ap, distance_an))
    return siamese_net


In [None]:
model = get_siamese_net((350,250,3))

In [None]:
model.summary()

In [None]:
ks.utils.plot_model(model, to_file='model.png', show_shapes=True)

In [None]:
dummy_data = np.random.normal(size=(3, 350, 250,3))

In [None]:
model(dummy_data)

In [None]:
def get_siamese_net(input_size):
#input_size = [350, 250, 3]
    anker_input = ks.Input(input_size)
    positive_input = ks.Input(input_size)
    negative_input = ks.Input(input_size)

    vgg = ks.applications.VGG16(
        include_top=False,
        weights='imagenet',
        input_shape=input_size,
        pooling='max'
    )

    encoder_layer = ks.layers.Dense(256, activation='relu')(vgg)

    encoded_anker = model(anker_input)
    encoded_positive = model(positive_input)
    encoded_negative = model(negative_input)

    Distance_Layer = ks.layers.Lambda(lambda x: tripletloss_wrapper(x))
    triplet_loss = Distance_Layer([encoded_anker, encoded_positive, encoded_negative])

    similarity = ks.layers.Dense(1, activation='sigmoid')(triplet_loss)

    siamese_net = ks.Model(inputs=[anker_input, positive_input, negative_input], outputs=similarity)

    return siamese_net

#encoder_layer = ks.layers.Dense(265, activation='relu')


In [None]:
def tripletloss(model_anchor, model_positive, model_negative, margin=0.5):
    distance1 = tf.sqrt(tf.reduce_mean(tf.pow(model_anchor - model_positive, 2), 1, keepdims=True))
    distance2 = tf.sqrt(tf.reduce_mean(tf.pow(model_anchor - model_negative, 2), 1, keepdims=True))
    return tf.reduce_mean(tf.maximum(distance1 - distance2 + margin, 0))

def tripletloss_wrapper(loss_list, margin = 0.5):
    return (tripletloss(loss_list[0], loss_list[1], loss_list[2], margin))

def make_triplet_loss_network(input_size):
    
    input_anker = ks.Input(input_size)
    input_positive = ks.Input(input_size)
    input_negative = ks.Input(input_size)

    model = ks.Sequential()
    model.add(
        ks.applications.VGG16(
            include_top=False,
            weights='imagenet',
            input_shape=input_size,
            pooling='max'
    )
    )
    model.add(ks.layers.Flatten())
    model.add(ks.layers.Dense(256, activation='sigmoid'))

    encoded_anker = model(input_anker)
    encoded_positive = model(input_positive)
    encoded_negative = model(input_negative)

    triplet_model = ks.Model(inputs=[input_anker, input_positive, input_negative], outputs=[encoded_anker, encoded_positive, encoded_negative])

    #loss = tripletloss(encoded_anker, encoded_positive, encoded_negative)

    

    # similarity_layer = ks.layers.Lambda(lambda x: tripletloss(x[0], x[1], x[2]))
    # similarity_score = similarity_layer([encoded_anker, encoded_positive, encoded_negative])

    return triplet_model

In [None]:
triplet_model = make_triplet_loss_network([350,250,3])

triplet_model.summary()

In [None]:
triplet = dataset.get_batch_list()

len(triplet[0][0])

In [None]:
triplet_model.fit(x=dataset.get_batch_list())

In [None]:
model = make_triplet_loss_network([350,250,3])

model.summary()

In [None]:
anker, positive, negative = dataset.get_triplet()

model.evaluate()

In [None]:
# load tfrecords dataset
image_feature_description = {
    "image": tf.io.FixedLenFeature([], tf.string), 
    "class": tf.io.FixedLenFeature([], tf.int64), 
    }

def _parse_data(unparsed_example):
    return tf.io.parse_single_example(unparsed_example, image_feature_description)

def _bytestring_to_pixels(parsed_example):
    byte_string = parsed_example['image']
    image = tf.io.decode_image(byte_string)
    image = tf.reshape(image, input_shape)
    return image, parsed_example["class"]

def load_and_extract_images(filepath):
    dataset = tf.data.TFRecordDataset(filepath)
    dataset = dataset.map(_parse_data, num_parallel_calls=AUTOTUNE)
    dataset = dataset.map(_bytestring_to_pixels, num_parallel_calls=AUTOTUNE) # .cache()
    return dataset

def tripletloss(model_anchor, model_positive, model_negative, margin=2):
    distance1 = tf.sqrt(tf.reduce_mean(tf.pow(model_anchor - model_positive, 2), 1, keepdims=True))
    distance2 = tf.sqrt(tf.reduce_mean(tf.pow(model_anchor - model_negative, 2), 1, keepdims=True))
    return tf.reduce_mean(tf.maximum(distance1 - distance2 + margin, 0))

In [None]:
dataset = load_and_extract_images(data_path)

In [None]:
def triplet_input_fn(dataset):
    double_set = tf.data.Dataset(dataset.batch(2))
    return double_set

triplet_set = triplet_input_fn(dataset)

In [None]:
from matplotlib.pyplot import imshow
test_set = train_dataset.take(1)
for image, label in test_set:
    print(image.shape)
    imshow(image)

In [None]:
convnet = ks.Sequential()
convnet.add(ks.layers.Input(input_shape))
convnet.add(ks.layers.Conv2D(32, [7,7]))
convnet.add(ks.layers.MaxPool2D())
convnet.add(ks.layers.Conv2D(64, [5,5]))
convnet.add(ks.layers.MaxPool2D())
convnet.add(ks.layers.Conv2D(128, [3,3]))
convnet.add(ks.layers.MaxPool2D())
convnet.add(ks.layers.Flatten())
convnet.add(ks.layers.Dense(256, activation=None))
convnet.add(ks.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1)))

In [None]:
convnet.summary()

In [None]:
convnet.compile(optimizer='Adam', loss=tripletloss, metrics=['accuracy'])

In [None]:
for epoch in range(10):
    convnet.fit(dataset, batch_size=8)