# 映像メディア課題
今回は「Context Based Emotion Recognition using　EMOTIC Dataset」というTPAMI論文の自己風再現です。<br>
https://github.com/Tandon-A/emotic での実現を参考しました。<br>
ファイルの位置が人それぞれだから、最初にファイル位置を設定します。<br>
別研究時間が必要のために、すべての例外処理がありません。すべての設定が正しいと仮定します。

In [1]:
import os
home_dir = os.environ['HOME']
annotations_file = os.path.join(home_dir, 'data', 'Annotations', 'Annotations.mat')
emotic_dir = os.path.join(home_dir, 'data', 'emotic')
preprocessed_data_save_dir = os.path.join(home_dir, 'data', 'emotic_preprocessed')
model_dir = os.path.join(home_dir, 'nothing', 'emotic', 'models')
pretrained_model_dir = os.path.join(home_dir, 'pretrained_models')

if not os.path.exists(preprocessed_data_save_dir):
    os.makedirs(preprocessed_data_save_dir)
if not os.path.exists(model_dir):
    os.makedirs(model_dir)

In [2]:
import scipy.io as scio
import cv2
import numpy as np
from cmath import nan

## データ前処理
参考コードからAnnotations.matのデータ構造処理方法が早く分かるので、そちらからも参考させていただきました。

In [None]:
annotations:dict = scio.loadmat(annotations_file)

# 論文から見ると画像（Image）、人顔と体（Body）、感情ラベル（Emotion Categories）、
# VADモデル点数（Continous Dimensions）を使って、画像感情分析をします。前の二つが入力、以外は出力です。
# matをnpy(fastest)に変形します。そこでtrain, valid, testが分けたので、そのまま使います。

emo_cats = ['Affection', 'Anger', 'Annoyance', 'Anticipation', 'Aversion', 'Confidence', 'Disapproval', 'Disconnection',
       'Disquietment', 'Doubt/Confusion', 'Embarrassment', 'Engagement', 'Esteem', 'Excitement', 'Fatigue', 'Fear',
       'Happiness', 'Pain', 'Peace', 'Pleasure', 'Sadness', 'Sensitivity', 'Suffering', 'Surprise', 'Sympathy', 'Yearning']
cat2ind = {}
ind2cat = {}
for idx, emotion in enumerate(emo_cats):
    cat2ind[emotion] = idx
    ind2cat[idx] = emotion

def emo_cat_one_hot(emo_cat):
    one_hot_cat = np.zeros(26)
    for em in emo_cat:
        one_hot_cat[cat2ind[em]] = 1
    return one_hot_cat

