In [1]:
import pandas as pd
import numpy as np
from GPyOpt.methods.bayesian_optimization import BayesianOptimization
from model import VAE

In [2]:
pairs_dev_train_path = 'pairsDevTrain.txt'
pairs_dev_test_path = 'pairsDevTest.txt'

def extract_pairs_for_development(pairs_dev_filepath: str):
    pairs = []
    with open(pairs_dev_filepath, 'r') as f:
        size = int(next(f))
        for i in range(size):
            line = next(f)
            name, num1, num2 = line.strip().split()
            pairs.append(((name, int(num1)), (name, int(num2))))
        for i in range(size):
            line = next(f)
            name1, num1, name2, num2 = line.strip().split()
            pairs.append(((name1, int(num1)), (name2, int(num2))))
    return pairs

pairs_dev_train = extract_pairs_for_development(pairs_dev_train_path)
pairs_dev_test = extract_pairs_for_development(pairs_dev_test_path)

In [3]:
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image

dataset_path = './lfw/'

class PairDataGenerator(Dataset):
    def __init__(self, pairs, transform=None):
        self.pairs = pairs

        composition = []
        if transform:
            composition.append(transform)
        composition.append(transforms.ToTensor())

        self.transform = transforms.Compose(composition)

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, index):
        (name1, num1), (name2, num2) = self.pairs[index]
        # Assuming images are in a folder named 'lfw-deepfunneled'
        image_path1 = f'{dataset_path}/{name1}/{name1}_{num1:0>4}.jpg'
        image_path2 = f'{dataset_path}/{name2}/{name2}_{num2:0>4}.jpg'

        image1 = Image.open(image_path1).convert("RGB")
        image2 = Image.open(image_path2).convert("RGB")

        image1 = self.transform(image1)
        image2 = self.transform(image2)

        # Concatenate images horizontally
        pair = torch.cat([image1, image2], dim=2)

        y_true = 1 if name1 == name2 else 0
        return pair, y_true

In [4]:
train_pair_gen = PairDataGenerator(pairs_dev_train, transform=transforms.Resize((256,256)))
test_pair_gen = PairDataGenerator(pairs_dev_test, transform=transforms.Resize((256,256)))
train_pair_loader = DataLoader(train_pair_gen, batch_size=32)
test_pair_loader = DataLoader(test_pair_gen, batch_size=32)

In [5]:
vae = VAE((3, 256, 256), 256).cuda()
vae.load_state_dict(torch.load('trained/vae-256.dat')['state_dict'])
vae.eval()

VAE(
  (encoder_conv): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): LeakyReLU(negative_slope=0.01)
    (6): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (7): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.01)
    (9): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (10): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): LeakyReLU(negative_slope=0.01)
  )
  (mu): Sequential(
    (0): Linear(in_features=16384, out_features=256, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0

In [6]:
def to_numpy(pair_loader):
    left_latent_list = []
    right_latent_list = []
    value_list = []
    for pair_batch, value_batch in pair_loader:
        left_images = pair_batch[:, :, :, :256].cuda()
        right_images = pair_batch[:, :, :, -256:].cuda()
        left_images_latent = vae.forward_encoder(left_images)[0].detach().cpu().numpy()
        right_images_latent = vae.forward_encoder(right_images)[0].detach().cpu().numpy()
        values = value_batch.detach().cpu().numpy()
        left_latent_list.append(left_images_latent)
        right_latent_list.append(right_images_latent)
        value_list.append(values)
    left_latent = np.concatenate(left_latent_list, axis=0)
    right_latent = np.concatenate(right_latent_list, axis=0)
    values = np.concatenate(value_list, axis=0)
    return left_latent, right_latent, values
train_left_latent, train_right_latent, train_values = to_numpy(train_pair_loader)
test_left_latent, test_right_latent, test_values = to_numpy(test_pair_loader)

In [8]:
dif = train_left_latent - train_right_latent
sq_dif = dif ** 2
sq_dif_sum = np.sum(sq_dif, axis=1)
train_dist_array = np.sqrt(sq_dif_sum)
train_min = train_dist_array.min()
train_max = train_dist_array.max()
print(train_min)
print(train_max)
print(len(train_dist_array))
train_dist_array

10.978727
63.927536
2200


array([21.073095, 29.629827, 41.133606, ..., 29.790003, 29.138306,
       22.96704 ], dtype=float32)

In [9]:
dif = test_left_latent - test_right_latent
sq_dif = dif ** 2
sq_dif_sum = np.sum(sq_dif, axis=1)
test_dist_array = np.sqrt(sq_dif_sum)
test_min = test_dist_array.min()
test_max = test_dist_array.max()
print(test_min)
print(test_max)
print(len(test_dist_array))
test_dist_array

16.225035
48.390682
1000


array([34.405396, 36.395824, 38.41734 , 33.569515, 32.523342, 34.93727 ,
       31.577353, 37.38541 , 35.144993, 26.177683, 27.377682, 29.202696,
       31.34058 , 26.520834, 32.214672, 33.217304, 31.464514, 36.667507,
       21.663109, 23.10997 , 32.359337, 30.59164 , 30.023516, 37.98361 ,
       28.300045, 40.80275 , 37.03526 , 33.593292, 24.01202 , 31.880583,
       31.9852  , 24.439774, 21.020737, 32.227314, 25.989838, 29.9112  ,
       38.849075, 32.99214 , 31.32262 , 33.431683, 27.405657, 34.08843 ,
       37.022564, 29.015408, 32.445538, 32.14316 , 28.949202, 30.458809,
       31.710327, 34.85473 , 29.41375 , 32.357872, 33.84021 , 34.984066,
       28.905663, 31.37485 , 33.959248, 30.05769 , 31.715906, 33.66422 ,
       37.310833, 36.806576, 33.190083, 27.73489 , 22.082052, 33.442963,
       30.363699, 30.571651, 29.05333 , 21.525108, 23.30971 , 22.713463,
       31.53365 , 33.376057, 31.366108, 35.40055 , 26.828016, 26.268702,
       33.960655, 30.09609 , 32.20524 , 31.791227, 

In [10]:
def obj_func(threshold: float) -> float:
    decision = (train_dist_array < threshold).astype(int)
    accuracy = np.mean(decision == train_values)
    return -accuracy
def evaluate(dist_array: np.array, true: np.array, threshold: float) -> float:
    decision = (dist_array < threshold).astype(int)
    return np.mean(decision == true)

In [11]:
domain = [{'name': 'threshold', 'type': 'continuous', 'domain': (train_min, train_max)}]
max_iter = 30
BO = BayesianOptimization(f = obj_func, domain = domain)
BO.run_optimization(max_iter=max_iter)

In [12]:
opt_values = BO.x_opt
print(opt_values)

[31.35039376]


In [13]:
obj_func(*opt_values)

-0.5954545454545455

In [14]:
evaluate(test_dist_array, test_values, threshold=opt_values[0])

0.573