In [4]:
import cv2
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from PIL import Image

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

try:
    from torchinfo import summary
except ModuleNotFoundError:
    !pip install torchinfo
    from torchinfo import summary

import os
import pathlib
import shutil
import sys

Collecting torchinfo
  Downloading torchinfo-1.7.1-py3-none-any.whl (22 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.7.1
[0m

In [5]:
"""
Same dir structure as on Kaggle
input/
    lfw-dataset/
        csv files
        lfw-deepfunneled/
working/
    notebook
    data/
        train/
        val/
        test/
"""

'\nSame dir structure as on Kaggle\ninput/\n    lfw-dataset/\n        csv files\n        lfw-deepfunneled/\nworking/\n    notebook\n    data/\n        train/\n        val/\n        test/\n'

In [6]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(device)

# https://towardsdatascience.com/pytorch-switching-to-the-gpu-a7c0b21e8a99
# for modifications to use GPU

# Also this: https://github.com/pytorch/examples/blob/main/imagenet/main.py

np.random.seed(0)
torch.random.manual_seed(0)

cuda


<torch._C.Generator at 0x7faee6482090>

In [7]:
data_folder = '../input/lfw-dataset/'

In [8]:
lfw_allnames = pd.read_csv(data_folder+"lfw_allnames.csv")

image_paths = lfw_allnames.loc[lfw_allnames.index.repeat(lfw_allnames['images'])]
image_paths['image_path'] = 1 + image_paths.groupby('name').cumcount()
image_paths['image_path'] = image_paths.image_path.apply(lambda x: str(x).zfill(4))
image_paths['image_path'] = image_paths.name + "/" + image_paths.name + "_" + image_paths.image_path + ".jpg"
image_paths = image_paths.drop("images", axis=1)

In [9]:
num_ppl = 10

print(image_paths['name'].value_counts()[:num_ppl])
list_people = list(image_paths['name'].value_counts()[:num_ppl].keys())
list_num_images = list(image_paths['name'].value_counts()[:num_ppl])
print(list_people, list_num_images)

George_W_Bush        530
Colin_Powell         236
Tony_Blair           144
Donald_Rumsfeld      121
Gerhard_Schroeder    109
Ariel_Sharon          77
Hugo_Chavez           71
Junichiro_Koizumi     60
Jean_Chretien         55
John_Ashcroft         53
Name: name, dtype: int64
['George_W_Bush', 'Colin_Powell', 'Tony_Blair', 'Donald_Rumsfeld', 'Gerhard_Schroeder', 'Ariel_Sharon', 'Hugo_Chavez', 'Junichiro_Koizumi', 'Jean_Chretien', 'John_Ashcroft'] [530, 236, 144, 121, 109, 77, 71, 60, 55, 53]


In [10]:
"""
num_for_each = image_paths['name'].value_counts()[num_ppl-1]
tmp_l = []
for name in list(image_paths['name'].value_counts()[:num_ppl].keys()):
    tmp_l.append(image_paths[image_paths.name==name].sample(num_for_each))
data = pd.concat(tmp_l)
print(data)
"""

"\nnum_for_each = image_paths['name'].value_counts()[num_ppl-1]\ntmp_l = []\nfor name in list(image_paths['name'].value_counts()[:num_ppl].keys()):\n    tmp_l.append(image_paths[image_paths.name==name].sample(num_for_each))\ndata = pd.concat(tmp_l)\nprint(data)\n"

In [11]:
num_for_each = image_paths['name'].value_counts()[num_ppl-1]
tmp_train = []
tmp_val = []
tmp_test = []
for name in list(image_paths['name'].value_counts()[:num_ppl].keys()):
    data_all = image_paths[image_paths.name==name].sample(num_for_each)
    data_train, data_test = train_test_split(data_all, test_size=0.2)
    data_train, data_val = train_test_split(data_train, test_size=0.2)
    tmp_train.append(data_train.copy())
    tmp_val.append(data_val.copy())
    tmp_test.append(data_test.copy())
data_train = pd.concat(tmp_train)
data_val = pd.concat(tmp_val)
data_test = pd.concat(tmp_test)
print(data_train.shape, data_val.shape, data_test.shape)


(330, 2) (90, 2) (110, 2)


In [12]:
data_root = './data/'

data_list = [data_train, data_val, data_test]
dirs = ['train', 'val', 'test']

# """             # (un)comment this line (only) and run, to copy

# # remove data directory if it exists
if os.path.exists(data_root) and os.path.isdir(data_root):
    shutil.rmtree(data_root)

transform_augment = transforms.Compose([
    transforms.RandomHorizontalFlip(p=1)
])

for i in range(len(dirs)):
    pathlib.Path(os.path.join(data_root, dirs[i])).mkdir(parents=True, exist_ok=True)
    
    data = data_list[i]

    for person in list_people:
        if len(data_train[data_train['name']==person])>0:
            pathlib.Path(os.path.join(data_root, dirs[i], person)).mkdir(parents=True, exist_ok=True)

    for im_path in data_list[i].image_path:
        name = data[data['image_path']==im_path]['name'].iloc[0]
        path_from = os.path.join(data_folder+'/lfw-deepfunneled/lfw-deepfunneled/', im_path)
        filename, file_extension = os.path.splitext(path_from.split('/')[-1])
        path_to = os.path.join(data_root, dirs[i], name)

        if not os.path.isfile(os.path.join(path_to, im_path)):
            shutil.copy(path_from, path_to)         # earlier (just copies image)
            
            # if dirs[i]!='test':                   # test-time augmentation too?
            img = Image.open(path_from)
            img = transform_augment(img)            # transformed image
            img.save(path_to+'/'+filename+'_transformed'+file_extension)

# """

In [61]:
train_path = os.path.join(data_root, dirs[0])
val_path = os.path.join(data_root, dirs[1])
test_path = os.path.join(data_root, dirs[2])

train_transform = transforms.Compose(transforms=[
    # transforms.RandomHorizontalFlip(),
    # transforms.Grayscale(num_output_channels=1),         # convert to grayscale
    transforms.ToTensor(),
    # transforms.Normalize(mean=0, std=255),      # output = (input-mean)/std
])
test_transform = transforms.Compose(transforms=[
    # transforms.Grayscale(num_output_channels=1),         # convert to grayscale
    transforms.ToTensor(),
    # transforms.Normalize(mean=0, std=255)
])

dataloader_kwargs = {
    'pin_memory': True,
    'num_workers': 1,
    'batch_size': 1,
    'shuffle': True
}
dataloader_kwargs_triplet = {
    'pin_memory': True,
    'num_workers': 1,
    'batch_size': 4,
    'shuffle': True
}
non_blocking = dataloader_kwargs['pin_memory']  # https://stackoverflow.com/questions/55563376/

train_loader = DataLoader(
    torchvision.datasets.ImageFolder(train_path, train_transform), **dataloader_kwargs
)
train_loader_triplet = DataLoader(
    torchvision.datasets.ImageFolder(train_path, train_transform), **dataloader_kwargs_triplet
)
val_loader = DataLoader(
    torchvision.datasets.ImageFolder(val_path, test_transform), **dataloader_kwargs
)
test_loader = DataLoader(
    torchvision.datasets.ImageFolder(test_path, test_transform), **dataloader_kwargs
)

In [62]:
for data in train_loader:
    print(data[0].shape, data[1].shape)
    # print(data[0], data[1])
    print(torch.mean(data[0]))
    break
# Total train data is of shape (128, 3, 250, 250)

torch.Size([1, 3, 250, 250]) torch.Size([1])
tensor(0.4668)


In [63]:
class FaceCNN_initial(nn.Module):
    def __init__(self, num_input_channels, num_classes, stride=1, padding=1):
        super().__init__()

        self.network = nn.Sequential(

        nn.Conv2d(in_channels=num_input_channels, out_channels=50, kernel_size=3, stride=stride, padding=padding),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2),

        nn.Conv2d(in_channels=50, out_channels=20, kernel_size=3, stride=stride, padding=padding),
        nn.ReLU(),

        nn.Flatten(),
        nn.Linear(in_features=20*125*125, out_features=num_classes)

        )

    def forward(self, input):
        output = self.network(input)
        return output

In [64]:
class FaceCNN(nn.Module):
    def __init__(self, num_input_channels, num_classes, stride=1, padding=1):
        super().__init__()

        self.encoder = nn.Sequential(

            # (250, 250, 3)

            nn.Conv2d(in_channels=num_input_channels, out_channels=64, kernel_size=7, stride=2, padding=padding),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(p=0.2),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=padding),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(p=0.2),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=stride, padding=padding),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Dropout(p=0.2),

            nn.Conv2d(in_channels=256, out_channels=64, kernel_size=3, stride=stride, padding=padding),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            # nn.MaxPool2d(kernel_size=2),
            nn.Dropout(p=0.2),

            nn.Flatten(),
            nn.Linear(in_features=14400, out_features=1024),
            nn.ReLU(),
            nn.Dropout(p=0.5),    # https://stats.stackexchange.com/questions/240305/
            nn.Linear(in_features=1024, out_features=64),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(        
            nn.Dropout(p=0.5),
            nn.Linear(in_features=64, out_features=num_classes),
        )

    def forward(self, input):
        encoded = self.encoder(input)
        output = self.decoder(encoded)
        return output, encoded

