* Custom Dataset & DataLoader
* Torchvision ImageFolder Dataset
* Residual Block
* CNN model with Residual Block
* Loss Fucntions (Center Loss and Triplet Loss)

## Imports

In [1]:
import os
import numpy as np
from PIL import Image

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# os.environ["CUDA_VISIBLE_DEVICES"] = "2"

## Custom DataSet with DataLoader

In [2]:
class ImageDataset(Dataset):
    def __init__(self, file_list, target_list):
        self.file_list = file_list
        self.target_list = target_list
        self.n_class = len(list(set(target_list)))

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, index):
        img = Image.open(self.file_list[index])
        img = torchvision.transforms.ToTensor()(img)
        label = self.target_list[index]
        return img, label

#### Парсим папку, чтобы собрать пути до всех изображений

In [3]:
def parse_data(datadir):
    img_list = []
    ID_list = []
    for root, directories, filenames in os.walk(datadir):
        for filename in filenames:
            if filename.endswith('.jpg') and not filename.startswith('._'):
                filei = os.path.join(root, filename)
                img_list.append(filei)
                ID_list.append(root.split('/')[-1])

    # construct a dictionary, where key and value correspond to ID and target
    uniqueID_list = list(set(ID_list))
    class_n = len(uniqueID_list)
    target_dict = dict(zip(uniqueID_list, range(class_n)))
    label_list = [target_dict[ID_key] for ID_key in ID_list]

    print('{}\t\t{}\n{}\t\t{}'.format('#Images', '#Labels', len(img_list), len(set(label_list))))
    return img_list, label_list, class_n

In [4]:
img_list, label_list, class_n = parse_data('medium_dev')

#Images		#Labels
10		5


In [5]:
trainset = ImageDataset(img_list, label_list)

In [6]:
train_data_item, train_data_label = trainset.__getitem__(0)

In [7]:
import matplotlib.pyplot as plt

In [8]:
print('data item shape: {}\t data item label: {}'.format(train_data_item.shape, train_data_label))

data item shape: torch.Size([3, 32, 32])	 data item label: 0


In [9]:
dataloader = DataLoader(trainset, batch_size=10, shuffle=True, num_workers=1, drop_last=False)

## Torchvision DataSet and DataLoader

In [10]:
imageFolder_dataset = torchvision.datasets.ImageFolder(root='medium_dev/', 
                                                       transform=torchvision.transforms.ToTensor())

In [11]:
imageFolder_dataloader = DataLoader(imageFolder_dataset, batch_size=10, shuffle=True, num_workers=1, drop_last=True)

In [12]:
imageFolder_dataset.__len__(), len(imageFolder_dataset.classes)

(10, 5)

## Residual Block

In [13]:
class ResBlock(nn.Module):
    def __init__(self, channel_size, stride=1):
        super(ResBlock, self).__init__()
        self.block = nn.Sequential(nn.Conv2d(in_channels=channel_size, out_channels=channel_size, 
                                             kernel_size=3, stride=stride, padding=1, bias=False), # batchnorm его уберет
                                   nn.BatchNorm2d(num_features=channel_size),
                                   nn.ReLU(inplace=True),
                                   nn.Conv2d(in_channels=channel_size, out_channels=channel_size, 
                                             kernel_size=3, stride=stride, padding=1, bias=False),
                                   nn.BatchNorm2d(num_features=channel_size))
        self.logit_non_linear = nn.ReLU(inplace=True)

    def forward(self, x):
        output = x
        output = self.block(output)
        output = self.logit_non_linear(output + x) # skip connection
        return output

## CNN Model with Residual Block 

In [14]:
class Network(nn.Module):
    def __init__(self, num_feats, hidden_sizes, num_classes, feat_dim=10):
        super(Network, self).__init__()
        
        self.hidden_sizes = [num_feats] + hidden_sizes + [num_classes]
        
        self.layers = []
        for idx, channel_size in enumerate(hidden_sizes):
            self.layers.append(nn.Conv2d(in_channels=self.hidden_sizes[idx], 
                                         out_channels=self.hidden_sizes[idx+1], 
                                         kernel_size=3, stride=2, bias=False))
            self.layers.append(nn.ReLU(inplace=True))
            self.layers.append(ResBlock(channel_size=channel_size))
            
        self.layers = nn.Sequential(*self.layers)
        self.linear_label = nn.Linear(self.hidden_sizes[-2], self.hidden_sizes[-1], bias=False)
        
        # For creating the embedding to be passed into the Center Loss criterion
        self.linear_closs = nn.Linear(self.hidden_sizes[-2], feat_dim, bias=False)
        self.relu_closs = nn.ReLU(inplace=True)
    
    def forward(self, x):
        output = x
        output = self.layers(output)
        
        # output = output.reshape(b, c,h*w)  # тогда линейный слой упадет


        # это более элегентное решение
        output = F.avg_pool2d(output, [output.size(2), output.size(3)], stride=1)
        output = output.reshape(output.shape[0], output.shape[1])
        
        label_output = self.linear_label(output) # b, 5
        label_output = label_output/torch.norm(self.linear_label.weight, dim=1)
        
        # Create the feature embedding for the Center Loss
        closs_output = self.linear_closs(output) #b, 10
        closs_output = self.relu_closs(closs_output)

        return closs_output, label_output

