# Import Stuff

In [1]:
import os
import time
import torch 
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import cv2
import seaborn as sns
import matplotlib.pyplot as plt
import onnx
import onnxruntime
import quanto
from tqdm import tqdm

from sklearn.metrics.pairwise import cosine_similarity
from PIL import Image
from torchvision import transforms
from matplotlib import pyplot as plt

from torch.utils.data import Dataset, DataLoader
import torch.utils.data as data
from datasets import dataset_utils
from matching import matching
from evaluation.metrics import createPR, recallAt100precision, recallAtK
from datasets.load_dataset import GardensPointDataset, SFUDataset, StLuciaDataset



# Constants

In [2]:
WEIGHTS_FILE = "calc.caffemodel.pt"
ITERATIONS = 100 # for testing average duration

# Preprocess Images

In [3]:
class ConvertToYUVandEqualizeHist:
    def __call__(self, img):
        img_yuv = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2YUV)
        img_yuv[:, :, 0] = cv2.equalizeHist(img_yuv[:, :, 0])
        img_rgb = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2RGB)
        return Image.fromarray(img_rgb)

preprocess = transforms.Compose(
    [
        ConvertToYUVandEqualizeHist(),
        transforms.Grayscale(num_output_channels=1),
        transforms.Resize((120, 160), interpolation=Image.BICUBIC),
        transforms.ToTensor(),
    ]
)

In [6]:
class CustomImageDataset(Dataset):
    def __init__(self, name, folder, transform=None):
        
        self.name = os.path.basename(name)
        self.folder = os.path.join(name, folder)
        self.image_paths = dataset_utils.read_images_paths(self.folder, get_abs_path=True)
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, index) :
        image_path = self.image_paths[index]
        img = Image.open(image_path)
        if self.transform:
            img = self.transform(img)
        return(img)

In [7]:
dataset_db = CustomImageDataset("images\SFU", "dry", preprocess)
dataset_q = CustomImageDataset("images\SFU", "jan", preprocess)

print("Dataset Length:", len(dataset_db))
dataset_db[0]

Dataset Length: 385


  dataset_db = CustomImageDataset("images\SFU", "dry", preprocess)
  dataset_q = CustomImageDataset("images\SFU", "jan", preprocess)


tensor([[[0.3098, 0.4431, 0.6235,  ..., 0.0314, 0.0196, 0.0196],
         [0.1882, 0.4471, 0.7020,  ..., 0.0471, 0.0353, 0.0353],
         [0.1412, 0.4392, 0.6510,  ..., 0.0431, 0.0314, 0.0353],
         ...,
         [0.7882, 0.8157, 0.8314,  ..., 0.1529, 0.1176, 0.0824],
         [0.7961, 0.8118, 0.8353,  ..., 0.1333, 0.0902, 0.0627],
         [0.7843, 0.8000, 0.8196,  ..., 0.0980, 0.0588, 0.0431]]])

In [8]:
batch_size = 32
num_workers = 0
db_dataloader = DataLoader(dataset_db, batch_size=batch_size, shuffle=False, num_workers=num_workers)
q_dataloader = DataLoader(dataset_q, batch_size=batch_size, shuffle=False, num_workers=num_workers)

# Model Definition

In [9]:
class CalcModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.input_dim = (1, 120, 160)
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(5, 5), stride=2, padding=4)
        self.relu1 = nn.ReLU(inplace=False)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=(4, 4), stride=1, padding=2)
        self.relu2 = nn.ReLU(inplace=False)
        self.conv3 = nn.Conv2d(128, 4, kernel_size=(3, 3), stride=1, padding=0)
        self.relu3 = nn.ReLU(inplace=False)
        self.pool = nn.MaxPool2d(kernel_size=(3, 3), stride=2)
        self.lrn1 = nn.LocalResponseNorm(5, alpha=0.0001, beta=0.75)
        self.lrn2 = nn.LocalResponseNorm(5, alpha=0.0001, beta=0.75)


    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.pool(x)
        x = self.lrn1(x)

        x = self.relu2(self.conv2(x))
        x = self.pool(x)
        x = self.lrn2(x)

        x = self.relu3(self.conv3(x))
        x = torch.flatten(x, 1)
        return x

### Normal Model

In [10]:
calc = CalcModel()

# Load the model weights
state_dict = torch.load(WEIGHTS_FILE)
my_new_state_dict = {}
my_layers = list(calc.state_dict().keys())
for layer in my_layers:
    my_new_state_dict[layer] = state_dict[layer]
calc.load_state_dict(my_new_state_dict)

print(calc)

CalcModel(
  (conv1): Conv2d(1, 64, kernel_size=(5, 5), stride=(2, 2), padding=(4, 4))
  (relu1): ReLU()
  (conv2): Conv2d(64, 128, kernel_size=(4, 4), stride=(1, 1), padding=(2, 2))
  (relu2): ReLU()
  (conv3): Conv2d(128, 4, kernel_size=(3, 3), stride=(1, 1))
  (relu3): ReLU()
  (pool): MaxPool2d(kernel_size=(3, 3), stride=2, padding=0, dilation=1, ceil_mode=False)
  (lrn1): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=1.0)
  (lrn2): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=1.0)
)


# Run Models

### Normal Model

In [11]:
calc.eval()

# Pass database tensor through the model

db_features = []

with torch.no_grad():

    for batch in db_dataloader:
        output = calc(batch)
        db_features.append(output)

db_features = torch.cat(db_features, axis=0)

print(db_features.shape)

# Pass query tensor through the model

q_features = []

with torch.no_grad():

    for batch in q_dataloader:
        output = calc(batch)
        q_features.append(output)

q_features = torch.cat(q_features, axis=0)

print(q_features.shape)

torch.Size([385, 936])
torch.Size([385, 936])


# Average Time

### Normal Model

In [12]:
times = [] # Initialize a list to store the time for each pass

for _ in tqdm(range(ITERATIONS), desc="Processing database dataset"):
    start_time = time.time()

    # Pass the dataset through the model
    with torch.no_grad():
        db_features = []
        for batch in db_dataloader:
            output = calc(batch)
            db_features.append(output)

    end_time = time.time()
    times.append(end_time - start_time)

average_time = sum(times) / len(times) # Calculate the average time

print(f'Average time: {average_time} seconds')

Processing dataset: 100%|██████████| 100/100 [04:08<00:00,  2.48s/it]

Average time: 2.4831705689430237 seconds





In [14]:
times = [] # Initialize a list to store the time for each pass

for _ in tqdm(range(ITERATIONS), desc="Processing query dataset"):
    start_time = time.time()

    with torch.no_grad():
        for batch in q_dataloader:
            output = calc(batch)

    end_time = time.time()
    times.append(end_time - start_time)

average_time = sum(times) / len(times) # Calculate the average time

print(f'Average time: {average_time} seconds')

Processing query dataset: 100%|██████████| 100/100 [04:31<00:00,  2.72s/it]

Average time: 2.7157634329795837 seconds



