In [None]:
!pip install tqdm
!pip install sentence-transformers
!pip install annoy
!pip install torchsummary

In [46]:
import os
import torch
import torch.nn as nn
import torchvision.models as models
from PIL import Image
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import numpy as np
from tqdm import tqdm
from annoy import AnnoyIndex
from sklearn.model_selection import train_test_split
from torchsummary import summary

In [47]:
# Define a custom dataset
class ImageDataset(Dataset):
    def __init__(self, image_paths, transform):
        self.image_paths = image_paths
        self.transform = transform
    def __len__(self):
        return len(self.image_paths)
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path)
        return self.transform(image), img_path # return (image, path)

In [48]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        resnet50 = models.resnet50(pretrained=True)
        self.encoder = nn.Sequential(*(list(resnet50.children())[:-1])) # remove fc layer used for classification

        # freeze layers up to 3 to retain information learned from pretrained weights
        for name, layer in self.encoder.named_children():
            if name in['0', '1', '2', '3']:
                for param in layer.parameters():
                    param.requires_grad = False
            
    def forward(self, x):
        latent = self.encoder(x).view(x.size(0), -1)
        return latent

class Decoder(nn.Module):
    def __init__(self, latent_dim=2048):
        super(Decoder, self).__init__()
        self.decoder = nn.Sequential(
            # Fully connected layer to expand the latent vector
            nn.Linear(latent_dim, 8 * 8 * 256),  # 8x8 spatial dimension and 256 channels
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(8 * 8 * 256),
            
            # Reshape to (B, 256, 8, 8) via view
            nn.Unflatten(1, (256, 8, 8)),
            
            # Upsampling layers (transpose convolutions)
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=2),    # 8x8 -> 14x14
            nn.ReLU(inplace=True),
            
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),     # 14x14 -> 28x28
            nn.ReLU(inplace=True),
            
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),      # 28x28 -> 56x56
            nn.ReLU(inplace=True),
            
            nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1),      # 56x56 -> 112x112
            nn.ReLU(inplace=True),
            
            nn.ConvTranspose2d(16, 3, kernel_size=4, stride=2, padding=1),       # 112x112 -> 224x224
            nn.Sigmoid()  # Scaling the output to [0, 1] for RGB images
        )
    
    def forward(self, x):
        return self.decoder(x)

class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder(latent_dim=2048)

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return latent, reconstructed

In [49]:
decoder = Autoencoder().decoder
summary(decoder, (2048,))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                [-1, 16384]      33,570,816
              ReLU-2                [-1, 16384]               0
       BatchNorm1d-3                [-1, 16384]          32,768
         Unflatten-4            [-1, 256, 8, 8]               0
   ConvTranspose2d-5          [-1, 128, 14, 14]         524,416
              ReLU-6          [-1, 128, 14, 14]               0
   ConvTranspose2d-7           [-1, 64, 28, 28]         131,136
              ReLU-8           [-1, 64, 28, 28]               0
   ConvTranspose2d-9           [-1, 32, 56, 56]          32,800
             ReLU-10           [-1, 32, 56, 56]               0
  ConvTranspose2d-11         [-1, 16, 112, 112]           8,208
             ReLU-12         [-1, 16, 112, 112]               0
  ConvTranspose2d-13          [-1, 3, 224, 224]             771
          Sigmoid-14          [-1, 3, 2

In [50]:
# Define constants
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
batch_size=32
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [51]:
model = Autoencoder().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4)

# Train, val, test splits (60, 20, 20)
image_folder = "./fashion-dataset/images"
image_paths = [os.path.join(image_folder, fname) for fname in os.listdir(image_folder)]

train_val_paths, test_paths = train_test_split(image_paths, test_size=0.2)
train_paths, val_paths = train_test_split(train_val_paths, test_size=0.25)

train_dataset = ImageDataset(train_paths, transform)
val_dataset = ImageDataset(val_paths, transform)
test_dataset = ImageDataset(test_paths, transform)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Training loop
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for images, paths in tqdm(dataloader, desc="Training loop", unit='batch', leave=True):
        images = images.to(device)
        optimizer.zero_grad()
        latent, reconstructed = model(images)
        loss = criterion(reconstructed, images)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    return running_loss / len(dataloader)

epochs = 10
for epoch in range(epochs):
    train_loss = train_epoch(model, train_dataloader, criterion, optimizer, device)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {train_loss:.4f}")

torch.save(model.state_dict(), "resnet50_autoencoder.pth")

Training loop:   2%|▊                                                    | 13/834 [01:58<2:02:25,  8.95s/batch]

In [None]:
def embed_image_dataset(image_folder, save_to_file=False, filename=""):
    dataset = ImageDataset(image_folder, transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    latent_representations = {}
    with torch.no_grad():
        for images, paths in tqdm(dataloader, desc="Processing Images", unit='batch'):
            images = images.to(device) # Output: [batch_size, 3, 224, 224]
            features = feature_extractor(images).squeeze() # Output: [batch_size, 2048]
            for path, feature in zip(paths, features.cpu()):
                latent_representations[path] = feature.numpy()
    if save_to_file:
        np.save(filename, latent_representations)

    return latent_representations

In [None]:
# load the fashion dataset and compute embeddings
fd_image_folder = "./fashion-dataset/images"
fd_lat_rep = embed_image_dataset(fd_iamge_folder, True, "lat_rep_fd_nft.npy") # save embeddings to a file

In [None]:
# load the wardrobe dataset and compute embeddings
wardrobe_folder = "./sample-wardrobe"
wardrobe_lat_rep = embed_image_dataset(wardrobe_folder)

In [None]:
# create list of fashion dataset embeddings and paths
latent_fd_images = np.load("lat_rep_fd_nft.npy", allow_pickle=True).item()
fd_img_paths = list(latent_fd_images.keys())
fd_features = np.array(list(latent_fd_images.values())) # Output: (44441, 2048)

# create list of wardrobe embeddings and paths
wardrobe_paths = list(wardrobe_lat_rep.keys())
wardrobe_features = np.array(list(wardrobe_lat_rep.values()))

# get the mean embedding of all items in wardrobe
mean_embedding = np.mean(wardrobe_features, axis=0)

In [None]:
# Perform Annoy
embedding_dim = 2048  # Original dimensionality
annoy_index = AnnoyIndex(embedding_dim, metric='euclidean')

# Add all items to Annoy index
for i, embedding in enumerate(fd_features):
    annoy_index.add_item(i, embedding)

# Build the index
n_trees = 50
annoy_index.build(n_trees)  # Number of trees

# Query the index
n_neighbors = 10
indices = annoy_index.get_nns_by_vector(mean_embedding, n_neighbors, include_distances=True)

print("Recommended indices:", indices[0])
for idx in indices[0]:
    print(fd_img_paths[idx])

In [None]:
im = Image.open("./fashion-dataset/images/52122.jpg")
im.show()

In [None]:
# TODO: fine tune resnet50 model on fashion dataset

In [None]:
# Build the encoder for metadata

In [None]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
metadata = ["Nike", "Nike Pegasus 40 White/Black"] # dim = d
embedding = model.encode(metadata) # shape = [d x 384]
print(embedding.shape)

In [None]:
# concat embeddings

In [None]:
# perform ANNOY on weighted embedding X' and inventory embeddings Y_1, ... Y_n

In [None]:
# output recommendations