In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Running code @ {device}')

!pip3 install facenet_pytorch
from facenet_pytorch import MTCNN, InceptionResnetV1

from sklearn.datasets import fetch_lfw_people
from sklearn.model_selection import train_test_split

Running code @ cuda


In [2]:
def get_dataset():
    return fetch_lfw_people(color=True)


def get_small_dataset(og_ds, num_classes=5):
    uniq, elem = np.unique(og_ds['target'], return_counts=True)
    most_common = dict(sorted(zip(uniq, elem), key=lambda p: p[1], reverse=True)[:num_classes]).keys()

    datas = []
    images = []
    targets = []
    for i in og_ds['target']:
        if i in most_common:
            datas.append(og_ds['data'][i])
            images.append(og_ds['images'][i])
            targets.append(og_ds['target'][i])

    return {
        'data': np.array(datas),
        'images': np.array(images),
        'target': np.array(targets),
        'target_names': np.array([og_ds['target_names'][tgt] for tgt in set(most_common)])
    }

# _ = get_og_dataset()

In [3]:
class FacesDataSet(Dataset):
    def __init__(self, X, y, transform=None):
        self.X = X
        self.y = y
        self.trans = transforms.Compose([transforms.ToTensor()])


    def __len__(self):
        return len(self.y)


    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
  
        return self.trans(self.X[idx]), self.y[idx]

In [4]:
BATCH_SIZE_TRAIN = 100
BATCH_SIZE_TEST = 100
NO_WORKERS = 8
SHUFFLE_DATA = True


DS = get_dataset()
SMALL_DS = get_small_dataset(DS)

X_train, X_test, y_train, y_test = train_test_split(SMALL_DS['images'],
    SMALL_DS['target'], test_size=0.2, random_state=42)

train_set = FacesDataSet(X_train, y_train)
test_set = FacesDataSet(X_test, y_test)

# print(f'Size of training images {X_train.shape}')
# print(f'Size of training labels {y_train.shape}')
# print(f'Size of test images {X_test.shape}')
# print(f'Size of test labels {y_test.shape}')

test_loader = torch.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE_TRAIN, 
    shuffle=SHUFFLE_DATA, num_workers=NO_WORKERS)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=BATCH_SIZE_TRAIN, 
    shuffle=SHUFFLE_DATA, num_workers=NO_WORKERS)

batch_train_images, batch_train_labels = next(iter(train_loader))
# print(batch_train_images)
# print(batch_train_labels)


def loopy_test_loader(dl):
    data_iter = iter(dl)
    
    while True:
        try:
            yield next(data_iter)
        except StopIteration:
            data_iter = iter(dl)
            yield next(data_iter)

  cpuset_checked))


In [15]:
class MLP(nn.Module):
    def __init__(self, width, height, n_chan, n_cls):
        super(MLP, self).__init__()

        self.fc1 = nn.Linear(n_chan * width * height, 128)
        self.relu1 = nn.ReLU()

        self.fc2 = nn.Linear(128, 64)
        self.relu2 = nn.ReLU()

        self.fc3 = nn.Linear(64, n_cls)
        self.sigmoid = nn.Sigmoid()


    def forward(self, x):
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.relu1(x)

        x = self.fc2(x)
        x = self.relu2(x)

        x = self.fc3(x)
        x = self.sigmoid(x)

        return x


_, chan, w, h = batch_train_images.size()
conv_net = ConvNet(w, h, chan, 5)
conv_net = conv_net.to(device)

print(list(conv_net.modules())[0])


select = 2
inputs = batch_train_images.to(device)[:select]
target = batch_train_labels[:select]

output = conv_net(inputs)
_, predicted = torch.max(output, 1)

print(output)
print(predicted)
print(target)

