In [2]:
import numpy as np
import random
import os
import torchvision
import torchvision.transforms as transforms
import torchvision.transforms as TF
from torch.utils.data import DataLoader, Dataset
import torchvision.utils
import torch
import torch.nn as nn
import pandas as pd
import pickle
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from PIL import Image, UnidentifiedImageError
import cv2
import torch.nn.functional as F
from torch.nn.functional import cosine_similarity
import matplotlib.pyplot as plt
from PIL import Image, ImageOps


In [3]:
triplet_loss_df= pd.read_csv('triplet_loss.csv')

In [4]:
triplet_loss_df=triplet_loss_df[['image','collection_id']]

In [5]:
triplet_loss_df.head()

Unnamed: 0,image,collection_id
0,image0.jpg,2
1,image1.jpg,6
2,image2.jpg,11
3,image3.jpg,16
4,image4.jpg,16


In [6]:
triplet_loss_df.shape

(100, 2)

In [7]:
def resize_and_pad_image(img, desired_height, desired_width, pad_color=(255, 255, 255)):
    # Convert grayscale images to RGB
    if img.mode == 'L':
        img = img.convert('RGB')

    original_width, original_height = img.size
    aspect_ratio = original_width / original_height

    # Determine new dimensions based on aspect ratio
    if original_width / original_height > desired_width / desired_height:
        new_width = desired_width
        new_height = int(new_width / aspect_ratio)
    else:
        new_height = desired_height
        new_width = int(new_height * aspect_ratio)

    # Resize the image to new dimensions
    image_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS)

    # Calculate padding to achieve desired dimensions
    padding_left = (desired_width - new_width) // 2
    padding_right = desired_width - new_width - padding_left
    padding_top = (desired_height - new_height) // 2
    padding_bottom = desired_height - new_height - padding_top

    # Pad the resized image to achieve desired dimensions
    image_padded = ImageOps.expand(image_resized, border=(padding_left, padding_top, padding_right, padding_bottom), fill=pad_color)

    return image_padded



# Path to the directory containing images
source_dir = 'images_full'
# Path to the directory where the processed images will be saved
target_dir = 'processed_images'

# Create the target directory if it doesn't exist
if not os.path.exists(target_dir):
    os.makedirs(target_dir)

# Desired dimensions after padding
desired_height = 512
desired_width = 512

# Process each image in the directory
for filename in os.listdir(source_dir):
    if filename.endswith(('.png', '.jpg', '.jpeg', '.bmp')):  # Check for image files
        image_path = os.path.join(source_dir, filename)
        img = Image.open(image_path).convert("RGB")  # Ensure the image is in RGB

        # Resize and pad the image
        padded_image = resize_and_pad_image(img, desired_height, desired_width)

        # Save the processed image to the new directory
        save_path = os.path.join(target_dir, filename)
        padded_image.save(save_path)

print("All images have been processed and saved to", target_dir)


All images have been processed and saved to processed_images


In [8]:
# Define the base directory where the images are stored
base_directory = 'processed_images'

# Prepend the base directory to each image filename in the dataframe
triplet_loss_df['image'] = triplet_loss_df['image'].apply(lambda x: os.path.join(base_directory, x))

In [9]:
triplet_loss_df.head()

Unnamed: 0,image,collection_id
0,processed_images\image0.jpg,2
1,processed_images\image1.jpg,6
2,processed_images\image2.jpg,11
3,processed_images\image3.jpg,16
4,processed_images\image4.jpg,16


In [10]:
triplet_loss_df.shape

(100, 2)

In [11]:
def check_uniform_image_sizes(base_directory):
    # Initialize a variable to store the size of the first image
    first_image_size = None
    uniform_size = True

    # Loop through all the images in the directory
    for filename in os.listdir(base_directory):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(base_directory, filename)
            img = Image.open(img_path)

            # Set the size of the first image
            if first_image_size is None:
                first_image_size = img.size
                print(f"Expected size based on the first image: {first_image_size}")

            # Check the size of each image against the first image's size
            if img.size != first_image_size:
                print(f"{filename}: Size {img.size} does NOT match {first_image_size}")
                uniform_size = False

    if uniform_size:
        print("All images have the same size.")
    else:
        print("Not all images have the same size.")

