In [1]:
# 导入必要的库
import torch # PyTorch深度学习框架
import torch.nn as nn # PyTorch深度学习框架中的神经网络模块
import torch.optim as optim # PyTorch深度学习框架中的优化器模块
import torch.nn.functional as F # PyTorch深度学习框架中的函数模块
from torch.utils.data import Dataset, DataLoader # PyTorch深度学习框架中的数据集和数据加载器模块
import torchvision.transforms as transforms # PyTorch深度学习框架中的图像预处理模块
import torchvision.datasets as datasets # PyTorch深度学习框架中的数据集模块
import numpy as np # NumPy科学计算库
import pandas as pd # Pandas数据处理库
import cv2 # OpenCV计算机视觉库
import os # Python内置的操作系统接口模块
import random # Python内置的随机数生成模块
import matplotlib.pyplot as plt # Matplotlib绘图库
from sklearn.metrics import confusion_matrix # Scikit-learn机器学习库中的混淆矩阵模块
from sklearn.metrics import accuracy_score # Scikit-learn机器学习库中的准确率模块
import itertools # Python内置的迭代器模块
import re
from PIL import Image

In [None]:
# 定义超参数
batch_size = 64 # 批量大小
learning_rate = 0.001 # 学习率
num_epochs = 10 # 训练轮数

In [2]:
# 定义LeNet-5模型
class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, stride=1, padding=2) # 第一层卷积层，输入通道数为1，输出通道数为6，卷积核大小为5*5，步长为1，填充为2
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) # 第一层池化层，池化核大小为2*2，步长为2
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5, stride=1) # 第二层卷积层，输入通道数为6，输出通道数为16，卷积核大小为5*5，步长为1
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) # 第二层池化层，池化核大小为2*2，步长为2
        self.fc1 = nn.Linear(16*5*5, 120) # 第一层全连接层，输入节点数为16*5*5，输出节点数为120
        self.fc2 = nn.Linear(120, 84) # 第二层全连接层，输入节点数为120，输出节点数为84
        self.fc3 = nn.Linear(84, 7) # 第三层全连接层，输入节点数为84，输出节点数为7

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x))) # 第一层卷积层+池化层，使用ReLU激活函数
        x = self.pool2(F.relu(self.conv2(x))) # 第二层卷积层+池化层，使用ReLU激活函数
        x = x.view(-1, 16*5*5) # 将特征图展开成一维向量
        x = F.relu(self.fc1(x)) # 第一层全连接层，使用ReLU激活函数
        x = F.relu(self.fc2(x)) # 第二层全连接层，使用ReLU激活函数
        x = self.fc3(x) # 第三层全连接层，不使用激活函数
        return x

In [None]:
# 定义数据集
class FaceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir # 数据集根目录
        self.transform = transform # 数据预处理
        self.labels = { # 标签字典
            'angry': 0,
            'disgusted': 1,
            'fearful': 2,
            'happy': 3,
            'neutral': 4,
            'sad': 5,
            'surprised': 6
        }
        self.data = self.load_data() # 加载数据

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name, label = self.data[idx]
        img_path = os.path.join(self.root_dir, label, img_name)
        image = Image.open(img_path).convert('L') # 打开图像并将其转换为灰度图像
        if self.transform:
            image = self.transform(image) # 数据预处理
        return image, label

    def load_data(self):
        data = []
        for label in self.labels.keys():
            label_dir = os.path.join(self.root_dir, label)
            for img_name in os.listdir(label_dir):
                data.append((img_name, self.labels[label]))
        return data

In [None]:
# 定义数据预处理
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(), # 随机水平翻转
    transforms.RandomCrop(44), # 随机裁剪
    transforms.ToTensor(), # 将图像转换为张量
    transforms.Normalize((0.5,), (0.5,)) # 标准化
])

transform_test = transforms.Compose([
    transforms.CenterCrop(44), # 中心裁剪
    transforms.ToTensor(), # 将图像转换为张量
    transforms.Normalize((0.5,), (0.5,)) # 标准化
])