In [95]:
def triplet_loss_fn(f, Y, num_classes, alpha=1, lam=0.01):
    # f is num_samples x output_dim_of_encoder (=64)
    # Y is categorical of size num_classes
    loss = 0
    for c in range(num_classes):
        p = f[Y==c]
        n = f[~(Y==c)]
        p_self = torch.sum((p[:,None,:]-p[None,:,:])**2, dim=2)
        p_n = torch.sum((p[:,None,:]-n[None,:,:])**2, dim=2)
        loss += torch.sum(torch.relu(p_self[:,:,None]-p_n[:,None,:]+alpha))
    return loss*lam

In [108]:
num_input_channels = 3
model = FaceCNN(num_input_channels=num_input_channels, num_classes=len(list_people)).to(device)
print(summary(model, input_size=(dataloader_kwargs['batch_size'], num_input_channels, 250, 250)))

optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, weight_decay=1e-3)
loss_fn = nn.CrossEntropyLoss()
num_epochs = 30

Layer (type:depth-idx)                   Output Shape              Param #
FaceCNN                                  [1, 10]                   --
├─Sequential: 1-1                        [1, 64]                   --
│    └─Conv2d: 2-1                       [1, 64, 123, 123]         9,472
│    └─BatchNorm2d: 2-2                  [1, 64, 123, 123]         128
│    └─ReLU: 2-3                         [1, 64, 123, 123]         --
│    └─MaxPool2d: 2-4                    [1, 64, 61, 61]           --
│    └─Dropout: 2-5                      [1, 64, 61, 61]           --
│    └─Conv2d: 2-6                       [1, 128, 61, 61]          73,856
│    └─BatchNorm2d: 2-7                  [1, 128, 61, 61]          256
│    └─ReLU: 2-8                         [1, 128, 61, 61]          --
│    └─MaxPool2d: 2-9                    [1, 128, 30, 30]          --
│    └─Dropout: 2-10                     [1, 128, 30, 30]          --
│    └─Conv2d: 2-11                      [1, 256, 30, 30]          295,168
│