In [12]:
check_uniform_image_sizes(base_directory='processed_images')

Expected size based on the first image: (512, 512)
All images have the same size.


In [13]:
from torchvision import transforms
from PIL import Image

def augmentation(image):
    original_size = (512, 512)  #  all images are 512x512

    # Define transformations with white fill where applicable
    crop = transforms.Compose([
        transforms.CenterCrop((int(original_size[0] * 0.8), int(original_size[1] * 0.8))),
        transforms.Pad((int(original_size[1] * 0.1), int(original_size[0] * 0.1)), fill=(255, 255, 255)),
        transforms.Resize(original_size)
    ])

    random_horizontal_flip = transforms.Compose([
        transforms.RandomHorizontalFlip(p=1.0)
    ])

    random_rotation = transforms.Compose([
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.RandomRotation(50, fill=(255, 255, 255)),
        transforms.Resize(original_size)
    ])

    random_vertical_flip = transforms.Compose([
        transforms.RandomVerticalFlip(p=1.0)
    ])

    random_affine = transforms.Compose([
        transforms.RandomAffine(degrees=30, translate=(0.1, 0.1), scale=(0.8, 1.2), shear=10, fill=(255, 255, 255)),
        transforms.Resize(original_size)
    ])

    random_perspective = transforms.Compose([
        transforms.RandomPerspective(distortion_scale=0.5, p=1.0, fill=(255, 255, 255)),
        transforms.Resize(original_size)
    ])

    gaussian_blur = transforms.Compose([
        transforms.GaussianBlur(kernel_size=5, sigma=(0.1, 2.0))
    ])

    random_grayscale = transforms.Compose([
        transforms.RandomGrayscale(p=1.0)
    ])

    random_invert = transforms.Compose([
        transforms.RandomInvert(p=1.0)
    ])

    # Apply transformations to the image
    image_crop = crop(image)
    image_random_horizontal_flip = random_horizontal_flip(image)
    image_random_rotation = random_rotation(image)
    image_random_vertical_flip = random_vertical_flip(image)
    image_random_affine = random_affine(image)
    image_random_perspective = random_perspective(image)
    image_gaussian_blur = gaussian_blur(image)
    image_random_grayscale = random_grayscale(image)
    image_random_invert = random_invert(image)

    # Return the augmented images
    return (image_crop, image_random_horizontal_flip, image_random_rotation,
            image_random_vertical_flip, image_random_affine,
            image_random_perspective, image_gaussian_blur, image_random_grayscale,
            image_random_invert)


In [14]:
# Directory to save augmented images
augmented_images_dir = 'augmented_images'
os.makedirs(augmented_images_dir, exist_ok=True)

# List to store the new rows for the DataFrame
new_rows_r = []

# Loop through each image in the DataFrame and augment the images
for index, row in tqdm(triplet_loss_df.iterrows(), desc="Augmenting images", total=triplet_loss_df.shape[0]):
    image_path = row['image']
    class_id = row['collection_id']

    try:
        image = Image.open(image_path).convert('RGB')
        augmented_images = augmentation(image)

        for i, augmented_image in enumerate(augmented_images):
            # Save the augmented image
            augmented_image_path = os.path.join(augmented_images_dir, f'augmented_{class_id}_{index}_{i}.jpg')
            augmented_image.save(augmented_image_path)

            # Add new row for augmented data
            new_row = {'collection_id': class_id, 'image': augmented_image_path}
            new_rows_r.append(new_row)

    except (IOError, UnidentifiedImageError) as e:
        print(f"Cannot augment image {image_path}: {e}")

Augmenting images:   2%|▏         | 2/100 [00:00<00:10,  9.74it/s]

Cannot augment image processed_images\image0.jpg: [Errno 2] No such file or directory: 'processed_images\\image0.jpg'


Augmenting images: 100%|██████████| 100/100 [00:12<00:00,  8.20it/s]


In [15]:
check_uniform_image_sizes(base_directory='augmented_images')

Expected size based on the first image: (512, 512)
All images have the same size.


In [16]:
augmented_df = pd.DataFrame(new_rows_r)
triplet_loss_df_aug = pd.concat([triplet_loss_df, augmented_df], ignore_index=True)

In [17]:
# saving the dataframe
triplet_loss_df_aug.to_csv('augmented_data.csv')

