# 讯飞人脸表情分类挑战赛baseline(非官方)


> 一、赛事背景
人脸表情是传播人类情感信息与协调人际关系的重要方式，表情识别是指从静态照片或视频序列中选择出表情状态，从而确定对人物的情绪与心理变化。在日常生活中人类习惯从面部表情中吸收非言语暗示，那么计算机可以完成类似任务吗？答案是肯定的，但是需要训练它学会识别情绪。


> 二、赛事任务
给定人脸照片完成具体的情绪识别，选手需要根据训练集数据构建情绪识别任务，并对测试集图像进行预测，识别人脸的7种情绪。

> 三、评审规则
1. 数据说明
赛题数据由训练集和测试集组成，训练集数据集按照不同情绪的文件夹进行存放。其中：

训练集：2.8W张人脸图像；

测试集：7K张人脸图像；

为了简化任务赛题图像只包含单张人脸，所有图像的尺寸为48 * 48像素。数据集包括的情绪标签包括以下7类：

angry

disgusted

fearful

happy

neutral

sad

surprised


> 四、基线分数


![](https://ai-studio-static-online.cdn.bcebos.com/d7dda6d25bc24f75a30aa54bacc82cbea72a90aa5d7b4529b96cbbcd1f975178)


## 数据解压


In [None]:
!cd 'data/data100817' && unzip -q Datawhale.zip

## 数据集前处理


对数据集进行处理，生成数据集信息，train.csv里包含图像的名称及其对应的标签。

In [None]:
import os
import json
import cv2

import pandas as pd

rootDir = 'data/data100817/Datawhale/train'

image_id = []
category_id = []

def Test1(rootDir):
    list_dirs = os.walk(rootDir)

    img_id = 0
    for root, dirs, files in list_dirs:
        for d in dirs:

            path = os.path.join(root, d)
            cat_id = int(d)
            for im in os.listdir(path):
              
                image_id.append(d + '/' + im)
                category_id.append(cat_id)

Test1(rootDir)


img = pd.DataFrame(image_id)
img = img.rename(columns = {0:"image"})
img['label'] = category_id

img.to_csv('work/train.csv', index=False)


## 解题思路


通过观察数据集，发现有一个类别特别少，一个类别特别多。所以使用labelshuffling方法作为一个基本的解题思路。另外，由于人脸带有较大的相似性，故使用了带有注意力机制的骨干网络，SE_ResNeXt50。


* **知识点：labelshuffling**

首先对原始的图像列表，按照标签顺序进行排序； 然后计算每个类别的样本数量，并得到样本最多的那个类别的样本数。 根据这个最多的样本数，对每类都产生一个随机排列的列表； 然后用每个类别的列表中的数对各自类别的样本数求余，得到一个索引值，从该类的图像中提取图像，生成该类的图像随机列表； 然后把所有类别的随机列表连在一起，做个Random Shuffling，得到最后的图像列表，用这个列表进行训练。

In [2]:
# 导入所需要的库
from sklearn.utils import shuffle
import os
import pandas as pd
import numpy as np
from PIL import Image

import paddle
import paddle.nn as nn
from paddle.io import Dataset
import paddle.vision.transforms as T
import paddle.nn.functional as F
from paddle.metric import Accuracy

import warnings
warnings.filterwarnings("ignore")

# labelshuffling

def labelShuffling(dataFrame, groupByName = 'label'):

    groupDataFrame = dataFrame.groupby(by=[groupByName])
    labels = groupDataFrame.size()
    print("length of label is ", len(labels))
    maxNum = max(labels)
    lst = pd.DataFrame()
    for i in range(len(labels)):
        print("Processing label  :", i)
        tmpGroupBy = groupDataFrame.get_group(i)
        createdShuffleLabels = np.random.permutation(np.array(range(maxNum))) % labels[i]
        print("Num of the label is : ", labels[i])
        lst=lst.append(tmpGroupBy.iloc[createdShuffleLabels], ignore_index=True)
        print("Done")
    # lst.to_csv('test1.csv', index=False)
    return lst

# 读取数据
train_images = pd.read_csv('work/train.csv')
train_images = labelShuffling(train_images)
train_images = shuffle(train_images)
# 划分训练集和校验集
all_size = len(train_images)
# print(all_size)
train_size = int(all_size * 0.85)
train_image_list = train_images[:train_size]
val_image_list = train_images[train_size:]
df = train_image_list
# df = shuffle(df)
train_image_path_list = df['image'].values
label_list = df['label'].values
label_list = paddle.to_tensor(label_list, dtype='int64')
train_label_list = paddle.nn.functional.one_hot(label_list, num_classes=7)

val_image_path_list = val_image_list['image'].values
val_label_list = val_image_list['label'].values
val_label_list = paddle.to_tensor(val_label_list, dtype='int64')
val_label_list = paddle.nn.functional.one_hot(val_label_list, num_classes=7)

# 定义数据预处理
data_transforms = T.Compose([
    T.Resize(size=(48, 48)),
    T.RandomRotation(5),
    # T.RandomCrop(224),
    T.RandomHorizontalFlip(0.5),
    T.RandomVerticalFlip(0.5),
    T.Transpose(),    # HWC -> CHW   
])


## 数据读取

In [3]:
# 构建Dataset
class MyDataset(paddle.io.Dataset):
    """
    步骤一：继承paddle.io.Dataset类
    """
    def __init__(self, train_img_list, val_img_list,train_label_list,val_label_list, mode='train'):
        """
        步骤二：实现构造函数，定义数据读取方式，划分训练和测试数据集
        """
        super(MyDataset, self).__init__()
        self.img = []
        self.label = []
        # 借助pandas读csv的库
        self.train_images = train_img_list
        self.test_images = val_img_list
        self.train_label = train_label_list
        self.test_label = val_label_list
        if mode == 'train':
            # 读train_images的数据
            for img,la in zip(self.train_images, self.train_label):
                self.img.append('data/data100817/Datawhale/train/'+img)
                self.label.append(la)
        else:
            # 读test_images的数据
            for img,la in zip(self.test_images, self.test_label):
                self.img.append('data/data100817/Datawhale/train/'+img)
                self.label.append(la)

    def load_img(self, image_path):
        # 实际使用时使用Pillow相关库进行图片读取即可，这里我们对数据先做个模拟
        image = Image.open(image_path)
        image = np.asarray(image)
        image = image.astype(np.float32) / 255.
        return image

    def __getitem__(self, index):
        """
        步骤三：实现__getitem__方法，定义指定index时如何获取数据，并返回单条数据（训练数据，对应的标签）
        """
        image = self.load_img(self.img[index])
        label = self.label[index]
        # label = paddle.to_tensor(label)
        
        return data_transforms(image), paddle.nn.functional.label_smooth(label)

    def __len__(self):
        """
        步骤四：实现__len__方法，返回数据集总数目
        """
        return len(self.img)

In [4]:
#train_loader
train_dataset = MyDataset(train_img_list=train_image_path_list, val_img_list=val_image_path_list, train_label_list=train_label_list, val_label_list=val_label_list, mode='train')
train_loader = paddle.io.DataLoader(train_dataset, places=paddle.CPUPlace(), batch_size=196, shuffle=True, num_workers=0)

#val_loader
val_dataset = MyDataset(train_img_list=train_image_path_list, val_img_list=val_image_path_list, train_label_list=train_label_list, val_label_list=val_label_list, mode='test')
val_loader = paddle.io.DataLoader(val_dataset, places=paddle.CPUPlace(), batch_size=196, shuffle=True, num_workers=0)

In [None]:
# 数据读取测试

print('=============train dataset=============')
for image, label in train_dataset:
    print('image shape: {}, label: {}'.format(image.shape, label))
    break

## 模型训练

In [5]:
from work.se_resnext import SE_ResNeXt50_vd_32x4d
# from work.res2net import Res2Net50_vd_26w_4s
from paddle.vision.ops import DeformConv2D
# 模型封装
model_res = SE_ResNeXt50_vd_32x4d(class_dim=7)
model = paddle.Model(model_res)

# 定义优化器
optim = paddle.optimizer.Adam(learning_rate=3e-4, parameters=model.parameters())

# 配置模型
model.prepare(
    optim,
    paddle.nn.CrossEntropyLoss(soft_label=True),
    Accuracy()
    )

# model.load('work/SE_ResNeXt50_vd_32x4d_pretrained.pdparams',skip_mismatch=True)

# 模型训练与评估
model.fit(train_loader,
        val_loader,
        log_freq=1,
        epochs=16,
        verbose=1,
        )

In [10]:
model.save('Hapi_MyCNN3', False)  # save for inference

In [3]:
import os, time
import matplotlib.pyplot as plt
import paddle
import pandas as pd
from PIL import Image
import numpy as np
# import patta as tta


use_gpu = False
model_file_path="Hapi_MyCNN3"
paddle.set_device('gpu:0') if use_gpu else paddle.set_device('cpu')
model = paddle.jit.load(model_file_path)

# model = tta.ClassificationTTAWrapper(model, tta.aliases.ten_crop_transform(48,48))
model.eval() #训练模式

def load_image(img_path):
    '''
    预测图片预处理
    '''
    img = Image.open(img_path)
    
    #resize
    # img = img.resize((224, 224), Image.BILINEAR) #Image.BILINEAR双线性插值
    img = np.array(img).astype('float32')

    # HWC to CHW 
    # img = img.transpose((2, 0, 1))
    
    #Normalize
    img = img / 255         #像素值归一化
  
    return img

def infer_img(path, model):
    '''
    模型预测
    '''
    #对预测图片进行预处理
    infer_imgs = []
    infer_imgs.append(load_image(path))
    infer_imgs = np.array(infer_imgs)
    label_list = {0:'angry', 1:'disgusted', 2:'fearful' , 3:'happy', 4:'neutral', 5:'sad', 6:'surprised'}
    #
    label_pre = []
    for i in range(len(infer_imgs)):
        data = infer_imgs[i]
        dy_x_data = np.array(data).astype('float32')
        dy_x_data = dy_x_data[np.newaxis,np.newaxis, :,:]
        img = paddle.to_tensor(dy_x_data)
        out = model(img)
        lab = np.argmax(out.numpy())  #argmax():返回最大数的索引
        
        label_pre.append(label_list[lab])
        
    return label_pre

img_list = os.listdir('data/data100817/Datawhale/test')
img_list.sort(key=lambda x: int(x[:-4]))

pre_list = []
for i in range(len(img_list)):
    pre_list.append(infer_img(path='data/data100817/Datawhale/test/' + img_list[i], model=model)[0])

img = pd.DataFrame(img_list)
img = img.rename(columns = {0:"name"})
img['label'] = pre_list

img.to_csv('submitrenlian3.csv', index=False)


# 总结


首先这是一个基线，用来快速试错，事实证明，这个不好用。所以下一期计划魔改一下LeNet5来做这个比赛基线，因为数据集是48 * 48 * 1，通道数为1，并且图像的尺寸也不大。再改一下损失函数，应该能够取得一个差不多的成绩吧。