In [1]:
import os
import cv2
import csv
import math
import random
import numpy as np
import pandas as pd
import argparse
import pickle

import torch
import torch.nn as nn
from torchvision import transforms
import torchvision.models as models
import torch.utils.data as data
import torch.nn.functional as F



import torch
import torch.nn as nn
import math


import torch
import cv2
import numpy as np
import random

import pickle
from torch.autograd import Variable

import os
import cv2
import torch.utils.data as data
import pandas as pd
import random
from torchvision import transforms

os.environ["CUDA_VISIBLE_DEVICES"] = "1"
import clip
device = torch.device('cuda:0')
clip_model, preprocess = clip.load("ViT-B/32", device=device)


class RafDataset(data.Dataset):
    def __init__(self, phase, transform=None):
        self.phase = phase
        self.transform = transform
        dataset = pd.read_csv(os.path.join('../../data/FERPlus', 'test_new_fer.txt'), sep=' ', header=None)
        
        name_c = 0
        label_c = 1

            
        self.label = dataset.iloc[:, label_c].values
        images_names = dataset.iloc[:, name_c].values
        self.aug_func = [flip_image, add_g]
        self.file_paths = []
        self.clean = True
        
        for f in images_names:
            path = os.path.join('../../data/FERPlus', f)
            self.file_paths.append(path)


    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        label = self.label[idx]
        image = cv2.imread(self.file_paths[idx])
        image = image[:, :, ::-1]        
        image = self.transform(image)
        image1 = transforms.RandomHorizontalFlip(p=1)(image)
        return image, label, idx, image1
    
    

def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

class BasicBlock(nn.Module):
    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
        
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x
    

    
class ResNet(nn.Module):
    def __init__(self, block, n_blocks, channels, output_dim):
        super().__init__()
                
        
        self.in_channels = channels[0]
            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        
    def get_resnet_layer(self, block=BasicBlock, n_blocks=[2,2,2,2], channels=[64, 128, 256, 512], stride = 1):
    
        layers = []
        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels
            
        return nn.Sequential(*layers)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h
    

class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)


class Model(nn.Module):
    def __init__(self, pretrained=True, num_classes=7, drop_rate=0):
        super(Model, self).__init__()
        
        res18 = ResNet(block = BasicBlock, n_blocks = [2,2,2,2], channels = [64, 128, 256, 512], output_dim=1000)
        msceleb_model = torch.load('../../checkpoint/resnet18_msceleb.pth')
        state_dict = msceleb_model['state_dict']
        res18.load_state_dict(state_dict, strict=False)
        
        self.drop_rate = drop_rate
        self.features = nn.Sequential(*list(res18.children())[:-2])
        self.features2 = nn.Sequential(*list(res18.children())[-2:-1])
        
        fc_in_dim = list(res18.children())[-1].in_features  # original fc layer's in dimention 512
        self.fc = nn.Linear(fc_in_dim, num_classes)  # new fc layer 512x7
        
        self.parm={}
        for name,parameters in self.fc.named_parameters():
            print(name,':',parameters.size())
            self.parm[name]=parameters
        
    def forward(self, x, clip_model):
        
        x = self.features(x)
        feat = x
        ##N 512 7 7
        x = self.features2(x)
        x = x.view(x.size(0), -1)
        
        
        fc_weights = self.parm['weight'].cuda()
        fc_weights = fc_weights.view(1, 7, 512, 1, 1)
        fc_weights = Variable(fc_weights, requires_grad = False)
        feat = feat.unsqueeze(1) # N * 1 * C * H * W
        hm = feat * fc_weights
        hm = hm.sum(2) # N * self.num_labels * H * W
        
        out = self.fc(x)
        return out, hm
    
    
    

def add_g(image_array, mean=0.0, var=30):
    std = var ** 0.5
    image_add = image_array + np.random.normal(mean, std, image_array.shape)
    image_add = np.clip(image_add, 0, 255).astype(np.uint8)
    return image_add

def flip_image(image_array):
    return cv2.flip(image_array, 1)

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    
    

import torch.nn.functional as F
from torch.autograd import Variable

    

