In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision.transforms as transforms
import datetime
from matplotlib.animation import FuncAnimation
from PIL import Image


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

In [3]:
emotion_map = {
    0: "Angry",
    1: "Disgust",
    2: "Fear",
    3: "Happy",
    4: "Sad",
    5: "Surprise",
    6: "Neutral"
}


In [4]:
def generateImages(csvfile_path,num):
    data_df = pd.read_csv(csvfile_path)
    data_df['pixels'] = data_df['pixels'].apply(lambda x: np.array([int(pixel) for pixel in x.split()]).reshape(48, 48))
    
    print(data_df['pixels'][0])
    print(data_df['pixels'][0].shape)
    for i in range(num):
        image = Image.fromarray(data_df['pixels'][i].astype(np.uint8))
        save_path=f"pictures/fer{i}_{emotion_map[data_df['emotion'][i]]}.jpg"
        image.save(save_path)
# generateImages("icml_face_data.csv",300)

In [5]:

def csv_to_tensor(csvfile_path):
    # 读取数据文件
    data_df = pd.read_csv(csvfile_path)

    # 提取像素列并转换为适当的格式
    data_df['pixels'] = data_df['pixels'].apply(lambda x: np.array([int(pixel) for pixel in x.split()]).reshape(1, 48, 48))

    # 划分数据集为训练集和测试集
    train_df = data_df[data_df['Usage'] == 'Training']
    test_df = data_df[data_df['Usage'] != 'Training']

    train_features = [torch.tensor(feature, dtype=torch.float32) for feature in train_df['pixels'].values]
    train_features = torch.stack(train_features)/255
    train_labels = torch.tensor(train_df['emotion'].values, dtype=torch.int64)
    test_features = [torch.tensor(feature, dtype=torch.float32) for feature in test_df['pixels'].values]
    test_features = torch.stack(test_features)/255
    test_labels = torch.tensor(test_df['emotion'].values, dtype=torch.int64)

    # public_test_df = data_df[data_df['Usage'] == 'PublicTest']
    # public_test_features = [torch.tensor(feature, dtype=torch.float32) for feature in public_test_df['pixels'].values]
    # public_test_features = torch.stack(public_test_features)/255
    # public_test_labels = torch.tensor(public_test_df['emotion'].values, dtype=torch.int64)

    # private_test_df = data_df[data_df['Usage'] == 'PrivateTest']
    # private_test_features = [torch.tensor(feature, dtype=torch.float32) for feature in private_test_df['pixels'].values]
    # private_test_features = torch.stack(private_test_features)/255
    # private_test_labels = torch.tensor(private_test_df['emotion'].values, dtype=torch.int64)

    return train_features, train_labels, test_features, test_labels
    # return train_features, train_labels, public_test_features, public_test_labels, private_test_features, private_test_labels

# 使用示例
data_file_path = 'icml_face_data.csv'
# data_file_path = '/content/drive/MyDrive/Colab Notebooks/icml_face_data.csv'
origin_train_features, train_labels, test_features, test_labels = csv_to_tensor(data_file_path)
# train_features, train_labels, public_test_features, public_test_labels, private_test_features, private_test_labels = csv_to_tensor(data_file_path)

In [6]:
def add_random_noise(image, mean=0, std=0.1):
    noise = torch.randn_like(image) * std + mean
    noisy_image = image + noise
    noisy_image = torch.clamp(noisy_image, 0, 1)
    return noisy_image
def enlarge_trainset(train_features, train_labels, transform):
    augmented_train_features = torch.stack([transform(train_features[i]) for i in range(train_features.size(0))])
    # return (
    #     torch.cat((train_features, augmented_train_features), dim = 0),
    #     torch.cat((train_labels, train_labels), dim = 0),
    # )
    return (
        augmented_train_features,
        train_labels
    )
transform = transforms.Compose([
                                transforms.RandomHorizontalFlip(p=0.5),
                                # transforms.ColorJitter(brightness=0.5, contrast=0.5)
                            ])

# transform = transforms.Compose([transforms.RandomErasing(0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3)),
#                                 transforms.RandomHorizontalFlip(p=0.5),
#                             ])
# train_features, train_labels = enlarge_trainset(train_features, train_labels, transform)
# print(train_features.shape)

