In [1]:
from torch import nn


class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.cnn1 = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.fc1 = nn.Sequential(
            nn.Linear(16 * 7 * 7, 10)
        )

    def forward_once(self, x):
        # Forward pass
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        # forward pass of input 1
        output1 = self.forward_once(input1)
        # forward pass of input 2
        output2 = self.forward_once(input2)
        return output1, output2

In [2]:
import torch
from torch import nn
import torch.nn.functional as F


class ContrastiveLoss(nn.Module):

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean(
            (1 - label) * torch.pow(euclidean_distance, 2)
            + label
            * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)
        )

        return loss_contrastive

In [3]:
import torch
import torch.nn.functional as F
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader


def test_pipeline(test_dataset, computing_device):
    test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)
    model = torch.jit.load("fold0epoch29.pt").to(computing_device)
    count = 1
    for img1, img2, label in test_dataloader:
        output1, output2 = model(img1, img2)

        figure = plt.figure(figsize=(8, 8))
        figure.suptitle(f'Image no.{count}', fontsize=16)

        ax = figure.add_subplot(1, 2, 1)
        ax.set_title("Img1")
        plt.axis("off")
        plt.imshow(img1.squeeze(), cmap="gray")
        ax = figure.add_subplot(1, 2, 2)
        ax.set_title("Img2")
        plt.axis("off")
        plt.imshow(img2.squeeze(), cmap="gray")

        plt.show()

        print(f"Image no.{count}")
        if label == torch.FloatTensor([[0]]):
            label = "Same numbers"
        else:
            label = "Different numbers"

        print(f"Correct label: '{label}'")
        print(F.pairwise_distance(output1, output2).item())
        print()

        count += 1
        if (count > 10):
            break

In [4]:
import time

import numpy as np
import torch
from torch.utils.data import DataLoader

from contrastive_loss import ContrastiveLoss
from model import SiameseNetwork

device = ""


def train(model, optimizer, criterion, dataloader):
    model.train()

    loss = []

    for img1, img2, label in dataloader:
        optimizer.zero_grad()

        img1, img2 = img1.to(device), img2.to(device)
        output1, output2 = model(img1, img2)

        loss_contrastive = criterion(output1, output2, label)
        loss_contrastive.backward()
        optimizer.step()
        loss.append(loss_contrastive.item())


    loss = np.array(loss)
    return loss.mean() / len(dataloader)


def save_model(model, name):
    model.eval()
    # Input to the model
    example1 = torch.randn(1, 1, 28, 28)
    example2 = torch.randn(1, 1, 28, 28)
    traced_script_module = torch.jit.trace(model.cpu(), (example1, example2))
    torch.jit.save(traced_script_module, name)


def validate(model, criterion, dataloader):
    model.eval()
    loss = []

    with torch.no_grad():
        for img1, img2, label in dataloader:
            img1, img2 = img1.to(device), img2.to(device)
            output1, output2 = model(img1, img2)

            loss_contrastive = criterion(output1, output2, label)
            loss.append(loss_contrastive.item())

        loss = np.array(loss)
    return loss.mean() / len(dataloader)


def train_pipeline(epochs, k_fold, batch_size, train_dataset, lr, computing_device):
    global device
    device = computing_device

    global contrastive_loss
    for fold, (train_idx, val_idx) in enumerate(k_fold.split(train_dataset)):

        train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
        val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx)

        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_subsampler)
        val_dataloader = DataLoader(train_dataset, batch_size=batch_size, sampler=val_subsampler)

        net = SiameseNetwork().to(computing_device)
        contrastive_loss = ContrastiveLoss()
        adam = torch.optim.Adam(net.parameters(), lr=lr)

        rounds_without_improvement = 0
        best_loss = float('inf')

        print(f"--FOLD {fold + 1}--\n")
        for epoch in range(epochs):
            print(f"--EPOCH {epoch + 1}--")

            train_loss = train(model=net, optimizer=adam, criterion=contrastive_loss, dataloader=train_dataloader)
            print(f"Train loss {train_loss}")

            val_loss = validate(model=net, criterion=contrastive_loss, dataloader=val_dataloader)
            print(f"Val loss {val_loss}")

            if (val_loss < best_loss):
                best_loss = val_loss
                best_model = net
                rounds_without_improvement = 0
            else:
                rounds_without_improvement += 1

            if (rounds_without_improvement > 3 or epoch == epochs - 1):
                save_model(model=net, name=f"fold{fold}-epoch{epoch}-transforms{train_dataset.transforms}.pt")
                break