parser = argparse.ArgumentParser()
parser.add_argument('--raf_path', type=str, default='../../data/raf-basic', help='raf_dataset_path')
parser.add_argument('--resnet50_path', type=str, default='../../data/resnet50_ft_weight.pkl', help='pretrained_backbone_path')
parser.add_argument('--label_path', type=str, default='list_patition_label.txt', help='label_path')
parser.add_argument('--workers', type=int, default=4, help='number of workers')
parser.add_argument('--batch_size', type=int, default=32, help='batch_size')
parser.add_argument('--w', type=int, default=7, help='width of the attention map')
parser.add_argument('--h', type=int, default=7, help='height of the attention map')
parser.add_argument('--gpu', type=int, default=0, help='the number of the device')
parser.add_argument('--lam', type=float, default=5, help='kl_lambda')
parser.add_argument('--epochs', type=int, default=60, help='number of epochs')
args = parser.parse_args(args=[])





def train(args, model, train_loader, optimizer, scheduler, device):
    running_loss = 0.0
    iter_cnt = 0
    correct_sum = 0
    
    model.to(device)
    model.train()

    
    total_loss = []
    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(train_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)
        
    
        criterion = nn.CrossEntropyLoss(reduction='none')

        

        output, hm1 = model(imgs1, clip_model)
        
        loss1 = nn.CrossEntropyLoss()(output, labels)



        loss = loss1 


        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


        iter_cnt += 1
        _, predicts = torch.max(output, 1)
        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num
        running_loss += loss

    scheduler.step()
    running_loss = running_loss / iter_cnt
    acc = correct_sum.float() / float(train_loader.dataset.__len__())
    return acc, running_loss



        

setup_seed(3407)

train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.RandomHorizontalFlip(),
    transforms.RandomErasing(scale=(0.02, 0.25)) ])

eval_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])])



train_dataset = RafDataset(phase='train', transform=train_transforms)
test_dataset = RafDataset(phase='test', transform=eval_transforms)



train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=args.batch_size,
                                           shuffle=True,
                                           num_workers=args.workers,
                                           pin_memory=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.batch_size,
                                          shuffle=False,
                                          num_workers=args.workers,
                                          pin_memory=True)




model = Model()
device = torch.device('cuda:{}'.format(args.gpu))
model.load_state_dict(torch.load("eac_FERPlus.pth")['model_state_dict'])
model.to(device)

with torch.no_grad():
    model.eval()

    running_loss = 0.0
    iter_cnt = 0
    correct_sum = 0
    data_num = 0


    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(test_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)


        outputs, x = model(imgs1, clip_model)

        loss = nn.CrossEntropyLoss()(outputs, labels)

        iter_cnt += 1
        _, predicts = torch.max(outputs, 1)

        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num

        running_loss += loss
        data_num += outputs.size(0)

    running_loss = running_loss / iter_cnt
    test_acc = correct_sum.float() / float(data_num)
    print('test acc: ', test_acc)

weight : torch.Size([7, 512])
bias : torch.Size([7])
test acc:  tensor(0.8903, device='cuda:0')


### test on AffectNet

In [2]:
import os
import cv2
import csv
import math
import random
import numpy as np
import pandas as pd
import argparse
import pickle

import torch
import torch.nn as nn
from torchvision import transforms
import torchvision.models as models
import torch.utils.data as data
import torch.nn.functional as F



import torch
import torch.nn as nn
import math


import torch
import cv2
import numpy as np
import random

import pickle
from torch.autograd import Variable

import os
import cv2
import torch.utils.data as data
import pandas as pd
import random
from torchvision import transforms


import clip
device = torch.device('cuda:0')
clip_model, preprocess = clip.load("ViT-B/32", device=device)