In [7]:
torch.cuda.is_available()

True

In [8]:
# a=16
# b=32
a=16
b=32
c=120
d=64
# 定义LeNet模型
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, a, kernel_size=5)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(a, b, kernel_size=5)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(b * 9 * 9, c)
        self.relu3 = nn.ReLU()
        self.dropout = nn.Dropout(p=0.3)
        self.fc2 = nn.Linear(c, d)
        self.relu4 = nn.ReLU()
        self.fc3 = nn.Linear(d, 7)

    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = x.view(-1, b * 9 * 9)
        x = self.dropout(x)
        x = self.relu3(self.fc1(x))
        x = self.dropout(x)
        x = self.relu4(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

In [16]:
# a=16
# b=32
a=16
b=32
c=156 #128
d=100
# 定义LeNet模型
class LeNet1(nn.Module):
    def __init__(self):
        super(LeNet1, self).__init__()
        self.conv1 = nn.Conv2d(1, a, kernel_size=3)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(a, b, kernel_size=3)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(b * 10 * 10, c)
        self.relu3 = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5) #0.4
        self.fc2 = nn.Linear(c, d)
        self.relu4 = nn.ReLU()
        self.fc3 = nn.Linear(d, 7)
        # self.fc3 = nn.Linear(d, e)
        # self.relu5 = nn.ReLU()
        # self.fc4 = nn.Linear(e, 7)

    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = x.view(-1, b * 10 * 10)
        x = self.dropout(x)
        x = self.relu3(self.fc1(x))
        x = self.dropout(x)
        x = self.relu4(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        # x = self.relu5(self.fc3(x))
        # x = self.dropout(x)
        # x = self.fc4(x)
        return x

In [10]:
# a=16
# b=32
a=32
b=64
c=256 #128
d=196
# 定义LeNet模型
class LeNet2(nn.Module):
    def __init__(self):
        super(LeNet2, self).__init__()
        self.conv1 = nn.Conv2d(1, a, kernel_size=3)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(a, b, kernel_size=3)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(b * 10 * 10, c)
        self.relu3 = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5) #0.4
        self.fc2 = nn.Linear(c, d)
        self.relu4 = nn.ReLU()
        self.fc3 = nn.Linear(d, 7)
        # self.fc3 = nn.Linear(d, e)
        # self.relu5 = nn.ReLU()
        # self.fc4 = nn.Linear(e, 7)

    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = x.view(-1, b * 10 * 10)
        x = self.dropout(x)
        x = self.relu3(self.fc1(x))
        x = self.dropout(x)
        x = self.relu4(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        # x = self.relu5(self.fc3(x))
        # x = self.dropout(x)
        # x = self.fc4(x)
        return x

In [11]:
def gaussian_weights_init(m):
    classname = m.__class__.__name__
    # 字符串查找find，找不到返回-1，不等-1即字符串中含有该字符
    if classname.find('Conv') != -1:
        m.weight.data.normal_(0.0, 0.04)
class FaceCNN(nn.Module):
    # 初始化网络结构
    def __init__(self):
        super(FaceCNN, self).__init__()

        # 第一次卷积、池化
        self.conv1 = nn.Sequential(
            # 输入通道数in_channels，输出通道数(即卷积核的通道数)out_channels，卷积核大小kernel_size，步长stride，对称填0行列数padding
            # input:(bitch_size, 1, 48, 48), output:(bitch_size, 64, 48, 48), (48-3+2*1)/1+1 = 48
            nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, stride=1, padding=1), # 卷积层
            nn.BatchNorm2d(num_features=64), # 归一化
            nn.RReLU(inplace=True), # 激活函数
            # output(bitch_size, 64, 24, 24)
            nn.MaxPool2d(kernel_size=2, stride=2), # 最大值池化
        )

        # 第二次卷积、池化
        self.conv2 = nn.Sequential(
            # input:(bitch_size, 64, 24, 24), output:(bitch_size, 128, 24, 24), (24-3+2*1)/1+1 = 24
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(num_features=128),
            nn.RReLU(inplace=True),
            # output:(bitch_size, 128, 12 ,12)
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # 第三次卷积、池化
        self.conv3 = nn.Sequential(
            # input:(bitch_size, 128, 12, 12), output:(bitch_size, 256, 12, 12), (12-3+2*1)/1+1 = 12
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(num_features=256),
            nn.RReLU(inplace=True),
            # output:(bitch_size, 256, 6 ,6)
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # 参数初始化
        self.conv1.apply(gaussian_weights_init)
        self.conv2.apply(gaussian_weights_init)
        self.conv3.apply(gaussian_weights_init)

        # 全连接层
        self.fc = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=256*6*6, out_features=4096),
            nn.RReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(in_features=4096, out_features=1024),
            nn.RReLU(inplace=True),
            nn.Linear(in_features=1024, out_features=256),
            nn.RReLU(inplace=True),
            nn.Linear(in_features=256, out_features=7),
            # nn.Dropout(p=0.2),
            # nn.Linear(in_features=256*6*6, out_features=7),
        )

    # 前向传播
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        # 数据扁平化
        x = x.view(x.shape[0], -1)
        y = self.fc(x)
        return y

