In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
import cv2

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision

from torchvision import transforms
from torch.utils.data import DataLoader, Dataset, SubsetRandomSampler, random_split
from torchvision.transforms import Compose, Resize, ToTensor, Normalize
from torch.nn import SyncBatchNorm
from torchvision.models import resnet50, resnet152
from torchvision.io import read_image
from torchvision.transforms import ToPILImage
from PIL import Image

import warnings
warnings.filterwarnings("ignore")

target_shape = (200, 200)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def getImagePaths(path):
    image_names = []
    for dirname, _, filenames in os.walk(path):
        for filename in filenames:
            fullpath = os.path.join(dirname, filename)
            image_names.append(fullpath)
    return image_names

left_dir_path = "ImgData/train/left"
right_dir_path = "ImgData/train/right"

left_images_path = getImagePaths(left_dir_path)
right_images_path = getImagePaths(right_dir_path)

print(f"Number of left images: {len(left_images_path)}\n")
print(f"Number of right images: {len(right_images_path)}\n")

Number of left images: 2000

Number of right images: 2000



In [3]:
def getShape(images_paths):
    shape = cv2.imread(images_paths[0]).shape
    for image_path in images_paths:
        image_shape=cv2.imread(image_path).shape
        if (image_shape!=shape):
            return "Different image shape"
        else:
            return "Same image shape " + str(shape)

In [4]:
anchor_images = left_images_path
positive_images = right_images_path

train_imgs = anchor_images + positive_images

In [5]:
def preprocess_image(image_tensor):
    """
    Preprocess the input image tensor.
    """
    
    # Define the transformations: resize
    transform = transforms.Compose([
    transforms.Resize(target_shape, antialias=True),  # Explicitly set antialias to True
    ])
    
    # Apply the transformations
    # print(image_tensor)
    image = transform(image_tensor)
    return image


def preprocess_doublets(anchor, test):
    return (
        preprocess_image(anchor),
        preprocess_image(test),
    )


In [6]:
from torch import sigmoid

class TLLDataset(Dataset):
    def __init__(self, train_imgs_paths):
        # Load images and convert to tensors
        train_imgs = [read_image(path) for path in train_imgs_paths]
        # print(len(train_imgs_paths))
        # positive_images = [read_image(find_co_path(path,ilookup,os.path.dirname(positive_image_paths[0]))) for path in anchor_image_paths]        
        self.train_imgs = torch.stack(train_imgs)
        
    def __len__(self):
        return len(self.train_imgs)
    
    def __getitem__(self, idx):
        img = self.train_imgs[idx]
        img = preprocess_image(img)
        # 
        img = img/255
        # print(img)
        return img

# Create an instance of TLLDataset
tll_dataset = TLLDataset(train_imgs)

# Determine the indices for training and validation
image_count = len(tll_dataset)
indices = list(range(image_count))
train_indices = indices[:round(image_count * 0.8)]
val_indices = indices[round(image_count * 0.8):]

# Create SubsetRandomSamplers
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

# Create DataLoaders for training and validation
train_loader = DataLoader(tll_dataset, batch_size=4, sampler=train_sampler)
val_loader = DataLoader(tll_dataset, batch_size=4, sampler=val_sampler)

In [7]:
class Reshape(nn.Module):
    def __init__(self, *args):
        super().__init__()
        self.shape = args

    def forward(self, x):
        return x.view(self.shape)
    