class RafDataset(data.Dataset):
    def __init__(self, phase, transform=None):
        self.phase = phase
        self.transform = transform
        NAME_COLUMN = 0
        LABEL_COLUMN = 1

        dataset = pd.read_csv('../../datasets/AffectNet/sota_test7.txt', sep=' ', header=None)

        file_names = dataset.iloc[:, NAME_COLUMN].values
        self.label = dataset.iloc[:, LABEL_COLUMN].values
        self.file_paths = []
        for f in file_names:
            path = os.path.join('../../AffectNet_sota_Manually_Annotated_Images', f)
            self.file_paths.append(path)


    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        path = self.file_paths[idx]
        image = cv2.imread(path)
        img = image[:, :, ::-1]  # BGR to RGB

        label = self.label[idx]
        if label == 0:
            label = 6
        elif label == 1:
            label = 3
        elif label == 2:
            label = 4
        elif label == 3:
            label = 0
        elif label == 4:
            label = 1
        elif label == 5:
            label = 2
        elif label == 6:
            label = 5
            
        image = self.transform(image)
        img1 = transforms.RandomHorizontalFlip(p=1.0)(image)
        return image, label, idx, img1 

    def get_labels(self):
        label = self.label
        return label


    

def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

class BasicBlock(nn.Module):
    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
        
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x
    

    
class ResNet(nn.Module):
    def __init__(self, block, n_blocks, channels, output_dim):
        super().__init__()
                
        
        self.in_channels = channels[0]
            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        
    def get_resnet_layer(self, block=BasicBlock, n_blocks=[2,2,2,2], channels=[64, 128, 256, 512], stride = 1):
    
        layers = []
        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels
            
        return nn.Sequential(*layers)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h
    

class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)


class Model(nn.Module):
    def __init__(self, pretrained=True, num_classes=7, drop_rate=0):
        super(Model, self).__init__()
        
        res18 = ResNet(block = BasicBlock, n_blocks = [2,2,2,2], channels = [64, 128, 256, 512], output_dim=1000)
        msceleb_model = torch.load('../../checkpoint/resnet18_msceleb.pth')
        state_dict = msceleb_model['state_dict']
        res18.load_state_dict(state_dict, strict=False)
        
        self.drop_rate = drop_rate
        self.features = nn.Sequential(*list(res18.children())[:-2])
        self.features2 = nn.Sequential(*list(res18.children())[-2:-1])
        
        fc_in_dim = list(res18.children())[-1].in_features  # original fc layer's in dimention 512
        self.fc = nn.Linear(fc_in_dim, num_classes)  # new fc layer 512x7
        
        self.parm={}
        for name,parameters in self.fc.named_parameters():
            print(name,':',parameters.size())
            self.parm[name]=parameters
        
    def forward(self, x, clip_model):
        
        x = self.features(x)
        feat = x
        ##N 512 7 7
        x = self.features2(x)
        x = x.view(x.size(0), -1)
        
        
        fc_weights = self.parm['weight'].cuda()
        fc_weights = fc_weights.view(1, 7, 512, 1, 1)
        fc_weights = Variable(fc_weights, requires_grad = False)
        feat = feat.unsqueeze(1) # N * 1 * C * H * W
        hm = feat * fc_weights
        hm = hm.sum(2) # N * self.num_labels * H * W
        
        out = self.fc(x)
        return out, hm
    
    
    

def add_g(image_array, mean=0.0, var=30):
    std = var ** 0.5
    image_add = image_array + np.random.normal(mean, std, image_array.shape)
    image_add = np.clip(image_add, 0, 255).astype(np.uint8)
    return image_add

def flip_image(image_array):
    return cv2.flip(image_array, 1)

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    

    

import torch.nn.functional as F
from torch.autograd import Variable


    

parser = argparse.ArgumentParser()
parser.add_argument('--raf_path', type=str, default='../../data/raf-basic', help='raf_dataset_path')
parser.add_argument('--resnet50_path', type=str, default='../../data/resnet50_ft_weight.pkl', help='pretrained_backbone_path')
parser.add_argument('--label_path', type=str, default='list_patition_label.txt', help='label_path')
parser.add_argument('--workers', type=int, default=4, help='number of workers')
parser.add_argument('--batch_size', type=int, default=32, help='batch_size')
parser.add_argument('--w', type=int, default=7, help='width of the attention map')
parser.add_argument('--h', type=int, default=7, help='height of the attention map')
parser.add_argument('--gpu', type=int, default=0, help='the number of the device')
parser.add_argument('--lam', type=float, default=5, help='kl_lambda')
parser.add_argument('--epochs', type=int, default=60, help='number of epochs')
args = parser.parse_args(args=[])





