In [1]:
!pip install gdown 
!gdown --id 1m9z_m05-tWU_IceHYps2B9royx3GLrKo

Collecting gdown
  Downloading gdown-4.2.0.tar.gz (13 kB)
  Installing build dependencies ... [?25l- \ | / - done
[?25h  Getting requirements to build wheel ... [?25l- done
[?25h  Preparing metadata (pyproject.toml) ... [?25l- done
Building wheels for collected packages: gdown
  Building wheel for gdown (pyproject.toml) ... [?25l- done
[?25h  Created wheel for gdown: filename=gdown-4.2.0-py3-none-any.whl size=14262 sha256=14ef3878e688d899a1b66d5975b9e2af9d70c334ee9a70aeb34e8d907be19e1e
  Stored in directory: /root/.cache/pip/wheels/8c/17/ff/58721d1fabdb87c21a0529948cf39e2be9af90ddbe4ad65944
Successfully built gdown
Installing collected packages: gdown
Successfully installed gdown-4.2.0
Downloading...
From: https://drive.google.com/uc?id=1m9z_m05-tWU_IceHYps2B9royx3GLrKo
To: /kaggle/working/Face_Images.zip
100%|███████████████████████████████████████| 7.06M/7.06M [00:00<00:00, 128MB/s]


In [2]:
# import zipfile

# z= zipfile.ZipFile('/Face_Images.zip')
# z.extractall()
!unzip Face_Images.zip

Archive:  Face_Images.zip
  inflating: annotations_test.csv    
  inflating: annotations_train.csv   
  inflating: __MACOSX/._annotations_train.csv  
   creating: test/
  inflating: test/closed_eye_0160.jpg_face_1.jpg  
  inflating: __MACOSX/test/._closed_eye_0160.jpg_face_1.jpg  
  inflating: test/Boris_Becker_0001.jpg  
  inflating: __MACOSX/test/._Boris_Becker_0001.jpg  
  inflating: test/closed_eye_0231.jpg_face_3.jpg  
  inflating: __MACOSX/test/._closed_eye_0231.jpg_face_3.jpg  
  inflating: test/Ali_Ahmeti_0001.jpg  
  inflating: __MACOSX/test/._Ali_Ahmeti_0001.jpg  
  inflating: test/Bill_Fennelly_0001.jpg  
  inflating: __MACOSX/test/._Bill_Fennelly_0001.jpg  
  inflating: test/Charlotte_Church_0001.jpg  
  inflating: __MACOSX/test/._Charlotte_Church_0001.jpg  
  inflating: test/Benjamin_Bratt_0001.jpg  
  inflating: __MACOSX/test/._Benjamin_Bratt_0001.jpg  
  inflating: test/closed_eye_0573.jpg_face_2.jpg  
  inflating: __MACOSX/test/._closed_eye_0573.jpg_

In [3]:
import os
import pandas as pd
from torchvision.io import read_image
from torch.utils.data import Dataset

class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [4]:
# import os
# # train_set = CustomImageDataset('/kaggle/working/annotations_train.csv','/kaggle/working/train', transform=transform_train, target_transform=None)
# from torchvision.io import read_image
# for i in os.listdir('/kaggle/working/train'):
#     print(i)
#     img = read_image(os.path.join('/kaggle/working/train',i))


In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.utils as utils
from sklearn.metrics import accuracy_score
import cv2
import math


def train_epoch(model, criterion, optimizer, dataloader, device, epoch, log_interval):
    model.train()
    losses = []
    all_label = []
    all_pred = []

    for batch_idx, (inputs, labels) in enumerate(dataloader):
        # get the inputs and labels
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        # forward
        outputs = model(inputs)
        if isinstance(outputs, list):
            outputs = outputs[0]

        # compute the loss
        loss = criterion(outputs, labels.squeeze())
        losses.append(loss.item())

        # compute the accuracy
        prediction = torch.max(outputs, 1)[1]
        all_label.extend(labels.squeeze())
        all_pred.extend(prediction)
        score = accuracy_score(labels.squeeze().cpu().data.squeeze().numpy(), prediction.cpu().data.squeeze().numpy())

        # backward & optimize
        loss.backward()
        optimizer.step()

        if (batch_idx + 1) % log_interval == 0:
            print("epoch {:3d} | iteration {:5d} | Loss {:.6f} | Acc {:.2f}%".format(epoch+1, batch_idx+1, loss.item(), score*100))

    # Compute the average loss & accuracy
    training_loss = sum(losses)/len(losses)
    all_label = torch.stack(all_label, dim=0)
    all_pred = torch.stack(all_pred, dim=0)
    training_acc = accuracy_score(all_label.squeeze().cpu().data.squeeze().numpy(), all_pred.cpu().data.squeeze().numpy())
    # Log
#     writer.add_scalars('Loss', {'train': training_loss}, epoch+1)
#     writer.add_scalars('Accuracy', {'train': training_acc}, epoch+1)
    print("Average Training Loss of Epoch {}: {:.6f} | Acc: {:.2f}%".format(epoch+1, training_loss, training_acc*100))


def val_epoch(model, criterion, dataloader, device, epoch):
    model.eval()
    losses = []
    all_label = []
    all_pred = []

    with torch.no_grad():
        for batch_idx, (inputs, labels) in enumerate(dataloader):
            # get the inputs and labels
            inputs, labels = inputs.to(device), labels.to(device)
            # forward
            outputs = model(inputs)
            if isinstance(outputs, list):
                outputs = outputs[0]
            # compute the loss
            loss = criterion(outputs, labels.squeeze())
            losses.append(loss.item())
            # collect labels & prediction
            prediction = torch.max(outputs, 1)[1]
            all_label.extend(labels.squeeze())
            all_pred.extend(prediction)

    # Compute the average loss & accuracy
    val_loss = sum(losses)/len(losses)
    all_label = torch.stack(all_label, dim=0)
    all_pred = torch.stack(all_pred, dim=0)
    val_acc = accuracy_score(all_label.squeeze().cpu().data.squeeze().numpy(), all_pred.cpu().data.squeeze().numpy())
    # Log
#     writer.add_scalars('Loss', {'val': val_loss}, epoch+1)
#     writer.add_scalars('Accuracy', {'val': val_acc}, epoch+1)
    print("Average Validation Loss: {:.6f} | Acc: {:.2f}%".format(val_loss, val_acc*100))


def visualize_attn(I, c):
    # Image
    img = I.permute((1,2,0)).cpu().numpy()
    # Heatmap
    N, C, H, W = c.size()
    a = F.softmax(c.view(N,C,-1), dim=2).view(N,C,H,W)
    up_factor = 32/H
    # print(up_factor, I.size(), c.size())
    if up_factor > 1:
        a = F.interpolate(a, scale_factor=up_factor, mode='bilinear', align_corners=False)
    attn = utils.make_grid(a, nrow=4, normalize=True, scale_each=True)
    attn = attn.permute((1,2,0)).mul(255).byte().cpu().numpy()
    attn = cv2.applyColorMap(attn, cv2.COLORMAP_JET)
    attn = cv2.cvtColor(attn, cv2.COLOR_BGR2RGB)
    # Add the heatmap to the image
    vis = 0.6 * img + 0.4 * attn
    return vis
#     return torch.from_numpy(vis).permute(2,0,1)
#     return vis.cpu().detach().numpy()

In [6]:
"""
Attention blocks
Reference: Learn To Pay Attention
"""
class ProjectorBlock(nn.Module):
    def __init__(self, in_features, out_features):
        super(ProjectorBlock, self).__init__()
        self.op = nn.Conv2d(in_channels=in_features, out_channels=out_features,
            kernel_size=1, padding=0, bias=False)

    def forward(self, x):
        return self.op(x)


class SpatialAttn(nn.Module):
    def __init__(self, in_features, normalize_attn=True):
        super(SpatialAttn, self).__init__()
        self.normalize_attn = normalize_attn
        self.op = nn.Conv2d(in_channels=in_features, out_channels=1,
            kernel_size=1, padding=0, bias=False)

    def forward(self, l, g):
        N, C, H, W = l.size()
        c = self.op(l+g) # (batch_size,1,H,W)
        if self.normalize_attn:
            a = F.softmax(c.view(N,1,-1), dim=2).view(N,1,H,W)
        else:
            a = torch.sigmoid(c)
        g = torch.mul(a.expand_as(l), l)
        if self.normalize_attn:
            g = g.view(N,C,-1).sum(dim=2) # (batch_size,C)
        else:
            g = F.adaptive_avg_pool2d(g, (1,1)).view(N,C)
        return c.view(N,1,H,W), g

In [7]:
"""
VGG-16 with attention
"""
class AttnVGG(nn.Module):
    def __init__(self, sample_size, num_classes, attention=True, normalize_attn=True, init_weights=True):
        super(AttnVGG, self).__init__()
        # conv blocks
        self.conv1 = self._make_layer(3, 64, 2)
        self.conv2 = self._make_layer(64, 128, 2)
        self.conv3 = self._make_layer(128, 256, 3)
        self.conv4 = self._make_layer(256, 512, 3)
        self.conv5 = self._make_layer(512, 512, 3)
        self.conv6 = self._make_layer(512, 512, 2, pool=True)
        self.dense = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=int(sample_size/32), padding=0, bias=True)
        # attention blocks
        self.attention = attention
        if self.attention:
            self.projector = ProjectorBlock(256, 512)
            self.attn1 = SpatialAttn(in_features=512, normalize_attn=normalize_attn)
            self.attn2 = SpatialAttn(in_features=512, normalize_attn=normalize_attn)
            self.attn3 = SpatialAttn(in_features=512, normalize_attn=normalize_attn)
        # final classification layer
        if self.attention:
            self.classify = nn.Linear(in_features=512*3, out_features=num_classes, bias=True)
        else:
            self.classify = nn.Linear(in_features=512, out_features=num_classes, bias=True)
        # if init_weights:
        #     self._initialize_weights()

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        l1 = self.conv3(x)
        x = F.max_pool2d(l1, kernel_size=2, stride=2, padding=0)
        l2 = self.conv4(x)
        x = F.max_pool2d(l2, kernel_size=2, stride=2, padding=0)
        l3 = self.conv5(x)
        x = F.max_pool2d(l3, kernel_size=2, stride=2, padding=0)
        x = self.conv6(x)
        g = self.dense(x) # batch_sizex512x1x1
        # attention
        if self.attention:
            c1, g1 = self.attn1(self.projector(l1), g)
            c2, g2 = self.attn2(l2, g)
            c3, g3 = self.attn3(l3, g)
            g = torch.cat((g1,g2,g3), dim=1) # batch_sizex3C
            # classification layer
            x = self.classify(g) # batch_sizexnum_classes
        else:
            c1, c2, c3 = None, None, None
            x = self.classify(torch.squeeze(g))
        return [x, c1, c2, c3]

    def _make_layer(self, in_features, out_features, blocks, pool=False):
        layers = []
        for i in range(blocks):
            conv2d = nn.Conv2d(in_channels=in_features, out_channels=out_features, kernel_size=3, padding=1, bias=False)
            layers += [conv2d, nn.BatchNorm2d(out_features), nn.ReLU(inplace=True)]
            in_features = out_features
            if pool:
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)


In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
import torchvision.utils as utils
# from tensorboardX import SummaryWriter
import os
# import argparse
import numpy as np
from datetime import datetime




# Parameters manager
no_save = False
save_path = '/kaggle/working/'
  
# parser.add_argument('--checkpoint', default='cnn_checkpoint.pth', type=str,
#     help='Path to checkpoint')

epoches = 12
batch_size = 32
Train = True
# Use specific gpus
os.environ["CUDA_VISIBLE_DEVICES"]='0'
# Device setting
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


if __name__ == '__main__':
    # Load data
    transform_train = transforms.Compose([
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])
    transform_test = transforms.Compose([
        transforms.ToPILImage(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])
    train_set = CustomImageDataset('/kaggle/working/annotations_train.csv','/kaggle/working/train', transform=transform_train, target_transform=None)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)
    test_set = CustomImageDataset('/kaggle/working/annotations_test.csv','/kaggle/working/test', transform=transform_test, target_transform=None)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=2)
    # Create model
    model = AttnVGG(sample_size=100, num_classes=2).to(device)
    # Run the model parallelly
    if torch.cuda.device_count() > 1:
        print("Using {} GPUs".format(torch.cuda.device_count()))
        model = nn.DataParallel(model)
    # Summary writer
#     writer = SummaryWriter("runs/cnn_attention_{:%Y-%m-%d_%H-%M-%S}".format(datetime.now()))
    # Train
    if Train:
        # Create loss criterion & optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=0)
        # lr_lambda = lambda epoch : np.power(0.5, int(epoch/25))
        # scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lr_lambda)

        for epoch in range(epoches):
            train_epoch(model, criterion, optimizer, train_loader, device, epoch, 100)
            val_epoch(model, criterion, test_loader, device, epoch)
            # adjust learning rate
            # scheduler.step()
        if not no_save:
            torch.save(model.state_dict(), os.path.join(save_path, "cnn_epoch{:03d}.pth".format(epoch+1)))
            print("Saving Model of Epoch {}".format(epoch+1))

    # Visualize
#     if args.visualize:
#         # Load model
#         model.load_state_dict(torch.load(args.checkpoint))
#         model.eval()

#         with torch.no_grad():
#             for batch_idx, (inputs, labels) in enumerate(test_loader):
#                 # get images
#                 inputs = inputs.to(device)
#                 if batch_idx == 0:
#                     images = inputs[0:16,:,:,:]
#                     I = utils.make_grid(images, nrow=4, normalize=True, scale_each=True)
# #                     writer.add_image('origin', I)
#                     _, c1, c2, c3 = model(images)
#                     # print(I.shape, c1.shape, c2.shape, c3.shape, c4.shape)
#                     attn1 = visualize_attn(I, c1)
# #                     writer.add_image('attn1', attn1)
#                     attn2 = visualize_attn(I, c2)
# #                     writer.add_image('attn2', attn2)
#                     attn3 = visualize_attn(I, c3)
# #                     writer.add_image('attn3', attn3)
#                     break

Average Training Loss of Epoch 1: 0.597062 | Acc: 67.58%
Average Validation Loss: 0.514380 | Acc: 75.99%
Average Training Loss of Epoch 2: 0.528591 | Acc: 73.85%
Average Validation Loss: 0.497753 | Acc: 79.42%
Average Training Loss of Epoch 3: 0.437666 | Acc: 80.56%
Average Validation Loss: 0.492392 | Acc: 79.16%
Average Training Loss of Epoch 4: 0.350109 | Acc: 85.06%
Average Validation Loss: 0.336386 | Acc: 86.28%
Average Training Loss of Epoch 5: 0.265195 | Acc: 89.47%
Average Validation Loss: 0.414182 | Acc: 83.11%
Average Training Loss of Epoch 6: 0.202394 | Acc: 91.58%
Average Validation Loss: 0.346801 | Acc: 87.34%
Average Training Loss of Epoch 7: 0.147748 | Acc: 94.47%
Average Validation Loss: 0.193986 | Acc: 92.61%
Average Training Loss of Epoch 8: 0.136283 | Acc: 95.45%
Average Validation Loss: 0.145161 | Acc: 94.46%
Average Training Loss of Epoch 9: 0.104392 | Acc: 96.23%
Average Validation Loss: 0.130152 | Acc: 96.04%
Average Training Loss of Epoch 10: 0.085287 | Acc: 97.1

In [9]:
model.load_state_dict(torch.load('cnn_epoch0{}.pth'.format(epoches)))
model.eval()
from PIL import Image
import cv2
with torch.no_grad():
    for batch_idx, (inputs, labels) in enumerate(test_loader):
        # get images
        inputs = inputs.to(device)
        if batch_idx == 0:
            for k in range(16):
                images = inputs[k:k+1,:,:,:]
                I = utils.make_grid(images, nrow=4, normalize=True, scale_each=True)
    #                     writer.add_image('origin', I)
                _, c1, c2, c3 = model(images)
    #             print(I.shape, c1.shape, c2.shape, c3.shape)
                attn1 = visualize_attn(I, c1)
    #             attn2 = visualize_attn(I, c2)
    #             attn3 = visualize_attn(I, c3)
                attn1 = attn1.astype(np.uint8)
#                 image_org = images[0].cpu().numpy()
    # #             attn1 = attn1.transpose((2,0,1))
    #             print(type(attn1))
    #             attn1 = cv2.cvtColor(attn1, cv2.COLOR_BGR2RGB)
    #             cv2.imwrite("IMAGE1.jpg",attn1)
                im1 = Image.fromarray(attn1)
#                 im2 = Image.fromarray((image_org*255).astype(np.uint8))
    #             im1 = im1.permute((1,2,0)).cpu().numpy()
    #             im2 = Image.fromarray(attn2)
    #             im3 = Image.fromarray(attn3)
                im1.save("IMAGE_attn{}.jpeg".format(k))
#                 im2.save("IMAGE_org{}.jpeg".format(k))
    #             im2.save("IMAGE2.jpeg")
    #             im3.save("IMAGE3.jpeg")
            
        break