In [25]:
import tensorflow as tf
from tensorflow.keras import layers, Model, Input

# Hyperparameters
embedding_dimension = 16
latent_dimension = 128
num_colors = 756
num_binary_attributes = 463
input_shape = (256, 256, 3)

# Cloth color embedding layer
color_embedding = layers.Embedding(num_colors, embedding_dimension, name="color_embedding")

# Encoder
def create_encoder(input_shape):
    inputs = Input(shape=input_shape, name="encoder_input")
    
    x = layers.Conv2D(32, kernel_size=3, strides=2, padding="same", activation="relu")(inputs)
    x = layers.Conv2D(64, kernel_size=3, strides=2, padding="same", activation="relu")(x)
    
    x = layers.Flatten()(x)
    x = layers.Dense(latent_dimension, activation="relu")(x)
    
    z_mean = layers.Dense(latent_dimension, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dimension, name="z_log_var")(x)
    
    return Model(inputs, [z_mean, z_log_var], name="encoder")

# Decoder
def create_decoder(latent_dimension, num_attributes):
    inputs = Input(shape=(latent_dimension + num_attributes,), name="decoder_input")
    
    x = layers.Dense(64 * 64 * 64, activation="relu")(inputs)
    x = layers.Reshape((64, 64, 64))(x)
    
    x = layers.Conv2DTranspose(64, kernel_size=3, strides=2, padding="same", activation="relu")(x)
    x = layers.Conv2DTranspose(32, kernel_size=3, strides=2, padding="same", activation="relu")(x)
    
    outputs = layers.Conv2DTranspose(3, kernel_size=3, padding="same", activation="sigmoid", name="decoder_output")(x)
    
    return Model(inputs, outputs, name="decoder")

# VAE Model
class VAE(Model):
    def __init__(self, encoder, decoder, color_embedding, num_binary_attributes, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.color_embedding = color_embedding
        self.num_binary_attributes = num_binary_attributes
    
    def call(self, inputs, training=False):
        images, color_ids, binary_attributes = inputs
        
        # Get color embeddings
        color_embeds = self.color_embedding(color_ids)
        
        # Concatenate color embeddings with binary attributes
        attribute_vector = layers.Concatenate(axis=-1)([color_embeds, binary_attributes])
        
        # Encode input images
        z_mean, z_log_var = self.encoder(images)
        
        # Sample from the latent space
        batch_size = tf.shape(images)[0]
        epsilon = tf.random.normal(shape=(batch_size, latent_dimension))
        z = z_mean + tf.exp(0.5 * z_log_var) * epsilon
        
        # Concatenate latent vector with the attribute vector
        z_cond = layers.Concatenate(axis=-1)([z, attribute_vector])
        
        # Decode the conditioned latent vector
        reconstructed_images = self.decoder(z_cond)
        
        return reconstructed_images, z_mean, z_log_var

# Instantiate the encoder, decoder, and VAE model
encoder = create_encoder(input_shape)
decoder = create_decoder(latent_dimension, embedding_dimension + num_binary_attributes)
vae = VAE(encoder, decoder, color_embedding, num_binary_attributes)

# Loss function and optimizer
reconstruction_loss = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

# Custom training step
@tf.function
def train_step(images, color_ids, binary_attributes):
    with tf.GradientTape() as tape:
        reconstructed_images, z_mean, z_log_var = vae([images, color_ids, binary_attributes], training=True)
        
        # Calculate reconstruction loss
        rec_loss = reconstruction_loss(images, reconstructed_images)
        
        # Calculate KL divergence loss
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)
        kl_loss = tf.reduce_mean(kl_loss)
        
        # Combine losses
        total_loss = rec_loss + kl_loss
    
    # Calculate gradients and update model weights
    gradients = tape.gradient(total_loss, vae.trainable_variables)
    optimizer.apply_gradients(zip(gradients, vae.trainable_variables))
    
    return rec_loss, kl_loss

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    
    for batch_images, batch_color_ids, batch_binary_attributes in train_dataset:
        rec_loss, kl_loss = train_step(batch_images, batch_color_ids, batch_binary_attributes)
        print(f"Reconstruction loss: {rec_loss:.4f}, KL loss: {kl_loss:.4f}")


Epoch 1/100
Reconstruction loss: 0.1559, KL loss: 2.3307
Reconstruction loss: 0.1601, KL loss: 81.9637
Reconstruction loss: 0.1583, KL loss: 7.4746
Reconstruction loss: 0.1586, KL loss: 1.0536
Reconstruction loss: 0.1554, KL loss: 0.0004
Reconstruction loss: 0.1552, KL loss: 0.0000
Reconstruction loss: 0.1589, KL loss: 0.0000
Reconstruction loss: 0.1578, KL loss: 0.0000
Reconstruction loss: 0.1589, KL loss: 0.0000
Reconstruction loss: 0.1571, KL loss: 0.0000
Reconstruction loss: 0.1571, KL loss: 0.0000
Reconstruction loss: 0.1554, KL loss: 0.0000
Reconstruction loss: 0.1561, KL loss: 0.0000
Reconstruction loss: 0.1530, KL loss: 0.0000
Reconstruction loss: 0.1597, KL loss: 0.0000
Reconstruction loss: 0.1622, KL loss: 0.0000
Reconstruction loss: 0.1562, KL loss: 0.0000
Reconstruction loss: 0.1551, KL loss: 0.0000
Reconstruction loss: 0.1529, KL loss: 0.0000