def train(args, model, train_loader, optimizer, scheduler, device):
    running_loss = 0.0
    iter_cnt = 0
    correct_sum = 0
    
    model.to(device)
    model.train()

    
    total_loss = []
    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(train_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)
        
    
        criterion = nn.CrossEntropyLoss(reduction='none')

        

        output, hm1 = model(imgs1, clip_model)
        
        loss1 = nn.CrossEntropyLoss()(output, labels)



        loss = loss1 


        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


        iter_cnt += 1
        _, predicts = torch.max(output, 1)
        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num
        running_loss += loss

    scheduler.step()
    running_loss = running_loss / iter_cnt
    acc = correct_sum.float() / float(train_loader.dataset.__len__())
    return acc, running_loss



        

setup_seed(3407)

train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.RandomHorizontalFlip(),
    transforms.RandomErasing(scale=(0.02, 0.25)) ])

eval_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])])



train_dataset = RafDataset(phase='train', transform=train_transforms)
test_dataset = RafDataset(phase='test', transform=eval_transforms)



train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=args.batch_size,
                                           shuffle=True,
                                           num_workers=args.workers,
                                           pin_memory=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.batch_size,
                                          shuffle=False,
                                          num_workers=args.workers,
                                          pin_memory=True)




model = Model()
device = torch.device('cuda:{}'.format(args.gpu))
model.load_state_dict(torch.load("eac_FERPlus.pth")['model_state_dict'])
model.to(device)

with torch.no_grad():
    model.eval()

    running_loss = 0.0
    iter_cnt = 0
    correct_sum = 0
    data_num = 0


    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(test_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)


        outputs, x = model(imgs1, clip_model)

        loss = nn.CrossEntropyLoss()(outputs, labels)

        iter_cnt += 1
        _, predicts = torch.max(outputs, 1)

        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num

        running_loss += loss
        data_num += outputs.size(0)

    running_loss = running_loss / iter_cnt
    test_acc = correct_sum.float() / float(data_num)
    print('test acc: ', test_acc)

weight : torch.Size([7, 512])
bias : torch.Size([7])
test acc:  tensor(0.3649, device='cuda:0')


## SFEW2.0

In [4]:
import os
import cv2
import csv
import math
import random
import numpy as np
import pandas as pd
import argparse
import pickle

import torch
import torch.nn as nn
from torchvision import transforms
import torchvision.models as models
import torch.utils.data as data
import torch.nn.functional as F



import torch
import torch.nn as nn
import math


import torch
import cv2
import numpy as np
import random

import pickle
from torch.autograd import Variable

import os
import cv2
import torch.utils.data as data
import pandas as pd
import random
from torchvision import transforms


import clip
device = torch.device('cuda:0')
clip_model, preprocess = clip.load("ViT-B/32", device=device)


class RafDataset(data.Dataset):
    def __init__(self, phase, transform=None):
        self.phase = phase
        self.transform = transform
        NAME_COLUMN = 0
        LABEL_COLUMN = 1

        dataset = pd.read_csv('../SFEW2_label.txt', sep=' ', header=None)

        self.file_paths = dataset.iloc[:, NAME_COLUMN].values
        self.label = dataset.iloc[:, LABEL_COLUMN].values
        


    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        path = '../'+self.file_paths[idx]
        image = cv2.imread(path)
        img = image[:, :, ::-1]  # BGR to RGB

        label = self.label[idx]
        if label == 0:
            label = 5
        elif label == 1:
            label = 2
        elif label == 2:
            label = 1
        elif label == 3:
            label = 3
        elif label == 4:
            label = 6
        elif label == 5:
            label = 4
        elif label == 6:
            label = 0

            
        image = self.transform(image)
        img1 = transforms.RandomHorizontalFlip(p=1.0)(image)
        return image, label, idx, img1 

    def get_labels(self):
        label = self.label
        return label


    

def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

class BasicBlock(nn.Module):
    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
        
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x
    

    
class ResNet(nn.Module):
    def __init__(self, block, n_blocks, channels, output_dim):
        super().__init__()
                
        
        self.in_channels = channels[0]
            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        
    def get_resnet_layer(self, block=BasicBlock, n_blocks=[2,2,2,2], channels=[64, 128, 256, 512], stride = 1):
    
        layers = []
        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels
            
        return nn.Sequential(*layers)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h
    