In [18]:
data=pd.read_csv('augmented_data.csv')
data.head()

Unnamed: 0.1,Unnamed: 0,image,collection_id
0,0,processed_images\image0.jpg,2
1,1,processed_images\image1.jpg,6
2,2,processed_images\image2.jpg,11
3,3,processed_images\image3.jpg,16
4,4,processed_images\image4.jpg,16


In [19]:
triplet_loss_df_aug=data.copy()

In [20]:
triplet_loss_df_aug.head()

Unnamed: 0.1,Unnamed: 0,image,collection_id
0,0,processed_images\image0.jpg,2
1,1,processed_images\image1.jpg,6
2,2,processed_images\image2.jpg,11
3,3,processed_images\image3.jpg,16
4,4,processed_images\image4.jpg,16


In [21]:
def generate_random_triplet(index, df, unique_collections):


  # Get anchor id based on index
  anchor_id = df.iloc[index]['collection_id']
  same_collection_df = df[df['collection_id'] == anchor_id]

  # Get anchor image
  anchor_idx = index
  anchor_image_path = same_collection_df.loc[anchor_idx, 'image']
  anchor_image = Image.open(anchor_image_path).convert('RGB')


  positive_indices = same_collection_df.index.difference([anchor_idx])
  positive_image_path = same_collection_df.loc[random.choice(positive_indices), 'image']
  positive_image = Image.open(positive_image_path).convert('RGB')

  # Select a negative image from a different collection
  negative_id = np.random.choice(unique_collections[unique_collections != anchor_id])
  negative_df = df[df['collection_id'] == negative_id]
  i = np.random.choice(negative_df.shape[0])
  negative_image_path = negative_df.iloc[i]['image']
  negative_image = Image.open(negative_image_path).convert('RGB')


  return anchor_image, positive_image, negative_image, anchor_id, index

In [22]:
def generate_hard_triplet(index, df, cache_emb, margin=0.2):
    anchor_embedding = cache_emb.get_index_emb(index).view(1, -1)
    all_embeddings = cache_emb.all_embeddings

    # Calculate Euclidean distances
    dists = torch.cdist(anchor_embedding, all_embeddings, p=2).squeeze()

    # Get the sorted distances and sorted indices (excluding the anchor itself)
    sorted_dists, sorted_indices = torch.sort(dists)
    anchor_idx = index
    anchor_id = df.iloc[index]['collection_id']
    same_collection_df = df[df['collection_id'] == anchor_id]
    positive_indices = same_collection_df.index.difference([anchor_idx])

    # Find the hard positive example (same collection, closest Euclidean distance)
    positive_dists = dists[positive_indices]
    hard_positive = torch.argmin(positive_dists)
    hard_positive_id = positive_indices[hard_positive]
    positive_image_path = df.loc[hard_positive_id, 'image']
    hard_positive_image = Image.open(positive_image_path).convert('RGB')

    # Find the semi-hard negative example (different collection)
    different_collection_df = df[df['collection_id'] != anchor_id]
    negative_indices = different_collection_df.index

    positive_distance = positive_dists[hard_positive].item()

    # Select semi-hard negative (distance greater than positive but within the margin)
    semi_hard_negative_indices = [idx for idx in negative_indices if positive_distance < dists[idx].item() < positive_distance + margin]
    if semi_hard_negative_indices:
        semi_hard_negative_id = semi_hard_negative_indices[0]
    else:
        # If no semi-hard negative found within the margin, fall back to the closest negative outside the margin
        negative_dists = dists[negative_indices]
        semi_hard_negative_id = negative_indices[torch.argmin(negative_dists)]

    negative_image_path = df.loc[semi_hard_negative_id, 'image']
    semi_hard_negative_image = Image.open(negative_image_path).convert('RGB')

    return hard_positive_image, semi_hard_negative_image, anchor_id, index



In [23]:
class embedding_cache():
  def __init__(self):
    self.emb={}
    self.all_embeddings={}

  def get_index_emb(self, index):
    return self.all_embeddings[index]

  def store_embedding(self, model, train_loader ):

    for batch in train_loader:
      model.eval()
      with torch.no_grad():

        emb_anchor=model(batch[0])

      embedding_batch={index.item():embedding for index, embedding in zip(batch[-1],emb_anchor)}
      self.all_embeddings={**embedding_batch, **self.all_embeddings}
    sorted_embeddings = dict(sorted(self.all_embeddings.items()))
    sorted_embeddings_values=sorted_embeddings.values()

