In [None]:
import pandas as pd
import numpy as np
import cv2
from sklearn.model_selection import train_test_split
import tensorflow as tf
from matplotlib import pyplot as plt

In [None]:
DATASET_PATH = '../Data/Stanford_Online_Products/'
TRAIN_FILE = 'Ebay_train.txt'
TEST_FILE = 'Ebay_test.txt'

In [None]:
df_train = pd.read_csv(f'{DATASET_PATH}{TEST_FILE}', sep=' ')
df_train.head()

In [None]:
df_train.info()

In [None]:
from random import choice

super_classes = df_train.super_class_id.unique()

fig, axes = plt.subplots(nrows=12, ncols=4, figsize=(15, 50))
for ax in range(len(super_classes)):
    super_class = super_classes[ax]
    for img in range(4):
        img_class_paths = df_train.loc[df_train.super_class_id == super_class, 'path'].values
        img_class_path = choice(img_class_paths)
        class_img = df_train.loc[df_train.path == img_class_path, 'class_id']
        img_class_full_path = f'{DATASET_PATH}{img_class_path}'
        img_class = cv2.cvtColor(cv2.imread(img_class_full_path), cv2.COLOR_BGR2RGB)
        axes[ax, img].set_title(f'Класс изображения {class_img.values[0]}')
        axes[ax, img].imshow(img_class)

Data Handler для Стэндфорского датасета


In [None]:
from random import sample
from tqdm import tqdm


class DatasetHandler:

    def __init__(self, train_path: str, test_path: str,
                 train_part: float = 1., test_part: float = 1.,
                 batch_size=64,
                 target_shape=(400, 400)):
        self.__target_shape = target_shape

        full_train_data = pd.read_csv(train_path, sep=' ')
        full_test_data = pd.read_csv(test_path, sep=' ')

        part_train_data_indexes = list(full_train_data.index)
        part_test_data_indexes = list(full_test_data.index)

        len_train_part = int(train_part * len(part_train_data_indexes))
        len_test_part = int(test_part * len(part_test_data_indexes))

        train_source = full_train_data.loc[sample(part_train_data_indexes, len_train_part)]
        test_source = full_test_data.loc[sample(part_test_data_indexes, len_test_part)]

        del full_train_data, full_test_data

        # Train/test triplets

        tqdm.write(f'Train generating')
        train_triplets = self.__generate_triplets(train_source)
        self.__train_dataset = self.__seal_dataset(train_triplets)
        self.__train_dataset = self.__train_dataset.batch(batch_size).prefetch(2)
        tqdm.write(f'Test generating')
        test_triplets = self.__generate_triplets(test_source)
        self.__test_dataset = self.__seal_dataset(test_triplets)
        self.__test_dataset = self.__test_dataset.batch(batch_size).prefetch(2)


    def __form_triplet(self, ind: int, data: pd.DataFrame):
        anchor = data.iloc[ind]
        similar_indexes = data.loc[(data.class_id == anchor.class_id) & (data.image_id != anchor.image_id)].index
        if len(similar_indexes) == 0:
            similar_indexes = data.loc[(data.super_class_id == anchor.super_class_id)].index
        positive = data.loc[choice(similar_indexes)]
        different_indexes = data.drop(index=data.loc[data.class_id == anchor.class_id].index).index
        negative = data.loc[choice(different_indexes)]

        return anchor, positive, negative


    def __generate_triplets(self, data: pd.DataFrame):
        triplets = {'anchors': [], 'positive': [], 'negative': []}
        for i in tqdm(range(data.shape[0])):
            anchor, positive, negative = self.__form_triplet(i, data)
            triplets['anchors'].append(f'{DATASET_PATH}{anchor["path"]}')
            triplets['positive'].append(f'{DATASET_PATH}{positive["path"]}')
            triplets['negative'].append(f'{DATASET_PATH}{negative["path"]}')
        return triplets


    def __seal_dataset(self, data: dict):
        anchor_dataset = tf.data.Dataset.from_tensor_slices(data['anchors'])
        positive_dataset = tf.data.Dataset.from_tensor_slices(data['positive'])
        negative_dataset = tf.data.Dataset.from_tensor_slices(data['negative'])

        triplets_path_dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
        triplets_images_dataset = triplets_path_dataset.map(self.__preprocess_triplets)

        return triplets_images_dataset


    def __preprocess_image(sekf, filename: tf.Tensor):
        """
        Load the specified file as a JPEG image, preprocess it and
        resize it to the target shape.
            """


        image_string = tf.io.read_file(filename)
        image = tf.image.decode_jpeg(image_string, channels=3)
        image = tf.image.convert_image_dtype(image, tf.float32)
        image = tf.image.resize(image, (400, 400))

        return image


    @tf.autograph.experimental.do_not_convert
    def __preprocess_triplets(self, anchor, positive, negative):
        """
        Given the filenames corresponding to the three images, load and
        preprocess them.
        """

        return (
            self.__preprocess_image(anchor),
            self.__preprocess_image(positive),
            self.__preprocess_image(negative),
        )


    def get_target_shape(self):
        return self.__target_shape

    def get_training_data(self):
        return self.__train_dataset

    def get_validation_data(self):
        return self.__test_dataset

In [None]:
data_handler = DatasetHandler(train_path=f'{DATASET_PATH}{TRAIN_FILE}', train_part=0.2,
                              test_path=f'{DATASET_PATH}{TEST_FILE}', test_part=0.05)

In [None]:
train = data_handler.get_training_data()
test = data_handler.get_validation_data()

In [None]:
from random import randint
a, p, n = next(iter(train))


fig, axes = plt.subplots(5, 3, figsize=(10, 18))
for i in range(5):
    img = randint(0, 31)
    axes[i, 0].set_title('Anchor')
    axes[i, 0].imshow(a[img].numpy())
    axes[i, 1].set_title('Positive')
    axes[i, 1].imshow(p[img].numpy())
    axes[i, 2].set_title('Negative')
    axes[i, 2].imshow(n[img].numpy())

Построим Siamese Model

In [None]:
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet

In [None]:
base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=data_handler.get_target_shape() + (3,), include_top=False
)

flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

In [None]:

class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=data_handler.get_target_shape() + (3,))
positive_input = layers.Input(name="positive", shape=data_handler.get_target_shape() + (3,))
negative_input = layers.Input(name="negative", shape=data_handler.get_target_shape() + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

In [None]:

class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]


In [None]:

siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.001))
siamese_model.fit(train, epochs=10, validation_data=test)