data_types = ['train', 'val', 'test']
for data_type in data_types:
    cur_annotations = annotations[data_type]
    image_arr, body_arr, emo_cat_arr, cont_dim_arr = [], [], [], []
    for i, cur_annotation in enumerate(cur_annotations[0]):
        print(data_type, ': {:.0%}'.format(i/len(cur_annotations[0])), end='\r')
        people_num = len(cur_annotation[4][0])
        image_size = np.array(cur_annotation[2]).flatten().tolist()[0]
        row = np.array(image_size[0]).flatten().tolist()[0]
        col = np.array(image_size[1]).flatten().tolist()[0]
        for person in range(people_num):
            next_iter = False
            person_details = cur_annotation[4][0][person]
            emo_cat, cont_dim = [], []
            if data_type == 'train':
                emo_cat = np.array(person_details[1]).flatten().tolist()
                emo_cat = np.array(emo_cat).flatten().tolist()
                emo_cat = [np.array(c).flatten().tolist()[0] for c in emo_cat]
                

                cont_dim = np.array(person_details[2]).flatten().tolist()[0]
                cont_dim = [np.array(c).flatten().tolist()[0] for c in cont_dim]
                for c in cont_dim:
                    if np.isnan(c):
                        next_iter = True
                        break
                if next_iter: continue
                
            else:
                if len(person_details[1][0]) != 0:
                    emo_cat = [np.array(c).flatten().tolist()[0] for c in person_details[2][0]]

                if len(person_details[3][0]) != 0:
                    cont_dim = [np.array(c).flatten().tolist()[0] for c in person_details[4][0]]
                    cont_dim = [np.array(c).flatten().tolist()[0] for c in cont_dim[0]]
            
            emo_cat = emo_cat_one_hot(emo_cat)
            emo_cat_arr.append(emo_cat)
            cont_dim_arr.append(np.array(cont_dim))

            image_file = os.path.join(emotic_dir,cur_annotation[1][0], cur_annotation[0][0])
            image = cv2.cvtColor(cv2.imread(image_file),cv2.COLOR_BGR2RGB)

            body_box = np.array(person_details[0]).flatten().tolist()
            x1, y1, x2, y2 = map(int, body_box)
            def relu(x):
                return max(x, 0)
            body = image[relu(y1):relu(y2), relu(x1):relu(x2)].copy()
            body = cv2.resize(body, (128, 128))
            body_arr.append(body)

            image = cv2.resize(image, (224,224))
            image_arr.append(image)

    image_arr, body_arr, emo_cat_arr, cont_dim_arr
    np.save(os.path.join(preprocessed_data_save_dir,'%s_context_arr.npy' %(data_type)), np.array(image_arr))
    np.save(os.path.join(preprocessed_data_save_dir,'%s_body_arr.npy' %(data_type)), np.array(body_arr))
    np.save(os.path.join(preprocessed_data_save_dir,'%s_cat_arr.npy' %(data_type)), np.array(emo_cat_arr))
    np.save(os.path.join(preprocessed_data_save_dir,'%s_cont_arr.npy' %(data_type)), np.array(cont_dim_arr))

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision.models as models
from torch.optim.lr_scheduler import StepLR
from torchvision.models import shufflenet_v2_x2_0, ShuffleNet_V2_X2_0_Weights, resnet18, ResNet18_Weights

Dataloader を作ります。

In [4]:
batch_size = 32

train_context = np.load(os.path.join(preprocessed_data_save_dir, 'train_context_arr.npy'))
train_body = np.load(os.path.join(preprocessed_data_save_dir, 'train_body_arr.npy'))
train_cat = np.load(os.path.join(preprocessed_data_save_dir, 'train_cat_arr.npy'))
train_cont = np.load(os.path.join(preprocessed_data_save_dir, 'train_cont_arr.npy'))

val_context = np.load(os.path.join(preprocessed_data_save_dir, 'val_context_arr.npy'))
val_body = np.load(os.path.join(preprocessed_data_save_dir, 'val_body_arr.npy'))
val_cat = np.load(os.path.join(preprocessed_data_save_dir, 'val_cat_arr.npy'))
val_cont = np.load(os.path.join(preprocessed_data_save_dir, 'val_cont_arr.npy'))

test_context = np.load(os.path.join(preprocessed_data_save_dir, 'test_context_arr.npy'))
test_body = np.load(os.path.join(preprocessed_data_save_dir, 'test_body_arr.npy'))
test_cat = np.load(os.path.join(preprocessed_data_save_dir, 'test_cat_arr.npy'))
test_cont = np.load(os.path.join(preprocessed_data_save_dir, 'test_cont_arr.npy'))

context_mean = [0.4690646, 0.4407227, 0.40508908]
context_std = [0.2514227, 0.24312855, 0.24266963]
body_mean = [0.43832874, 0.3964344, 0.3706214]
body_std = [0.24784276, 0.23621225, 0.2323653]
context_norm = [context_mean, context_std]
body_norm = [body_mean, body_std]


train_transform = transforms.Compose([transforms.ToPILImage(),
                                      transforms.RandomHorizontalFlip(),
                                      transforms.ColorJitter(
                                          brightness=0.4, contrast=0.4, saturation=0.4),
                                      transforms.ToTensor()])
test_transform = transforms.Compose([transforms.ToPILImage(),
                                     transforms.ToTensor()])