In [24]:
cache_emb=embedding_cache()

In [25]:
class TripletNetworkDataset(Dataset):
    def __init__(self, df, cache_emb, transform=None, use_hard_triplets=False):
        self.df = df
        self.cache_emb = cache_emb
        self.transform = transform
        self.use_hard_triplets = use_hard_triplets
        self.unique_collections = df['collection_id'].unique()

    def __getitem__(self, index):
        if self.use_hard_triplets:
            anchor_image, positive_image, negative_image, anchor_id, index = generate_hard_triplet(index, self.df, self.cache_emb)

        else:
            anchor_image, positive_image, negative_image, anchor_id, index = generate_random_triplet(index, self.df, self.unique_collections)

        # Apply transformations if specified
        if self.transform:
            anchor_image = self.transform(anchor_image)
            positive_image = self.transform(positive_image)
            negative_image = self.transform(negative_image)

        return anchor_image, positive_image, negative_image, anchor_id, index

    def __len__(self):
        return len(self.df)

In [26]:
transform = transforms.Compose([
    #transforms.Resize((512, 512)),  # Resize all images to 928 × 384
    transforms.ToTensor(),          # Convert image data to tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize images
])

In [27]:
class LearnablePreprocessLayer(nn.Module):
    def __init__(self, input_channels):
        super(LearnablePreprocessLayer, self).__init__()
        self.layers1 = nn.Sequential(
            nn.Conv2d(input_channels, 16, kernel_size=1, padding=0, bias=True),  # input_channels 3, 16 output channels, 1x1 kernel
            nn.LeakyReLU(negative_slope=0.1, inplace=True),
            nn.BatchNorm2d(16),  # BatchNorm on the output channels
        )
        self.layers2 = nn.Sequential(
            nn.Conv2d(input_channels, input_channels, kernel_size=3, padding=1, bias=True),
            nn.LeakyReLU(negative_slope=0.1, inplace=True),
            nn.BatchNorm2d(input_channels),
        )
        self.layers3 = nn.Sequential(
            nn.Conv2d(16 + input_channels, input_channels, kernel_size=1, padding=0, bias=True),  # Adjusted input channels
            nn.LeakyReLU(negative_slope=0.1, inplace=True),
            nn.BatchNorm2d(input_channels),
        )

    def forward(self, x):
        x1 = self.layers1(x)
        x2 = self.layers2(x)
        x_concat = torch.cat((x1, x2), 1)
        x3 = self.layers3(x_concat)
        return x3

class AdaptiveConcatPool2d(nn.Module):
    def __init__(self, size=1):
        super().__init__()
        self.ap = nn.AdaptiveAvgPool2d(size)
        self.mp = nn.AdaptiveMaxPool2d(size)

    def forward(self, x):
        x_ap = self.ap(x)
        x_mp = self.mp(x)
        return torch.cat([x_mp, x_ap], 1)

class Flatten(nn.Module):
    def forward(self, x):
        return x.view(x.size(0), -1)

class L2Norm(nn.Module):
    def __init__(self, dim=1):
        super(L2Norm, self).__init__()
        self.dim = dim

    def forward(self, x):
        return F.normalize(x, p=2, dim=self.dim)