KeyboardInterrupt: 

In [18]:
####################read dataset#########################
import os
import random
import pandas as pd
import tensorflow as tf
import numpy as np

image_folder = "data/img"
image_files = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith(".jpg")]
#image_files

######################split dataset#######################
random.seed(0)
random.shuffle(image_files)
total_images = len(image_files)
train_ratio, val_ratio = 0.6, 0.2

train_files = image_files[:int(total_images * train_ratio)]
val_files = image_files[int(total_images * train_ratio):int(total_images * (train_ratio + val_ratio))]
test_files = image_files[int(total_images * (train_ratio + val_ratio)):]
#print(train_files)
#print(val_files)
#print(test_files)

In [19]:
#Create a vocabulary dictionary that maps all the unique color tokens from your train and test data as keys to a unique integer value.
item_attr_data = pd.read_csv("data/item_attr_data_cleaned.csv")
color_unique = item_attr_data['clothes_color'].unique()
color_vocab = sorted(set(color_unique))
color_vocabulary = {w:i for i, w in enumerate(color_vocab)}
#print(color_vocabulary)
for i,j in enumerate(item_attr_data['clothes_color']):
    color_value = color_vocabulary[j]
    item_attr_data.loc[i, "clothes_color"] = color_value

In [20]:
###################train dataset label##########################
train_ids=[]
for train_index in train_files:
    parts = train_index.split('/')
    train_ids.append(parts[2][:-4])
    
train_labels = pd.DataFrame(columns=item_attr_data.columns)
train_labels = pd.concat([item_attr_data[item_attr_data['img_idx']==i] for i in train_ids], axis=0, ignore_index=True)
train_labels = train_labels.drop(columns=['index','img_idx'])
train_color = train_labels.pop('clothes_color')

train_color_np = train_color.to_numpy(dtype=np.int32)
train_labels_np = train_labels.to_numpy(dtype=np.float32)
train_labels_np = (train_labels_np + 1) / 2

###################val dataset label##########################
val_ids=[]
for val_index in val_files:
    parts = val_index.split('/')
    val_ids.append(parts[2][:-4])
    
val_labels = pd.DataFrame(columns=item_attr_data.columns)
val_labels = pd.concat([item_attr_data[item_attr_data['img_idx']==i] for i in val_ids], axis=0, ignore_index=True)
val_labels = val_labels.drop(columns=['index','img_idx'])
val_color = val_labels.pop('clothes_color')

val_color_np = val_color.to_numpy(dtype=np.int32)
val_labels_np = val_labels.to_numpy(dtype=np.float32)
val_labels_np = (val_labels_np + 1) / 2
###################test dataset label##########################
test_ids=[]
for test_index in test_files:
    parts = test_index.split('/')
    test_ids.append(parts[2][:-4])
    
test_labels = pd.DataFrame(columns=item_attr_data.columns)
test_labels = pd.concat([item_attr_data[item_attr_data['img_idx']==i] for i in test_ids], axis=0, ignore_index=True)
test_labels = test_labels.drop(columns=['index','img_idx'])
test_color = test_labels.pop('clothes_color')

test_color_np = test_color.to_numpy(dtype=np.int32)
test_labels_np = test_labels.to_numpy(dtype=np.float32)
test_labels_np = (test_labels_np + 1) / 2

# print(train_labels_np[:5])
# print(train_color_np[:5])

In [21]:
def load_image(image_file):
    image = tf.io.read_file(image_file)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, (256, 256))
    image = tf.cast(image, tf.float32) / 255.0
    return image

In [22]:
batch_size = 100
train_dataset = tf.data.Dataset.from_tensor_slices(train_files)
train_dataset = train_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices(val_files)
val_dataset = val_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

test_dataset = tf.data.Dataset.from_tensor_slices(test_files)
test_dataset = test_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)

In [23]:
train_color_ids_dataset = tf.data.Dataset.from_tensor_slices(train_color_np)
val_color_ids_dataset = tf.data.Dataset.from_tensor_slices(val_color_np)
test_color_ids_dataset = tf.data.Dataset.from_tensor_slices(test_color_np)

train_binary_attributes_dataset = tf.data.Dataset.from_tensor_slices(train_labels_np)
val_binary_attributes_dataset = tf.data.Dataset.from_tensor_slices(val_labels_np)
test_binary_attributes_dataset = tf.data.Dataset.from_tensor_slices(test_labels_np)

train_dataset = tf.data.Dataset.zip((train_dataset, train_color_ids_dataset, train_binary_attributes_dataset))
val_dataset = tf.data.Dataset.zip((val_dataset, val_color_ids_dataset, val_binary_attributes_dataset))
test_dataset = tf.data.Dataset.zip((test_dataset, test_color_ids_dataset, test_binary_attributes_dataset))

train_dataset = train_dataset.batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [24]:
train_dataset

<PrefetchDataset element_spec=(TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None), TensorSpec(shape=(None, 463), dtype=tf.float32, name=None))>