In [9]:
import clip
import torch
from PIL import Image

In [121]:
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, clip_preprocess = clip.load("ViT-B/32", device=device)

In [122]:
image_id = "0400016650890"
image_name = "0400016650890_A1_0.jpg"
image = "/home/rajkinra23/git/drip_vision/data/embeddings_dataset/train/{}/{}".format(image_id, image_name)

In [123]:
image_input = clip_preprocess(Image.open(image)).unsqueeze(0).to(device)
image_features = clip_model.encode_image(image_input)
print(image_features.size())

torch.Size([1, 512])


In [124]:
def get_embedding(image):
    image_input = clip_preprocess(Image.open(image)).unsqueeze(0).to(device)
    image_features = clip_model.encode_image(image_input)
    return image_features

In [125]:
# Parent directory import
import sys
sys.path.append('/home/rajkinra23/git/drip_vision/')

import torchvision
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader,Dataset
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import random
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import os

def preprocess(image):
    image = Image.open(image)
    image = image.convert("L")
    transform = transforms.Compose([transforms.Resize((100,100)), transforms.ToTensor()])
    image = transform(image)
    return image

# Use siamese model to compare?
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # First CNN
        self.cnn1 = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(1, 4, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(4),          
            nn.ReflectionPad2d(1),
            nn.Conv2d(4, 8, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(8),
            nn.ReflectionPad2d(1),
            nn.Conv2d(8, 8, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(8),
        )

        # Fully connected layers
        self.fc1 = nn.Sequential(
            nn.Linear(8*100*100, 500),
            nn.ReLU(inplace=True),
            nn.Linear(500, 500),
            nn.ReLU(inplace=True),
            nn.Linear(500, 64))

    def forward_once(self, x):
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2
    
    def predict(self, image):
        image = preprocess(image)
        image = image.unsqueeze(0)
        image = image.to(self.device)
        return self.forward_once(image)
    
# Contrastive loss
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim = True)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))


        return loss_contrastive
    
model = SiameseNetwork().to(device)
model.load_state_dict(torch.load("/home/rajkinra23/git/drip_vision/models/embedding_model.pt"))

<All keys matched successfully>

In [126]:
def get_embedding_custom(image):
    return model.predict(image)

In [127]:
# Compute distance between two images
def similarity(a, b):
    a_embedding = get_embedding(a)
    b_embedding = get_embedding(b)
    # print(a_embedding, b_embedding)
    # print(a_embedding - b_embedding)
    dist = (a_embedding - b_embedding).pow(2).sum(1).sqrt()
    return dist

In [128]:
def embedding_similarity(a, b):
    dist = (a - b).pow(2).sum(1).sqrt()
    return float(dist)

In [129]:
image_template = "/home/rajkinra23/git/drip_vision/data/embeddings_dataset/train/{}/{}"
a = image_template.format("0400016650890", "0400016650890_A1_0.jpg")
b = image_template.format("0400016650890", "0400016650890_A2_0.jpg")
c = image_template.format("0400017081918", "0400017081918_A0_0.jpg")
d = image_template.format("0400017081918", "0400017081918_A0_0.jpg")
e = image_template.format("0400015606460", "0400015606460_A3_0.jpg")
# print(similarity(a, b))
# print(similarity(a, c))
# print(similarity(d, e))
# print(similarity(a, d))

In [131]:
import random
import os

# Take a random sample of images and see how the similarity really works
train_dir = "/home/rajkinra23/git/drip_vision/data/embeddings_dataset/train/"
image_ids = set()
for _ in range(20):
    image_id = random.choice(os.listdir(train_dir))
    image_ids.add(image_id)

    
# Populate some random images
m = {}
images = []
for image_id in image_ids:
    for _ in range(2):
        root = os.path.join(train_dir, image_id)
        image = random.choice(os.listdir(root))
        images.append(os.path.join(root, image))
        m[os.path.join(root, image)] = image_id
        
# Compute embeddings
embeddings = {}
for image in images:
    # embeddings[image] = get_embedding_custom(image)
    embeddings[image] = get_embedding(image)

In [132]:
# Compute similarity
similarity = []
for a in embeddings:
    for b in embeddings:
        if a != b and m[a] != m[b]:
            similarity.append((embedding_similarity(embeddings[a], embeddings[b]), a, b))

In [134]:
from ipyplot import plot_images

similarity.sort()
print(len(similarity))
for each in similarity[:100]:
    plot_images((each[1], each[2]))

1030