In [5]:
import cv2 as cv
import numpy as np
import torch
import torchvision.transforms as T



class ResizeGrayscale:
    def __call__(self, sample):
        reshaped = T.Resize((28, 28))(sample)
        gray_reshaped = T.Grayscale()(reshaped)
        return 1 - gray_reshaped


class EqualizeHist:
    def __call__(self, sample):
        sample = sample.numpy()
        sample = cv.normalize(sample, None, alpha=0, beta=255, norm_type=cv.NORM_MINMAX, dtype=cv.CV_8UC1)
        sample = sample.astype(np.uint8)
        sample = cv.equalizeHist(sample[0])
        if len(np.where(sample.flatten() > 200)[0]) > len(np.where(sample.flatten() < 200)[0]):
            sample = 255 - sample
        sample = sample / 255
        return torch.from_numpy(sample).unsqueeze(0)


class CLAHE:
    def __call__(self, sample):
        sample = sample.numpy()
        sample = cv.normalize(sample, None, alpha=0, beta=255, norm_type=cv.NORM_MINMAX, dtype=cv.CV_8UC1)
        sample = sample.astype(np.uint8)
        clahe = cv.createCLAHE(clipLimit=3., )
        sample = clahe.apply(sample)
        if len(np.where(sample.flatten() > 200)[0]) > len(np.where(sample.flatten() < 200)[0]):
            sample = 255 - sample
        sample = sample / 255
        return torch.from_numpy(sample).unsqueeze(0)


class AdaptiveThreshold:
    def __call__(self, sample):
        sample = sample.numpy()
        sample = cv.normalize(sample, None, alpha=0, beta=255, norm_type=cv.NORM_MINMAX, dtype=cv.CV_8UC1)
        sample = sample.astype(np.uint8)
        sample = cv.adaptiveThreshold(sample[0], 255, cv.ADAPTIVE_THRESH_MEAN_C, cv.THRESH_BINARY, 11, 0)
        if len(np.where(sample.flatten() > 200)[0]) > len(np.where(sample.flatten() < 200)[0]):
            sample = 255 - sample
        sample = torch.from_numpy(sample).unsqueeze(0)
        sample = sample / 255
        return sample

In [6]:
import numpy as np
import torchvision
from torch.utils.data import Dataset

from transforms import ResizeGrayscale


class SiameseDataset(Dataset):
    def __init__(self, train: bool, transforms, mnist=False, svhn=False, mix=False, ):
        self.mnist_dataset = None
        self.svhn_dataset = None
        self.transforms = transforms

        if mnist:
            self.mnist_dataset = torchvision.datasets.MNIST("files", train=train, download=True,
                                                            transform=torchvision.transforms.Compose([
                                                                                                         torchvision.transforms.ToTensor(),
                                                                                                     ] + self.transforms))

        if svhn:
            if train:
                split = "train"
            else:
                split = "test"

            self.svhn_dataset = torchvision.datasets.SVHN(root="data", split=split, download=True,
                                                          transform=torchvision.transforms.Compose([
                                                                                                       torchvision.transforms.ToTensor(),
                                                                                                       ResizeGrayscale(),
                                                                                                   ] + self.transforms))

        # used to prepare the labels and images path
        self.pairs = make_pairs(mix, self.mnist_dataset, self.svhn_dataset)

        self.dataset = [self.mnist_dataset] + [self.svhn_dataset]

    def __getitem__(self, index):

        img1_dataset, img1_index = self.pairs[index][0]
        img2_dataset, img2_index = self.pairs[index][1]
        matching = self.pairs[index][2]

        return self.dataset[img1_dataset].__getitem__(img1_index)[0], \
               self.dataset[img2_dataset].__getitem__(img2_index)[0], matching

    def __len__(self):
        return len(self.pairs)


