# Compare the attention map between non activation vs. activation function version

Observe the attention map between two version to analyze

- Is the non-act attention contains negative values?

In [34]:
import random
random.seed(12)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import copy

from tqdm import tqdm

import albumentations as A
from albumentations.pytorch import ToTensorV2

import torch
from torch.utils.data import Dataset
from torchvision import transforms
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.nn.functional as F
from torch.nn.parameter import Parameter
import cv2

import math
import pickle

from pytorch_metric_learning import losses, miners, distances, reducers, testers
from pytorch_metric_learning.utils.accuracy_calculator import AccuracyCalculator

## Non act func

In [39]:
class ExpLoss(nn.Module):
    def __init__(self, depth, residual=True):
        super(ExpLoss, self).__init__()
        
        self.depth = depth
        self.residual = residual
        
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
        self.flatten = nn.Flatten()
        self.relu = nn.ReLU()
        
        self.fc1 = nn.Linear(depth, depth)
        self.fc2 = nn.Linear(depth, depth)
        
#         self.main_fc = nn.Linear(depth, depth)
        
        self.sim_act = nn.Sigmoid()
        self.att_act = nn.Sigmoid()
        
#         self.out_fc = nn.Sequential(nn.Linear(depth, depth),
#                                     nn.BatchNorm1d(depth),
#                                     nn.ReLU())

        self.out_fc = nn.Sequential(nn.Conv2d(depth, depth, kernel_size=1, padding=0, stride=1),
                                    nn.BatchNorm2d(depth),
                                    nn.ReLU())

        
    def forward(self, x, labels):
        batch_size, d, h, w = x.size()
        
        if self.residual:
            x_res = x
        
        x = self.avgpool(x)
        x = self.flatten(x)
        
        x1 = self.fc1(x)
        x2 = self.fc2(x)
        
        # cal sims (batchsize, depth)
        sims = torch.mm(x1, x2.permute(1,0))
        sims = sims / batch_size

        mask = self.filter_mask(labels)
        att = sims * (1 - mask.to(sims.device)) # only consider sample of same labels
        
        x = torch.mm(att, x_res.reshape(batch_size, d*h*w))
        x = x.reshape(batch_size, d, h, w)
        x = self.out_fc(x)
    
        if self.residual:
            x = x + x_res
        
        return x, 0

    def filter_mask(self, labels):
        """
        zero
        """
        classes = torch.unique(labels)
        mask = torch.ones((len(labels), len(labels)))
        
        indices = [(labels == k).nonzero().flatten() for k in classes]
        lindices = [torch.combinations(k, r=2, with_replacement=True) for k in indices]
        rindices = [torch.combinations(k.flip(0), r=2, with_replacement=True) for k in indices]
        indices = [torch.cat([lindices[i], rindices[i]]) for i in range(len(lindices))]

        for k in indices:
            mask[k[:,0], k[:,1]] = 0.
            
        return mask

## Sigmoid act

In [69]:
class ExpLoss(nn.Module):
    def __init__(self, depth, residual=True):
        super(ExpLoss, self).__init__()
        
        self.depth = depth
        self.residual = residual
        
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
        self.flatten = nn.Flatten()
        self.relu = nn.ReLU()
        
        self.fc1 = nn.Linear(depth, depth)
        self.fc2 = nn.Linear(depth, depth)
        
#         self.main_fc = nn.Linear(depth, depth)
        
        self.sim_act = nn.Sigmoid()
        self.att_act = nn.Sigmoid()
        
#         self.out_fc = nn.Sequential(nn.Linear(depth, depth),
#                                     nn.BatchNorm1d(depth),
#                                     nn.ReLU())

        self.out_fc = nn.Sequential(nn.Conv2d(depth, depth, kernel_size=1, padding=0, stride=1),
                                    nn.BatchNorm2d(depth),
                                    nn.ReLU())

        
    def forward(self, x, labels):
        batch_size, d, h, w = x.size()
        
        if self.residual:
            x_res = x
        
        x = self.avgpool(x)
        x = self.flatten(x)
        
        x1 = self.fc1(x)
        x2 = self.fc2(x)
        
        # cal sims (batchsize, depth)
        sims = torch.mm(x1, x2.permute(1,0))
        sims = sims / batch_size
        sims = self.sim_act(sims)
        mask = self.filter_mask(labels)
        att = sims * (1 - mask.to(sims.device)) # only consider sample of same labels
        
        x = torch.mm(att, x_res.reshape(batch_size, d*h*w))
        x = x.reshape(batch_size, d, h, w)
        x = self.out_fc(x)
    
        if self.residual:
            x = x + x_res
        
        return x, 0

    def filter_mask(self, labels):
        """
        zero
        """
        classes = torch.unique(labels)
        mask = torch.ones((len(labels), len(labels)))
        
        indices = [(labels == k).nonzero().flatten() for k in classes]
        lindices = [torch.combinations(k, r=2, with_replacement=True) for k in indices]
        rindices = [torch.combinations(k.flip(0), r=2, with_replacement=True) for k in indices]
        indices = [torch.cat([lindices[i], rindices[i]]) for i in range(len(lindices))]

        for k in indices:
            mask[k[:,0], k[:,1]] = 0.
            
        return mask

