In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from torchvision.datasets import MNIST
from torchvision import transforms

mean, std = 0.1307, 0.3081

mnist_train_dataset = MNIST('../data/MNIST', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((mean,), (std,))
                             ]))
mnist_test_dataset = MNIST('../data/MNIST', train=False, download=True,
                            transform=transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Normalize((mean,), (std,))
                            ]))
n_classes = 10

# mnist_train_dataset.train_labels.item()
# [boat, boat, ... , nature]

In [3]:
from __future__ import print_function, division
import torch
import torchvision
from torch.utils import data
import os
import pandas as pd
import numpy as np
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

from PIL import Image

test_data_path = "/notebooks/data/datasets/pipistrel/Hackathon/SingleFrame_ObjectProposalClassification/test"
train_data_path = "/notebooks/data/datasets/pipistrel/Hackathon/SingleFrame_ObjectProposalClassification/train"

train_labels = os.path.join(train_data_path, "/notebooks/userdata/teamE/TripletLoss/train_data_float.csv")
test_labels = os.path.join(test_data_path, "/notebooks/userdata/teamE/TripletLoss/test_data_float.csv")

n_classes = 2

def boats(root_path):
    return os.path.join(root_path, "boat")

def nature(root_path):
    return os.path.join(root_path, "nature")

def do_transform(image):
    img_size_scaled = 64
    transform = torchvision.transforms.Compose([
                            torchvision.transforms.RandomHorizontalFlip(),
                            torchvision.transforms.Resize(img_size_scaled),
                            torchvision.transforms.CenterCrop(img_size_scaled),
                            torchvision.transforms.ToTensor()
                            ])
    return transform(image)
        
    
    
class TripletPipi(data.Dataset):
    """
    Train: For each sample (anchor) randomly chooses a positive and negative samples
    Test: Creates fixed triplets for testing
    """

    def __init__(self, train=True):
        self.train = train
        self.extensions = ['png']

        if self.train:
            self.train_labels = torch.Tensor([1 for _ in range(1011)] + [0 for _ in range(10379)])
            self.train_data = [os.path.join(boats(train_data_path), file) \
                               for file in os.listdir(boats(train_data_path)) \
                               if file[-3:] in self.extensions] \
            + [os.path.join(nature(train_data_path), file) \
                                for file in os.listdir(nature(train_data_path)) \
                                if file[-3:] in self.extensions]
            self.labels_set = set(np.array(self.train_labels))
            self.label_to_indices = {label: np.where(np.array(self.train_labels) == label)[0]
                                     for label in self.labels_set}

        else:
            self.test_labels = torch.Tensor([1 for _ in range(482)] + [0 for _ in range(4132)])
            self.test_data = [os.path.join(boats(test_data_path), file) \
                                           for file in os.listdir(boats(test_data_path)) \
                                           if file[-3:] in self.extensions] \
                + [os.path.join(nature(test_data_path), file) \
                                for file in os.listdir(nature(test_data_path)) \
                                if file[-3:] in self.extensions]
            # generate fixed triplets for testing
            self.labels_set = set(np.array(self.test_labels))
            self.label_to_indices = {label: np.where(np.array(self.test_labels) == label)[0]
                                     for label in self.labels_set}

            random_state = np.random.RandomState(29)

            triplets = [[i,
                         random_state.choice(self.label_to_indices[self.test_labels[i].item()]),
                         random_state.choice(self.label_to_indices[
                                                 np.random.choice(
                                                     list(self.labels_set - set([self.test_labels[i].item()]))
                                                 )
                                             ])
                         ]
                        for i in range(len(self.test_data))]
            self.test_triplets = triplets
            
    def load_image(self, path):
        image = Image.open(path).convert('L')
        return do_transform(image)

    def __getitem__(self, index):
        if self.train:
            img1, label1 = self.train_data[index], self.train_labels[index].item()
            positive_index = index
            while positive_index == index:
                positive_index = np.random.choice(self.label_to_indices[label1])
            negative_label = np.random.choice(list(self.labels_set - set([label1])))
            negative_index = np.random.choice(self.label_to_indices[negative_label])
            img2 = self.train_data[positive_index]
            img3 = self.train_data[negative_index]
        else:
            img1 = self.test_data[self.test_triplets[index][0]]
            img2 = self.test_data[self.test_triplets[index][1]]
            img3 = self.test_data[self.test_triplets[index][2]]

        img1 = self.load_image(img1)
        img2 = self.load_image(img2)
        img3 = self.load_image(img3)
        return (img1, img2, img3), []

    def __len__(self):
        return len(self.train_labels) if self.train else len(self.test_labels)
    