def make_pairs(mix, mnist=None, svhn=None):
    pairs = []

    num_classes = 10

    if mix and mnist and svhn:
        return mix_pairs(mnist, num_classes, svhn)
    if svhn:
        svhn_labels = svhn.labels
        svhn_idx = [np.where(svhn_labels == i)[0] for i in range(0, num_classes)]
        dataset_pos = 1

        for anchor_idx in range(len(svhn_labels)):
            label = svhn_labels[anchor_idx]

            pos_idx = np.random.choice(svhn_idx[label])

            pairs.append([(dataset_pos, anchor_idx), (dataset_pos, pos_idx), 0])

            negative_label = np.random.randint(0, num_classes)
            while negative_label == label:
                negative_label = np.random.randint(0, num_classes)

            neg_idx = np.random.choice(svhn_idx[negative_label])

            pairs.append([(dataset_pos, anchor_idx), (dataset_pos, neg_idx), 1])
    if mnist:
        mnist_labels = mnist.targets
        mnist_idx = [np.where(mnist_labels == i)[0] for i in range(0, num_classes)]
        dataset_pos = 0

        for anchor_idx in range(len(mnist_labels)):
            label = mnist_labels[anchor_idx]

            pos_idx = np.random.choice(mnist_idx[label])

            pairs.append([(dataset_pos, anchor_idx), (dataset_pos, pos_idx), 0])

            negative_label = np.random.randint(0, num_classes)
            while negative_label == label:
                negative_label = np.random.randint(0, num_classes)

            neg_idx = np.random.choice(mnist_idx[negative_label])

            pairs.append([(dataset_pos, anchor_idx), (dataset_pos, neg_idx), 1])

    return pairs


def add_pairs_mix(dataset_labels, dataset_pos, mnist_idx, svhn_idx, num_classes):
    pairs = []
    for anchor_idx in range(len(dataset_labels)):
        mnist_label = dataset_labels[anchor_idx]

        dataset_choice = np.random.randint(0, 2)
        # 0 = MNIST, 1 = SVHN

        if dataset_choice == 0:
            pos_idx = np.random.choice(mnist_idx[mnist_label])
        else:
            pos_idx = np.random.choice(svhn_idx[mnist_label])

        pairs.append([(dataset_pos, anchor_idx), (dataset_choice, pos_idx), 0])

        negative_label = np.random.randint(0, num_classes)
        while negative_label == mnist_label:
            negative_label = np.random.randint(0, num_classes)

        dataset_choice = np.random.randint(0, 2)

        if dataset_choice == 0:
            neg_idx = np.random.choice(mnist_idx[negative_label])
        else:
            neg_idx = np.random.choice(svhn_idx[negative_label])

        pairs.append([(dataset_pos, anchor_idx), (dataset_choice, neg_idx), 1])
    return pairs


def mix_pairs(mnist, num_classes, svhn):
    pairs = []
    ### Add mixing of datasets
    mnist_labels = mnist.targets
    svhn_labels = svhn.labels
    mnist_idx = [np.where(mnist_labels == i)[0] for i in range(0, num_classes)]
    svhn_idx = [np.where(svhn_labels == i)[0] for i in range(0, num_classes)]
    mnist_dataset_pos = 0
    svhn_dataset_pos = 1

    pairs = pairs + add_pairs_mix(mnist_labels, mnist_dataset_pos, mnist_idx, svhn_idx, num_classes)

    pairs = pairs + add_pairs_mix(svhn_labels, svhn_dataset_pos, mnist_idx, svhn_idx, num_classes)

    return pairs


