In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F  # Importing PyTorch libraries for neural networks
from torchsummary import summary  # For summarizing the model architecture
from torchvision import transforms  # For performing transformations on images
from torch import utils  # Utilities for PyTorch like DataLoader and Tensor manipulation

import cv2  # OpenCV for image processing tasks
import numpy as np  # NumPy for numerical operations
import os  # For file and directory operations
import random  # For generating random values

from tqdm.auto import trange  # TQDM for progress bars in loops

In [None]:
# Defining constants for training configuration
EPOCHS = 10              # Number of training epochs
DATA_SIZE = 5000         # Total size of the training dataset
BATCH_SIZE = 100         # Batch size for training
VALID_DATA = 500         # Total size of the validation dataset
VALID_BATCH = 100        # Batch size for validation
IMG_SHAPE = 64           # Image dimensions (assumes square images of shape 64x64)


In [None]:
class ConvBlock(nn.Module):
    def __init__(self, n_layers: int, filters: int, kernel: int = 3, growth_factor: float = 2.0, 
                 moment: float = 0.7, stride: bool = True, alpha: float = 0.03):
        super(ConvBlock, self).__init__()
        PADDING = (kernel - 1) // 2  # Padding based on the kernel size
        self.stride = stride

        # Batch normalization layers
        self.norm = nn.ModuleList([nn.BatchNorm1d(num_features=filters, momentum=moment) for _ in range(n_layers)])
        
        # Convolutional layers
        self.conv = nn.ModuleList([nn.Conv2d(filters, filters, kernel, padding=PADDING) for _ in range(n_layers - 1)])
        
        # Non-linear activation function
        self.nlin = nn.LeakyReLU(alpha)

        # Add final convolutional layer with stride or max pooling if applicable
        if stride:
            self.conv.append(nn.Conv2d(int(filters // growth_factor), filters, kernel, stride=2, padding=PADDING))
        else:
            self.conv.append(nn.Conv2d(int(filters // growth_factor), filters, kernel, padding=PADDING))
            self.pool = nn.MaxPool2d(2, 2)  # Max pooling layer

        self.conv.reverse()  # Reverse the convolution layers for the forward pass

    def forward(self, x):
        if not self.stride:
            x = self.pool(x)
        
        # Pass through convolutional and normalization layers with activation
        for conv, norm in zip(self.conv, self.norm):
            x = self.nlin(norm(conv(x)))
        return x

In [None]:
class OneShot(nn.Module):
    def __init__(self, n_blocks: int = 7, n_high_refine: int = 3, n_conv_high_refine: int = 3, 
                 n_conv_end: int = 2, filters: int = 64, start_kernel: int = 5, kernel: int = 3, 
                 growth_factor: float = 2.0, alpha: float = 0.07, moment: float = 0.7, dense: int = 512, 
                 final: int = 100, drop: float = 0.2, stride: bool = True):
        super(OneShot, self).__init__()

        START_PADDING = (start_kernel - 1) // 2  # Padding for the starting convolution
        PADDING = (kernel - 1) // 2  # Padding for regular convolution layers

        # Initial convolutional layer with stride
        self.conv = nn.ModuleList([nn.Conv2d(3, filters, start_kernel, stride=2, padding=START_PADDING)])
        
        # Adding convolutional blocks
        for c in range(n_blocks):
            filters = int(filters * growth_factor)
            if c <= n_high_refine:
                # Add a high refinement ConvBlock
                self.conv.append(ConvBlock(n_conv_high_refine, filters, kernel, growth_factor, moment, stride, alpha))
            else:
                # Add an end refinement ConvBlock
                self.conv.append(ConvBlock(n_conv_end, filters, kernel, growth_factor, moment, stride, alpha))
        
        # Adaptive average pooling to reduce feature map size
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        
        # Flatten the output of the pooling layer
        self.flat = nn.Flatten()
        
        # Fully connected layers for classification
        self.lden = nn.Linear(int(filters * growth_factor), dense)
        self.lvec = nn.Linear(dense, final)
        
        # Activation and dropout
        self.nlin = nn.LeakyReLU(alpha)
        self.drop = nn.Dropout(drop)

    def forward(self, x):
        # Pass input through each convolutional block
        for conv in self.conv:
            x = conv(x)

        # Pool, flatten, and apply the fully connected layers with dropout
        x = self.flat(self.pool(x))
        x = self.drop(self.nlin(self.lden))
        return self.lvec(x)

In [None]:
model = OneShot()
summary(model, (3, IMG_SHAPE, IMG_SHAPE))


In [None]:
FOLDER = r"PATH"  # Path to the dataset folder
folders = os.listdir(FOLDER)  # List all subfolders (classes)
train = []  # List to store triplet (anchor, positive, negative) samples

for folder in folders:
    files = os.listdir(FOLDER + "/" + folder)  # List all files in the current folder

    for i in range(len(files) - 1):
        nfolder = random.choice(folders)  # Randomly select a different folder for the negative sample

        if nfolder != folder:
            path = r"PATH"  # Path for negative sample from another folder
            negative = cv2.imread(path)  # Load negative image from a different folder
        else:
            path = r"PATH"  # Path for negative sample (you may want to change this logic)
            negative = cv2.imread(path)  # Load negative image from the same folder

        # Load the anchor and positive samples from the current folder
        anchor = cv2.imread(FOLDER + "/" + folder + "/" + files[i])
        positive = cv2.imread(FOLDER + "/" + folder + "/" + files[i + 1])

        # Resize the images to the desired shape and normalize
        anchor = cv2.resize(anchor, (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0
        positive = cv2.resize(positive, (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0
        negative = cv2.resize(negative, (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0

        # Append the triplet (anchor, positive, negative) to the training list
        train.append([anchor, positive, negative])

    # For folders with 4 or more files, create additional triplet samples
    if len(files) >= 4:
        for i in range(2):
            if nfolder != folder:
                fk = os.listdir(FOLDER + "/" + nfolder)[0]  # Select the first file from the negative folder
                negative = cv2.imread(FOLDER + "/" + nfolder + "/" + fk)
            else:
                path = r"PATH"  # Path for the negative sample
                negative = cv2.imread(path)

            # Load the anchor and positive samples from the current folder
            anchor = cv2.imread(FOLDER + "/" + folder + "/" + files[-2 + i])
            positive = cv2.imread(FOLDER + "/" + folder + "/" + files[0 + i])

            # Resize and normalize the images
            anchor = cv2.resize(anchor, (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0
            positive = cv2.resize(positive, (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0
            negative = cv2.resize(negative, (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0

            # Append the triplet to the training list
            train.append([anchor, positive, negative])

# Convert the training list to a NumPy array
x = np.array(train, dtype=np.float32)


In [None]:
# Convert NumPy arrays to PyTorch tensors for the training data (anchor, positive, negative)
xtrain1 = torch.from_numpy(x[:DATA_SIZE, 0].reshape(DATA_SIZE, 3, IMG_SHAPE, IMG_SHAPE))  # Anchor images
xtrain2 = torch.from_numpy(x[:DATA_SIZE, 1].reshape(DATA_SIZE, 3, IMG_SHAPE, IMG_SHAPE))  # Positive images
xtrain3 = torch.from_numpy(x[:DATA_SIZE, 2].reshape(DATA_SIZE, 3, IMG_SHAPE, IMG_SHAPE))  # Negative images
# ytrain = torch.from_numpy(y[:DATA_SIZE])  # Uncomment if you have labels (optional)

# Convert NumPy arrays to PyTorch tensors for the validation data (anchor, positive, negative)
xvalid1 = torch.from_numpy(x[DATA_SIZE:DATA_SIZE + VALID_DATA, 0].reshape(VALID_DATA, 3, IMG_SHAPE, IMG_SHAPE))  # Anchor images
xvalid2 = torch.from_numpy(x[DATA_SIZE:DATA_SIZE + VALID_DATA, 1].reshape(VALID_DATA, 3, IMG_SHAPE, IMG_SHAPE))  # Positive images
xvalid3 = torch.from_numpy(x[DATA_SIZE:DATA_SIZE + VALID_DATA, 2].reshape(VALID_DATA, 3, IMG_SHAPE, IMG_SHAPE))  # Negative images
# yvalid = torch.from_numpy(y[DATA_SIZE:DATA_SIZE + VALID_DATA])  # Uncomment if you have labels (optional)


In [None]:
# Creating DataLoader objects for training data (anchor, positive, negative)
xtrain1 = utils.data.DataLoader(xtrain1, batch_size=BATCH_SIZE)  # DataLoader for anchor images in training
xtrain2 = utils.data.DataLoader(xtrain2, batch_size=BATCH_SIZE)  # DataLoader for positive images in training
xtrain3 = utils.data.DataLoader(xtrain3, batch_size=BATCH_SIZE)  # DataLoader for negative images in training
# ytrain = utils.data.DataLoader(ytrain, batch_size=BATCH_SIZE)  # Uncomment if you have labels

# Creating DataLoader objects for validation data (anchor, positive, negative)
xvalid1 = utils.data.DataLoader(xvalid1, batch_size=VALID_BATCH)  # DataLoader for anchor images in validation
xvalid2 = utils.data.DataLoader(xvalid2, batch_size=VALID_BATCH)  # DataLoader for positive images in validation
xvalid3 = utils.data.DataLoader(xvalid3, batch_size=VALID_BATCH)  # DataLoader for negative images in validation
# yvalid = utils.data.DataLoader(yvalid, batch_size=VALID_BATCH)  # Uncomment if you have labels


In [None]:
# Calculate the number of steps per epoch based on training and validation batches
steps = len(xtrain1)  # Number of steps (batches) in training data
vstep = len(xvalid1)  # Number of steps (batches) in validation data

# Define the triplet loss function with pairwise distance
criterion = nn.TripletMarginWithDistanceLoss(distance_function=nn.PairwiseDistance(), margin=10)

# Set initial learning rate and decay factor for learning rate scheduling
learning_rate = 1e-4
decay = 0.9

# Training loop
for epoch in trange(EPOCHS):  # Iterate through epochs
    lss = 0  # Initialize training loss for the epoch
    vls = 0  # Initialize validation loss for the epoch
    
    # Decay learning rate over time
    learning_rate = learning_rate / (1 + epoch * decay)
    
    # Optimizer with the updated learning rate
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    # Training loop over batches
    for step, (xtr1, xtr2, xtr3) in enumerate(zip(xtrain1, xtrain2, xtrain3)):
        a = model(xtr1)  # Anchor output
        p = model(xtr2)  # Positive output
        n = model(xtr3)  # Negative output
        
        # Compute triplet loss
        loss = criterion(a, p, n)
        lss += loss  # Accumulate training loss
        
        # Zero the gradients, backpropagate, and update the model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Validation loop over batches
    for step, (xval1, xval2, xval3) in enumerate(zip(xvalid1, xvalid2, xvalid3)):
        a = model(xval1)  # Anchor output for validation
        p = model(xval2)  # Positive output for validation
        n = model(xval3)  # Negative output for validation
        
        # Compute validation loss
        valloss = criterion(a, p, n)
        vls += valloss  # Accumulate validation loss
    
    # Print average training and validation loss for the current epoch
    print(f'Epoch: {epoch+1}/{EPOCHS} || Loss: {lss/steps:.4f} || Validation Loss: {vls/vstep:.4f}')


In [None]:
# Load two images from the provided paths and preprocess them
img1 = r"PATH"  # Path to the first image
img2 = r"PATH"  # Path to the second image

# Read, resize, and normalize image1, then convert it to a PyTorch tensor
img1 = torch.from_numpy(np.array(cv2.resize(cv2.imread(img1), (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0, dtype=np.float32))

# Read, resize, and normalize image2, then convert it to a PyTorch tensor
img2 = torch.from_numpy(np.array(cv2.resize(cv2.imread(img2), (IMG_SHAPE, IMG_SHAPE), cv2.INTER_LINEAR) / 255.0, dtype=np.float32))

# Reshape the images to the format expected by the model: (batch_size, channels, height, width)
pred1 = model(img1.reshape(1, 3, IMG_SHAPE, IMG_SHAPE))  # Predict features for image1
pred2 = model(img2.reshape(1, 3, IMG_SHAPE, IMG_SHAPE))  # Predict features for image2

# Calculate the pairwise distance between the two predicted feature vectors
distance = torch.pairwise_distance(pred1, pred2)

# Output the calculated distance
print(distance)