class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)


class Model(nn.Module):
    def __init__(self, pretrained=True, num_classes=7, drop_rate=0):
        super(Model, self).__init__()
        
        res18 = ResNet(block = BasicBlock, n_blocks = [2,2,2,2], channels = [64, 128, 256, 512], output_dim=1000)
        msceleb_model = torch.load('../../checkpoint/resnet18_msceleb.pth')
        state_dict = msceleb_model['state_dict']
        res18.load_state_dict(state_dict, strict=False)
        
        self.drop_rate = drop_rate
        self.features = nn.Sequential(*list(res18.children())[:-2])
        self.features2 = nn.Sequential(*list(res18.children())[-2:-1])
        
        fc_in_dim = list(res18.children())[-1].in_features  # original fc layer's in dimention 512
        self.fc = nn.Linear(fc_in_dim, num_classes)  # new fc layer 512x7
        
        self.parm={}
        for name,parameters in self.fc.named_parameters():
            print(name,':',parameters.size())
            self.parm[name]=parameters
        
    def forward(self, x, clip_model):
        
        x = self.features(x)
        feat = x
        ##N 512 7 7
        x = self.features2(x)
        x = x.view(x.size(0), -1)
        
        
        fc_weights = self.parm['weight'].cuda()
        fc_weights = fc_weights.view(1, 7, 512, 1, 1)
        fc_weights = Variable(fc_weights, requires_grad = False)
        feat = feat.unsqueeze(1) # N * 1 * C * H * W
        hm = feat * fc_weights
        hm = hm.sum(2) # N * self.num_labels * H * W
        
        out = self.fc(x)
        return out, hm
    
    
    

def add_g(image_array, mean=0.0, var=30):
    std = var ** 0.5
    image_add = image_array + np.random.normal(mean, std, image_array.shape)
    image_add = np.clip(image_add, 0, 255).astype(np.uint8)
    return image_add

def flip_image(image_array):
    return cv2.flip(image_array, 1)

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    

    

import torch.nn.functional as F
from torch.autograd import Variable


    

parser = argparse.ArgumentParser()
parser.add_argument('--raf_path', type=str, default='../../data/raf-basic', help='raf_dataset_path')
parser.add_argument('--resnet50_path', type=str, default='../../data/resnet50_ft_weight.pkl', help='pretrained_backbone_path')
parser.add_argument('--label_path', type=str, default='list_patition_label.txt', help='label_path')
parser.add_argument('--workers', type=int, default=4, help='number of workers')
parser.add_argument('--batch_size', type=int, default=32, help='batch_size')
parser.add_argument('--w', type=int, default=7, help='width of the attention map')
parser.add_argument('--h', type=int, default=7, help='height of the attention map')
parser.add_argument('--gpu', type=int, default=0, help='the number of the device')
parser.add_argument('--lam', type=float, default=5, help='kl_lambda')
parser.add_argument('--epochs', type=int, default=60, help='number of epochs')
args = parser.parse_args(args=[])





def train(args, model, train_loader, optimizer, scheduler, device):
    running_loss = 0.0
    iter_cnt = 0
    correct_sum = 0
    
    model.to(device)
    model.train()

    
    total_loss = []
    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(train_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)
        
    
        criterion = nn.CrossEntropyLoss(reduction='none')

        

        output, hm1 = model(imgs1, clip_model)
        
        loss1 = nn.CrossEntropyLoss()(output, labels)



        loss = loss1 


        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


        iter_cnt += 1
        _, predicts = torch.max(output, 1)
        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num
        running_loss += loss

    scheduler.step()
    running_loss = running_loss / iter_cnt
    acc = correct_sum.float() / float(train_loader.dataset.__len__())
    return acc, running_loss



        

setup_seed(3407)

train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.RandomHorizontalFlip(),
    transforms.RandomErasing(scale=(0.02, 0.25)) ])

eval_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])])



train_dataset = RafDataset(phase='train', transform=train_transforms)
test_dataset = RafDataset(phase='test', transform=eval_transforms)



train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=args.batch_size,
                                           shuffle=True,
                                           num_workers=args.workers,
                                           pin_memory=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.batch_size,
                                          shuffle=False,
                                          num_workers=args.workers,
                                          pin_memory=True)




