!pip install numpy tqdm scikit-learn tensorflow keras

In [1]:
# Import packages needed
import os
import torch
import cv2
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from keras.layers import Lambda, Dense
from keras import Model
from keras.models import load_model, Sequential
from keras.ops import cast, maximum, square
from keras.metrics import binary_accuracy
from keras.ops import norm
from keras.optimizers import Adam

from tensorflow.keras.callbacks import EarlyStopping

from keras import Input

from tqdm import tqdm
from sklearn.model_selection import train_test_split

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

2024-12-10 10:47:43.199224: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1733827663.218615  316865 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1733827663.224547  316865 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-10 10:47:43.245387: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
def load_image(image_path):
    image = cv2.imread(image_path)
    image = cv2.resize(image, (384, 384))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = image.astype(np.float32) / 255.0

    return image

In [3]:
# Function for reading all the image from the dataset folder
def get_data():
    # read all the folders
    data_path = "../../dataset/Musinsa_dataset"
    folders = os.listdir(data_path)

    # read all the images inside the folders
    style2index = []
    images = []
    labels = []
    for i in range(len(folders)):
        folder = folders[i]
        folder_path = f"{data_path}/{folder}"

        if not os.path.isdir(folder_path):
            continue

        files = os.listdir(folder_path)
        print(folder)

        count = 0

        for file in tqdm(files):
            try:
                # read the image
                image = load_image(f"{folder_path}/{file}")
                image = image.astype(np.float16)

                images.append(image)
                labels.append(i)
                count += 1
                if count >= 100:
                    break

            except Exception as e:
                print(f"Error reading image {file}: {e}")
        style2index.append({folder: i})

    return images, labels, style2index

In [4]:
image, labels, style2index = get_data()
print(len(image), len(labels))
print(style2index)

Classic


  6%|▌         | 99/1642 [00:00<00:05, 293.55it/s]


Chic


  6%|▌         | 99/1780 [00:00<00:05, 305.63it/s]


Cityboy


  4%|▍         | 99/2605 [00:00<00:09, 258.46it/s]


Casual


  3%|▎         | 99/2870 [00:00<00:10, 253.53it/s]


Minimal


  6%|▌         | 99/1770 [00:00<00:06, 252.42it/s]


Preppy


  7%|▋         | 99/1355 [00:00<00:04, 252.92it/s]


Workwear


  4%|▍         | 99/2556 [00:00<00:09, 252.05it/s]


Retro


  4%|▎         | 99/2670 [00:00<00:10, 244.59it/s]


Street


  8%|▊         | 99/1223 [00:00<00:04, 256.05it/s]


Gorpcore


  4%|▍         | 99/2631 [00:00<00:10, 246.83it/s]


Sporty


  4%|▍         | 99/2378 [00:00<00:08, 256.02it/s]


Romantic


  6%|▌         | 99/1795 [00:00<00:06, 256.17it/s]


Girlish


  6%|▌         | 99/1783 [00:00<00:06, 255.04it/s]

1300 1300
[{'Classic': 1}, {'Chic': 2}, {'Cityboy': 3}, {'Casual': 4}, {'Minimal': 5}, {'Preppy': 6}, {'Workwear': 7}, {'Retro': 8}, {'Street': 9}, {'Gorpcore': 10}, {'Sporty': 11}, {'Romantic': 12}, {'Girlish': 13}]





In [5]:
X_train, X_test, y_train, y_test = train_test_split(image, labels, test_size=0.2, random_state=42)

print(f"train set size: {len(X_train)}, {len(y_train)}")
print(f"test set size: {len(X_test)}, {len(y_test)}")

train set size: 1040, 1040
test set size: 260, 260


In [6]:
X_train = np.array(X_train)
X_test = np.array(X_test)
Y_train = np.array(y_train)
Y_test = np.array(y_test)

print(X_train.shape, X_test.shape, Y_train.shape, Y_test.shape)

(1040, 384, 384, 3) (260, 384, 384, 3) (1040,) (260,)


In [7]:
def generate_pair(X, y):
    """
        For contrastive learning, we need the dataset in pair.
        There should exist 
        Input: X(image), y(label)
        Output: X_pairs(image pair), y_pairs(label pair)
    """
    X = np.array(X)
    y = np.array(y)
    
    X_pairs = []
    y_pairs = []

    for i in range(len(X)):
        digit = y[i]

        positive_digit_index = np.random.choice(np.where(y == digit)[0])
        X_pairs.append([X[i], X[positive_digit_index]])
        y_pairs.append([0])

        negative_digit_index = np.random.choice(np.where(y!=digit)[0])
        X_pairs.append([X[i], X[negative_digit_index]])
        y_pairs.append([1])

    indices = np.arange(len(X_pairs))
    np.random.shuffle(indices)

    return np.array(X_pairs)[indices], np.array(y_pairs)[indices]

In [8]:
X_train_pairs, Y_train_pairs = generate_pair(X_train, y_train)
X_test_pairs, Y_test_pairs = generate_pair(X_test, y_test)

print("X_train_pairs shape: ", X_train_pairs.shape)
print("X_test_pairs shape: ", X_test_pairs.shape)

X_train_pairs shape:  (2080, 2, 384, 384, 3)
X_test_pairs shape:  (520, 2, 384, 384, 3)


# Model

### Check GPU

### ResNet based model

Using feature extraction part of pretrained ResNet model, one fc layer will be added on it.