In [109]:
def evaluate(loader, model):

    model.eval()

    score = 0
    cnt = 0

    with torch.no_grad():       # not training, so no need to calculate gradients
        for inputs, labels in loader:
            inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
            output, _ = model(inputs)
            _, pred = torch.max(output.data, 1)
            score += float(torch.sum(pred==labels.data))
            cnt += data[0].shape[0]

    return score/cnt

In [110]:
def train():
    best_acc = 0.0
    
    for epoch in range(num_epochs):
        train_score = 0
        cnt = 0
        CE_loss = 0
        triplet_loss = 0

        model.train()
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)

            optimizer.zero_grad()
            
            outputs,_ = model(inputs)
            
            # print(outputs, labels)

            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            
            CE_loss += loss.item()

            _, preds = torch.max(outputs.data, 1)
            train_score += float(torch.sum(preds==labels.data))
            cnt += inputs.shape[0]

            # print(preds, labels)

        for inputs, labels in train_loader_triplet:
            inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
            optimizer.zero_grad()
            outputs, encoded = model(inputs)
            loss = triplet_loss_fn(encoded, labels, len(list_people))
            loss.backward()
            optimizer.step()
            
            triplet_loss += loss.item()

        train_acc = train_score/cnt
        val_acc = evaluate(val_loader, model)
        
        print("Epoch:", epoch, "\tLoss:", CE_loss, triplet_loss, "\tTraining Acc:", train_acc, "\tVal Acc:", val_acc)

        if val_acc > best_acc:
            torch.save(model.state_dict(),'best.model')
            best_acc = val_acc