ConvNet(
  (conv1): Conv2d(3, 3, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(3, 3, kernel_size=(5, 5), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout1): Dropout(p=0.25, inplace=False)
  (conv3): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1))
  (conv4): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout2): Dropout(p=0.25, inplace=False)
  (dense1): Linear(in_features=231, out_features=231, bias=True)
  (dropout3): Dropout(p=0.4, inplace=False)
  (dense2): Linear(in_features=231, out_features=5, bias=True)
  (dropout4): Dropout(p=0.4, inplace=False)
)
tensor([[ 18.3619,   0.0000,  -8.7412,   0.0000,  -0.9017],
        [  1.8342,   7.3835, -16.2854,   0.0000,   3.5989]], device='cuda:0',
       grad_fn=<FusedDropoutBackward>)
tensor([0, 1], device='cuda:0')
tensor([1871, 1871])


In [9]:
class ConvNet(nn.Module):
    def __init__(self, width, height, n_chan, n_cls):
        super(ConvNet, self).__init__()

        self.conv1 = nn.Conv2d(n_chan, n_chan, kernel_size=5, stride=1)
        self.conv2 = nn.Conv2d(n_chan, n_chan, kernel_size=5, stride=1)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout1 = nn.Dropout(p=0.25)

        self.conv3 = nn.Conv2d(n_chan, n_chan, kernel_size=3, stride=1)
        self.conv4 = nn.Conv2d(n_chan, n_chan, kernel_size=3, stride=1)
        self.pool2 =  nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout2 = nn.Dropout(p=0.25)

        w_conv1 = (width - 5) + 1
        h_conv1 = (height - 5) + 1
        w_conv2 = (w_conv1 - 5) + 1
        h_conv2 = (h_conv1 - 5) + 1
        w_pool1 = (w_conv2 - 2) // 2 + 1
        h_pool1 = (h_conv2 - 2) // 2 + 1

        w_conv3 = (w_pool1 - 3) + 1
        h_conv3 = (h_pool1 - 3) + 1
        w_conv4 = (w_conv3 - 3) + 1
        h_conv4 = (h_conv3 - 3) + 1
        w_pool2 = (w_conv4 - 2) // 2 + 1
        h_pool2 = (h_conv4 - 2) // 2 + 1

        in_dense = n_chan * w_pool2 * h_pool2

        self.dense1 = nn.Linear(in_dense, in_dense)
        self.dropout3 = nn.Dropout(p=0.4)
        self.dense2 = nn.Linear(in_dense, out_features=n_cls)
        self.dropout4 = nn.Dropout(p=0.4)


    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.pool1(x)
        x = self.dropout1(x)

        x = self.conv3(x)
        x = self.conv4(x)
        x = self.pool2(x)
        x = self.dropout2(x)

        x = torch.flatten(x, 1)
        x = self.dense1(x)
        x = self.dropout3(x)

        x = torch.flatten(x, 1)
        x = self.dense2(x)
        x = self.dropout4(x)

        return x

_, chan, w, h = batch_train_images.size()
conv_net = ConvNet(w, h, chan, 5)
conv_net = conv_net.to(device)

print(list(conv_net.modules())[0])


select = 2
inputs = batch_train_images.to(device)[:select]
target = batch_train_labels[:select]

output = conv_net(inputs)
_, predicted = torch.max(output, 1)

print(output)
print(predicted)
print(target)

ConvNet(
  (conv1): Conv2d(3, 3, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(3, 3, kernel_size=(5, 5), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout1): Dropout(p=0.25, inplace=False)
  (conv3): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1))
  (conv4): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout2): Dropout(p=0.25, inplace=False)
  (dense1): Linear(in_features=231, out_features=231, bias=True)
  (dropout3): Dropout(p=0.4, inplace=False)
  (dense2): Linear(in_features=231, out_features=5, bias=True)
  (dropout4): Dropout(p=0.4, inplace=False)
)
tensor([[ -6.7763,   0.0000, -32.9813, -11.6701,   0.0000],
        [  0.0000,   3.7120,   0.0000,  -5.9164,   0.0000]], device='cuda:0',
       grad_fn=<FusedDropoutBackward>)
