In [1]:

import numpy as np 
import pandas as pd 
from PIL import Image
from tqdm import tqdm
import os
import shutil
import argparse
import numpy  as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision import transforms
import glob
import csv

# Config

In [30]:
epochs = 20  # 时间：3轮 一分钟
lr = 0.005
batchsize = 128

# Data preprocessing

In [3]:
# shutil.rmtree('/kaggle/working/')

In [5]:
if not os.path.exists('data'):

    os.makedirs('data')

In [6]:
class Generate_data():
    def __init__(self, datapath,savepath):  # datapath：原始csv文件，savepath：划分成训练集和验证集的csv文件
        """
        Generate_data class
        Two methods to be used
        1-split_test
        2-save_images
        [Note] that you have to split the public and private from fer2013 file
        """
        self.data_path = datapath   
        self.save_path = savepath
        
    def split_test(self, val_filename= 'val'):   # 输入train.csv文件，输出两个csv文件，分别是train.csv和val.csv，也即是将原始训练集划分为训练集和验证集
        """
        Helper function to split the validation and train data from general train file.
            params:-
                data_path = path to the folder that contains the train data file
        """
        train_csv_path = self.data_path +"/"+ 'train.csv'
        train = pd.read_csv(train_csv_path)
        # This is data split, u can use for creating test_data.
        
        validation_data = pd.DataFrame(train.iloc[:3589,:])  # 划分前3590个数据为验证集   经过DataFrame后，生成的csv文件会多第一列，为索引
        train_data = pd.DataFrame(train.iloc[3589:,:])       # 划分后2w多个数据为训练集
        train_data.to_csv(self.save_path+"/train.csv")       # 导出为CSV文件
        validation_data.to_csv(self.save_path+"/"+val_filename+".csv")  # 导出为CSV文件
        print("Done splitting the train file into validation & train")

    def str_to_image(self, str_img = ' '):  # 将数据文件中，每一行字符串所代表的一张图片，转换为一张48*48的Image图片
        '''
        Convert string pixels from the csv file into image object
            params:- take an image string
            return :- return PIL image object
        '''
        imgarray_str = str_img.split(' ')  # 从图片字符串split，形成列表
        imgarray = np.asarray(imgarray_str,dtype=np.uint8).reshape(48,48) # 将输入（字符串列表） 转换为 数组
        return Image.fromarray(imgarray)  # 将数组 转换为 图像Image类

    def save_images(self, datatype='train'):
        '''
        save_images is a function responsible for saving images from data files e.g(train, test) in a desired folder
            params:-
            datatype= str e.g (train, val, test)
        '''
        foldername= self.save_path + "/" + datatype
        if datatype == 'test':  # 测试集是提交kaggle比赛需要的，本地跑不需要，但这个代码不删除也可以，不影响
            csvfile_path= self.data_path+"/"+datatype+'.csv'  # 测试集放在data_path下，和原始未划分的训练集放在一个文件夹下
        else:
            csvfile_path= self.save_path+'/'+datatype+'.csv'
        if not os.path.exists(foldername):
            os.mkdir(foldername)

        data = pd.read_csv(csvfile_path)
        images = data['pixels'] # dataframe to series pandas
        numberofimages = images.shape[0]  # [5,]  所以是shape[0]
        if datatype=='train':
            print("训练集数据量为：", numberofimages, "条")
        for index in tqdm(range(numberofimages)):  # tqdm：设置打印进度条显示
            img = self.str_to_image(images[index]) # 将字符串转换为Image文件
            img.save(os.path.join(foldername,'{}{}.jpg'.format(datatype,index)),'JPEG')  # 以JPEG格式 保存图片 但实际保存为了jpg格式，不知道为什么，但无伤大雅
        print('Done saving {} data'.format((foldername)))

In [7]:
data_path='challenges-in-representation-learning-facial-expression-recognition-challenge'
save_path='data'

generate_dataset = Generate_data(data_path, save_path)
generate_dataset.split_test()
generate_dataset.save_images('train')
generate_dataset.save_images('test')
generate_dataset.save_images('val')

Done splitting the train file into validation & train
25120


100%|██████████| 25120/25120 [00:12<00:00, 1973.92it/s]


Done saving data/train data


100%|██████████| 7178/7178 [00:03<00:00, 1867.99it/s]


Done saving data/test data


100%|██████████| 3589/3589 [00:01<00:00, 1820.44it/s]

Done saving data/val data