train_dataset = TripletPipi(train=True)
test_dataset = TripletPipi(train=False)


In [4]:
import torch
from torch.optim import lr_scheduler
import torch.optim as optim
from torch.autograd import Variable

from trainer import fit
import numpy as np
cuda = torch.cuda.is_available()

print("using cuda: {}".format(cuda))

%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

mnist_classes = ['0', '1']
colors = ['#ff77b4', '#00ff0e']

def plot_embeddings(embeddings, targets, xlim=None, ylim=None):
    plt.figure(figsize=(10,10))
    for i in range(2):
        inds = np.where(targets==i)[0]
        plt.scatter(embeddings[inds,0], embeddings[inds,1], alpha=0.5, color=colors[i])
    if xlim:
        plt.xlim(xlim[0], xlim[1])
    if ylim:
        plt.ylim(ylim[0], ylim[1])
    plt.legend(mnist_classes)

def extract_embeddings(dataloader, model):
    with torch.no_grad():
        model.eval()
        embeddings = np.zeros((len(dataloader.dataset), 128))
        labels = np.zeros(len(dataloader.dataset))
        k = 0
        for images, target in dataloader:
            if cuda:
                images = images.cuda()
            embeddings[k:k+len(images)] = model.get_embedding(images).data.cpu().numpy()
            labels[k:k+len(images)] = target
            k += len(images)
    return embeddings, labels

using cuda: True


# Triplet network
We'll train a triplet network, that takes an anchor, positive (same class as anchor) and negative (different class than anchor) examples. The objective is to learn embeddings such that the anchor is closer to the positive example than it is to the negative example by some margin value.