class Emotic_PreDataset(Dataset):
    def __init__(self, x_image, x_body, y_cat, y_cont, transform, context_norm, body_norm):
        super(Emotic_PreDataset, self).__init__()
        self.x_image = x_image
        self.x_body = x_body
        self.y_cat = y_cat
        self.y_cont = y_cont
        self.transform = transform
        self.context_norm = transforms.Normalize(
            context_norm[0], context_norm[1])
        self.body_norm = transforms.Normalize(body_norm[0], body_norm[1])

    def __len__(self):
        return len(self.y_cat)

    def __getitem__(self, index):
        image_context = self.x_image[index]
        image_body = self.x_body[index]
        cat_label = self.y_cat[index]
        cont_label = self.y_cont[index]
        return self.context_norm(self.transform(image_context)), self.body_norm(self.transform(image_body)), torch.tensor(cat_label, dtype=torch.float32), torch.tensor(cont_label, dtype=torch.float32)/10.0

train_dataset = Emotic_PreDataset(train_context, train_body, train_cat, train_cont,
                                  train_transform, context_norm, body_norm)
val_dataset = Emotic_PreDataset(val_context, val_body, val_cat, val_cont,
                                test_transform, context_norm, body_norm)
test_dataset = Emotic_PreDataset(test_context, test_body, test_cat, test_cont,
                                 test_transform, context_norm, body_norm)

train_loader = DataLoader(train_dataset, batch_size,
                          shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size, shuffle=False)

## Model構築

model構築をします。今回は論文で一番性能良いのBImodelを構築します。論文のモデル構造の図は16個convlutionを使ったけど、実際にいろんなpretrained model構造で試したので実験したので、今回はパラメータ数が少ないのShuffleNetを使いたいと思いますが、実験したところ、overfittingの問題が大変ですので、resnet16を使います。その上に、性能が一番いいBIモデルを構築します。

In [None]:
# weights = ShuffleNet_V2_X2_0_Weights.DEFAULT
# model_body, model_image = shufflenet_v2_x2_0(weights=weights), shufflenet_v2_x2_0(weights=weights)

class Fusion(nn.Module):
    def __init__(self, nbf, nif):
        super(Fusion, self).__init__()
        self.nbf, self.nif = nbf, nif
        self.fc = nn.Linear(nbf + nif, 256)
        self.bn = nn.BatchNorm1d(256)
        self.relu = nn.ReLU()
        self.dp = nn.Dropout(0.5)
        self.fc_emo_cat = nn.Linear(256, 26)
        self.fc_cont_dim = nn.Linear(256, 3)

    def forward(self, x_image, x_body):
        x_image, x_body = x_image.view(-1, self.nif), x_body.view(-1, self.nbf)
        fuse_features = torch.cat((x_image, x_body), 1)
        fuse_out = self.fc(fuse_features)
        fuse_out = self.bn(fuse_out)
        fuse_out = self.relu(fuse_out)
        fuse_out = self.dp(fuse_out)
        emo_cat_out = self.fc_emo_cat(fuse_out)
        cont_dim_out = self.fc_cont_dim(fuse_out)
        return emo_cat_out, cont_dim_out

model_image = resnet18(num_classes=365)
context_state_dict = torch.load(os.path.join(
    pretrained_model_dir, 'resnet18_state_dict.pth'))
model_image.load_state_dict(context_state_dict)

model_body = resnet18(weights=ResNet18_Weights)

model_fusion = Fusion(list(model_image.children())[-1].in_features, list(model_body.children())[-1].in_features)
model_image = nn.Sequential(*(list(model_image.children())[:-1]))
model_body = nn.Sequential(*(list(model_body.children())[:-1]))

# 低層の二つmodelを固定します
for param in model_image.parameters():
    param.requires_grad = False
for param in model_body.parameters():
    param.requires_grad = False
for param in model_fusion.parameters():
    param.requires_grad = True



training methodを設定します（GPU並行、最適化アルゴリズム、損失関数（SL1のほうが性能が一番です））

In [6]:
device = torch.device("cuda:0")
model_image, model_body, model_fusion = nn.DataParallel(model_image), nn.DataParallel(model_body), nn.DataParallel(model_fusion)
model_image.to(device)
model_body.to(device)
model_fusion.to(device)

