<a href="https://colab.research.google.com/github/silverCore97/Bagua/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import json
import os
import random
from typing import List, Tuple

import torch
import torchvision
import torchvision.utils
import torch.nn as nn
from torch import optim
from torch.utils.data import TensorDataset, Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as T

from PIL import Image
from matplotlib import pyplot as plt

In [None]:
BATCH_SIZE = 64
DEVICE = torch.device("cuda")

In [None]:
class Config:
    def __init__(
            self,
            *,
            n=None,
            epochs: int = 20,
            batch_size: int = 64,
            lr: float = 5e-4,
            train_margin: float = 1.,
            val_margin: float = 1.,
            image_folder: str = "./foods/food_270_224_in",
            pretrained_network: str = "resnet18",
            n_val_batches=3,
            n_features_out: int = 256,
            architecture_fn: callable,
            device: str,
            network: nn.Module,
            losser: nn.Module,
            binary_losser: nn.Module,
            get_parameter_fn: callable,
    ):
        """
        Config class. Use to automatize testing
        :param device: 
        :param network: 
        :param losser: 
        :param architecture_fn: 
        :param n_val_batches: how many batches to evaluate during validation
        :type pretrained_network: name of pretrained network
        :param n: number of triplets to use
        :param epochs: number of epochs to train
        :param batch_size: size of your batches (= #pairs in a batch)
        :param lr: learning rate for optimizer
        :param train_margin: see https://datahacker.rs/siamese-network-triplet-loss/ for explanations on margin
        :param val_margin: see https://datahacker.rs/siamese-network-triplet-loss/ for explanations on margin
        """
        self.get_parameter_fn = get_parameter_fn
        self.binary_losser = binary_losser
        self.losser = losser
        self.network = network
        self.device = device
        self.architecture_fn = architecture_fn
        self.n_features_out = n_features_out
        self.n_val_batches = n_val_batches
        self.pretrained_network = pretrained_network
        self.val_margin = val_margin
        self.train_margin = train_margin
        self.lr = lr
        self.batch_size = batch_size
        self.epochs = epochs
        self.n = n
        self.image_folder = image_folder
        print(self)

    def __repr__(self):
        return f"""
Config [
        get_parameter_fn = {self.get_parameter_fn},
        binary_losser = {self.binary_losser},
        losser = {self.losser},
        network = {self.network},
        device = {self.device},
        architecture_fn = {self.architecture_fn},
        arch (n_ftr=1) = {self.architecture_fn(1)},
        pretrained_network = {self.pretrained_network},
        n_features_out = {self.n_features_out},
        val_margin = {self.val_margin},
        train_margin = {self.train_margin},
        learning_rate (lr) = {self.lr},
        batch_size = {self.batch_size},
        n_val_batches = {self.n_val_batches},
        epochs = {self.epochs},
        n = {self.n},
        image_folder = {self.image_folder},
]
"""


def read_file(filename: str, n=None):
    """
    reads one of the files test_triplets.txt or train_triplets.txt and return a list of tuples (a, b, c)
        as described in the exercise description.
    :param filename:
    :param n: how many lines to load. all if n is empty or None
    :return: list of tuples (a, b, c), where a, b, c are all integers in [0, 9999] padded with 0 s.t len == 5
    """
    with open(filename) as fobj:
        r = [tuple(map(int, line.strip().split(" "))) for i, line in enumerate(fobj)]
        return r[:n] if n else r


def generate_rescaled_images(n: int, path: str = "./food") -> None:
    """
    resizes all the images to nxn squares and saves them in the folder ./food<n>
    :param n: width and height of resized image
    :param path: path to scratch the images to resizes from
    :return: None
    """
    print(f"[ * ] Resizing images to {n}x{n} pixels, saving to ./food{n}")
    os.mkdir(f"./food{n}")
    for image_path in os.listdir(path):
        img = load_image(os.path.join(path, image_path))
        img.resize((n, n)).save(os.path.join(f"./food{n}", image_path))
    print("[ * ] Finished resizing")


def preprocess_images(transformations: T.Compose, name: str, path: str = "./food") -> None:
    """
    transform (resize, crop, rotate, from image to tensor, normalize,...)  the given images and save resulting tensors
    for future usage.
    :param transformations: a T.Compose object with transformations applied to each image.
            should start with PIL image object and end with PIL image object
    :param name: identifier of folder
    :param path: path to images
    :return: None
    """
    p = f"./foods/food_{name}"
    print(f"[ * ] Preprocess images, saving to {p}")
    os.mkdir(p)
    for image_name in os.listdir(path):
        img = load_image(os.path.join(path, image_name))
        transformations(img).save(os.path.join(p, image_name))
    print("[ + ] Finished preprocessing")


def load_image(path: str) -> Image.Image:
    """
    Loads an image in RGB mode
    :param path: path of image
    :return: img object
    """
    with open(path, "rb") as iobj:
        return Image.open(iobj).convert("RGB")


def show_example_pairs(loader: DataLoader) -> None:
    """
    Shows all the image pairs of the first batch
    :param loader:
    :return:
    """
    batch = next(iter(loader))
    concat = torch.cat((batch[0], batch[1]), 0)
    grid = torchvision.utils.make_grid(concat)
    plt.imshow(grid.permute(1, 2, 0))
    print(batch[2].numpy().reshape(-1))
    plt.show()


def show_plot(iteration, loss):
    plt.plot(iteration, loss)
    plt.show()


transforms = T.Compose(
    [
        T.Resize(270),  # Resize a PIL image to (<height>, x),
        # where <height> is the value that maintains the aspect ratio of the input image.
        T.CenterCrop(224),
        # may want to use randomRotation
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # mean and std of ImageNet dataset
        T.ToPILImage(),
    ]
)


