In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import torch
from torch.nn import Sequential, Linear, ReLU, Conv2d, MaxPool2d, Flatten
from torch.utils.data import DataLoader
from tqdm import trange
import numpy as np
import matplotlib.pyplot as plt
import os
from skimage import io

from src.data.ucmerced_dataset import UcMercedDataset
from src.data.orto_dataset import OrtoDataset
from src.settings import DATA_DIRECTORY
from src.settings import TRAIN_DATA_DIRECTORY, TEST_DATA_DIRECTORY
from src.evaluation import evaluate_anmrr, evaluate_loss

from sklearn.metrics.pairwise import euclidean_distances

def soft_pn(a, p, n):
    def dist(x, y):
        return torch.linalg.norm((x - y))
    
    dist_p = dist(a, p)
    dist_n1 = dist(a, n)
    dist_n2 = dist(p, n)

    min_n_dist = torch.minimum(dist_n1, dist_n2)

    pos_exp = dist_p.exp()
    min_n_exp = min_n_dist.exp()

    l = (pos_exp / (min_n_exp + pos_exp)).pow(2) + (min_n_exp / (min_n_exp + pos_exp) - 1).pow(2)
    return l.mean()
    

In [None]:
image_size = 224
train_dataset = UcMercedDataset(TRAIN_DATA_DIRECTORY, image_size, train=True)
test_dataset = UcMercedDataset(TEST_DATA_DIRECTORY, image_size, train=False)

train_dataloader = DataLoader(train_dataset, batch_size=80, shuffle=True, num_workers=10)
test_dataloader = DataLoader(test_dataset, shuffle=False, batch_size=100, num_workers=10)

In [None]:
def set_training_model_layers(model, training: bool, up_to_index: int):
    i = 0
    for child in model.children():
        if i > up_to_index:
            break
        for param in child.parameters():
            param.requires_grad = training
        i+=1

In [None]:
import torch
from tqdm import tqdm, trange
from torch.nn import Sequential, Linear, ReLU, Conv2d, MaxPool2d, Flatten, LocalResponseNorm, BatchNorm2d, LeakyReLU, Tanh
from torch.utils.tensorboard import SummaryWriter
from torch.optim import lr_scheduler

act = LeakyReLU()

model = torch.hub.load('pytorch/vision', 'resnet18', pretrained=True)

model.fc = torch.nn.Linear(512, 50, bias=True)
set_training_model_layers(model, False, 8)

model = model.cuda()

optim = torch.optim.Adam(model.parameters(), weight_decay=1e-5)
criterion = torch.nn.TripletMarginLoss()

def evaluate_model(model, train: DataLoader, test: DataLoader, criterion):
    model.train(False)
    with torch.no_grad():
        test_loss = evaluate_loss(model, test, criterion)

        train_anmrr = evaluate_anmrr(model, train_dataloader, euclidean_distances)
        test_anmrr = evaluate_anmrr(model, test_dataloader, euclidean_distances)

    model.train(True)

    return test_loss, train_anmrr, test_anmrr


writer = SummaryWriter()
t = trange(25)
for epoch in t:
    loss_sum = 0.0
    for i_batch, sample_batched in enumerate(train_dataloader):
        optim.zero_grad()
        
        anchors = sample_batched['a'].cuda()
        positives = sample_batched['p'].cuda()
        negatives = sample_batched['n'].cuda()
        a = model(anchors)
        p = model(positives)
        n = model(negatives)
        loss = criterion(a, p, n)
        loss.backward()
        loss_sum += float(loss)
        optim.step()
        t.set_description(f"Batch: {i_batch}")
    del a
    del p
    del n
    del loss
    
    train_loss = loss_sum / len(train_dataloader)

    test_loss, train_anmmr, test_anmrr = evaluate_model(model, train_dataloader, test_dataloader, criterion)
    writer.add_scalar('Loss/train', train_loss, epoch)
    writer.add_scalar('Loss/test', test_loss, epoch)
    writer.add_scalar('ANMRR/train', train_anmmr, epoch)
    writer.add_scalar('ANMRR/test', test_anmrr, epoch)
    t.set_description(f"Epoch: {epoch}, Train loss: {train_loss}")


In [None]:
paths = []
embeddings = []
classes = []
with torch.no_grad():
    for i_batch, sample_batched in enumerate(test_dataloader):
        anchors = sample_batched['a'].cuda()
        y = sample_batched['a_y']
        classes.append(y.cpu().numpy())
        anchor_paths = sample_batched['path']
        paths.extend(anchor_paths)
        a = model(anchors).cpu().numpy()
        embeddings.append(a)

    embeddings = np.concatenate(embeddings)
    classes = np.concatenate(classes)

import pickle as pkl
with open("merced_embeddings.pkl.gz", "wb") as f:
    pkl.dump((paths, embeddings, classes), f)

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
tsne_embeddings = TSNE(n_components=2).fit_transform(embeddings)


In [None]:

from PIL import Image
def visualize_tsne_embeddings(embeddings: np.array, image_paths):

    def get_image(path):
        img = Image.open(path)
        # img.resize((10,10))
        a = np.asarray(img)
        return OffsetImage(a, zoom=0.15)


    fig, ax = plt.subplots(figsize=(15,15))
    ax.scatter(embeddings[:, 0], embeddings[:, 1]) 
    for image_path, (x, y) in zip(image_paths, tsne_embeddings):
        ab = AnnotationBbox(get_image(image_path), (x, y), frameon=False)
        ax.add_artist(ab)
    fig.savefig("merced_embeddings.png", dpi=300, bbox_inches='tight', pad_inches=0)
    plt.show()

visualize_tsne_embeddings(tsne_embeddings, paths)

In [None]:
from sklearn.metrics.pairwise import euclidean_distances

from IPython.display import Image
from IPython.display import display
distances = euclidean_distances(embeddings)
if(len(classes.shape) < 2):
    classes = classes[:, None]

paths = np.array(paths).squeeze()
rankings = np.argsort(distances, axis=1)
selected_images = paths[rankings]

cols = 6
rows = 6

for label, name in test_dataset.label_name_mapping.items():
    
    indices_with_class = np.argwhere(classes == label)[:, 0].squeeze()
    example_query_index = np.random.choice(indices_with_class)

    query_image_path = paths[example_query_index]
    example_query = selected_images[example_query_index, :].squeeze()

    query_image = io.imread(query_image_path)
    
    fig=plt.figure(figsize=(3, 3))
    plt.imshow(query_image)
    query_image_name = os.path.split(query_image_path)[1]
    plt.title(f"Query: {query_image_name}")
    plt.axis("off")
    plt.show()
    

    fig=plt.figure(figsize=(8, 8))
    for i in range (cols * rows):
        path = example_query[i]
        image = io.imread(path)
        fig.add_subplot(rows, cols, i+1)
        plt.title(os.path.split(path)[1])
        plt.axis("off")
        
        plt.imshow(image)
    fig.suptitle(f"Response to query: {query_image_name}")
    plt.show()

In [None]:
model.train(False)
with torch.no_grad():
    test_anmrr, class_anmrr = evaluate_anmrr(model, test_dataloader, euclidean_distances, class_mean=True)
model.train(True)
anmrr_with_labels = [(train_dataset.label_name_mapping[c], value) for c, value in class_anmrr]
anmrr_with_labels

In [None]:
import plotly.express as px
names, values = list(zip(*anmrr_with_labels))
fig = px.bar(x=names, y=values)
fig.show(renderer='browser')