opt = optim.Adam((list(model_fusion.parameters()) + list(model_image.parameters()) +
                  list(model_body.parameters())), lr=0.001, weight_decay=5e-4)
scheduler = StepLR(opt, step_size=7, gamma=0.1)

class Emo_cat_Loss(nn.Module):
    def __init__(self, ):
        super(Emo_cat_Loss, self).__init__()

    def forward(self, pred, target):
        self.weights = self.prepare_dynamic_weights(target)
        self.weights = self.weights.to(device)
        loss = (((pred - target)**2) * self.weights)
        return loss.sum()

    def prepare_dynamic_weights(self, target):
        target_stats = torch.sum(target, dim=0).float().unsqueeze(dim=0).cpu()
        weights = torch.zeros((1, 26))
        weights[target_stats != 0] = 1.0 / torch.log(target_stats[target_stats != 0].data + 1.2)
        weights[target_stats == 0] = 0.0001
        return weights

class ContinuousLoss_SL1(nn.Module):
    def __init__(self, margin=1):
        super(ContinuousLoss_SL1, self).__init__()
        self.margin = margin

    def forward(self, pred, target):
        labs = torch.abs(pred - target)
        loss = 0.5 * (labs ** 2)
        loss[(labs > self.margin)] = labs[(labs > self.margin)] - 0.5
        return loss.sum()
cat_loss_func = Emo_cat_Loss()
cont_loss_func = ContinuousLoss_SL1()


## training

In [7]:
import matplotlib.pyplot as plt

In [8]:
def train(epochs):

    min_loss = np.inf

    train_loss = list()
    val_loss = list()

    for e in range(epochs):
        running_loss = 0.0

        model_fusion.train()
        model_image.train()
        model_body.train()
        i = 1
        for images_context, images_body, labels_cat, labels_cont in iter(train_loader):
            print('train:{:.0%}'.format(i/len(train_loader)), end='\r'); i += 1
            images_context = images_context.to(device)
            images_body = images_body.to(device)
            labels_cat = labels_cat.to(device)
            labels_cont = labels_cont.to(device)

            opt.zero_grad()

            pred_image = model_image(images_context)
            pred_body = model_body(images_body)
            pred_cat, pred_cont = model_fusion(pred_image, pred_body)
            cat_loss_batch = cat_loss_func(pred_cat, labels_cat)
            cont_loss_batch = cont_loss_func(pred_cont * 10, labels_cont * 10)
            loss = (0.5 * cat_loss_batch) + (0.5 * cont_loss_batch)
            running_loss += loss.item()
            loss.backward()
            opt.step()
        print('epoch = %d training loss = %.4f' % (e, running_loss))
        train_loss.append(running_loss)

        running_loss = 0.0
        model_fusion.eval()
        model_image.eval()
        model_body.eval()

        i = 1
        with torch.no_grad():
            for images_context, images_body, labels_cat, labels_cont in iter(val_loader):
                print('valid:{:.0%}'.format(i/len(train_loader)), end='\r'); i += 1
                images_context = images_context.to(device)
                images_body = images_body.to(device)
                labels_cat = labels_cat.to(device)
                labels_cont = labels_cont.to(device)

                pred_context = model_image(images_context)
                pred_body = model_body(images_body)

                pred_cat, pred_cont = model_fusion(pred_context, pred_body)
                cat_loss_batch = cat_loss_func(pred_cat, labels_cat)
                cont_loss_batch = cont_loss_func(pred_cont * 10, labels_cont * 10)
                loss = (0.5 * cat_loss_batch) + (0.5 * cont_loss_batch)
                running_loss += loss.item()

            if e % 1 == 0:
                print('epoch = %d validation loss = %.4f' % (e, running_loss))
        val_loss.append(running_loss)

        scheduler.step()

        if val_loss[-1] < min_loss:
            min_loss = val_loss[-1]
            model_fusion.to(device)
            model_image.to(device)
            model_body.to(device)
            torch.save(model_fusion, os.path.join(model_dir, 'model_emotic1.pth'))
            torch.save(model_image, os.path.join(model_dir, 'model_context1.pth'))
            torch.save(model_body, os.path.join(model_dir, 'model_body1.pth'))

    f, (ax1, ax2) = plt.subplots(1, 2, figsize=(6, 6))
    f.suptitle('emotic')
    ax1.plot(range(0, len(train_loss)), train_loss, color='Blue')
    ax2.plot(range(0, len(val_loss)), val_loss, color='Red')
    ax1.legend(['train'])
    ax2.legend(['valid'])
    plt.savefig('train_val_loss.png')