In [7]:
import torch
from sklearn.model_selection import KFold

import test
import train
from dataset import SiameseDataset
from transforms import AdaptiveThreshold, EqualizeHist


class Pipelines:
    def __init__(self, k_fold_splits, batch_size, lr, epochs, transforms, device):
        self.k_fold_splits = k_fold_splits
        self.batch_size = batch_size
        self.lr = lr
        self.epochs = epochs
        self.transforms = transforms
        self.device = device

    def mnist_svhn_mix_pipeline(self):
        k_fold = KFold(n_splits=self.k_fold_splits)
        train_dataset = SiameseDataset(train=True, mnist=True, svhn=True, mix=True, transforms=self.transforms)
        train.train_pipeline(epochs=self.epochs, k_fold=k_fold, batch_size=self.batch_size, train_dataset=train_dataset,
                             lr=self.lr,
                             computing_device=self.device)

        del train_dataset

        test_dataset = SiameseDataset(train=False, mnist=True, svhn=True, mix=True, transforms=self.transforms)
        test.test_pipeline(test_dataset=test_dataset, computing_device=self.device)

    def mnist_svhn_pipeline(self):
        k_fold = KFold(n_splits=self.k_fold_splits)
        train_dataset = SiameseDataset(train=True, mnist=True, svhn=True, mix=False, transforms=self.transforms)
        train.train_pipeline(epochs=self.epochs, k_fold=k_fold, batch_size=self.batch_size, train_dataset=train_dataset,
                             lr=self.lr,
                             computing_device=device)

        del train_dataset

        test_dataset = SiameseDataset(train=False, mnist=True, svhn=True, mix=False, transforms=self.transforms)
        test.test_pipeline(test_dataset=test_dataset, computing_device=device)

    def mnist_pipeline(self):
        k_fold = KFold(n_splits=self.k_fold_splits)
        train_dataset = SiameseDataset(train=True, mnist=True, svhn=False, mix=False, transforms=self.transforms)
        train.train_pipeline(epochs=self.epochs, k_fold=k_fold, batch_size=self.batch_size, train_dataset=train_dataset,
                             lr=self.lr,
                             computing_device=device)

        del train_dataset

        test_dataset = SiameseDataset(train=False, mnist=True, svhn=False, mix=False, transforms=self.transforms)
        test.test_pipeline(test_dataset=test_dataset, computing_device=device)

    def svhn_pipeline(self):
        k_fold = KFold(n_splits=self.k_fold_splits)
        train_dataset = SiameseDataset(train=True, mnist=False, svhn=True, mix=False, transforms=self.transforms)
        train.train_pipeline(epochs=self.epochs, k_fold=k_fold, batch_size=self.batch_size, train_dataset=train_dataset,
                             lr=self.lr,
                             computing_device=device)

        del train_dataset

        test_dataset = SiameseDataset(train=False, mnist=False, svhn=True, mix=False, transforms=self.transforms)
        test.test_pipeline(test_dataset=test_dataset, computing_device=device)

    def all_pipelines(self):
        self.mnist_svhn_mix_pipeline()
        self.mnist_svhn_pipeline()
        self.mnist_pipeline()
        self.svhn_pipeline()


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

first_config = Pipelines(
    k_fold_splits=5,
    batch_size=4084,
    lr=0.001,
    epochs=20,
    transforms=[
        AdaptiveThreshold()
    ],
    device=device
)

first_config.all_pipelines()

second_config = Pipelines(
    k_fold_splits=5,
    batch_size=128,
    lr=0.001,
    epochs=20,
    transforms=[
        EqualizeHist()
    ],
    device=device)


cpu
Using downloaded and verified file: data/train_32x32.mat
--FOLD 1--

--EPOCH 1--


KeyboardInterrupt: 