def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        torch.nn.init.xavier_normal_(m.weight.data)

### Training & Testing Model

In [15]:
def train(model, data_loader, test_loader):
    model.train()

    for epoch in range(numEpochs):
        avg_loss = 0.0
        for batch_num, (feats, labels) in enumerate(data_loader):
            feats, labels = feats.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(feats)[1]

            loss = criterion(outputs, labels.long())
            loss.backward()
            optimizer.step()
            
            avg_loss += loss.item()

            if batch_num % 50 == 49:
                print('Epoch: {}\tBatch: {}\tAvg-Loss: {:.4f}'.format(epoch+1, batch_num+1, avg_loss/50))
                avg_loss = 0.0    
            
            torch.cuda.empty_cache()
            del feats
            del labels
            del loss
        
        val_loss, val_acc = test_classify(model, test_loader)
        train_loss, train_acc = test_classify(model, data_loader)
        print('Train Loss: {:.4f}\tTrain Accuracy: {:.4f}\tVal Loss: {:.4f}\tVal Accuracy: {:.4f}'.
              format(train_loss, train_acc, val_loss, val_acc))


def test_classify(model, test_loader):
    model.eval()
    test_loss = []
    accuracy = 0
    total = 0

    for batch_num, (feats, labels) in enumerate(test_loader):
        feats, labels = feats.to(device), labels.to(device)
        outputs = model(feats)[1]
        
        _, pred_labels = torch.max(F.softmax(outputs, dim=1), 1)
        pred_labels = pred_labels.view(-1)
        
        loss = criterion(outputs, labels.long())
        
        accuracy += torch.sum(torch.eq(pred_labels, labels)).item()
        total += len(labels)
        test_loss.extend([loss.item()]*feats.size()[0])
        del feats
        del labels

    model.train()
    return np.mean(test_loss), accuracy/total

#### Dataset, DataLoader and Constant Declarations

In [16]:
train_dataset = torchvision.datasets.ImageFolder(root='medium/', 
                                                 transform=torchvision.transforms.ToTensor())
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=10, 
                                               shuffle=True, num_workers=8)

dev_dataset = torchvision.datasets.ImageFolder(root='medium_dev/', 
                                               transform=torchvision.transforms.ToTensor())
dev_dataloader = torch.utils.data.DataLoader(dev_dataset, batch_size=10, 
                                             shuffle=True, num_workers=8)

In [17]:
numEpochs = 4
num_feats = 3

learningRate = 1e-2
weightDecay = 5e-5

hidden_sizes = [32, 64]
num_classes = len(train_dataset.classes)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [18]:
network = Network(num_feats, hidden_sizes, num_classes)
network.apply(init_weights)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(network.parameters(), lr=learningRate, weight_decay=weightDecay, momentum=0.9)

In [19]:
network