class VAE(nn.Module):

    def __init__(self, input_channels = 3, hidden_channels =32, latent_dim=64, device='cuda'):
        super(VAE, self).__init__()
        self.device = device

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, hidden_channels, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(hidden_channels, hidden_channels*2, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(hidden_channels*2, latent_dim, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(0.2),
        )

        

        # Latent mean and variance
        self.latent = nn.MaxPool2d()
        

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(latent_dim, hidden_channels*2, kernel_size=3, stride=2, padding=1, output_padding= 1),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(hidden_channels*2, hidden_channels, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(hidden_channels, input_channels, kernel_size=3, stride=2, padding=1, output_padding=1),
            # nn.ConstantPad2d((0, 1, 0, 1), value=0),  # Use ConstantPad2d for padding
        )




    def encode(self, x):
        x = self.encoder(x)
        # print(x.shape)
        mean, logvar = self.mean_layer(x), self.logvar_layer(x)
        # print(mean.shape)
        return mean, logvar


    def decode(self, x):
        # print(x.shape)
        return self.decoder(x)

    def forward(self, x):
        mean, log_var = self.encode(x)
        x_hat = self.decode(z)  
        # print(x_hat.shape)
        # print('pred_image: ',x_hat[0][0])
        return x_hat, mean, log_var
    
    def get_latent_space(self, x):
        mean, log_var = self.encode(x)

        return z 

In [8]:
from torch.optim import Adam
from torch.nn.functional import normalize
model = VAE().to('cuda')
optimizer = Adam(model.parameters(), lr=1e-3)

def loss_function(x, x_hat, mean, log_var):
    
    reproduction_loss = nn.functional.mse_loss(x_hat, x)

    # print('true label ', x[0][0])
    # # if np.isnan(reproduction_loss.item()):
    # #     print('true label ', x)
    # #     print('pred_label ', x_hat)
    # #     return

    
    # print('reproduction loss: ', reproduction_loss.item())
    KLD = - 0.5 * torch.sum(1+ log_var - mean.pow(2) - log_var.exp())
    # print('Mean: ', mean)
    # print('log_var: ', log_var)
    # print('KLD: ', KLD.item())
    return 0.9*reproduction_loss + 0.1*KLD

In [9]:
import math 
def train(model, optimizer, epochs, device, train_loader):
    model.train()
    for epoch in range(epochs):
        overall_loss = 0
        for batch_idx, x in enumerate(train_loader):
            
            x = x.to(device, dtype=torch.float)
            optimizer.zero_grad()

            x_hat, mean, log_var = model(x)
            loss = loss_function(x, x_hat, mean, log_var)


            loss.backward()

            overall_loss += loss.item()
            # print(loss.item())

            optimizer.step()

        print("\tEpoch", epoch + 1, "\tAverage Loss: ", overall_loss/(batch_idx*x.shape[0]))

    return overall_loss

train(model, optimizer, 5, 'cuda', train_loader)

	Epoch 1 	Average Loss:  nan


KeyboardInterrupt: 

In [None]:
plt.imshow(model.decode(model.encode(next(iter(train_loader))))[0][0], cmap='gray')

In [None]:
next(iter(train_loader)).shape

In [None]:
for batch in train_loader:
    plt.subplot(121)
    plt.imshow(model.decode(model.get_latent_space(batch.to('cuda'))).to('cpu').detach().numpy()[0][2], cmap='gray')
    plt.subplot(122)
    plt.imshow(batch[0][0], cmap='gray')
    break

In [None]:
torch.save(model.state_dict(),'vae.pt') 

In [None]:
## classifier
class PairwiseDistanceLayer(nn.Module):
    def __init__(self):
        super(PairwiseDistanceLayer, self).__init__()

    def forward(self, anchor, test):
        # print(anchor)
        # print(test)
        
        distances = F.cosine_similarity(anchor, test,2)
        # print(distances)
        return distances


class outputlayer(nn.Module):
    def __init__(self, vae_model):
        super(outputlayer, self).__init__()
        self.vae_model = vae_model
        self.distance_layer = PairwiseDistanceLayer()
        
    def forward(self, anchor, tests):
        anchor_embedding = self.vae_model.get_latent_space(anchor).unsqueeze(1).repeat(1,20,1)

        tests_embedding = []
        for i in range(tests.shape[1]):
            current_test = tests[:, i, :, :]
            tests_embedding.append(self.vae_model.get_latent_space(current_test).unsqueeze(1))
        
        tests_embedding = torch.cat(tests_embedding, dim=1)
        print(tests_embedding[0][0])
        distances = self.distance_layer(anchor_embedding, tests_embedding)
        # print(distances)
        output = nn.Softmax(dim=1)(distances)
        return output

In [None]:
class testDataset(Dataset):
    def __init__(self,test_anchor_dir, test_dir_paths, test_df_filename):
        self.test_candidates_df = pd.read_csv(test_df_filename)
        
        anchor_names = self.test_candidates_df['left'].values

        flatten_test_names = self.test_candidates_df.drop('left', axis=1).to_numpy().flatten()

        anchor_paths = [os.path.join(test_anchor_dir, filename+'.jpg') for filename in anchor_names]
        
        test_anchor_images = [read_image(path) for path in anchor_paths]

        # test_images_paths = [find_test_co_paths(path,test_dir_paths) for path in anchor_paths]

        test_paths = [os.path.join(test_dir_paths, filename+'.jpg') for filename in flatten_test_names]
        
        test_images = [read_image(path) for path in test_paths]

        self.test_anchor_images = torch.stack(test_anchor_images)
        self.test_images = torch.stack(test_images)

        # print(self.test_anchor_images.shape)
        # print(self.test_images.shape)
        

    def __len__(self):
        return len(self.test_candidates_df.index)
    def __getitem__(self, idx):

        anchor = self.test_anchor_images[idx]
        test = self.test_images[idx*20: (idx+1)*20]

        anchor, test = preprocess_doublets(anchor, test)

        anchor = anchor/255
        test = test/255

        # print(self.test_candidates_df.iloc[idx])
        return anchor, test


In [None]:
test_left_dir_path = "ImgData/test/left"
test_right_dir_path = "ImgData/test/right"

# test_left_images_path = getImagePaths(test_left_dir_path)
# test_right_images_path = getImagePaths(test_right_dir_path)

In [None]:
testset = testDataset(test_left_dir_path, test_right_dir_path,"test_candidates.csv")

In [None]:
test_loader = DataLoader(testset, batch_size=4,shuffle=False)

In [None]:
next(iter(train_loader))[0][0]

In [None]:
model.decode()

In [None]:
result = pd.read_csv('test_candidates.csv')

testNet = outputlayer(vae_model=model)

constant_batch_size = 4
with torch.no_grad():
    for i, (anchor, tests) in enumerate(test_loader):
        
        anchor = anchor.to('cuda', dtype=torch.float)
        tests = tests.to('cuda', dtype=torch.float)
    
        sims = testNet(anchor, tests).tolist()
        # print(sims)
        batch_size = len(sims)
        
        for j in range(batch_size):
            result.loc[j+constant_batch_size*i, result.columns != 'left'] = sims[j]

        

In [None]:
result.to_csv("solutionVae.csv",index=False)