In [12]:
# from torchstat import stat
# import torchvision.models as models
# model = FaceCNN()
# # stat(model, (1, 48, 48))

# def get_parameter_number(model):
#     total_num = sum(p.numel() for p in model.parameters())
#     trainable_num = sum(p.numel() for p in model.parameters() if p.requires_grad)
#     return {'Total': total_num, 'Trainable': trainable_num}
# get_parameter_number(model)
# tmp_features = test_features[0].cpu()
# model(tmp_features)

In [13]:
class VGGNet(nn.Module):
    def __init__(self):
        super(VGGNet, self).__init__()
        
        # 卷积层部分
        self.features = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # 全连接层部分
        self.classifier = nn.Sequential(
            nn.Linear(128 * 12 * 12, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(512, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(512, 7)  # 输出7个类别
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [14]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [20]:
# 初始化空的训练和测试准确率列表以及epochs列表
train_accuracy_list = []
test_accuracy_list = []
epochs = []
plt.figure(figsize=(3, 6), dpi=100)
def generateModel(model_path, load_history = False):
    model = LeNet1()
    # model = LeNet2()
    # model = VGGNet()
    # model = FaceCNN()
    if load_history == True:
        model.load_state_dict(torch.load(model_path))
    return model
def train(model, train_features, train_labels, num_epochs, learning_rate, batch_size):
    global test_features
    global test_labels
    print(device)
    model.to(device) # 移动模型到cuda

    train_dataset = TensorDataset(origin_train_features, train_labels)
    train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataset = TensorDataset(test_features, test_labels)
    test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    test_accuracy = 0
    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()  # 交叉熵损失
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-3)
    # 训练模型
    for epoch in range(num_epochs):
        # if epoch % 10 == 11: 
        #     train_features, train_labels = enlarge_trainset(origin_train_features, train_labels, transform)
        #     train_dataset = TensorDataset(train_features, train_labels)
        #     train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        for batch_features, batch_labels in train_data_loader:
            batch_features = batch_features.to(device)
            batch_labels = batch_labels.to(device)
            optimizer.zero_grad()
            outputs = model(batch_features)
            loss = criterion(outputs, batch_labels.long())  # 计算损失
            loss.backward()
            optimizer.step()
        if epoch % 10 == 0 or epoch == num_epochs - 1:
            model.eval()
            test_correct_cnt = 0
            train_correct_cnt = 0
            for tmp_features, tmp_labels in test_data_loader:
                with torch.no_grad():
                    tmp_labels = tmp_labels.to(device)
                    tmp_features = tmp_features.to(device)
                    predictions = model(tmp_features)  # 添加通道维度并进行预测
                    predicted_labels = torch.argmax(predictions, dim=1)  # 获取预测标签
                    test_correct_cnt += (predicted_labels == tmp_labels).sum().item()
            test_accuracy = test_correct_cnt / len(test_labels)
            print(f'Accuracy on testset: {test_accuracy * 100:.2f}%')

            for tmp_features, tmp_labels in train_data_loader:
                with torch.no_grad():
                    tmp_labels = tmp_labels.to(device)
                    tmp_features = tmp_features.to(device)
                    predictions = model(tmp_features)  # 添加通道维度并进行预测
                    predicted_labels = torch.argmax(predictions, dim=1)  # 获取预测标签
                    train_correct_cnt += (predicted_labels == tmp_labels).sum().item()
            train_accuracy = train_correct_cnt / len(train_labels)
            print(f'Accuracy on trainset: {train_accuracy * 100:.2f}%')

            model.train()
            
            # 画图
            # train_accuracy_list.append(train_accuracy)
            # test_accuracy_list.append(test_accuracy)
            # epochs.append(epoch + 1)



            
            # plt.subplot(1, 1, 1)
            # try:
            #     train_acc_lines.remove(train_acc_lines[0])  # 移除上一步曲线
            #     val_acc_lines.remove(val_acc_lines[0])
            # except Exception:
            #     pass
            # train_acc_lines = plt.plot(epochs, train_accuracy_list, 'r', lw=1)  # lw为曲线宽度
            # val_acc_lines = plt.plot(epochs, test_accuracy_list, 'b', lw=1)
            # plt.title("acc")
            # plt.xlabel("epoch")
            # plt.ylabel("acc")
            # plt.legend(["train_acc","test_acc"])
            # plt.show()
            # plt.pause(0.1)  # 图片停留0.1s


            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    # model.eval()
    # with torch.no_grad():
    #     predictions = model(test_features)  # 添加通道维度并进行预测
    #     predicted_labels = torch.argmax(predictions, dim=1)  # 获取预测标签
    # test_accuracy = (predicted_labels == test_labels).sum().item() / len(test_labels)
    # print(f'Accuracy on testset: {test_accuracy * 100:.2f}%')

    # with torch.no_grad():
    #     predictions = model(train_features)  # 添加通道维度并进行预测
    #     predicted_labels = torch.argmax(predictions, dim=1)  # 获取预测标签
    # accuracy = (predicted_labels == train_labels).sum().item() / len(train_labels)
    # print(f'Accuracy on trainset: {accuracy * 100:.2f}%')

    print('Training finished!')
    current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    model_name = type(model).__name__
    model_file_name = f"{model_name}_{current_time}_{test_accuracy * 100:.2f}"
    # 保存模型
    torch.save(model.state_dict(), model_file_name)
    last_name = model_file_name
    plt.ioff()
# test_features = test_features.to(device)
# test_labels = test_labels.to(device)
# train_features = train_features.to(device)
# train_labels = train_labels.to(device)



train(generateModel("LeNet1_2023-11-09_12-48-26_59.96",True), origin_train_features, train_labels, 200, 0.001, 512)

cuda:0
Accuracy on testset: 59.11%
Accuracy on trainset: 92.83%
Epoch [1/200], Loss: 0.8111
Accuracy on testset: 59.32%
Accuracy on trainset: 90.81%
Epoch [11/200], Loss: 0.9127
Accuracy on testset: 59.45%
Accuracy on trainset: 91.41%
Epoch [21/200], Loss: 0.7801
Accuracy on testset: 59.61%
Accuracy on trainset: 91.52%
Epoch [31/200], Loss: 1.0243
Accuracy on testset: 59.24%
Accuracy on trainset: 90.68%
Epoch [41/200], Loss: 0.8634
Accuracy on testset: 59.67%
Accuracy on trainset: 91.22%
Epoch [51/200], Loss: 0.7266
Accuracy on testset: 59.43%
Accuracy on trainset: 90.09%
Epoch [61/200], Loss: 0.9181
Accuracy on testset: 59.64%
Accuracy on trainset: 91.08%
Epoch [71/200], Loss: 0.7237
Accuracy on testset: 59.15%
Accuracy on trainset: 89.36%
Epoch [81/200], Loss: 1.0457
Accuracy on testset: 58.87%
Accuracy on trainset: 89.69%
Epoch [91/200], Loss: 0.9472
Accuracy on testset: 58.89%
Accuracy on trainset: 89.48%
Epoch [101/200], Loss: 0.6858
Accuracy on testset: 59.71%
Accuracy on trainse