## Data loader

In [None]:
class FERDataset(Dataset):
    'Characterizes a dataset for PyTorch'
    def __init__(self, dff, transforms):
        'Initialization'
        self.transforms = transforms
        self.dff= pd.read_csv(dff) if type(dff) is str else dff
        
        self.dff['pixels'] = [[int(y) for y in x.split()] for x in self.dff['pixels']]

    def __len__(self):
        'Denotes the total number of samples'
        return len(self.dff)

    def __getitem__(self, index):
        'Generates one sample of data'
        # Select sample
        #ID = self.list_IDs[index]

        # Load data and get label
        X = self.dff.iloc[index]['pixels']
#         X = X.split()
        X = np.array(X, dtype=np.uint8)
        X = X.reshape(48,48)
        
        y = int(self.dff.iloc[index]['emotion'])

        if self.transforms:
            X = self.transforms(image=X)['image']

#             X = torch.cat((X,X,X),0)

        return X, y

batch_size= 64

df = pd.read_csv('/tf/data/Quan/fer2013/data/csv_file/fer2013.csv')

df_train = df[df['Usage'] == 'Training']
df_val = df[df['Usage'] == 'PublicTest']
df_test = df[df['Usage'] == 'PrivateTest']


train_transforms = A.Compose([
#     A.CLAHE(),
    A.Resize(48,48),
    A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=10, p=0.5, border_mode=0, value=0),
#     A.RandomCrop(height=40, width=40),
    A.Normalize(mean=(0.485,), std=(0.229,)),
    ToTensorV2()
])


test_transforms = A.Compose([
#     A.CLAHE(),
    A.Resize(48,48),
    A.Normalize(mean=(0.485,), std=(0.229,)),
    ToTensorV2()
])

# train_set = FERDataset(df_train, train_transforms)
train_set = FERDataset(df_train, test_transforms) # no augmentation!
val_set = FERDataset(df_val, test_transforms)
test_set = FERDataset(df_test, test_transforms)


train_loader = torch.utils.data.DataLoader(train_set,
                                             batch_size=batch_size, shuffle=True,
                                             num_workers=8)
val_loader = torch.utils.data.DataLoader(val_set,
                                             batch_size=batch_size, shuffle=False,
                                             num_workers=8)
test_loader = torch.utils.data.DataLoader(test_set,
                                             batch_size=batch_size, shuffle=False,
                                             num_workers=8)

## Model

In [None]:
class sVGG_exp(nn.Module):
    def __init__(self, features, in_features_classifier, n_classes):
        super(sVGG_exp, self).__init__()
        self.features_0 = features[:7]
        self.features_1 = features[7:14]
        self.features_2 = features[14:24]
        self.features_3 = features[24:34]
        
        self.features_0[0] = nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        