# Data loader

In [12]:
class Plain_Dataset(Dataset):
    def __init__(self,img_dir,datatype,transform,csv_file='None'):
        '''
        Pytorch Dataset class
        params:-
                 csv_file : the path of the csv file    (train, validation, test)
                 img_dir  : the directory of the images (train, validation, test)
                 datatype : string for searching along the image_dir (train, val, test)
                 transform: pytorch transformation over the data
        return :-
                 image, labels
        '''
        
        
        self.img_dir = img_dir
        self.transform = transform
        self.datatype = datatype
        if self.datatype != 'test':
            self.csv_file = pd.read_csv(csv_file)
            self.labels = self.csv_file['emotion']  # pandas中的Series可以理解为一种特殊的列表，索引可以不是整数，可以是字符，有点像字典

    def __len__(self):
        return len(os.listdir(self.img_dir))

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist() # 将张量转换为列表
        img = Image.open(self.img_dir + '/' + self.datatype + str(idx) + '.jpg')
        if self.transform :    # 图像预处理
            img = self.transform(img)
        
        if self.datatype != 'test':
            labels = np.array(self.labels[idx])   # 需要将字符串转换为数组 asarray是浅拷贝，而array属于深拷贝，会另外开辟内存空间
            labels = torch.from_numpy(labels).long()
            return(img, labels)
        
        return img
        
        

        

In [17]:
traincsv_file='data/train.csv'
validationcsv_file='data/val.csv'

train_img_dir='data/train'
validation_img_dir='data/val'
test_img_dir='data/test'

# print(os.path.abspath('.'))

In [18]:
transformation= transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,),(0.5,))])

train_dataset= Plain_Dataset(csv_file=traincsv_file, img_dir = train_img_dir, datatype = 'train', transform = transformation)
validation_dataset= Plain_Dataset(csv_file=validationcsv_file, img_dir = validation_img_dir, datatype = 'val', transform = transformation)
test_dataset= Plain_Dataset( img_dir = test_img_dir, datatype = 'test', transform = transformation)

train_loader = DataLoader(train_dataset, batch_size=batchsize, shuffle = True, num_workers=0)
val_loader   = DataLoader(validation_dataset, batch_size=batchsize, shuffle = True, num_workers=0)
test_loader  = DataLoader(test_dataset, batch_size=batchsize, shuffle = True, num_workers=0)

In [19]:
len(train_loader)

197

# Model

In [21]:
class Deep_Emotion(nn.Module):
    def __init__(self):
        '''
        Deep_Emotion class contains the network architecture.
        '''
        super(Deep_Emotion,self).__init__()
        self.conv1 = nn.Conv2d(1,10,3)
        self.conv2 = nn.Conv2d(10,10,3)
        self.pool2 = nn.MaxPool2d(2,2)

        self.conv3 = nn.Conv2d(10,10,3)
        self.conv4 = nn.Conv2d(10,10,3)
        self.pool4 = nn.MaxPool2d(2,2)

        self.norm = nn.BatchNorm2d(10)

        self.fc1 = nn.Linear(810,50)
        self.fc2 = nn.Linear(50,7)  # 七分类问题

        self.localization = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=7),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            nn.Conv2d(8, 10, kernel_size=5),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )

        self.fc_loc = nn.Sequential(
            nn.Linear(640, 32),
            nn.ReLU(True),
            nn.Linear(32, 3 * 2)
        )
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def stn(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, 640)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)

        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)
        return x

    def forward(self,input):
        out = self.stn(input)

        out = F.relu(self.conv1(out))
        out = self.conv2(out)
        out = F.relu(self.pool2(out))

        out = F.relu(self.conv3(out))
        out = self.norm(self.conv4(out))
        out = F.relu(self.pool4(out))

        out = F.dropout(out)
        out = out.view(-1, 810)
        out = F.relu(self.fc1(out))
        out = self.fc2(out)

        return out

# Train

