In [None]:
# mount drive, install kaggle, download the data, and unzip
from google.colab import drive
drive.mount('/content/gdrive')
!pip install -q kaggle
%mkdir /root/.kaggle
%cp /content/gdrive/My\ Drive/CMU11785-HW2P2/kaggle.json  /root/.kaggle/
%cd /
!kaggle datasets download -d cmu11785/20fall-hw2p2
!unzip -q 20fall-hw2p2.zip -d data

In [None]:
# change directory to hw2_p2
%cd /content/gdrive/My\ Drive/CMU11785-HW2P2/

In [None]:
# importing the packages
import os
import numpy as np
from PIL import Image
import time
import datetime
import torch.optim as optim
import torch
from torchvision import transforms, datasets
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [None]:
#smallest ResNET Block
class baseBlock(torch.nn.Module):
  def __init__(self,in_channel,out_channel,stride=1,shortcut=None):
      super(baseBlock,self).__init__()
      self.conv1 = torch.nn.Conv2d(in_channel,out_channel,stride=stride,kernel_size=3,padding=1)
      self.bn1   = torch.nn.BatchNorm2d(out_channel)
      self.conv2 = torch.nn.Conv2d(out_channel,out_channel,stride=1,kernel_size=3,padding=1)
      self.bn2   = torch.nn.BatchNorm2d(out_channel)
      self.shortcut = shortcut

  def forward(self,x):
      output = F.relu(self.bn1(self.conv1(x)))
      #print(output.shape)
      output = self.bn2(self.conv2(output))
      #print(output.shape)
      if self.shortcut is not None:
        output += self.shortcut(x)
      output = F.relu(output)
      #print(output.shape)
      return output


class ResNet(torch.nn.Module):
  def __init__(self,num_layers,classes=10,feats=512):
      super(ResNet,self).__init__()
      self.conv1 = nn.Conv2d(3,64,kernel_size=3,stride=1,padding=1)
      self.bn1   = nn.BatchNorm2d(64)
      self.input_planes = 64
      self.layer1 = self._layer(64,  num_layers[0],stride=2)
      self.layer2 = self._layer(128,  num_layers[1],stride=2)
      self.layer3 = self._layer(256,  num_layers[2],stride=2)
      self.layer4 = self._layer(feats,num_layers[3],stride=2)
      self.avgPool = nn.AdaptiveAvgPool2d((2))
      self.fc  =  torch.nn.Linear(feats*2*2,classes)
  
  def _layer(self,planes,num_layers,stride=1):
      netLayers =[]
      shortcut = None
      if stride !=1 or self.input_planes != planes:
        shortcut = torch.nn.Sequential(torch.nn.Conv2d(self.input_planes,planes,kernel_size=1,stride=stride),
                        torch.nn.BatchNorm2d(planes))
      
      netLayers.append(baseBlock(self.input_planes,planes,stride=stride,shortcut=shortcut))
      self.input_planes = planes
      for i in range(1,num_layers):
          netLayers.append(baseBlock(self.input_planes,planes))
          self.input_planes = planes
      return torch.nn.Sequential(*netLayers)

  def forward(self,x):
      x = F.relu(self.bn1(self.conv1(x)))
      #print("Conv1  :",x.shape)
      x = self.layer1(x)
      #print("L_1:",x.shape)
      x = self.layer2(x)
      #print("L_2:",x.shape)
      x = self.layer3(x)
      #print("L_3:",x.shape)
      x = self.layer4(x)
      #print("L_4:",x.shape)
      x = self.avgPool(x)
      #print("avg:",x.shape)
      x = torch.flatten(x,1)
      #print("flattened:",x.shape)
      out =self.fc(x)
      #print("labels: ",x.shape)
      return x,out

In [None]:
class fetch_image_pairs(torch.utils.data.Dataset):
  def __init__(self,text_pair_path,dir_path,dev_pair=False):
   self.pair_1  = []
   self.pair_2  = []
   self.label   = []
   self.dev_pair = dev_pair
   self.dir_path = dir_path

   with open(text_pair_path) as f:
     for line in f:
       items = line.split()
       self.pair_1.append(items[0])
       self.pair_2.append(items[1])
       if self.dev_pair:
         self.label.append(items[2])
       else:
          self.label.append(-1)

  def __len__(self):
    return len(self.pair_1)
  
  def __getitem__(self,index):
    img1 = Image.open(self.dir_path+self.pair_1[index])
    img2 = Image.open(self.dir_path+self.pair_2[index])
    img1 = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.485,0.456,0.406),(0.229,0.224,0.225))])(img1)
    img2 = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.485,0.456,0.406),(0.229,0.224,0.225))])(img2)
    lbl = int(self.label[index])
    return img1,img2,lbl


def test_verify(model,vpv_loader):
  sim_score   = []
  exact_score = []
  tart_time = time.time()
  with torch.no_grad():
    model.eval()
    for batch_idx, (img1,img2,true_score) in enumerate(vpv_loader):  
      img1,img2,true_score = img1.to(device), img2.to(device),true_score.to(device)
      embedding_1 = model(img1.float())[0]
      embedding_2 = model(img2.float())[0]
      calc_score = F.cosine_similarity(embedding_1,embedding_2)
      sim_score.append(calc_score.view(-1))
      exact_score.append(true_score.view(-1))
      torch.cuda.empty_cache()
      del true_score
      del img1
      del img2
  end_time = time.time()
  print("Similarity score calculated:!")
  return sim_score,exact_score


In [None]:
# Hyper parameters
cuda = torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")
train_batch_size = 128                      # input batch size for training')
test_batch_size  = 64                      # input batch size for training')
num_workers       = 4
embedding_dim    = 512                      # embedding dimension for images
hidden_layers    = [1,1,1,1]                # ResNET hidden Layers

In [None]:
# Model 
num_classes      = 4000     # number of unique classes
model = ResNet(hidden_layers,num_classes,embedding_dim)
PATH  = os.getcwd()+f"/saved_model_with_centloss/state_dict_model_10.22_4.pt"
model.load_state_dict(torch.load(PATH, map_location=device))
model.to(device)
model.eval()

In [None]:
# Predicting the AUC for the test pairs
vpt_set    = fetch_image_pairs('/data/verification_pairs_test.txt','/data/',dev_pair=False)
vpt_loader = DataLoader(vpt_set,batch_size=test_batch_size,shuffle=False,num_workers=num_workers,drop_last=False)
sim_score,exact_score=test_verify(model,vpt_loader)
sim_score = torch.cat(sim_score,axis=0).cpu().numpy()

In [None]:
!touch /content/gdrive/My\ Drive/CMU11785-HW2P2/verification_pairs_test_score.npy
!touch /content/gdrive/My\ Drive/CMU11785-HW2P2/verification_pairs_test.npy
!touch /content/gdrive/My\ Drive/CMU11785-HW2P2/submission.csv

%cd /content/gdrive/My\ Drive/CMU11785-HW2P2/


with open("/data/verification_pairs_test.txt") as f:
  pairs = []
  for i,line in enumerate(f):
    items = line.split()
    pairs.append(" ".join(items))
pairs =np.array(pairs)



np.save('verification_pairs_test.npy',pairs)
np.save('verification_pairs_test_score.npy',sim_score)


In [None]:
import pandas as pd
df = pd.DataFrame({"id":pairs,"Category":sim_score})
df.to_csv('submission.csv',index=False)

In [None]:
!kaggle competitions submit -c 11785-hw2p2-slack-kaggle -f submission.csv -m "Message"