In [None]:
# 定义数据加载器
model = LeNet5() # 创建模型
train_dataset = FaceDataset('emo/train', transform=transform_train) # 加载训练集
test_dataset = FaceDataset('emo/test', transform=transform_test)# 加载测试集
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4) # 创建训练集数据加载器
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4) # 创建测试集数据加载器


In [None]:
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss() # 定义交叉熵损失函数
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9) # 定义随机梯度下降优化器

In [None]:
# 将模型放到GPU上
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 判断是否有GPU可用
model.to(device) # 将模型放到GPU上

In [None]:
# 定义test函数
def test(model, test_loader):
    y_true = [] # 存储真实标签
    y_pred = [] # 存储预测标签
    with torch.no_grad(): # 不需要计算梯度
        for images, labels in test_loader: # 遍历测试集
            images = images.to(device) # 将图像放到GPU上
            labels = labels.to(device) # 将标签放到GPU上
            outputs = model(images) # 前向传播
            _, predicted = torch.max(outputs.data, 1) # 获取预测结果
            y_true += labels.cpu().numpy().tolist() # 将真实标签添加到列表中
            y_pred += predicted.cpu().numpy().tolist() # 将预测标签添加到列表中
    return y_true, y_pred # 返回真实标签和预测标签

In [None]:
# 训练模型
accuracy_list = [0.0] * num_epochs # 初始化准确率列表
for epoch in range(num_epochs): # 遍历所有的epoch
    model.train() # 将模型设置为训练模式
    for i, (images, labels) in enumerate(train_loader): # 遍历训练集
        images = images.to(device) # 将图像放到GPU上
        labels = labels.to(device) # 将标签放到GPU上

        # 前向传播
        outputs = model(images) # 前向传播
        loss = criterion(outputs, labels) # 计算损失

        # 反向传播和优化
        optimizer.zero_grad() # 梯度清零
        loss.backward() # 反向传播
        optimizer.step() # 更新参数

        if (i+1) % 100 == 0: # 每100个batch输出一次信息
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, i+1, len(train_loader), loss.item()))

    # 测试模型
    model.eval() # 将模型设置为评估模式
    y_true, y_pred = test(model, test_loader)
    accuracy = accuracy_score(y_true, y_pred) # 计算准确率
    accuracy_list[epoch] = accuracy # 将准确率添加到准确率列表中
    print('Epoch [{}/{}], Test Accuracy: {:.2f}%'.format(epoch+1, num_epochs, accuracy*100))
    print('Confusion Matrix:', confusion_matrix(y_true, y_pred))

    # 保存模型
    torch.save(model.state_dict(), 'model.pth')

In [None]:
# 绘制混淆矩阵
def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    """
    绘制混淆矩阵

    参数：
    cm：混淆矩阵
    classes：类别列表
    normalize：是否进行标准化
    title：图像标题
    cmap：颜色映射表
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
# 绘制准确率曲线
def plot_accuracy_curve(accuracy_list):
    """
    绘制准确率曲线

    参数：
    accuracy_list：准确率列表
    """
    for epoch in range(num_epochs):
        if accuracy_list[epoch] == 0.0:
            y_true, y_pred = test(model, test_loader)
            accuracy = accuracy_score(y_true, y_pred)
            accuracy_list[epoch] = accuracy
    plt.plot(range(num_epochs), accuracy_list, '-o')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy Curve')
    plt.show()

In [None]:
# 加载模型
model.load_state_dict(torch.load('model.pth'))

In [None]:
# 测试模型
y_true, y_pred = test(model, test_loader)

In [None]:
# 绘制混淆矩阵
cm = confusion_matrix(y_true, y_pred)
plot_confusion_matrix(cm, classes=['angry', 'disgusted', 'fearful', 'happy', 'neutral', 'sad', 'surprised'])

In [None]:
# 绘制准确率曲线
plot_accuracy_curve(accuracy_list)