def validate(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        *batches, labels = move_to_device(*next(val_iter), device=device)
        outputs = model(*batches)
        loss_cnt += losser(*outputs)
        cnt_wrong_predicted += binary_losser(*outputs, labels)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n


def validate2(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        batch0, batch1, batch2, labels = move_to_device(*next(val_iter), device=device)
        res0, res1 = model(batch0, batch1, batch2)
        loss_cnt += losser(res0, res1, labels)
        cnt_wrong_predicted += binary_losser(res0, res1, labels)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n


def validate_classification(
        model: nn.Module, losser: nn.Module, n_batches: int, val_iter: iter, batch_size: int,
        device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        *batches, labels = move_to_device(*next(val_iter), device=device)
        outputs = model(*batches)
        predicted_labels_inv = torch.max(outputs, dim=1)[1]
        loss_cnt += losser(outputs, labels)
        cnt_wrong_predicted += torch.sum(predicted_labels_inv == labels)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n


def move_to_device(*batches, device):
    return (batch.to(device) for batch in batches)


def generate_train_val_triplets(n: int = 1000) -> None:
    """
    generete a train_triplets_custom.txt and a val_triplets_custom.txt file, which can then be loaded by
    ImageTripletClass (see helper.py), s.t. the images for validating and training are distinct.
    :param n: how many images approx in the val set
    :return: None
    """
    print("[ * ] Generating triplets")
    triplets = read_file("./train_triplets.txt")
    train_triplets = 0
    val_triplets = 0

    val_numbers = {random.randint(0, 4999) for _ in range(n)}

    with open("train_triplets_custom.txt", "w") as train_wobj, open("val_triplets_custom.txt", "w") as val_wobj:
        for a, b, c in triplets:
            if a not in val_numbers and b not in val_numbers and c not in val_numbers:
                train_wobj.write(f"{a} {b} {c}\n")
                train_triplets += 1

            elif a in val_numbers and b in val_numbers and c in val_numbers:
                val_wobj.write(f"{a} {b} {c}\n")
                val_triplets += 1

    print(train_triplets, val_triplets)
    print("[ * ] Finished")


# preprocess_images(transforms, "270_224_in")


In [None]:
class ImageSetWithLoading(Dataset):
    def __init__(self, path: str, transform, n=None):
        """
        loads n (=all if not specified) images in folder specified by path
        performs on each of them the tranformation specified by tranform
        :param path: folder to read images from
        :param transform: transformation to perform on each image. If preprocessed, only use T.ToTensor()
        :param n: number of images to read. While you only train, only load n=5000 to save time!
        """
        print("[ * ] Loading images")
        self.path = path
        self.transform = transform
        self.n = n
        self.image_paths = self.get_image_paths()
        self.images = self.load()
        self.size = len(self.images)
        print("[ + ] Loaded all images successfully")

    def get_image_paths(self):
        image_paths = list(sorted(os.listdir(self.path)))
        return image_paths[:self.n] if self.n else image_paths

    def load(self):
        return list(
            map(self.transform, map(load_image, map(lambda file: os.path.join(self.path, file), self.image_paths))))

    def __len__(self) -> int:
        return self.size

    def __getitem__(self, index: int) -> Image.Image:
        return self.images[index]


class ImageSetWithoutLoading(Dataset):
    def __init__(self, path: str, transform, n=None):
        """
        does not load any images in prior, but only on demand, to save memory
        performs on each of them the tranformation specified by tranform
        :param path: folder to read images from
        :param transform: transformation to perform on each image. If preprocessed, only use T.ToTensor()
        :param n: number of images to read. While you only train, only load n=5000 to save time!
        """
        print("[ * ] Loading images")
        self.path = path
        self.transform = transform
        self.n = n
        self.image_paths = self.get_image_paths()
        self.size = len(self.image_paths)
        print("[ + ] Loaded all images successfully")

    def get_image_paths(self):
        image_paths = list(sorted(os.listdir(self.path)))
        return image_paths[:self.n] if self.n else image_paths

    def __len__(self) -> int:
        return self.size

    def __getitem__(self, index: int) -> Image.Image:
        assert str(index).rjust(5, '0') == self.image_paths[index][:5]
        return self.transform(load_image(os.path.join(self.path, self.image_paths[index])))


class ImageCollection(TensorDataset):
    def __init__(self, image_set: Dataset, path: str, n=None, triplets = None):
        """
        Abstract class. Basis for ImagePairSet and ImageTripletSet
        :param image_set: instace of class ImageSet with loaded Images
        :param path: either "./test_triplets.txt" or "./train_triplets.txt"
        :param n: how many triplets (=lines) to use
        """
        self.image_set = image_set
        self.path = path
        self.n = n
        self.triplets = triplets if triplets else read_file(path, n=n)
        self.size = None  # override size with actual size

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        raise NotImplementedError


class ImagePairSet(ImageCollection):
    def __init__(self, image_set: Dataset, path: str, n=None):
        """
        creates pairs of images with the appropriate label 1 (if more similiar) or 0 (if less similiar)
        :param image_set: instace of class ImageSet with loaded Images
        :param path: either "./test_triplets.txt" or "./train_triplets.txt"
        :param n: how many triplets (=lines) to use
        """
        super().__init__(image_set, path, n)
        self.size = len(self.triplets) * 2

    def __len__(self):
        return self.size

    def __getitem__(self, index: int):
        a, b, c = self.triplets[index // 2]
        if index % 2 == 0:
            return self.image_set[a], self.image_set[b], 1
        else:
            return self.image_set[a], self.image_set[c], 0


class ImageTripletSet(ImageCollection):
    def __init__(self, image_set: Dataset, path: str = None, n=None, triplets = None,
                 shuffle: bool = True):
        """
        creates triplets as specified by file given by path
        :param shuffle: wether to (randomly) rearrange a, b, c, as a, c, b or not
        :param image_set: instance of class Image Set with loaded Images
        :param path: either "./test_triplets.txt" or "./train_triplets.txt"
        :param n: how many triplets (=lines) to use
        """
        super().__init__(image_set, path=path, n=n, triplets=triplets)
        self.shuffle = shuffle
        self.size = len(self.triplets)

    def __getitem__(self, index: int):
        a, b, c, = self.triplets[index]
        if random.randint(0, 1) or not self.shuffle:
            return self.image_set[a], self.image_set[b], self.image_set[c], 1  # 1 if a and b more similiar tha a and c
        else:
            return self.image_set[a], self.image_set[c], self.image_set[b], 0  # 0 if a and c more similiar tha a and b

    def split(self, ratio=0.2):
        """
        splits this data set in 2 others with first one (1-test_ratio) of size, and second one test_ratio of size
        used to split the train_val_set into a train set and a val set
        :param ratio: the ratio of the second set
        :return: 2 ImageTripletSet's
        """
        assert 0 <= ratio <= 1
        n = int((1 - ratio) * len(self))
        return (
            ImageTripletSet(self.image_set, triplets=self.triplets[:n], shuffle=self.shuffle),
            ImageTripletSet(self.image_set, triplets=self.triplets[n:], shuffle=self.shuffle),
        )


class DataLoaderForever:
    def __init__(self, dataloader):
        self.dataloader = dataloader
        self.iter = iter(dataloader)

    def __iter__(self):
        return self

    def __next__(self):
        try:
            return next(self.iter)
        except StopIteration:
            self.iter = iter(self.dataloader)
        return self.__next__()


In [None]:
# load images
images = ImageSetWithLoading(
    "/content/drive/MyDrive/food_270_224_in",
    transform=T.Compose([T.ToTensor(), ]),
    n=5000
)

# for i, j in enumerate(images.image_paths):
#   print(i, j)

# generate train-triplets and validatig triplets from train-images (=tv-triplets) from traing-triplets
train_tv_triplets = ImageTripletSet(images, "/content/drive/MyDrive/train_triplets_custom.txt", shuffle=True)
train_triplets, tv_triplets = train_tv_triplets.split()

# generate train-triplets from val-triplets (other images than in train_triplets!)
val_triplets = ImageTripletSet(images, "/content/drive/MyDrive/val_triplets_custom.txt", shuffle=True)

# create DataLoader's to iterate over
train_loader = DataLoader(train_triplets, batch_size=BATCH_SIZE, shuffle=False, drop_last=True)
tv_loader = DataLoaderForever(DataLoader(tv_triplets, batch_size=BATCH_SIZE, shuffle=False, drop_last=True))
val_loader = DataLoaderForever(DataLoader(val_triplets, batch_size=BATCH_SIZE, shuffle=False, drop_last=True))


[ * ] Loading images
[ + ] Loaded all images successfully


In [None]:
class ContrastiveLoss(nn.Module):
    """
    Contrastive Loss Function
    Credits to https://datahacker.rs/019-siamese-network-in-pytorch-with-application-to-face-similarity/
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label) -> torch.Tensor:
        # Calculate the euclidean distance and calculate the contrastive loss
        euclidean_distance = nn.functional.pairwise_distance(output1, output2, keepdim=True)
        return torch.mean(
            (1 - label) * torch.pow(euclidean_distance, 2) +
            label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        )


class TripletLoss(nn.Module):
    """
    Triplet Loss
    Refernce: https://datahacker.rs/siamese-network-triplet-loss/
    """

    def __init__(self, margin=1e-2):
        super().__init__()
        self.margin = margin

    def forward(self, outputA, outputB, outputC, labels) -> torch.Tensor:
        n = len(outputA)

        distance0 = torch.pow(nn.functional.pairwise_distance(outputA, outputB, keepdim=True), 2)
        distance1 = torch.pow(nn.functional.pairwise_distance(outputA, outputC, keepdim=True), 2)

        # a tensor with 1 if label was 1 and -1 if label was 0
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels)))

        # calculate the diff of the distances, eliminate one dimension, scale with -1 if b and c swapped
        distances = torch.reshape(distance0 - distance1, [n])
        distances = distances * factors
        x = torch.clamp(distances + self.margin, min=0)
        return torch.sum(x)


class TripletBinaryLoss(nn.Module):
    """
    returns the number of samples that are wrong classified per batch
    """

    def __init__(self, margin=0.001):
        """

        :param margin: if difference less than margin => wrong classified
        """
        super().__init__()
        self.margin = margin

    def forward(self, outputA, outputB, outputC, labels) -> torch.Tensor:
        n = len(outputA)
        distance0 = torch.pow(nn.functional.pairwise_distance(outputA, outputB, keepdim=True), 2)
        distance1 = torch.pow(nn.functional.pairwise_distance(outputA, outputC, keepdim=True), 2)
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels)))
        distances = torch.reshape(distance0 - distance1, [n]) * factors
        return torch.sum(distances + self.margin > 0)


In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        """
        Siamese network
        Credits to https://datahacker.rs/019-siamese-network-in-pytorch-with-application-to-face-similarity/
        """
        super().__init__()
        self.cnn1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),

            nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),

            nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, stride=1),
            nn.ReLU(inplace=True),
        )

        self.fc1 = nn.Sequential(
            nn.Linear(in_features=384, out_features=1024),
            nn.ReLU(inplace=True),

            nn.Linear(in_features=1024, out_features=256),
            nn.ReLU(inplace=True),

            nn.Linear(in_features=256, out_features=2),
        )

    def forward_one(self, img):
        output = self.cnn1(img)
        o = output.view(output.size()[0], -1)
        return self.fc1(o)

    def forward(self, *images):
        return (self.forward_one(img) for img in images)


class PretrainedNetwork(nn.Module):
    def __init__(self, device: str, pretrained_network: str):
        super().__init__()
        self.device = device
        self.pretrained_network = pretrained_network
        self.model = None

    def get_pretrained_network(self):
        return {
            "resnet18": torchvision.models.resnet18,
            "resnet50": torchvision.models.resnet50,
            # ToDo: add other pretrained models here
        }[self.pretrained_network](pretrained=True).to(self.device)

    def get_feature_extractor(self):
        feature_extractor = self.get_pretrained_network()

        # don't change weights of pretrained model
        for param in feature_extractor.parameters():
            param.requires_grad = False

        return feature_extractor

    def forward_one(self, img):
        return self.model(img)

    def forward(self, *images):
        return [self.forward_one(img) for img in images]


class PretrainedFeatureExtractorNetwork(PretrainedNetwork):
    def __init__(self, device: str, architecture_fn: nn.Sequential, pretrained_network: str):
        """
        generate a pretrained model with a fully connected last layer
        only trains last layer but not the pretrained model
        the pretrained model only extracts features from the images
        :param architecture_fn: 
        :param device: cuda or cpu
        :param pretrained_network: name of nn to use
        """
        super().__init__(device=device, pretrained_network=pretrained_network)
        self.model = self.get_feature_extractor()
        self.model.fc = architecture_fn(self.model.fc.in_features)




class FineTuningNetwork(PretrainedNetwork):
    def __init__(self, device: str, architecture_fn: nn.Sequential, pretrained_network: str):
        super().__init__(device=device, pretrained_network=pretrained_network)
        self.model = self.get_pretrained_network()
        self.model.fc = architecture_fn(self.model.fc.in_features)


class Classificator(PretrainedNetwork):
    def __init__(self, device: str, architecture_fn: nn.Sequential, pretrained_network: str):
        super().__init__(device, pretrained_network)

        self.feature_extractor = self.get_feature_extractor()
        self.fc = architecture_fn(self.feature_extractor.fc.in_features)
        self.feature_extractor.fc = nn.Identity()

    def forward(self, a, b, c):
        A = self.feature_extractor(a)
        B = self.feature_extractor(b)
        C = self.feature_extractor(c)
        X = torch.cat([A, B, C], dim=1)
        X = self.fc(X)
        return X  # torch.max(X, dim=1)[1]


In [None]:
#### My Custom Modell Without Shuffle


class CustomPretrainedFeatureExtractorNetwork(PretrainedNetwork):
    def __init__(self, device: str, pretrained_network: str):
        super().__init__(device, pretrained_network)
        self.feature_extractor = self.get_feature_extractor()
        n_feature_extractor_out = self.feature_extractor.fc.in_features
        self.feature_extractor.fc = nn.Identity()
        self.model =  nn.Sequential(
            # nn.Dropout(p=0.5),
            nn.Linear(n_feature_extractor_out, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=False),

            # nn.Dropout(p=0.5),
            nn.Linear(512, 64),
            nn.BatchNorm1d(64),
            nn.Sigmoid(),
            # nn.Dropout(p=0.5),

            nn.Linear(64, 1),
            # nn.Sigmoid(),
        ).to(self.device)

    def forward(self, a, b, c):
        A = self.feature_extractor(a)
        B = self.feature_extractor(b)
        C = self.feature_extractor(c)

        # AB = torch.cat([A, B], dim=1)
        # AC = torch.cat([A, C], dim=1)
        AB = A*B
        AC = A*C
        # AB = torch.pow(A-B, 2)
        # AC = torch.pow(A-C, 2)

        outputAB = self.model(AB)
        outputAC = self.model(AC)

        return outputAB, outputAC


class CustomLossWithoutShuffle(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        diff = torch.reshape(AB - AC, [n])
        return torch.mean(torch.max(diff + self.margin, torch.tensor([0], device=DEVICE).float()))
        # return torch.mean(torch.clamp(diff + self.margin, min=0))



class CustomBinaryLossWithoutShuffle(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        diff = torch.reshape(AB - AC, [n])
        print(AB- AC)
        print(diff)
        print(diff + self.margin)
        print([diff + self.margin>0])
        x = torch.sum(diff + self.margin > 0)
        return x



def validate_model2(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        batch0, batch1, batch2, labels = move_to_device(*next(val_iter), device=DEVICE)
        res0, res1 = model(batch0, batch1, batch2)
        loss_cnt += losser(res0, res1, labels)
        cnt_wrong_predicted += binary_losser(res0, res1, labels)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n




# create our neuronal network
model = CustomPretrainedFeatureExtractorNetwork(device=DEVICE, pretrained_network="resnet18").to(DEVICE)

# create our loss function for training
losser = CustomLossWithoutShuffle(margin=0.01, device=DEVICE)

# crate our loss function for validation, user binary loss here: either true classified or wrong classified
binary_losser = CustomBinaryLossWithoutShuffle(margin=0.00001, device=DEVICE)

# create optimizer
# IMPORTANT: if pretrained and only extracting features (last layer) user model.fc.parameters() (=only
# optimize last layer) here, else use model.parameters() (=all weights to optimize) here
optimizer = optim.Adam(model.model.parameters(), lr=0.01)

# for stats
counter = []
loss_history = []
iteration_number = 0

assert train_triplets.shuffle == False

print("[ * ] Start training")
for epoch in range(20):
    for i, data in enumerate(train_loader):

        batch0, batch1, batch2, labels = move_to_device(*data, device=DEVICE)

        optimizer.zero_grad()
        model.train()  # why train?
        res0, res1 = model(batch0, batch1, batch2)

        loss = losser(res0, res1, labels)
        loss.backward()
        optimizer.step()

        print("train loss: {:.3f}".format(loss.item()))

        if i % 50 == 0:
            print("[ * ] eval the model")

            # staistics
            iteration_number += 50
            counter.append(iteration_number)
            loss_history.append(loss.item())

            # test
            model.eval()
            with torch.no_grad():
                val_loss0, val_loss1 = validate_model2(
                    model, losser, binary_losser, 4, val_loader, BATCH_SIZE, device=DEVICE
                )
                tv_loss0, tv_loss1 = validate_model2(
                    model, losser, binary_losser, 4, tv_loader, BATCH_SIZE, device=DEVICE
                )
                print(f"val loss  : {val_loss0:.3f}")
                print(f"val       : {100 * val_loss1:.3f}% wrong classified")
                print(f"tv loss   : {tv_loss0:.3f}")
                print(f"tv        : {100 * tv_loss1:.3f}% wrong classified")
            print("[ + ] evaluation completed")
            print("[ * ] train again")

    print(f"[ + ] Finished epoch {epoch}")
print("[ + ] Finished training")

show_plot(counter, loss_history)

In [None]:
#### My Custom Modell Complicated model



class CustomPretrainedFeatureExtractorNetwork(PretrainedNetwork):
    def __init__(self, device: str, pretrained_network: str):
        super().__init__(device, pretrained_network)
        self.feature_extractor = self.get_feature_extractor()
        n_feature_extractor_out = self.feature_extractor.fc.in_features
        self.feature_extractor.fc = nn.Identity()
        self.model =  nn.Sequential(
            # nn.Dropout(p=0.5),
            nn.Linear(n_feature_extractor_out, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=False),

            # nn.Dropout(p=0.5),
            nn.Linear(1024, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            # nn.Dropout(p=0.5),

            nn.Linear(256, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),

            nn.Linear(64, 1),
            # nn.Sigmoid(),
        ).to(self.device)

    def forward(self, a, b, c):
        A = self.feature_extractor(a)
        B = self.feature_extractor(b)
        C = self.feature_extractor(c)

        # AB = torch.cat([A, B], dim=1)
        # AC = torch.cat([A, C], dim=1)
        # AB = A*B
        # AC = A*C
        AB = torch.pow(A-B, 2)
        AC = torch.pow(A-C, 2)

        outputAB = self.model(AB)
        outputAC = self.model(AC)

        return outputAB, outputAC


class CustomLoss(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC, labels):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels))).to(self.device)
        diff = torch.reshape(AB - AC, [n]) * factors
        return torch.mean(torch.max(diff + self.margin, torch.tensor([0], device=DEVICE).float()))
        # return torch.mean(torch.clamp(diff + self.margin, min=0))


class CustomBinaryLoss(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC, labels):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels))).to(self.device)
        diff = torch.reshape(AB - AC, [n]) * factors
        x = torch.sum(diff + self.margin > 0)
        return x


def validate_model2(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        batch0, batch1, batch2, labels = move_to_device(*next(val_iter), device=DEVICE)
        res0, res1 = model(batch0, batch1, batch2)
        loss_cnt += losser(res0, res1, labels)
        cnt_wrong_predicted += binary_losser(res0, res1, labels)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n




# create our neuronal network
model = CustomPretrainedFeatureExtractorNetwork(device=DEVICE, pretrained_network="resnet18").to(DEVICE)

# create our loss function for training
losser = CustomLoss(margin=0.01, device=DEVICE)

# crate our loss function for validation, user binary loss here: either true classified or wrong classified
binary_losser = CustomBinaryLoss(margin=0.00001, device=DEVICE)

# create optimizer
# IMPORTANT: if pretrained and only extracting features (last layer) user model.fc.parameters() (=only
# optimize last layer) here, else use model.parameters() (=all weights to optimize) here
optimizer = optim.Adam(model.model.parameters(), lr=0.01)

# for stats
counter = []
loss_history = []
iteration_number = 0

assert train_triplets.shuffle == True

print("[ * ] Start training")
for epoch in range(20):
    for i, data in enumerate(train_loader):

        batch0, batch1, batch2, labels = move_to_device(*data, device=DEVICE)

        optimizer.zero_grad()
        model.train()  # why train?
        res0, res1 = model(batch0, batch1, batch2)

        loss = losser(res0, res1, labels)
        loss.backward()
        optimizer.step()

        print("train loss: {:.3f}".format(loss.item()))

        if i % 50 == 0:
            print("[ * ] eval the model")

            # staistics
            iteration_number += 50
            counter.append(iteration_number)
            loss_history.append(loss.item())

            # test
            model.eval()
            with torch.no_grad():
                val_loss0, val_loss1 = validate_model2(
                    model, losser, binary_losser, 4, val_loader, BATCH_SIZE, device=DEVICE
                )
                tv_loss0, tv_loss1 = validate_model2(
                    model, losser, binary_losser, 4, tv_loader, BATCH_SIZE, device=DEVICE
                )
                print(f"val loss  : {val_loss0:.3f}")
                print(f"val       : {100 * val_loss1:.4f}% wrong classified")
                print(f"tv loss   : {tv_loss0:.3f}")
                print(f"tv        : {100 * tv_loss1:.4f}% wrong classified")
            print("[ + ] evaluation completed")
            print("[ * ] train again")

    print(f"[ + ] Finished epoch {epoch}")
print("[ + ] Finished training")

show_plot(counter, loss_history)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
train loss: 0.021
train loss: 0.034
train loss: 0.024
train loss: 0.041
train loss: 0.030
train loss: 0.022
train loss: 0.017
train loss: 0.020
train loss: 0.019
train loss: 0.016
train loss: 0.022
train loss: 0.016
train loss: 0.020
train loss: 0.016
train loss: 0.020
train loss: 0.022
train loss: 0.013
train loss: 0.018
train loss: 0.018
train loss: 0.023
train loss: 0.012
train loss: 0.013
train loss: 0.016
train loss: 0.016
train loss: 0.022
train loss: 0.020
train loss: 0.015
train loss: 0.019
[ * ] eval the model
val loss  : 0.012
val       : 45.3125% wrong classified
tv loss   : 0.013
tv        : 45.3125% wrong classified
[ + ] evaluation completed
[ * ] train again
train loss: 0.012
train loss: 0.010
train loss: 0.021
train loss: 0.011
train loss: 0.014
train loss: 0.014
train loss: 0.015
train loss: 0.013
train loss: 0.023
train loss: 0.016
train loss: 0.014
train loss: 0.015
train loss: 0.022
train loss: 0.013
t

KeyboardInterrupt: ignored

In [None]:
#### My Custom Modell Simple model

# reaches without relu: 47/45%
# reaches with relu: 100/100%




class CustomPretrainedFeatureExtractorNetwork(PretrainedNetwork):
    def __init__(self, device: str, pretrained_network: str):
        super().__init__(device, pretrained_network)
        self.feature_extractor = self.get_feature_extractor()
        n_feature_extractor_out = self.feature_extractor.fc.in_features
        self.feature_extractor.fc = nn.Identity()
        self.model =  nn.Sequential(
            nn.Linear(n_feature_extractor_out, 2048),
                      nn.BatchNorm1d(n_feature_extractor_out),

            nn.Sigmoid(),
        ).to(self.device)

    def forward(self, a, b, c):
        A = self.feature_extractor(a)
        B = self.feature_extractor(b)
        C = self.feature_extractor(c)

        # AB = torch.cat([A, B], dim=1)
        # AC = torch.cat([A, C], dim=1)
        # AB = A*B
        # AC = A*C
        AB = torch.pow(A-B, 2)
        AC = torch.pow(A-C, 2)

        outputAB = self.model(AB)
        outputAC = self.model(AC)

        return outputAB, outputAC


class CustomLoss(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC, labels):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels))).to(self.device)
        diff = torch.reshape(AB - AC, [n]) * factors
        return torch.mean(torch.max(diff + self.margin, torch.tensor([0], device=DEVICE).float()))
        # return torch.mean(torch.clamp(diff + self.margin, min=0))


class CustomBinaryLoss(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC, labels):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels))).to(self.device)
        diff = torch.reshape(AB - AC, [n]) * factors
        x = torch.sum(diff + self.margin > 0)
        return x


def validate_model2(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        batch0, batch1, batch2, labels = move_to_device(*next(val_iter), device=DEVICE)
        res0, res1 = model(batch0, batch1, batch2)
        loss_cnt += losser(res0, res1, labels)
        cnt_wrong_predicted += binary_losser(res0, res1, labels)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n




# create our neuronal network
model = CustomPretrainedFeatureExtractorNetwork(device=DEVICE, pretrained_network="resnet18").to(DEVICE)

# create our loss function for training
losser = CustomLoss(margin=0.001, device=DEVICE)

# crate our loss function for validation, user binary loss here: either true classified or wrong classified
binary_losser = CustomBinaryLoss(margin=0.00001, device=DEVICE)

# create optimizer
# IMPORTANT: if pretrained and only extracting features (last layer) user model.fc.parameters() (=only
# optimize last layer) here, else use model.parameters() (=all weights to optimize) here
optimizer = optim.Adam(model.model.parameters(), lr=0.01)

# for stats
counter = []
loss_history = []
iteration_number = 0

assert train_triplets.shuffle == True

print("[ * ] Start training")
for epoch in range(20):
    for i, data in enumerate(train_loader):

        batch0, batch1, batch2, labels = move_to_device(*data, device=DEVICE)

        optimizer.zero_grad()
        model.train()  # why train?
        res0, res1 = model(batch0, batch1, batch2)

        loss = losser(res0, res1, labels)
        loss.backward()
        optimizer.step()

        print("train loss: {:.6f}".format(loss.item()))

        if i % 50 == 0:
            print("[ * ] eval the model")

            # staistics
            iteration_number += 50
            counter.append(iteration_number)
            loss_history.append(loss.item())

            # test
            model.eval()
            with torch.no_grad():
                val_loss0, val_loss1 = validate_model2(
                    model, losser, binary_losser, 4, val_loader, BATCH_SIZE, device=DEVICE
                )
                tv_loss0, tv_loss1 = validate_model2(
                    model, losser, binary_losser, 4, tv_loader, BATCH_SIZE, device=DEVICE
                )
                print(f"val loss  : {val_loss0:.3f}")
                print(f"val       : {100 * val_loss1:.6f}% wrong classified")
                print(f"tv loss   : {tv_loss0:.3f}")
                print(f"tv        : {100 * tv_loss1:.6f}% wrong classified")
            print("[ + ] evaluation completed")
            print("[ * ] train again")

    print(f"[ + ] Finished epoch {epoch}")
print("[ + ] Finished training")

show_plot(counter, loss_history)

[ * ] Start training
train loss: 0.055055
[ * ] eval the model
val loss  : 0.043
val       : 43.750000% wrong classified
tv loss   : 0.048
tv        : 45.703125% wrong classified
[ + ] evaluation completed
[ * ] train again
train loss: 0.048739
train loss: 0.012984
train loss: 0.002430
train loss: 0.001170
train loss: 0.001044
train loss: 0.001041
train loss: 0.000987
train loss: 0.001013
train loss: 0.001037
train loss: 0.000999
train loss: 0.000984
train loss: 0.001000
train loss: 0.000999
train loss: 0.000994
train loss: 0.001000
train loss: 0.001004
train loss: 0.000999
train loss: 0.000999
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.000999
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000
train loss: 0.001000

KeyboardInterrupt: ignored

In [None]:
#### ARCHIV My Custom Modell
# 47/20%
## with more complicated model: same performances
## with more complicated model and pow(A-B): 50/15



class CustomPretrainedFeatureExtractorNetwork(PretrainedNetwork):
    def __init__(self, device: str, pretrained_network: str):
        super().__init__(device, pretrained_network)
        self.feature_extractor = self.get_feature_extractor()
        n_feature_extractor_out = self.feature_extractor.fc.in_features
        self.feature_extractor.fc = nn.Identity()
        self.model =  nn.Sequential(
            # nn.Dropout(p=0.5),
            nn.Linear(n_feature_extractor_out, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=False),

            # nn.Dropout(p=0.5),
            nn.Linear(512, 64),
            nn.BatchNorm1d(64),
            nn.Sigmoid(),
            # nn.Dropout(p=0.5),

            nn.Linear(64, 1),
            # nn.Sigmoid(),
        ).to(self.device)

    def forward(self, a, b, c):
        A = self.feature_extractor(a)
        B = self.feature_extractor(b)
        C = self.feature_extractor(c)

        # AB = torch.cat([A, B], dim=1)
        # AC = torch.cat([A, C], dim=1)
        AB = A*B
        AC = A*C
        # AB = torch.pow(A-B, 2)
        # AC = torch.pow(A-C, 2)

        outputAB = self.model(AB)
        outputAC = self.model(AC)

        return outputAB, outputAC


class CustomLoss(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC, labels):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels))).to(self.device)
        diff = torch.reshape(AB - AC, [n]) * factors
        return torch.mean(torch.max(diff + self.margin, torch.tensor([0], device=DEVICE).float()))
        # return torch.mean(torch.clamp(diff + self.margin, min=0))


class CustomBinaryLoss(nn.Module):
    def __init__(self, device, margin=1):
        super().__init__()
        self.device = device
        self.margin = margin

    def forward(self, AB, AC, labels):
        n = len(labels)  # assert n == len(AB) == len(AC) holds
        factors = torch.Tensor(list(map(lambda label: (1 if label else -1), labels))).to(self.device)
        diff = torch.reshape(AB - AC, [n]) * factors
        print((AB- AC))
        print(labels)
        print(factors)
        print(diff)
        print(diff + self.margin)
        print([diff + self.margin>0])
        x = torch.sum(diff + self.margin > 0)
        return x


def validate_model2(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        batch0, batch1, batch2, labels = move_to_device(*next(val_iter), device=DEVICE)
        res0, res1 = model(batch0, batch1, batch2)
        loss_cnt += losser(res0, res1, labels)
        cnt_wrong_predicted += binary_losser(res0, res1, labels)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n




# create our neuronal network
model = CustomPretrainedFeatureExtractorNetwork(device=DEVICE, pretrained_network="resnet18").to(DEVICE)

# create our loss function for training
losser = CustomLoss(margin=0.01, device=DEVICE)

# crate our loss function for validation, user binary loss here: either true classified or wrong classified
binary_losser = CustomBinaryLoss(margin=0.00001, device=DEVICE)

# create optimizer
# IMPORTANT: if pretrained and only extracting features (last layer) user model.fc.parameters() (=only
# optimize last layer) here, else use model.parameters() (=all weights to optimize) here
optimizer = optim.Adam(model.model.parameters(), lr=0.01)

# for stats
counter = []
loss_history = []
iteration_number = 0

assert train_triplets.shuffle == True

print("[ * ] Start training")
for epoch in range(20):
    for i, data in enumerate(train_loader):

        batch0, batch1, batch2, labels = move_to_device(*data, device=DEVICE)

        optimizer.zero_grad()
        model.train()  # why train?
        res0, res1 = model(batch0, batch1, batch2)

        loss = losser(res0, res1, labels)
        loss.backward()
        optimizer.step()

        print("train loss: {:.3f}".format(loss.item()))

        if i % 50 == 0:
            print("[ * ] eval the model")

            # staistics
            iteration_number += 50
            counter.append(iteration_number)
            loss_history.append(loss.item())

            # test
            model.eval()
            with torch.no_grad():
                val_loss0, val_loss1 = validate_model2(
                    model, losser, binary_losser, 4, val_loader, BATCH_SIZE, device=DEVICE
                )
                tv_loss0, tv_loss1 = validate_model2(
                    model, losser, binary_losser, 4, tv_loader, BATCH_SIZE, device=DEVICE
                )
                print(f"val loss  : {val_loss0:.3f}")
                print(f"val       : {100 * val_loss1:.3f}% wrong classified")
                print(f"tv loss   : {tv_loss0:.3f}")
                print(f"tv        : {100 * tv_loss1:.3f}% wrong classified")
            print("[ + ] evaluation completed")
            print("[ * ] train again")

    print(f"[ + ] Finished epoch {epoch}")
print("[ + ] Finished training")

show_plot(counter, loss_history)

[ * ] Start training
train loss: 0.053
[ * ] eval the model
val loss  : 0.053
val       : 46.875% wrong classified
tv loss   : 0.054
tv        : 51.562% wrong classified
[ + ] evaluation completed
[ * ] train again
train loss: 0.115
train loss: 0.166
train loss: 0.182
train loss: 0.144
train loss: 0.124
train loss: 0.079
train loss: 0.038
train loss: 0.023
train loss: 0.038
train loss: 0.059
train loss: 0.054
train loss: 0.044
train loss: 0.057
train loss: 0.032
train loss: 0.023
train loss: 0.018
train loss: 0.015
train loss: 0.026
train loss: 0.025
train loss: 0.038
train loss: 0.033
train loss: 0.035
train loss: 0.021
train loss: 0.018
train loss: 0.011
train loss: 0.017
train loss: 0.023
train loss: 0.019
train loss: 0.017
train loss: 0.015
train loss: 0.014
train loss: 0.018
train loss: 0.013
train loss: 0.013
train loss: 0.014
train loss: 0.015
train loss: 0.014
train loss: 0.008
train loss: 0.010
train loss: 0.016
train loss: 0.011
train loss: 0.013
train loss: 0.012
train loss:

KeyboardInterrupt: ignored

In [None]:
#### Classic Triplet Loss 
# gives 47/39
#  https://www.kaggle.com/code/hirotaka0122/triplet-loss-with-pytorch/notebook


class ClassicPretrainedFeatureExtractorNetwork(PretrainedNetwork):
    def __init__(self, device: str, pretrained_network: str):
        super().__init__(device, pretrained_network)
        self.model = self.get_feature_extractor()
        self.fc = nn.Sequential(
            nn.Linear(self.model.fc.in_features, 2048),
            nn.ReLU(),
            nn.Linear(2048, 512),
        )
        self.model.fc = self.fc

    def forward(self, x):
        return self.model(x)



class ClassicTripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin
        
    def calc_euclidean(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        losses = torch.relu(distance_positive - distance_negative + self.margin)

        return losses.mean()


class ClassicTripletBinaryLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin
        
    def calc_euclidean(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        losses = (distance_positive - distance_negative + self.margin > 0)

        return losses.sum()


def validate_classic_triplet(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        batch0, batch1, batch2, labels = move_to_device(*next(val_iter), device=DEVICE)
        assert labels.sum() == BATCH_SIZE
        res0 = model(batch0)
        res1 = model(batch1)
        res2 = model(batch2)
        loss_cnt += losser(res0, res1, res2)
        cnt_wrong_predicted += binary_losser(res0, res1, res2)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n




# create our neuronal network
model = ClassicPretrainedFeatureExtractorNetwork(device=DEVICE, pretrained_network="resnet18").to(DEVICE)

# create our loss function for training
losser = ClassicTripletLoss(margin=0.1)

# crate our loss function for validation, user binary loss here: either true classified or wrong classified
binary_losser = ClassicTripletBinaryLoss(margin=0.00001)

# create optimizer
# IMPORTANT: if pretrained and only extracting features (last layer) user model.fc.parameters() (=only
# optimize last layer) here, else use model.parameters() (=all weights to optimize) here
optimizer = optim.Adam(model.model.fc.parameters(), lr=0.01)

# for stats
counter = []
loss_history = []
iteration_number = 0

assert train_triplets.shuffle == False

print("[ * ] Start training")
for epoch in range(20):
    for i, data in enumerate(train_loader):

        batch0, batch1, batch2, labels = move_to_device(*data, device=DEVICE)
        assert torch.sum(labels) == BATCH_SIZE

        optimizer.zero_grad()
        model.train()  # why train?
        res0 = model(batch0)
        res1 = model(batch1)
        res2 = model(batch2)

        loss = losser(res0, res1, res2)
        loss.backward()
        optimizer.step()

        print("train loss: {:.3f}".format(loss.item()))

        if i % 50 == 0:
            print("[ * ] eval the model")

            # staistics
            iteration_number += 50
            counter.append(iteration_number)
            loss_history.append(loss.item())

            # test
            model.eval()
            with torch.no_grad():
                val_loss0, val_loss1 = validate_classic_triplet(
                    model, losser, binary_losser, 4, val_loader, BATCH_SIZE, device=DEVICE
                )
                tv_loss0, tv_loss1 = validate_classic_triplet(
                    model, losser, binary_losser, 4, tv_loader, BATCH_SIZE, device=DEVICE
                )
                print(f"val loss  : {val_loss0:.3f}")
                print(f"val       : {100 * val_loss1:.3f}% wrong classified")
                print(f"tv loss   : {tv_loss0:.3f}")
                print(f"tv        : {100 * tv_loss1:.3f}% wrong classified")
            print("[ + ] evaluation completed")
            print("[ * ] train again")

    print(f"[ + ] Finished epoch {epoch}")
print("[ + ] Finished training")

show_plot(counter, loss_history)

[ * ] Start training
train loss: 1.972
[ * ] eval the model
val loss  : 36.731
val       : 41.016% wrong classified
tv loss   : 35.131
tv        : 51.172% wrong classified
[ + ] evaluation completed
[ * ] train again
train loss: 44.589
train loss: 749.869
train loss: 44.377
train loss: 67.595
train loss: 99.923
train loss: 87.616
train loss: 86.867
train loss: 52.830
train loss: 76.749
train loss: 53.076
train loss: 27.598
train loss: 29.259
train loss: 51.540
train loss: 46.312
train loss: 51.085
train loss: 32.042
train loss: 34.784
train loss: 36.954
train loss: 26.605
train loss: 39.670
train loss: 33.641
train loss: 9.898
train loss: 11.744
train loss: 21.235
train loss: 24.855
train loss: 32.807
train loss: 20.776
train loss: 17.742
train loss: 16.288
train loss: 12.792
train loss: 8.874
train loss: 12.006
train loss: 13.880
train loss: 11.147
train loss: 11.098
train loss: 6.999
train loss: 8.334
train loss: 5.105
train loss: 10.580
train loss: 6.717
train loss: 6.942
train loss

KeyboardInterrupt: ignored

In [None]:
#### ARCHIV Classic Triplet Loss

# Gives approx 45/43%

#  https://www.kaggle.com/code/hirotaka0122/triplet-loss-with-pytorch/notebook


class ClassicPretrainedFeatureExtractorNetwork(PretrainedNetwork):
    def __init__(self, device: str, pretrained_network: str):
        super().__init__(device, pretrained_network)
        self.model = self.get_feature_extractor()
        self.fc = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(self.model.fc.in_features, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=False),

            nn.Dropout(p=0.5),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.Sigmoid(),
            nn.Dropout(p=0.5),

            nn.Linear(256, 128),
            nn.Sigmoid(),
        )
        # self.fc = nn.Sequential(
        #     nn.Linear(self.model.fc.out_features, 2048),
        #     nn.ReLU(),
        #     nn.Linear(2048, 512),
        # )
        self.model.fc = self.fc

    def forward(self, x):
        return self.model(x)



class ClassicTripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin
        
    def calc_euclidean(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        losses = torch.relu(distance_positive - distance_negative + self.margin)

        return losses.mean()


class ClassicTripletBinaryLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin
        
    def calc_euclidean(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        losses = (distance_positive - distance_negative + self.margin > 0)

        return losses.sum()


def validate_classic_triplet(
        model: nn.Module, losser: nn.Module, binary_losser: nn.Module, n_batches: int, val_iter: iter,
        batch_size: int, device: str
):
    loss_cnt = 0
    cnt_wrong_predicted = 0
    for i in range(n_batches):
        batch0, batch1, batch2, labels = move_to_device(*next(val_iter), device=DEVICE)
        assert labels.sum() == BATCH_SIZE
        res0 = model(batch0)
        res1 = model(batch1)
        res2 = model(batch2)
        loss_cnt += losser(res0, res1, res2)
        cnt_wrong_predicted += binary_losser(res0, res1, res2)
    n = n_batches * batch_size
    return loss_cnt / n_batches, cnt_wrong_predicted / n




# create our neuronal network
model = ClassicPretrainedFeatureExtractorNetwork(device=DEVICE, pretrained_network="resnet18").to(DEVICE)

# create our loss function for training
losser = ClassicTripletLoss(margin=0.1)

# crate our loss function for validation, user binary loss here: either true classified or wrong classified
binary_losser = ClassicTripletBinaryLoss(margin=0.00001)

# create optimizer
# IMPORTANT: if pretrained and only extracting features (last layer) user model.fc.parameters() (=only
# optimize last layer) here, else use model.parameters() (=all weights to optimize) here
optimizer = optim.Adam(model.model.fc.parameters(), lr=0.01)

# for stats
counter = []
loss_history = []
iteration_number = 0

assert train_triplets.shuffle == False

print("[ * ] Start training")
for epoch in range(20):
    for i, data in enumerate(train_loader):

        batch0, batch1, batch2, labels = move_to_device(*data, device=DEVICE)
        assert torch.sum(labels) == BATCH_SIZE

        optimizer.zero_grad()
        model.train()  # why train?
        res0 = model(batch0)
        res1 = model(batch1)
        res2 = model(batch2)

        loss = losser(res0, res1, res2)
        loss.backward()
        optimizer.step()

        print("train loss: {:.3f}".format(loss.item()))

        if i % 50 == 0:
            print("[ * ] eval the model")

            # staistics
            iteration_number += 50
            counter.append(iteration_number)
            loss_history.append(loss.item())

            # test
            model.eval()
            with torch.no_grad():
                val_loss0, val_loss1 = validate_classic_triplet(
                    model, losser, binary_losser, 4, val_loader, BATCH_SIZE, device=DEVICE
                )
                tv_loss0, tv_loss1 = validate_classic_triplet(
                    model, losser, binary_losser, 4, tv_loader, BATCH_SIZE, device=DEVICE
                )
                print(f"val loss  : {val_loss0:.3f}")
                print(f"val       : {100 * val_loss1:.3f}% wrong classified")
                print(f"tv loss   : {tv_loss0:.3f}")
                print(f"tv        : {100 * tv_loss1:.3f}% wrong classified")
            print("[ + ] evaluation completed")
            print("[ * ] train again")

    print(f"[ + ] Finished epoch {epoch}")
print("[ + ] Finished training")

show_plot(counter, loss_history)

[ * ] Start training
train loss: 0.180
[ * ] eval the model
val loss  : 0.099
val       : 43.750% wrong classified
tv loss   : 0.099
tv        : 42.969% wrong classified
[ + ] evaluation completed
[ * ] train again
train loss: 0.176
train loss: 0.202
train loss: 0.255
train loss: 0.211
train loss: 0.292
train loss: 0.254
train loss: 0.276
train loss: 0.251
train loss: 0.297
train loss: 0.211
train loss: 0.237
train loss: 0.362
train loss: 0.371
train loss: 0.246
train loss: 0.314
train loss: 0.292
train loss: 0.277
train loss: 0.307
train loss: 0.283
train loss: 0.318
train loss: 0.307
train loss: 0.253
train loss: 0.185
train loss: 0.278
train loss: 0.204
train loss: 0.261
train loss: 0.204
train loss: 0.195
train loss: 0.177
train loss: 0.224
train loss: 0.162
train loss: 0.246
train loss: 0.236
train loss: 0.216
train loss: 0.189
train loss: 0.126
train loss: 0.135
train loss: 0.203
train loss: 0.132
train loss: 0.157
train loss: 0.136
train loss: 0.116
train loss: 0.143
train loss:

KeyboardInterrupt: ignored

In [None]:
config = Config(
    epochs=20,
    batch_size=64,
    lr=0.001,
    train_margin=0.01,
    val_margin=0.0001,
    image_folder="/content/drive/MyDrive/food_270_224_in",
    pretrained_network="resnet18",
    n_val_batches=5,
    architecture_fn=lambda out_features: nn.Sequential(
        nn.Dropout(p=0.5),
        nn.Linear(out_features, 512),
        nn.BatchNorm1d(512),
        nn.ReLU(inplace=False),

        nn.Dropout(p=0.5),
        nn.Linear(512, 64),
        nn.BatchNorm1d(64),
        nn.Sigmoid(),
        nn.Dropout(p=0.5),

        nn.Linear(64, 1),
        # nn.Sigmoid(),
    ),
    device=torch.device("cuda"),
    network=PretrainedFeatureExtractorNetwork2,
    losser=TripletLoss2,
    binary_losser=TripletBinaryLoss2,
    get_parameter_fn=lambda model: model.model.parameters()
)

In [None]:
# create our neuronal network
model = get_network(config).to(config.device)

# create our loss function for training
losser = get_losser(config).to(config.device)

# crate our loss function for validation, user binary loss here: either true classified or wrong classified
binary_losser = get_binary_losser(config).to(config.device)

# create optimizer
# IMPORTANT: if pretrained and only extracting features (last layer) user model.fc.parameters() (=only
# optimize last layer) here, else use model.parameters() (=all weights to optimize) here
optimizer = optim.Adam(config.get_parameter_fn(model), lr=config.lr)

In [None]:


# for stats
counter = []
loss_history = []
iteration_number = 0

print("[ * ] Start training")
for epoch in range(config.epochs):
    for i, data in enumerate(train_loader):

        batch0, batch1, batch2, labels = move_to_device(*data, device=config.device)

        optimizer.zero_grad()
        model.train()  # why train?
        res0, res1 = model(batch0, batch1, batch2)

        loss = losser(res0, res1, labels)
        loss.backward()
        optimizer.step()

        print("train loss: {:.3f}".format(loss.item()))

        if i % 20 == 0:
            print("[ * ] eval the model")

            # staistics
            iteration_number += 20
            counter.append(iteration_number)
            loss_history.append(loss.item())

            # test
            model.eval()
            with torch.no_grad():
                val_loss0, val_loss1 = validate2(
                    model, losser, binary_losser, config.n_val_batches,
                    val_loader, config.batch_size, device=config.device
                )
                tv_loss0, tv_loss1 = validate2(
                    model, losser, binary_losser, config.n_val_batches,
                    tv_loader, config.batch_size, device=config.device
                )
                print(f"val loss  : {100 * val_loss0:.3f}")
                print(f"val       : {100 * val_loss1:.3f}% wrong classified")
                print(f"tv loss   : {100 * tv_loss0:.3f}")
                print(f"tv        : {100 * tv_loss1:.3f}% wrong classified")
            print("[ + ] evaluation completed")
            print("[ * ] train again")

    print(f"[ + ] Finished epoch {epoch}")
print("[ + ] Finished training")

show_plot(counter, loss_history)