#         self.exploss_2 = ExpLoss(256)
        self.exploss_3 = ExpLoss(512)
        
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.classifier = nn.Sequential(nn.Flatten(),
                                        nn.Linear(in_features_classifier, in_features_classifier),
                                        nn.ReLU(inplace=True),
                                        nn.Dropout(0.5, inplace=False),
                                        nn.Linear(in_features_classifier, in_features_classifier // 2),
                                        nn.ReLU(inplace=True),
                                        nn.Dropout(0.5, inplace=False),
                                        nn.Linear(in_features_classifier // 2, n_classes))
        
    def forward(self, x, labels, return_att=True):
        x = self.features_0(x)
        
        x = self.features_1(x)
        
        x = self.features_2(x)
#         x, _ = self.exploss_2(x, labels)
        
        x = self.features_3(x)
        x, _ = self.exploss_3(x, labels)
        
        att = self.avgpool(x)
        x = self.classifier(att)
        
        if return_att:
            return x, 0, att
        return x, 0

## Non act

In [40]:
model = torch.load('exploss_more/sVGG_opt_residualexploss_lastconv_originalimgsize_noaugmentation_model.pt')
criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
# scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=5, verbose=True)


In [32]:
# activation = {}
# def hook(module, input, output):
#     activation['exploss_3'] = output
#     return hook

# model.exploss_3.register_forward_hook(hook)

In [51]:
device = torch.device('cuda')
model.to(device)
model.eval()

running_valloss = 0.0
running_valacc = 0.0
exp_features = []
exp_labels = []
exp_atts = []
for i,data in enumerate(test_loader):
    # get the inputs; data is a list of [inputs, labels]
    inputs, labels = data
    inputs = inputs.to(device)
    labels = labels.to(device)

#     outputs,_, features = model(inputs, labels)
    x = model.features_0(inputs)
    x = model.features_1(x)
    x = model.features_2(x)
    x = model.features_3(x)
    
    x = model.exploss_3.avgpool(x)
    x = model.exploss_3.flatten(x)
    
    x1 = model.exploss_3.fc1(x)
    x2 = model.exploss_3.fc2(x)

    mask = model.exploss_3.filter_mask(labels)

    sims = torch.mm(x1, x2.permute(1,0))
    sims = sims / len(test_loader)
    att = sims * (1 - mask.to(sims.device))
    
    with torch.no_grad():
#         exp_features.append(features.detach().cpu())
#         exp_labels.append(labels.detach().cpu())
        exp_atts.append(att.detach().cpu())

#     loss = criterion(outputs, labels)
#     acc = (torch.argmax(outputs, dim=1) == labels).float().sum()

#     running_valloss += (loss.item() * inputs.size(0))
#     running_valacc += acc.item()

# print('- Avg. val_loss: %.4f | Avg. val_acc: %.4f' % (running_valloss / len(test_loader.dataset), running_valacc / len(test_loader.dataset)))

### Check negative value of non act

> No sign of negative value, this prove that even without activation fuction such as sigmoid. The mechanism still learns effectively

In [68]:
exp_atts = torch.cat([k.flatten() for k in exp_atts])
exp_atts.min(), exp_atts.max(), exp_atts.mean(), exp_atts.var(), exp_atts.std()


(tensor(0.), tensor(23.0621), tensor(2.5275), tensor(29.2517), tensor(5.4085))

## Act sigmoid

In [71]:
model = torch.load('exploss_more/sVGG_opt_residualexploss_sigmoid_lastconv_originalimgsize_noaugmentation_model.pt')
criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
# scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=5, verbose=True)


In [73]:
device = torch.device('cuda')
model.to(device)
model.eval()

running_valloss = 0.0
running_valacc = 0.0
exp_features = []
exp_labels = []
exp_atts = []
for i,data in enumerate(test_loader):
    # get the inputs; data is a list of [inputs, labels]
    inputs, labels = data
    inputs = inputs.to(device)
    labels = labels.to(device)

#     outputs,_, features = model(inputs, labels)
    x = model.features_0(inputs)
    x = model.features_1(x)
    x = model.features_2(x)
    x = model.features_3(x)
    
    x = model.exploss_3.avgpool(x)
    x = model.exploss_3.flatten(x)
    
    x1 = model.exploss_3.fc1(x)
    x2 = model.exploss_3.fc2(x)

    mask = model.exploss_3.filter_mask(labels)

    sims = torch.mm(x1, x2.permute(1,0))
    sims = sims / len(test_loader)
    sims = model.exploss_3.sim_act(sims)
    att = sims * (1 - mask.to(sims.device))
    
    with torch.no_grad():
#         exp_features.append(features.detach().cpu())
#         exp_labels.append(labels.detach().cpu())
        exp_atts.append(att.detach().cpu())

#     loss = criterion(outputs, labels)
#     acc = (torch.argmax(outputs, dim=1) == labels).float().sum()

#     running_valloss += (loss.item() * inputs.size(0))
#     running_valacc += acc.item()

# print('- Avg. val_loss: %.4f | Avg. val_acc: %.4f' % (running_valloss / len(test_loader.dataset), running_valacc / len(test_loader.dataset)))

### Check negative value sigmoid act

In [74]:
exp_atts = torch.cat([k.flatten() for k in exp_atts])
exp_atts.min(), exp_atts.max(), exp_atts.mean(), exp_atts.var(), exp_atts.std()


(tensor(0.), tensor(1.), tensor(0.1773), tensor(0.1399), tensor(0.3741))