In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [5]:
import os
import pandas as pd
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision.datasets import ImageFolder
from torchvision import transforms
from sklearn.model_selection import train_test_split


class MuseumDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        """
        Args:
            csv_file (string): Путь к CSV файлу со списком изображений.
            root_dir (string): Директория с изображениями.
            transform (callable, optional): Преобразование, применяемое к изображению.
        """
        self.annotations = pd.read_csv(csv_file, delimiter=';')
        self.root_dir = root_dir
        self.transform = transform
        
        self.images = ImageFolder(root=root_dir, transform=transform)
        self.image_paths = {os.path.basename(path): path for path, _ in self.images.imgs}


    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
            
        image_name = self.annotations.iloc[idx, 4]
        image = Image.open(self.image_paths.get(image_name)).convert("RGB")

        if self.transform is not None:
            image = self.transform(image)
            

        group = self.annotations.iloc[idx, 3]
        desc = self.annotations.iloc[idx, 2]
        name = self.annotations.iloc[idx, 1]

        sample = {
            'image': image,
            'group': group
        }
        
        return sample


transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((256, 256), antialias=True),
    transforms.ConvertImageDtype(torch.float),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

dataset = MuseumDataset(csv_file='/kaggle/input/mincult-museum-dataset/train_dataset_mincult-train/train.csv', root_dir='/kaggle/input/mincult-museum-dataset/train_dataset_mincult-train/train', transform=transform)

indices = list(range(len(dataset)))
train_indices, test_indices = train_test_split(indices, test_size=0.5, random_state=42)

train_sampler = torch.utils.data.SubsetRandomSampler(train_indices)
test_sampler = torch.utils.data.SubsetRandomSampler(test_indices)

train_loader = torch.utils.data.DataLoader(dataset, batch_size=32, sampler=train_sampler)
test_loader = torch.utils.data.DataLoader(dataset, batch_size=32, sampler=test_sampler)

In [6]:
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch.nn.functional import cosine_similarity

class ImageEncoder(nn.Module):
    def __init__(self):
        super(ImageEncoder, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU(inplace=True)
        self.pulling = nn.AdaptiveAvgPool2d(4)
        
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU(inplace=True)
        self.linear = nn.Linear(2048, 128)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pulling(x)
        x = F.relu(self.conv2(x))
        x = torch.flatten(x, start_dim=1)
        #print(x.shape)
        x = self.linear(x)
        return x

class ContrastiveLoss(nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, target):

        cosine_distance = cosine_similarity(output1, output2)

        loss_contrastive = torch.mean((1 - target) * torch.pow(cosine_distance, 2) +
                                      (target) * torch.pow(torch.clamp(self.margin - cosine_distance, min=0.0), 2))
        return loss_contrastive

In [7]:
import torch.optim as optim
device = 'cuda'
encoder = ImageEncoder().to(device)
criterion = ContrastiveLoss().to(device)
optimizer = optim.Adam(encoder.parameters(), lr=0.05)

In [None]:
from tqdm.notebook import tqdm
import math

losses = []
num_epochs = 5

criterion = ContrastiveLoss().to(device)

for epoch in range(num_epochs):
    encoder.train()
    running_loss = 0.0
    
    for batch in tqdm(train_loader, desc=f'Epoch {epoch+1}', leave=False):
        inputs = batch['image'].to(device)
        labels = [item for item in batch['group']]

        batch_size = inputs.size(0)
        images1, images2 = torch.split(inputs, batch_size // 2, dim=0)

        optimizer.zero_grad()

        outputs1 = encoder(images1)
        outputs2 = encoder(images2)

        target = [1.0 if labels[0] == labels[1] else 0.0]

        loss = criterion(outputs1, outputs2, torch.tensor(target).to(device)) 
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

    average_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch+1}, Loss: {average_loss}")
    losses.append(average_loss)

Epoch 1:   0%|          | 0/509 [00:00<?, ?it/s]