In [9]:
# Use backbone of pretrained model
from keras.applications import EfficientNetV2S

input1 = Input(shape=(384,384,3,))
input2 = Input(shape=(384,384,3,))

base_model = EfficientNetV2S(weights="imagenet", include_top=True)

network = Sequential(
    [
        Input(shape=(384, 384, 3)),
        base_model,
        Dense(256, activation=None)
    ]
)


I0000 00:00:1733827672.850424  316865 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 22456 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3090, pci bus id: 0000:61:00.0, compute capability: 8.6


In [10]:
twin1 = network(input1)
twin2 = network(input2)

In [11]:
def cosine_distance(twins):
    twin1_output, twin2_output = twins
    twin1_norm = tf.linalg.l2_normalize(twin1_output, axis=1)
    twin2_norm = tf.linalg.l2_normalize(twin2_output, axis=1)

    cosine_similarity = twin1_norm * twin2_norm  # Element-wise multiplication
    cosine_similarity = tf.reduce_sum(cosine_similarity, axis=1, keepdims=True)

    return (1 - cosine_similarity)



In [12]:
from keras.ops import norm
def euclidean_distance(twins):
    """Compute the euclidean distance (norm) of the output of
    the twin networks.
    """
    twin1_output, twin2_output = twins
    return norm(twin1_output - twin2_output, axis=1, keepdims=True)


# distance = Lambda(euclidean_distance)([twin1, twin2])


In [13]:
distance = Lambda(cosine_distance)([twin1, twin2])
# distance = Lambda(euclidean_distance)([twin1, twin2])
model = Model(inputs=[input1, input2], outputs=distance)

In [14]:
def contrastive_loss(y, d):
    """
    Compute the contrastive loss introduced by Yann LeCun et al. in the paper
    "Dimensionality Reduction by Learning an Invariant Mapping."
    """
    margin = 1
    y = cast(y, d.dtype)

    loss = (1 - y) / 2 * square(d) + y / 2 * square(maximum(0.0, margin - d) + 1e-6)
    return loss


In [15]:
optimizer = Adam(0.005)
model.compile(loss=contrastive_loss, optimizer=optimizer, metrics=[binary_accuracy])


In [None]:
early_stopping = EarlyStopping(
    monitor='val_loss',  # Metric to monitor
    patience=100,         # Number of epochs with no improvement to stop training
    restore_best_weights=True  # Restore weights from the best epoch
)

with tf.device('/GPU:0'):
    history = model.fit(
        x=[X_train_pairs[:, 0], X_train_pairs[:, 1]],
        y=Y_train_pairs[:],
        validation_data=([X_test_pairs[:, 0], X_test_pairs[:, 1]], Y_test_pairs[:]),
        batch_size=8,
        epochs=500,
        callbacks=[early_stopping]
    )


Epoch 1/500


I0000 00:00:1733827749.778618  317376 service.cc:148] XLA service 0x7fd768003610 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1733827749.778753  317376 service.cc:156]   StreamExecutor device (0): NVIDIA GeForce RTX 3090, Compute Capability 8.6
2024-12-10 10:49:14.151146: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
I0000 00:00:1733827762.892507  317376 cuda_dnn.cc:529] Loaded cuDNN version 90600
I0000 00:00:1733827848.962485  317376 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m260/260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m215s[0m 207ms/step - binary_accuracy: 0.5021 - loss: 0.1847 - val_binary_accuracy: 0.5000 - val_loss: 0.2500
Epoch 2/500
[1m260/260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 145ms/step - binary_accuracy: 0.4835 - loss: 0.1813 - val_binary_accuracy: 0.4923 - val_loss: 0.2363
Epoch 3/500
[1m260/260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 145ms/step - binary_accuracy: 0.4869 - loss: 0.1906 - val_binary_accuracy: 0.5000 - val_loss: 0.2500
Epoch 4/500
[1m260/260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 139ms/step - binary_accuracy: 0.5061 - loss: 0.1978 - val_binary_accuracy: 0.5000 - val_loss: 0.1820
Epoch 5/500
[1m260/260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 141ms/step - binary_accuracy: 0.5086 - loss: 0.1794 - val_binary_accuracy: 0.5077 - val_loss: 0.2011
Epoch 6/500
[1m260/260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 140ms/step - binary_accuracy: 0.

In [None]:
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.title("Training and Validation Loss")
plt.ylabel("loss")
plt.xlabel("epoch")
plt.legend(["train", "val"], loc="upper right")
plt.show()

In [None]:
predictions = model.predict([X_test_pairs[:, 0], X_test_pairs[:, 1]]) >= 0.5

In [None]:
print(model.layers)
print(model.layers[2].input)

In [None]:
embedding_model = model.layers[2]
print(embedding_model)

image_path = "../../dataset/Musinsa_dataset/Cityboy/snap_card_1277506810595237743.jpg"
image = load_image(image_path)
embedding = embedding_model.predict(image.reshape(1, 224, 224, 3))

print(embedding.shape)

In [None]:
# Save the model
embedding_model = Model(inputs=input1, outputs=twin1)


embedding_model.save("embedding_model.h5")

In [None]:
# Load the model
loaded_model = load_model("embedding_model.h5")

image_path = "../../dataset/Musinsa_dataset/Cityboy/snap_card_1277506810595237743.jpg"
image = load_image(image_path)

embedding = loaded_model.predict(image.reshape(1, 224, 224, 3))

print("Generated embedding: ", embedding)