In [111]:
train()

Epoch: 0 	Loss: 1523.5315465927124 3.576771288542659 	Training Acc: 0.09696969696969697 	Val Acc: 0.13333333333333333
Epoch: 1 	Loss: 1480.5981893539429 7.036555926315486 	Training Acc: 0.16363636363636364 	Val Acc: 0.2111111111111111
Epoch: 2 	Loss: 1388.423449397087 8.345587398856878 	Training Acc: 0.20454545454545456 	Val Acc: 0.3055555555555556
Epoch: 3 	Loss: 1273.9857878684998 7.455293735489249 	Training Acc: 0.28484848484848485 	Val Acc: 0.29444444444444445
Epoch: 4 	Loss: 1154.534344483167 10.415365964407101 	Training Acc: 0.34393939393939393 	Val Acc: 0.25555555555555554
Epoch: 5 	Loss: 1047.3524520546198 13.149000030942261 	Training Acc: 0.4090909090909091 	Val Acc: 0.48333333333333334
Epoch: 6 	Loss: 957.3268567174673 10.303598511964083 	Training Acc: 0.47575757575757577 	Val Acc: 0.4388888888888889
Epoch: 7 	Loss: 870.0268783867359 10.868931866018102 	Training Acc: 0.503030303030303 	Val Acc: 0.4722222222222222
Epoch: 8 	Loss: 784.0191351852845 9.920111494488083 	Training A

In [112]:
model.load_state_dict(torch.load('best.model'))
model.eval()
score = 0
cnt = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
        output, encoded = model(inputs)
        _, pred = torch.max(output.data, 1)
        score += float(torch.sum(pred==labels.data))
        cnt += data[0].shape[0]

print(score/cnt)

0.8909090909090909


In [19]:
model.load_state_dict(torch.load('best.model'))
model.eval()

arr_encoder = torch.zeros((2*data_test.shape[0],64))
arr_labels = torch.zeros((2*data_test.shape[0]))
idx = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
        output, encoded = model(inputs)
        arr_encoder[idx] = encoded
        arr_labels[idx] = labels
        idx += 1

In [None]:
for c in range(num_ppl):
    p = arr_encoder[arr_labels==c]
    n = arr_encoder[~(arr_labels==c)]
    p_self = torch.mean(torch.sum((p[:,None,:]-p[None,:,:])**2, dim=2))
    p_n = torch.mean(torch.sum((p[:,None,:]-n[None,:,:])**2, dim=2))
    print(f"class : {c} closeness ratio : {p_self/p_n}")

## Adversarial attack

In [15]:
import math
import matplotlib.pyplot as plt

In [16]:
class Generator(nn.Module):
    def __init__(self, input_shape):
        super(Generator, self).__init__()
        self.noise = nn.Parameter(torch.randn(size = input_shape))
        # We attempt a simple strategy of adding constant noise to each image

    def forward(self, input):
        out = input + self.noise
        return out

In [None]:
model.load_state_dict(torch.load('best.model'))

In [43]:
generator = Generator([1,3,250,250])
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(generator.parameters(), lr=1e-3, weight_decay=1e-3)

### l2 regularized

In [19]:
def regularizer(image1, image2, lam = 1):
    return torch.mean((image1-image2)**2)*lam

In [None]:
num_epochs = 6

