<a href="https://colab.research.google.com/github/rajlm10/Shoppster/blob/main/Baseline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!nvidia-smi

Wed Nov 30 09:07:50 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   61C    P8    10W /  70W |      0MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [2]:
!pip install -Uq sentence_transformers
!pip install transformers -qqq

[K     |████████████████████████████████| 85 kB 2.4 MB/s 
[K     |████████████████████████████████| 5.5 MB 28.4 MB/s 
[K     |████████████████████████████████| 1.3 MB 60.3 MB/s 
[K     |████████████████████████████████| 182 kB 69.7 MB/s 
[K     |████████████████████████████████| 7.6 MB 52.4 MB/s 
[?25h  Building wheel for sentence-transformers (setup.py) ... [?25l[?25hdone


In [3]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import cv2
import os
import torch
from torch import nn
from transformers import AutoFeatureExtractor, AutoModel
from torchvision import transforms
from tqdm.auto import tqdm
from sentence_transformers import SentenceTransformer, models
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, SequentialSampler
import math
from PIL import Image
import torch.nn.functional as F
tqdm.pandas()

In [4]:
class ImageEncoder(nn.Module):
    def __init__(self, model_name="google/vit-base-patch16-224-in21k", pretrained=True, trainable=True, device=None):
        super().__init__()
        self.device = device
        self.model = AutoModel.from_pretrained(model_name)

        self.model.to(self.device)
        
        if trainable:
          for p in self.model.parameters():
              p.requires_grad = trainable

    def forward(self, image_inputs):
        image_inputs = {k:v.to(self.device) for k,v in image_inputs.items()}
        image_outputs = self.model(**image_inputs)
        image_embedding = image_outputs.pooler_output 
        return image_embedding

class TextEncoder(nn.Module):

    def __init__(self, model_name="sentence-transformers/all-mpnet-base-v2", trainable=True, device=None):
        super().__init__()
        self.device=device
        word_embedding_model = models.Transformer(model_name)
        pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(), 'mean')
        self.model = SentenceTransformer(modules=[word_embedding_model, pooling_model], device=device)
        self.model.to(self.device)

        if trainable:    
          for p in self.model.parameters():
              p.requires_grad = trainable

    def forward(self, text):
        attr_embedding = self.model.encode(text, convert_to_tensor=True, device=self.device)
        return attr_embedding

class FashionDataset(Dataset):

    def __init__(self, root_dir, df):
        self.df = df
        self.root_dir = root_dir
        self.processor = AutoFeatureExtractor.from_pretrained("google/vit-base-patch16-224-in21k")    
        

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):

        image_file = self.root_dir + "/" + self.df["image_id"][idx]
        positive = self.df['positive'][idx] 
        negative = self.df['negative'][idx] 
        image = Image.open(image_file).convert("RGB")

        encoding={}
        encoding["image_name"] = self.df["image_id"][idx]

        pixel_vals = self.processor(image, return_tensors="pt").pixel_values #The preprocessor will take care of resizing.
        encoding["anchor"] = {"pixel_values" : pixel_vals.squeeze(0)}

        encoding["pos"] = positive
        encoding["neg"] = negative
        

        return encoding

class Network(nn.Module):

    def __init__(self, hyperparms=None):

        super(Network, self).__init__()  
        self.vision_projection = nn.Linear(hyperparms["vision_dim"], hyperparms["proj_dim"])
        self.text_projection = nn.Linear(hyperparms["text_dim"], hyperparms["proj_dim"])     
        self.dropout = nn.Dropout(0.1)

        device = hyperparms["device"]
        model_name = hyperparms["model_name"]
        self.image_encoder = ImageEncoder(model_name, device=device)


        self.text_encoder = TextEncoder(device=device)
        EUCLIDEAN = lambda x, y: F.pairwise_distance(x, y, p=2)
        self.criterion = nn.TripletMarginWithDistanceLoss(margin=0.2, distance_function=EUCLIDEAN, reduction="mean")
        self.relu_f = nn.ReLU()

    def forward(self, batch):

        # Getting Image and Text Features
        image_emb = self.image_encoder(batch["anchor"])
        image_emb = torch.nn.functional.normalize(image_emb, p=2, dim=1)
        
        # Getting Image and Text Embeddings (with same dimension)
        image_emb = self.relu_f(self.vision_projection(image_emb))
        image_emb = self.dropout(image_emb)

        pos_emb = self.text_encoder(batch["pos"])
        pos_emb = torch.nn.functional.normalize(pos_emb, p=2, dim=1)
        pos_emb = self.relu_f(self.text_projection(pos_emb))
        pos_emb = self.dropout(pos_emb)

        neg_emb = self.text_encoder(batch["neg"])
        neg_emb = torch.nn.functional.normalize(neg_emb, p=2, dim=1)
        neg_emb = self.relu_f(self.text_projection(neg_emb))
        neg_emb = self.dropout(neg_emb)

        # Calculating the Loss
        loss = self.criterion(image_emb, pos_emb, neg_emb)       #Triplet Loss calculation
        return loss


In [5]:
import pandas as pd
test_dataframe = pd.read_csv('/content/drive/MyDrive/MLD/test_data.csv')


!mkdir ~/.kaggle/
!cp /content/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d paramaggarwal/fashion-product-images-small
!unzip -qq /content/fashion-product-images-small.zip 


Downloading fashion-product-images-small.zip to /content
 97% 549M/565M [00:04<00:00, 148MB/s]
100% 565M/565M [00:04<00:00, 139MB/s]


In [9]:
test_dataset = FashionDataset(root_dir='./images',df=test_dataframe)


test_batch_size = 512
dataloader_test = DataLoader(test_dataset, 
                                   sampler=SequentialSampler(test_dataset), 
                                   batch_size=test_batch_size)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = Network(hyperparms={"device": device,"model_name": "google/vit-base-patch16-224-in21k", "vision_dim": 768, "text_dim": 768, "proj_dim": 512})
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

def get_image_embeddings():
    model.eval()
    test_image_embeddings = []
    image_names = []
    with torch.no_grad():
        for batch in tqdm(dataloader_test):
            image_features = model.image_encoder(batch["anchor"])             
            image_names.extend(batch["image_name"])
            image_embeddings = model.vision_projection(image_features)
            test_image_embeddings.append(image_embeddings)
    return image_names, torch.cat(test_image_embeddings)

image_names, image_embeddings = get_image_embeddings()

from sentence_transformers import util

def search(query, image_embeddings, img_names, k=101):
    query_emb = model.text_encoder(query)
    query_emb = model.text_projection(query_emb)
    hits = util.semantic_search(query_emb, image_embeddings, top_k=k)[0]
    return [img_names[hit['corpus_id']] for hit in hits]
        
def get_results(df):
    from collections import defaultdict
    attr2Image = defaultdict(list)
    attr2Pred_Image = defaultdict(list)
    actuals = []
    predictions = []
    for index, row in tqdm(df.iterrows(), total=df.shape[0]):
        attr = row["positive"]    
        if attr not in attr2Image:
            attr2Image["attr"] = df[df.positive == attr]["image_id"].to_list()
        actuals.append(attr2Image["attr"])
        if attr not in attr2Pred_Image:
           attr2Pred_Image["attr"] = search(attr, image_embeddings, image_names)
           
        predictions.append(attr2Pred_Image["attr"])

    return actuals, predictions

actuals, predictions = get_results(test_dataframe)

def recall(actual, predicted, k):
    act_set = set(actual)
    pred_set = set(predicted[:k])
    result = round(len(act_set & pred_set) / float(len(act_set)), 2)
    return result

def recall_at_k(k=1):
    results = []
    for actual, predicted in zip(actuals, predictions):
         results.append(recall(actual, predicted, k))
    return results         

R1 = recall_at_k(1)
R5 = recall_at_k(5)
R20 = recall_at_k(20)
R100 = recall_at_k(100)
Mean_R1 = sum(R1) / len(R1)
Mean_R5 = sum(R5) / len(R5)
Mean_R20 = sum(R20) / len(R20)
Mean_R100 = sum(R100)/ len(R100)
print(Mean_R1)
print(Mean_R5)
print(Mean_R20)
print(Mean_R100)

Downloading:   0%|          | 0.00/346M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/571 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/438M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/363 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/239 [00:00<?, ?B/s]

  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/3030 [00:00<?, ?it/s]

0.0
0.001646864686468647
0.006597359735973597
0.029366336633663333