Network(
  (layers): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
    (1): ReLU(inplace=True)
    (2): ResBlock(
      (block): Sequential(
        (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (logit_non_linear): ReLU(inplace=True)
    )
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), bias=False)
    (4): ReLU(inplace=True)
    (5): ResBlock(
      (block): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
   

In [20]:
img = torch.rand(1, 3, 128, 128)
y = network(img)


In [21]:
y

(tensor([[0.8010, 0.9066, 0.0000, 0.0000, 0.0000, 0.0161, 0.4866, 0.1145, 0.0000,
          0.3056]], grad_fn=<ReluBackward1>),
 tensor([[0.4706, 0.0357]], grad_fn=<DivBackward0>))

In [22]:
network.train()
network.to(device)
train(network, train_dataloader, dev_dataloader)

Epoch: 1	Batch: 50	Avg-Loss: 0.6238


IndexError: Target 4 is out of bounds.

## Center Loss
___
Код на основе https://github.com/KaiyangZhou/pytorch-center-loss

In [None]:
# ?torch.addmm

In [34]:
class CenterLoss(nn.Module):
    """
    Args:
        num_classes (int): number of classes.
        feat_dim (int): feature dimension.
    """
    def __init__(self, num_classes, feat_dim, device=torch.device('cpu')):
        super(CenterLoss, self).__init__()
        self.num_classes = num_classes
        self.feat_dim = feat_dim
        self.device = device
        
        self.centers = nn.Parameter(torch.randn(self.num_classes, self.feat_dim).to(self.device))

    def forward(self, x, labels):
        """
        Args:
            x: feature matrix with shape (batch_size, feat_dim).
            labels: ground truth labels with shape (batch_size).
        """
        # todo implement
        # x (batch, featdim)
        # self.centers: (nmclasses, feat_dim)
        # dist: (batch_size, mum classes)
        # loss = sum(x_i - centersyi)**2

        distx = (x ** 2).sum(dim=1, keepdim=True) # (b, 1)
        cit_ci = (self.centers ** 2).sum(dim=1)[:, None] # (num_classes, 1)
        xi_xj = x @ self.centers.t()
        print(xi_xj.shape, distx.shape, cit_ci.shape)
        dist = distx - 2 * xi_xj + cit_ci # (b, n)
        loss = 0

        center_labels = torch.arange(self.num_classes, dtype=torch.long, device=dist.device) # (n)

        mask = labels[:, None] == center_labels[:None, :]

        loss = (dist * mask).sum() / x.shape[0]

        
        return loss

In [35]:
center_loss = CenterLoss(3, 10)
x = torch.rand(2, 10)
labels = torch.arange(2, dtype=torch.long)

In [36]:
labels

tensor([0, 1])

In [37]:
center_loss(x, labels)

torch.Size([2, 3]) torch.Size([2, 1]) torch.Size([3, 1])


RuntimeError: The size of tensor a (2) must match the size of tensor b (3) at non-singleton dimension 0

In [None]:
def train_closs(model, data_loader, test_loader):
    model.train()

    for epoch in range(numEpochs):
        avg_loss = 0.0
        for batch_num, (feats, labels) in enumerate(data_loader):
            feats, labels = feats.to(device), labels.to(device)
            
            optimizer_label.zero_grad()
            optimizer_closs.zero_grad()
            
            feature, outputs = model(feats)

            l_loss = criterion_label(outputs, labels.long())
            c_loss = criterion_closs(feature, labels.long())
            loss = l_loss + closs_weight * c_loss
            
            loss.backward()
            
            optimizer_label.step()
            optimizer_closs.step()
            
            avg_loss += loss.item()

            if batch_num % 50 == 49:
                print('Epoch: {}\tBatch: {}\tAvg-Loss: {:.4f}'.format(epoch+1, batch_num+1, avg_loss/50))
                avg_loss = 0.0    
            
            torch.cuda.empty_cache()
            del feats
            del labels
            del loss
        
        val_loss, val_acc = test_classify_closs(model, test_loader)
        train_loss, train_acc = test_classify_closs(model, data_loader)
        print('Train Loss: {:.4f}\tTrain Accuracy: {:.4f}\tVal Loss: {:.4f}\tVal Accuracy: {:.4f}'.
              format(train_loss, train_acc, val_loss, val_acc))


def test_classify_closs(model, test_loader):
    model.eval()
    test_loss = []
    accuracy = 0
    total = 0

    for batch_num, (feats, labels) in enumerate(test_loader):
        feats, labels = feats.to(device), labels.to(device)
        feature, outputs = model(feats)
        
        _, pred_labels = torch.max(F.softmax(outputs, dim=1), 1)
        pred_labels = pred_labels.view(-1)
        
        l_loss = criterion_label(outputs, labels.long())
        c_loss = criterion_closs(feature, labels.long())
        loss = l_loss + closs_weight * c_loss
        
        accuracy += torch.sum(torch.eq(pred_labels, labels)).item()
        total += len(labels)
        test_loss.extend([loss.item()]*feats.size()[0])
        del feats
        del labels

    model.train()
    return np.mean(test_loss), accuracy/total

In [None]:
closs_weight = 0.2
lr_cent = 0.5
feat_dim = 10

network = Network(num_feats, hidden_sizes, num_classes, feat_dim)
network.apply(init_weights)

criterion_label = nn.CrossEntropyLoss()
criterion_closs = CenterLoss(num_classes, feat_dim, device)
optimizer_label = torch.optim.SGD(network.parameters(), lr=learningRate, weight_decay=weightDecay, momentum=0.9)
optimizer_closs = torch.optim.SGD(criterion_closs.parameters(), lr=lr_cent)

In [None]:
network.train()
network.to(device)
train_closs(network, train_dataloader, dev_dataloader)

## Triplet Loss
___
Необходимо сделать dataloader который возвращать триплет - tuple из трех картинок. Две из одного класса и одна из другого класса. Далее используем triplet loss.

Подробнее: https://github.com/adambielski/siamese-triplet/blob/master/losses.py

In [None]:
triplet_loss = nn.TripletMarginLoss(margin=1.0, p=2)
face_img1, label_img1 = trainset.__getitem__(0)
face_img2, label_img2 = trainset.__getitem__(1)
face_img3, label_img3 = trainset.__getitem__(-1)

print(label_img1, label_img2, label_img3)
## face_img1 and face_img2 are from the same class and face_img3 is from a different class.
loss = triplet_loss(face_img1, face_img2, face_img3)
print ("Loss={:0.2f}".format(loss))