### 使用BertTokenizer进行文本向量化
### 使用opensmile提取音频特征

In [None]:
import opensmile
import pandas as pd
import os
import sklearn
import matplotlib.pyplot as plt
import numpy as np
import torch
from tqdm.notebook import tqdm
from transformers import BertTokenizer
from torch.utils.data import TensorDataset

## 1、构造特征提取函数

In [None]:
## 文本向量化函数
## text_list 文本内容的list
## 返回一个字典，{'input_ids':value, 'token_type_ids':value, 'attention_mask':value},每个元素的长度等于len(text_list)
## "input_ids"-词转换为数字后的序列 'token_type_ids'-标记一段文本中不同句子的序号 'attention_mask'-标记填充位置的序号 
## reference: https://huggingface.co/docs/transformers/main/en/glossary
def text_tokenize(text_list): 
    tokenizer = BertTokenizer.from_pretrained('./bert-base-uncased',do_lower_case=True)
    encoded_text = tokenizer.batch_encode_plus(
        text_list,
        add_special_tokens=True,
        return_attention_mask=True,
        pad_to_max_length=True,
        max_length=256,
        return_tensors='pt'
    )
    return encoded_text
## 'input_ids' 'token_type_ids' 'attention_mask'

In [None]:
## 文本向量化示例
temp_txt_list = ["Hello,my name is Jerry.","Hello,my name is Tom."]
temp_coded_txt = text_tokenize(temp_txt_list)
print(type(temp_coded_txt),len(temp_coded_txt),len(temp_coded_txt["input_ids"]))
len(temp_coded_txt["token_type_ids"][0])

In [None]:
## 音频特征提取函数
## file_list:音频文件路径的列表  list类型
## 返回值numpy.ndarray  形状:(len(file_list),88)
def extract_audio_feature(file_list):
    print("请耐心等待特征提取完！")
    smile = opensmile.Smile(
    feature_set=opensmile.FeatureSet.eGeMAPSv02,
    feature_level=opensmile.FeatureLevel.Functionals)
    feature = []
    for n,file in enumerate(file_list):
        y = smile.process_file(file)
        y = y.to_numpy().reshape(-1)
        feature.append(y)
        if (n+1)%100 == 0:
            print(f"当前进度{n+1}/{len(file_list)}")
    print("此次特征提取已结束")
    print("-------------------------------")
    feature = np.stack(feature,axis = 0)
    return feature

In [None]:
## 音频特征提取示例
file = "./train/Ses01F_impro01_F000.wav"
audio_feature = extract_audio_feature([file])
print(type(audio_feature),audio_feature.shape)

## 2、读入CSV文件示例

In [None]:
## 读入csv文件示例
## 由于我们的csv文件使用"#"分隔，需要定义sep参数为"#",否则会读取失败！！！
train_csv = pd.read_csv("./CSVfile/train.csv",sep="#")

In [None]:
train_csv.text.values[0]

In [None]:
## 快速查看前5条数据
train_csv.head(5)

In [None]:
## 统计该csv下lable列不同值的数量
train_csv.value_counts(subset="label")

In [None]:
## 画图显示类别数
plt.bar([0,1,2,3],list(train_csv.value_counts(subset="label")),tick_label = ["0","1","2","3"])
plt.xlabel("Label")
plt.ylabel("Num")
plt.title("Train dataset sample distribution")

## 3、读取CSV文件、分离文件路径、文本内容、标签

In [None]:
## 读取train.csv、dev.csv
train_csv = pd.read_csv("./CSVfile/train.csv", sep = "#")
dev_csv = pd.read_csv("./CSVfile/dev.csv", sep = "#")
## 分离文件路径、文本内容和标签
## 训练时间较长，建议可以先截取部分样本进行代码正确性验证，再使用全部样本
train_path = list(train_csv.path)[:1500]
train_label = list(train_csv.label)[:1500]
train_txt = list(train_csv.text)[:1500]
dev_path = list(dev_csv.path)[:500]
dev_label = list(dev_csv.label)[:500]
dev_txt = list(dev_csv.text)[:500]

# train_path = list(train_csv.path)
# train_label = list(train_csv.label)
# train_txt = list(train_csv.text)
# dev_path = list(dev_csv.path)
# dev_label = list(dev_csv.label)
# dev_txt = list(dev_csv.text)

## 4、创建Data loaders