In [31]:
def Train(net, epochs,train_loader,val_loader,criterion,optmizer,device):
    '''
    Training Loop
    '''
    print("===================================Start Training===================================")
    for e in range(epochs):
        train_loss = 0
        validation_loss = 0

        train_correct = 0
        val_correct = 0

        # Train the model  #
        net.train()
        for data, labels in train_loader:
            data, labels = data.to(device), labels.to(device) # 对于选择cpu还是gpu作为运算器，这行代码必要
            optmizer.zero_grad()
            outputs = net(data)
            loss = criterion(outputs,labels)
            loss.backward()
            optmizer.step()
            train_loss += loss.item()
            _, preds = torch.max(outputs,1)  # 前者为最大值，后者为最大值的索引，即preds表示七分类中具体属于哪个类别
            train_correct += torch.sum(preds == labels.data)

        #validate the model#
        net.eval()
        for data,labels in val_loader:
            data, labels = data.to(device), labels.to(device)
            val_outputs = net(data)
            val_loss = criterion(val_outputs, labels)
            validation_loss += val_loss.item()
            _, val_preds = torch.max(val_outputs,1)
            val_correct += torch.sum(val_preds == labels.data)


        train_loss = train_loss/len(train_dataset)
        train_acc = train_correct.double() / len(train_dataset)

        validation_loss =  validation_loss / len(validation_dataset)
        val_acc = val_correct.double() / len(validation_dataset)
        print('Epoch: {} \tTraining Loss: {:.8f} \tValidation Loss {:.8f} \tTraining Acuuarcy {:.3f}% \tValidation Acuuarcy {:.3f}%'
                                                           .format(e+1, train_loss,validation_loss,train_acc * 100, val_acc*100))

    torch.save(net.state_dict(),'deep_emotion-{}-{}-{}.pt'.format(epochs,batchsize,lr))
    print("===================================Training Finished===================================")


# Start training

In [28]:
#set device to cuda
device= 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [25]:
# Model
net = Deep_Emotion()
net.to(device)

Deep_Emotion(
  (conv1): Conv2d(1, 10, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1))
  (conv4): Conv2d(10, 10, kernel_size=(3, 3), stride=(1, 1))
  (pool4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (norm): BatchNorm2d(10, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=810, out_features=50, bias=True)
  (fc2): Linear(in_features=50, out_features=7, bias=True)
  (localization): Sequential(
    (0): Conv2d(1, 8, kernel_size=(7, 7), stride=(1, 1))
    (1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (2): ReLU(inplace=True)
    (3): Conv2d(8, 10, kernel_size=(5, 5), stride=(1, 1))
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): ReLU(inplac

In [None]:
criterion= nn.CrossEntropyLoss()
optmizer= optim.Adam(net.parameters(),lr= lr)

In [32]:
Train(net, epochs, train_loader, val_loader, criterion, optmizer, device)

Epoch: 1 	Training Loss: 0.01107255 	Validation Loss 0.01149050 	Training Acuuarcy 45.243% 	Validation Acuuarcy 44.609%
Epoch: 2 	Training Loss: 0.01091114 	Validation Loss 0.01146732 	Training Acuuarcy 46.334% 	Validation Acuuarcy 45.277%
Epoch: 3 	Training Loss: 0.01076095 	Validation Loss 0.01155597 	Training Acuuarcy 46.919% 	Validation Acuuarcy 43.550%
Epoch: 4 	Training Loss: 0.01070111 	Validation Loss 0.01130113 	Training Acuuarcy 47.468% 	Validation Acuuarcy 45.472%
Epoch: 5 	Training Loss: 0.01060776 	Validation Loss 0.01129115 	Training Acuuarcy 47.488% 	Validation Acuuarcy 45.695%
Epoch: 6 	Training Loss: 0.01052435 	Validation Loss 0.01104624 	Training Acuuarcy 48.193% 	Validation Acuuarcy 46.336%
Epoch: 7 	Training Loss: 0.01049874 	Validation Loss 0.01114440 	Training Acuuarcy 48.674% 	Validation Acuuarcy 45.639%
Epoch: 8 	Training Loss: 0.01040370 	Validation Loss 0.01104635 	Training Acuuarcy 48.794% 	Validation Acuuarcy 46.197%
Epoch: 9 	Training Loss: 0.01038565 	Val

# Predict

In [36]:
result=[]
net.eval()
for data in test_loader:
    data = data.to(device)
    outputs = net(data)
   
    _, preds = torch.max(outputs,1)
    preds= preds.cpu().numpy() # 如果要转换为numpy，则需要将位于GPU的数据传入到主存中   如果不加，
    # 会报“can't convert cuda:0 device type tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first”错误
    result.extend(preds)
    #print(f"val_pred:{val_preds}")

TypeError: can't convert cuda:0 device type tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.

In [34]:
csv_file='submission.csv'

In [35]:
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    for item in result:
        writer.writerow([item])