In [None]:
for epoch in range(num_epochs):
    train_score = 0
    cnt = 0
    train_loss = 0
    generator.train()
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
        optimizer.zero_grad()
        gen = generator(inputs)
        outputs,_ = model(gen)
        
        # print(outputs, labels)
        loss = -loss_fn(outputs, labels) + regularizer(inputs, gen, lam=2*1e4)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, preds = torch.max(outputs.data, 1)
        train_score += float(torch.sum(preds==labels.data))
        cnt += inputs.shape[0]
        
    train_acc = train_score/cnt
    score = 0
    cnt = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
            output, encoded = model(generator(inputs))
            _, pred = torch.max(output.data, 1)
            score += float(torch.sum(pred==labels.data))
            cnt += data[0].shape[0]

    val_acc = (score/cnt)
    print("Epoch:", epoch, "\tLoss:", train_loss, "\tTraining Acc:", train_acc, "\tValidation Acc:", val_acc)
    with torch.no_grad():
        for inputs, labels in val_loader:
            plt.figure()
            f, axarr = plt.subplots(1,2)
            axarr[0].imshow(inputs[0].permute(1, 2, 0))
            axarr[1].imshow(generator(inputs)[0].permute(1, 2, 0).detach().numpy())
            plt.show(block=True)
            break


In [210]:
torch.save(generator.state_dict(),'generator2')

In [None]:
score = 0
cnt = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
        output, encoded = model(generator(inputs))
        _, pred = torch.max(output.data, 1)
        score += float(torch.sum(pred==labels.data))
        cnt += data[0].shape[0]

print("test acc:", score/cnt)
for inputs, labels in test_loader:
    plt.figure()
    f, axarr = plt.subplots(1,2)
    axarr[0].imshow(inputs[0].permute(1, 2, 0))
    axarr[1].imshow(generator(inputs)[0].permute(1, 2, 0).detach().numpy())
    break

### l1 regularized

In [44]:
def regularizer(image1, image2, lam = 1):
    return torch.mean(torch.abs(image1-image2))*lam

In [46]:
num_epochs = 6

In [None]:
for epoch in range(num_epochs):
    train_score = 0
    cnt = 0
    train_loss = 0
    generator.train()
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
        optimizer.zero_grad()
        gen = generator(inputs)
        outputs,_ = model(gen)
        
        # print(outputs, labels)
        loss = -loss_fn(outputs, labels) + regularizer(inputs, gen, lam=1e2)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, preds = torch.max(outputs.data, 1)
        train_score += float(torch.sum(preds==labels.data))
        cnt += inputs.shape[0]
        
    train_acc = train_score/cnt
    score = 0
    cnt = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
            output, encoded = model(generator(inputs))
            _, pred = torch.max(output.data, 1)
            score += float(torch.sum(pred==labels.data))
            cnt += data[0].shape[0]

    val_acc = (score/cnt)
    print("Epoch:", epoch, "\tLoss:", train_loss, "\tTraining Acc:", train_acc, "\tValidation Acc:", val_acc)
    with torch.no_grad():
        for inputs, labels in val_loader:
            plt.figure()
            f, axarr = plt.subplots(1,2)
            axarr[0].imshow(inputs[0].permute(1, 2, 0))
            axarr[1].imshow(generator(inputs)[0].permute(1, 2, 0).detach().numpy())
            plt.show(block=True)
            break

In [48]:
torch.save(generator.state_dict(),'generator1')

In [None]:
score = 0
cnt = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device, non_blocking=non_blocking), labels.to(device, non_blocking=non_blocking)
        output, encoded = model(generator(inputs))
        _, pred = torch.max(output.data, 1)
        score += float(torch.sum(pred==labels.data))
        cnt += data[0].shape[0]

print("test acc:", score/cnt)
for inputs, labels in test_loader:
    plt.figure()
    f, axarr = plt.subplots(1,2)
    axarr[0].imshow(inputs[0].permute(1, 2, 0))
    axarr[1].imshow(generator(inputs)[0].permute(1, 2, 0).detach().numpy())
    break

### Observations
l2 regularized attacks seem to fair better than l1 regularized at visual similarity i.e. need to add "less" noise for similar reduction in model accuracy