model = Model()
device = torch.device('cuda:{}'.format(args.gpu))

model.load_state_dict(torch.load("eac_FERPlus.pth")['model_state_dict']) # mask baseline
# model.load_state_dict(torch.load("result_logs/baseline_resnet18.pth")['model_state_dict'])
model.to(device)

with torch.no_grad():
    model.eval()
    correct_sum = 0
    data_num = 0


    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(test_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)


        outputs, x = model(imgs1, clip_model)

        _, predicts = torch.max(outputs, 1)
        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num
        data_num += outputs.size(0)

    test_acc = correct_sum.float() / float(data_num)
    print('test acc: ', test_acc)

weight : torch.Size([7, 512])
bias : torch.Size([7])
test acc:  tensor(0.4579, device='cuda:0')


### MMA

In [5]:
import os
import cv2
import csv
import math
import random
import numpy as np
import pandas as pd
import argparse
import pickle

import torch
import torch.nn as nn
from torchvision import transforms
import torchvision.models as models
import torch.utils.data as data
import torch.nn.functional as F



import torch
import torch.nn as nn
import math


import torch
import cv2
import numpy as np
import random

import pickle
from torch.autograd import Variable

import os
import cv2
import torch.utils.data as data
import pandas as pd
import random
from torchvision import transforms


import clip
device = torch.device('cuda:0')
clip_model, preprocess = clip.load("ViT-B/32", device=device)


class RafDataset(data.Dataset):
    def __init__(self, phase, transform=None):
        self.phase = phase
        self.transform = transform
        NAME_COLUMN = 0
        LABEL_COLUMN = 1

        dataset = pd.read_csv('../MMA_label.txt', sep=' ', header=None)

        self.file_paths = dataset.iloc[:, NAME_COLUMN].values
        self.label = dataset.iloc[:, LABEL_COLUMN].values
        


    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        path = '../'+self.file_paths[idx]
        image = cv2.imread(path)
        img = image[:, :, ::-1]  # BGR to RGB

        label = self.label[idx]
        if label == 0:
            label = 5
        elif label == 1:
            label = 2
        elif label == 2:
            label = 1
        elif label == 3:
            label = 3
        elif label == 4:
            label = 6
        elif label == 5:
            label = 4
        elif label == 6:
            label = 0

            
        image = self.transform(image)
        img1 = transforms.RandomHorizontalFlip(p=1.0)(image)
        return image, label, idx, img1 

    def get_labels(self):
        label = self.label
        return label


    

def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

class BasicBlock(nn.Module):
    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
        
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x
    

    
class ResNet(nn.Module):
    def __init__(self, block, n_blocks, channels, output_dim):
        super().__init__()
                
        
        self.in_channels = channels[0]
            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        
    def get_resnet_layer(self, block=BasicBlock, n_blocks=[2,2,2,2], channels=[64, 128, 256, 512], stride = 1):
    
        layers = []
        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels
            
        return nn.Sequential(*layers)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h
    

class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)



class Model(nn.Module):
    def __init__(self, pretrained=True, num_classes=7, drop_rate=0):
        super(Model, self).__init__()
        
        res18 = ResNet(block = BasicBlock, n_blocks = [2,2,2,2], channels = [64, 128, 256, 512], output_dim=1000)
        msceleb_model = torch.load('../../checkpoint/resnet18_msceleb.pth')
        state_dict = msceleb_model['state_dict']
        res18.load_state_dict(state_dict, strict=False)
        
        self.drop_rate = drop_rate
        self.features = nn.Sequential(*list(res18.children())[:-2])
        self.features2 = nn.Sequential(*list(res18.children())[-2:-1])
        
        fc_in_dim = list(res18.children())[-1].in_features  # original fc layer's in dimention 512
        self.fc = nn.Linear(fc_in_dim, num_classes)  # new fc layer 512x7
        
        self.parm={}
        for name,parameters in self.fc.named_parameters():
            print(name,':',parameters.size())
            self.parm[name]=parameters
        
    def forward(self, x, clip_model):
        
        x = self.features(x)
        feat = x
        ##N 512 7 7
        x = self.features2(x)
        x = x.view(x.size(0), -1)
        
        
        fc_weights = self.parm['weight'].cuda()
        fc_weights = fc_weights.view(1, 7, 512, 1, 1)
        fc_weights = Variable(fc_weights, requires_grad = False)
        feat = feat.unsqueeze(1) # N * 1 * C * H * W
        hm = feat * fc_weights
        hm = hm.sum(2) # N * self.num_labels * H * W
        
        out = self.fc(x)
        return out, hm
    
    
    
    