def get_model(model_name: str, freeze_backbone: bool = True, input_channels=3):
    model = getattr(torchvision.models, model_name)(pretrained=True)

    if freeze_backbone:
        for param in model.parameters():
            param.requires_grad = False

    # Add the LearnablePreprocessLayer
    preprocessing = LearnablePreprocessLayer(input_channels=input_channels)

    # Replace the first layer of the model with the LearnablePreprocessLayer
    if hasattr(model, 'conv1'):
        original_first_conv = model.conv1
        model.conv1 = nn.Sequential(
            preprocessing,
            original_first_conv
        )
    else:
        raise NotImplementedError(f"{model_name} architecture does not have a 'conv1' layer")

    # Modify the fully connected layer (fc) assuming the model uses a classifier named 'fc'
    if hasattr(model, 'fc'):
        num_features = model.fc.in_features
        model.avgpool = AdaptiveConcatPool2d()
        model.fc = nn.Sequential(
            Flatten(),
            nn.BatchNorm1d(num_features * 2),
            nn.Dropout(0.25),
            nn.Linear(num_features * 2, 512, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(512),
            nn.Dropout(0.5),
            nn.Linear(512, 256, bias=True),
            L2Norm(dim=1)

        )
    else:
        raise NotImplementedError("This model architecture is not supported yet.")

    return model


In [28]:
model = get_model('resnet50', freeze_backbone=True, input_channels=3)
print(model)



ResNet(
  (conv1): Sequential(
    (0): LearnablePreprocessLayer(
      (layers1): Sequential(
        (0): Conv2d(3, 16, kernel_size=(1, 1), stride=(1, 1))
        (1): LeakyReLU(negative_slope=0.1, inplace=True)
        (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (layers2): Sequential(
        (0): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): LeakyReLU(negative_slope=0.1, inplace=True)
        (2): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (layers3): Sequential(
        (0): Conv2d(19, 3, kernel_size=(1, 1), stride=(1, 1))
        (1): LeakyReLU(negative_slope=0.1, inplace=True)
        (2): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  )
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_s

In [29]:
# Get unique collection ids
unique_collection_ids = data['collection_id'].unique()

train_df_list = []
val_df_list = []



for unique_collection_id in unique_collection_ids:
    collection_images = data[data['collection_id'] == unique_collection_id]
    train_images, val_images = train_test_split(
        collection_images, test_size=2/len(collection_images), random_state=42
    )
    train_df_list.append(train_images)
    val_df_list.append(val_images)

train_df = pd.concat(train_df_list).reset_index(drop=True)
val_df = pd.concat(val_df_list).reset_index(drop=True)

In [30]:
train_dataset = TripletNetworkDataset(train_df, transform=transform, cache_emb=cache_emb)
val_dataset = TripletNetworkDataset(val_df, transform=transform,cache_emb=cache_emb)

In [33]:
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=0)

In [32]:
# Define the triplet loss
triplet_loss = nn.TripletMarginLoss(margin=0.2, p=2)

In [None]:
# Check if CUDA is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Move the model to the selected device
model = model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)
num_epochs = 1  # Reduce the number of epochs for quick testing
save_dir = 'saved_models'
os.makedirs(save_dir, exist_ok=True)

best_loss = float('inf')  # To keep track of the best loss
loss_history = []  # Initialize loss history list

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0  # Initialize running loss
    num_batches = 0  # Initialize the number of batches processed
    train_loader_progress = tqdm(train_loader, desc=f"Training Epoch {epoch + 1}/{num_epochs}")

    for batch in train_loader_progress:
        # Move data to the same device as the model
        batch = [item.to(device) for item in batch]
        anchor_imgs, positive_imgs, negative_imgs, labels, index = batch

        optimizer.zero_grad()
        anchor_embeddings = model(anchor_imgs)
        positive_embeddings = model(positive_imgs)
        negative_embeddings = model(negative_imgs)

        # Compute the loss
        loss = triplet_loss(anchor_embeddings, positive_embeddings, negative_embeddings)
        # Accumulate the loss
        running_loss += loss.item()
        num_batches += 1

        # Backpropagation
        loss.backward()
        optimizer.step()

        # Update the progress bar with the current loss
        train_loader_progress.set_postfix(loss=loss.item())

    # Save embeddings
    cache_emb.store_embedding(model, train_loader, device)

    # Calculate the average loss over all of the batches
    if num_batches > 0:  # Ensure division by zero doesn't occur
        average_loss = running_loss / num_batches
    else:
        average_loss = 0
    loss_history.append(average_loss)
    print(f"Average Loss for Epoch {epoch + 1}: {average_loss:.4f}")

    # Save the model if the average loss is the best
    if average_loss < best_loss:
        best_loss = average_loss
        model_save_path = os.path.join(save_dir, f"model_epoch_{epoch + 1}_loss_{average_loss:.4f}.pth")
        torch.save(model.state_dict(), model_save_path)
        print(f"Model saved to {model_save_path}")

# Check embeddings
print("Embeddings stored: ", len(cache_emb.embeddings))