In [None]:
##  'input_ids' 'token_type_ids' 'attention_mask'
train_coded_txt = text_tokenize(train_txt)
dev_coded_txt = text_tokenize(dev_txt)
train_dataset = TensorDataset(train_coded_txt["input_ids"], 
                              train_coded_txt["attention_mask"],
                              torch.tensor(train_label))
dev_dataset = TensorDataset(dev_coded_txt["input_ids"], 
                            dev_coded_txt["attention_mask"],
                            torch.tensor(dev_label))
print(len(train_dataset),len(dev_dataset))

In [None]:
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler

In [None]:
## 这里的batch_size 可以从1、2、4、8、16...尝试，过大的batch_size会使训练过程因为显存不足失败
batch_size = 4
dataloader_train = DataLoader(
    train_dataset,
    sampler=RandomSampler(train_dataset),
    batch_size=batch_size
)
dataloader_dev = DataLoader(
    dev_dataset,
    sampler=RandomSampler(dev_dataset),
    batch_size=32
)

## 5、定义性能指标

In [None]:
import numpy as np
from sklearn.metrics import f1_score

In [None]:
def f1_score_func(preds, labels):
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return f1_score(labels_flat, preds_flat, average='macro')

In [None]:
def accuracy_per_class(preds, labels):
    label_dict_inverse = {0:"angry",1:"happy or excited",2:"neutral",3:"sad"}
    # print(preds)
    preds_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    for label in np.unique(labels_flat):
        y_preds = preds_flat[labels_flat==label]
        y_true = labels_flat[labels_flat==label]
        print(f'Class: {label_dict_inverse[label]}')
        print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}\n')

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix

In [None]:
def calculate_score_classification(preds, labels, average_f1='macro'):  # weighted, macro
    preds = np.argmax(preds, axis=1).flatten()
    labels = labels.flatten()
    accuracy = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average=average_f1, zero_division=0)
    precision = precision_score(labels, preds, average='macro', zero_division=0)
    ua = recall_score(labels, preds, average='macro', zero_division=0)
    confuse_matrix = confusion_matrix(labels, preds)
    return accuracy, ua, f1, precision, confuse_matrix

## 6、设置模型

In [None]:
from transformers import AdamW, get_linear_schedule_with_warmup

In [None]:
## 创建深度学习模型类
class MyDLmodel():
    def __init__(self,model,device):
        self.model = model
        self.model.to(device)
        self.optimizer = AdamW(self.model.parameters(),
                               lr=1e-6,
                               eps=1e-8)
        self.scheduler = None
        self.device = device
    def evaluate(self,dataloader_val):
        pass
    def train(self,dataloader_train,dataloader_dev,epochs):
        pass
    def predict(self,dataloader_test):
        pass


In [None]:
import random
## 设置随机种子
def set_seeds(seed_val):
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)

In [None]:
## test_preds 长度为1241的list，对应测试集中1241个样本的标签
##运行后会在当前目录生成result.csv文件，提交result.csv文件即可
##如果没有生成，请检查test_preds的长度是否为1241！
def write_result(test_preds):
    if len(test_preds) != 1241:
        print("错误！请检查test_preds长度是否为1241！！！")
        return -1
    test_csv = pd.read_csv("./CSVfile/test.csv",sep="#")
    test_csv["label"] = test_preds
    test_csv.to_csv("./result.csv",sep = "#")
    print("测试集预测结果已成功写入到文件中！")

In [None]:
from transformers import BertForSequenceClassification

In [None]:
## 特征处理函数，可以对提取的特征进行处理，以获得更好的特征表示
def feature_process(feature):
    return feature
## model reference： https://huggingface.co/docs/transformers/main/en/model_doc/bert#transformers.BertForSequenceClassification
if __name__  == "__main__":
    set_seeds(17)
    pretrained_model = None
    ## 这里的模型请自行寻找合适的多模态处理模型，以及考虑如何将多个模态的特征进行融合
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    mymodel = MyDLmodel(pretrained_model,device)
    epochs = 10
    mymodel.train(dataloader_train,dataloader_dev,epochs)
    ## 预测测试集标签
    test_csv = pd.read_csv("./CSVfile/test.csv",sep = "#")
    test_text = list(test_csv.text)
    test_coded_txt = text_tokenize(test_text)
    test_dataset = TensorDataset(
        test_coded_txt["input_ids"], 
        test_coded_txt["attention_mask"])
    dataloader_test = DataLoader(
        test_dataset,
        sampler=RandomSampler(test_dataset),
        batch_size=32)
    test_preds = mymodel.predict(dataloader_test)
    ## 写入预测结果
    write_result(test_preds)