def add_g(image_array, mean=0.0, var=30):
    std = var ** 0.5
    image_add = image_array + np.random.normal(mean, std, image_array.shape)
    image_add = np.clip(image_add, 0, 255).astype(np.uint8)
    return image_add

def flip_image(image_array):
    return cv2.flip(image_array, 1)

def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    

    

import torch.nn.functional as F
from torch.autograd import Variable


    

parser = argparse.ArgumentParser()
parser.add_argument('--raf_path', type=str, default='../../data/raf-basic', help='raf_dataset_path')
parser.add_argument('--resnet50_path', type=str, default='../../data/resnet50_ft_weight.pkl', help='pretrained_backbone_path')
parser.add_argument('--label_path', type=str, default='list_patition_label.txt', help='label_path')
parser.add_argument('--workers', type=int, default=4, help='number of workers')
parser.add_argument('--batch_size', type=int, default=32, help='batch_size')
parser.add_argument('--w', type=int, default=7, help='width of the attention map')
parser.add_argument('--h', type=int, default=7, help='height of the attention map')
parser.add_argument('--gpu', type=int, default=0, help='the number of the device')
parser.add_argument('--lam', type=float, default=5, help='kl_lambda')
parser.add_argument('--epochs', type=int, default=60, help='number of epochs')
args = parser.parse_args(args=[])





def train(args, model, train_loader, optimizer, scheduler, device):
    running_loss = 0.0
    iter_cnt = 0
    correct_sum = 0
    
    model.to(device)
    model.train()

    
    total_loss = []
    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(train_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)
        
    
        criterion = nn.CrossEntropyLoss(reduction='none')

        

        output, hm1 = model(imgs1, clip_model)
        
        loss1 = nn.CrossEntropyLoss()(output, labels)



        loss = loss1 


        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


        iter_cnt += 1
        _, predicts = torch.max(output, 1)
        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num
        running_loss += loss

    scheduler.step()
    running_loss = running_loss / iter_cnt
    acc = correct_sum.float() / float(train_loader.dataset.__len__())
    return acc, running_loss



        

setup_seed(3407)

train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.RandomHorizontalFlip(),
    transforms.RandomErasing(scale=(0.02, 0.25)) ])

eval_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])])



train_dataset = RafDataset(phase='train', transform=train_transforms)
test_dataset = RafDataset(phase='test', transform=eval_transforms)



train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=args.batch_size,
                                           shuffle=True,
                                           num_workers=args.workers,
                                           pin_memory=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.batch_size,
                                          shuffle=False,
                                          num_workers=args.workers,
                                          pin_memory=True)




model = Model()
device = torch.device('cuda:{}'.format(args.gpu))
# model.load_state_dict(torch.load("tmp_resnet18_mask_best.pth")['model_state_dict'])
# model.load_state_dict(torch.load("checkpoint/tmp_resnet18.pth")['model_state_dict']) # mask baseline
model.load_state_dict(torch.load("eac_FERPlus.pth")['model_state_dict'])
model.to(device)

with torch.no_grad():
    model.eval()
    correct_sum = 0
    data_num = 0


    for batch_i, (imgs1, labels, indexes, imgs2) in enumerate(test_loader):
        imgs1 = imgs1.to(device)
        labels = labels.to(device)


        outputs, x = model(imgs1, clip_model)

        _, predicts = torch.max(outputs, 1)
        correct_num = torch.eq(predicts, labels).sum()
        correct_sum += correct_num
        data_num += outputs.size(0)

    test_acc = correct_sum.float() / float(data_num)
    print('test acc: ', test_acc)

weight : torch.Size([7, 512])
bias : torch.Size([7])
test acc:  tensor(0.5989, device='cuda:0')