ShuffleNetのoverfitting問題が大変ですので、やはり参考資料のpretrained_modelのResNet18を使う方がいいだと思います。

In [None]:
train(15)

In [166]:
from sklearn.metrics import average_precision_score, precision_recall_curve

In [167]:
def test_scikit_ap(cat_preds, cat_labels):
  ap = np.zeros(26, dtype=np.float32)
  for i in range(26):
    ap[i] = average_precision_score(cat_labels[i, :], cat_preds[i, :])
  print ('ap', ap, ap.shape, ap.mean())
  return ap.mean()


def test_emotic_vad(cont_preds, cont_labels):
  vad = np.zeros(3, dtype=np.float32)
  for i in range(3):
    vad[i] = np.mean(np.abs(cont_preds[i, :] - cont_labels[i, :]))
  print ('vad', vad, vad.shape, vad.mean())
  return vad.mean()

## test

In [180]:
def test_scikit_ap(cat_preds, cat_labels):
  ap = np.zeros(26, dtype=np.float32)
  for i in range(26):
    ap[i] = average_precision_score(cat_labels[i, :], cat_preds[i, :])
  print ('ap', ap, ap.shape, ap.mean())
  return ap.mean()


def test_emotic_vad(cont_preds, cont_labels):
  vad = np.zeros(3, dtype=np.float32)
  for i in range(3):
    vad[i] = np.mean(np.abs(cont_preds[i, :] - cont_labels[i, :]))
  print ('vad', vad, vad.shape, vad.mean())
  return vad.mean()

def test(model_body, model_image, model_fusion):
    cat_preds = np.zeros((7280, 26))
    cat_labels = np.zeros((7280, 26))
    cont_preds = np.zeros((7280, 3))
    cont_labels = np.zeros((7280, 3))

    with torch.no_grad():
        model_image.to(device)
        model_body.to(device)
        model_fusion.to(device)
        model_image.eval()
        model_body.eval()
        model_fusion.eval()
        indx = 0
        i = 1
        for images_context, images_body, labels_cat, labels_cont in iter(test_loader):
            print("test:{:.0%}".format(i/len(test_loader)), end='\r'); i += 1
            images_context = images_context.to(device)
            images_body = images_body.to(device)

            pred_context = model_image(images_context)
            pred_body = model_body(images_body)
            pred_cat, pred_cont = model_fusion(pred_context, pred_body)

            cat_preds[ indx : (indx + pred_cat.shape[0]), :] = pred_cat.to("cpu").data.numpy()
            cat_labels[ indx : (indx + labels_cat.shape[0]), :] = labels_cat.to("cpu").data.numpy()
            cont_preds[ indx : (indx + pred_cont.shape[0]), :] = pred_cont.to("cpu").data.numpy() * 10
            cont_labels[ indx : (indx + labels_cont.shape[0]), :] = labels_cont.to("cpu").data.numpy() * 10
            indx = indx + pred_cat.shape[0]

    cat_preds = cat_preds.transpose()
    cat_labels = cat_labels.transpose()
    cont_preds = cont_preds.transpose()
    cont_labels = cont_labels.transpose()
    ap_mean = test_scikit_ap(cat_preds, cat_labels)
    vad_mean = test_emotic_vad(cont_preds, cont_labels)
    print (ap_mean, vad_mean)
    return ap_mean, vad_mean

In [None]:
model_body = torch.load(os.path.join(model_dir, 'model_context1.pth'))
model_image = torch.load(os.path.join(model_dir, 'model_body1.pth'))
model_emotic1 = torch.load(os.path.join(model_dir, 'model_emotic1.pth'))
test(model_body, model_image, model_fusion)