![alt text](images/anchor_negative_positive.png "Source: FaceNet")
Source: [2] *Schroff, Florian, Dmitry Kalenichenko, and James Philbin. [Facenet: A unified embedding for face recognition and clustering.](https://arxiv.org/abs/1503.03832) CVPR 2015.*

**Triplet loss**:   $L_{triplet}(x_a, x_p, x_n) = max(0, m +  \lVert f(x_a)-f(x_p)\rVert_2^2 - \lVert f(x_a)-f(x_n)\rVert_2^2$\)

In [None]:
import torch.nn as nn


class EmbeddingNet(nn.Module):
    def __init__(self):
        super(EmbeddingNet, self).__init__()
        self.convnet = nn.Sequential(nn.Conv2d(1, 32, 5), nn.PReLU(),
                                     nn.MaxPool2d(2, stride=2),
                                     nn.Conv2d(32, 64, 5), nn.PReLU(),
                                     nn.Conv2d(64, 96, 5), nn.PReLU(),
                                     nn.MaxPool2d(2, stride=2),
                                     nn.Conv2d(96, 96, 3), nn.PReLU(),
                                     nn.MaxPool2d(2, stride=2))

        self.fc = nn.Sequential(nn.Linear(1536, 256),
                                nn.PReLU(),
                                nn.Linear(256, 256),
                                nn.PReLU(),
                                nn.Linear(256, 128)
                                )

    def forward(self, x):
        output = self.convnet(x)
        output = output.view(output.size()[0], -1)
        output = self.fc(output)
        return output

    def get_embedding(self, x):
        return self.forward(x)


In [None]:
# Set up data loaders
# from datasets import TripletMNIST

triplet_train_dataset = TripletPipi(train_dataset) # Returns triplets of images
triplet_test_dataset = TripletPipi(test_dataset)
batch_size = 256
kwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
triplet_train_loader = torch.utils.data.DataLoader(triplet_train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
triplet_test_loader = torch.utils.data.DataLoader(triplet_test_dataset, batch_size=batch_size, shuffle=False, **kwargs)

# Set up the network and training parameters
# from networks import EmbeddingNet, TripletNet
from networks import TripletNet
from losses import TripletLoss

margin = 1.
embedding_net = EmbeddingNet()
model = TripletNet(embedding_net)
if cuda:
    model.cuda()
loss_fn = TripletLoss(margin)
lr = 1e-3
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = lr_scheduler.StepLR(optimizer, 8, gamma=0.1, last_epoch=-1)
n_epochs = 20
log_interval = 100

In [None]:
fit(triplet_train_loader, triplet_test_loader, model, loss_fn, optimizer, scheduler, n_epochs, cuda, log_interval)

Epoch: 1/20. Train set: Average loss: 0.1006
Epoch: 1/20. Validation set: Average loss: 0.0400
Epoch: 2/20. Train set: Average loss: 0.0343
Epoch: 2/20. Validation set: Average loss: 0.0230
Epoch: 3/20. Train set: Average loss: 0.1188
Epoch: 3/20. Validation set: Average loss: 0.0374
Epoch: 4/20. Train set: Average loss: 0.0845
Epoch: 4/20. Validation set: Average loss: 0.0350
Epoch: 5/20. Train set: Average loss: 0.0315
Epoch: 5/20. Validation set: Average loss: 0.0232
Epoch: 6/20. Train set: Average loss: 0.0225
Epoch: 6/20. Validation set: Average loss: 0.0183
Epoch: 7/20. Train set: Average loss: 0.0297
Epoch: 7/20. Validation set: Average loss: 0.0197
Epoch: 8/20. Train set: Average loss: 0.0172
Epoch: 8/20. Validation set: Average loss: 0.0156
Epoch: 9/20. Train set: Average loss: 0.0159
Epoch: 9/20. Validation set: Average loss: 0.0162
Epoch: 10/20. Train set: Average loss: 0.0139
Epoch: 10/20. Validation set: Average loss: 0.0142
Epoch: 11/20. Train set: Average loss: 0.0148
Ep

In [None]:
class FlatPipistrelDataset(data.Dataset):

    def __init__(self, train=True):        
        self.extensions = ['png']
        data_path = train_data_path if train else test_data_path
        self.data = [os.path.join(boats(data_path), file) for file in os.listdir(boats(data_path)) \
                                                          if file[-3:] in self.extensions] \
                  + [os.path.join(nature(data_path), file) for file in os.listdir(nature(data_path)) \
                                                           if file[-3:] in self.extensions]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = self.data.__getitem__(idx)
        image = Image.open(img_name).convert('L')
        return do_transform(image), 1 if "boat" in img_name else 0
    
    @property
    def dataset(self):
        return list(self)

test_loader = torch.utils.data.DataLoader(FlatPipistrelDataset(False), batch_size=batch_size, shuffle=False, **kwargs)
val_embeddings_tl, val_labels_tl = extract_embeddings(test_loader, model)
plot_embeddings(val_embeddings_tl, val_labels_tl)

In [None]:
import pickle

model_dir = "/notebooks/userdata/teamE/TripletLoss/models"
model_index = "04"

out_model = os.path.join(model_dir, model_index + "-triplet-pipi.model")    
torch.save(model, out_model)

test_loader = torch.utils.data.DataLoader(FlatPipistrelDataset(False), batch_size=batch_size, shuffle=False, **kwargs)
train_loader = torch.utils.data.DataLoader(FlatPipistrelDataset(True), batch_size=batch_size, shuffle=False, **kwargs)

test_embeddings, _ = extract_embeddings(test_loader, model)
train_embeddings, _ = extract_embeddings(train_loader, model)

out_embeddings = os.path.join(model_dir, model_index + "-test-embeddings.pickle")
with open(out_embeddings, "wb") as f:
    pickle.dump(list(zip(test_embeddings, test_dataset.test_data)), f)

out_embeddings = os.path.join(model_dir, model_index + "-train-embeddings.pickle")
with open(out_embeddings, "wb") as f:
    pickle.dump(list(zip(train_embeddings, train_dataset.train_data)), f)