tensor([1, 1], device='cuda:0')
tensor([1871, 1871])


In [7]:

def get_ds_facenet_vectors(ds, net, mtcnn):
    return [net(mtcnn(img).unsqueeze(0)) for img in ds['images']]

# vectors = get_ds_facenet_vectors(small_ds,
#     InceptionResnetV1(pretrained='vggface2').eval(), MTCNN(margin=32))

In [8]:
EPOCHS = 50
CUDA_LAUNCH_BLOCKING=1

lr_init = 0.01
lr_factor = 0.1
weight_decay_factor = 1e-4
lr_schedule_milestones = [90e3, 100e3, 110e3]


def test_alg(net):
    train_iter = 0
    losses = []
    steps = []

    criterion = nn.CrossEntropyLoss()

    #optimizer = torch.optim.SGD(net.parameters(), lr=lr_init, momentum=0.9, weight_decay=weight_decay_factor)
    optimizer = torch.optim.Adam(net.parameters(), lr=0.01)

    #optimizer = torch.optim.SGD(net.parameters(), lr=lr_init, momentum=0.9, weight_decay=weight_decay_factor)
    optimizer = torch.optim.Adam(net.parameters(), lr=0.01, weight_decay=weight_decay_factor)
            
    lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=lr_schedule_milestones, gamma=lr_factor)

    # simulate an inifinte test data provider by looping over the test data
    test_data_provider = loopy_test_loader(test_loader)

    # set model in train mode
    net.train()

    running_loss = 0.0
    running_acc = 0.0
    ct = 0

    for epoch in range(int(EPOCHS)):  # loop over the dataset multiple times
        
        for i, data in enumerate(train_loader, 0):
            # set the learning rate and decay according to iteration schedule
            lr_scheduler.step()
            
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data[0].to(device), data[1].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            running_acc += top_k_accuracy(1, labels, outputs)
            
            if train_iter % REPORT_TRAIN_EVERY == REPORT_TRAIN_EVERY - 1:    # print every REPORT_TRAIN_EVERY mini-batch iterations
                train_loss = running_loss / REPORT_TRAIN_EVERY
                train_acc = running_acc / REPORT_TRAIN_EVERY
                
                print('[%d, %5d, %6d] LR: %.5f' % (epoch + 1, i + 1, train_iter, lr_scheduler.get_lr()[-1]))
                print('[%d, %5d] loss: %.5f, acc: %.5f' %
                    (epoch + 1, i + 1, train_loss, train_acc))
                
                losses.append(train_loss)
                steps.append(train_iter)
                
                running_loss = 0
                train_loss = 0
                running_acc = 0
                train_acc = 0
                
                
            if train_iter % PLOT_EVERY == 0:
                plot_losses(losses, steps, train_iter)
                
            train_iter += 1
        
            if train_iter % REPORT_TEST_EVERY == 0:
                # set model in test mode
                net.eval()
                
                with torch.no_grad():
                    # evaluate over at most TEST_ITER sub samples from the test_loader
                    test_iter = 0
                    test_loss = 0
                    correct = 0
                    
                    while test_iter < TEST_ITERS:
                    #for j, test_data in enumerate(test_loader, start=test_ct):
                        test_data = next(test_data_provider)
                            
                        # get the test inputs; data is a list of [inputs, labels]
                        test_inputs, test_labels = test_data[0].to(device), test_data[1].to(device)
                        
                        out = net(test_inputs)
                        test_loss += criterion(out, test_labels)
                        
                        correct += top_k_accuracy(1, test_labels, out)
                        
                        test_iter += 1
                            
                    avg_test_loss = test_loss / TEST_ITERS
                    avg_acc = correct / TEST_ITERS
                    
                    print('[%d, %5d] avg_test_loss: %.5f, avg_test_acc: %.2f' 
                        % (epoch + 1, i + 1, avg_test_loss, avg_acc))
                    
                # set model back in train mode
                net.train()
        
    print('Finished Training')


# test_alg(conv_net)