In [63]:
import numpy as np
import pandas as pd
import os
import librosa
import librosa.display
import IPython
from IPython.display import Audio
from IPython.display import Image
import matplotlib.pyplot as plt
from tqdm import tqdm

EMOTIONS = {
    'angry': 0,
    'fear': 1,
    'happy': 2,
    'neutral': 3,
    'sad': 4,
    'surprise': 5
}  

DATA_PATH = '../data/CASIA'  

data_list = []

for dirname, _, filenames in os.walk(DATA_PATH):
    for filename in filenames:
        file_path = os.path.join(dirname, filename)
        identifiers = filename.split('.')[0].split('-')

        emotion = EMOTIONS.get(identifiers[1])  
        name = identifiers[2]

        # 将提取的信息作为字典添加到data_list列表中
        data_list.append({
            "Emotion": emotion,
            "Person":name,
            "Path": file_path
            
        })

data = pd.DataFrame(data_list)


print("Number of files is {}".format(len(data)))
data.head()




Number of files is 1200


Unnamed: 0,Emotion,Person,Path
0,0,liuchanhg,../data/CASIA\angry\201-angry-liuchanhg.wav
1,0,wangzhe,../data/CASIA\angry\201-angry-wangzhe.wav
2,0,zhaoquanyin,../data/CASIA\angry\201-angry-zhaoquanyin.wav
3,0,ZhaoZuoxiang,../data/CASIA\angry\201-angry-ZhaoZuoxiang.wav
4,0,liuchanhg,../data/CASIA\angry\202-angry-liuchanhg.wav


In [64]:
SAMPLE_RATE = 48000
mel_spectrograms = []
signals = []
for i, file_path in enumerate(data.Path):
    audio, sample_rate = librosa.load(file_path, duration=3, offset=0, sr=SAMPLE_RATE)
    signal = np.zeros((int(SAMPLE_RATE*3,)))
    signal[:len(audio)] = audio
    signals.append(signal)
signals = np.stack(signals,axis=0)
print(signals.shape)

(1200, 144000)


In [65]:
import numpy as np
import os
import pandas as pd


data_splits = {
    'train': {'X': [], 'Y': []},
    'val': {'X': [], 'Y': []},
}

val_ind = np.random.permutation(data[data['Person'] == 'zhaoquanyin'].index)
train_ind = np.random.permutation(data[data['Person'] != 'zhaoquanyin'].index)

data_splits['train']['X'].extend([signals[i] for i in train_ind])
data_splits['train']['Y'].extend([data.loc[i, 'Emotion'] for i in train_ind])
data_splits['val']['X'].extend([signals[i] for i in val_ind])
data_splits['val']['Y'].extend([data.loc[i, 'Emotion'] for i in val_ind])

data_splits['train']['X'] = np.array(data_splits['train']['X'])
data_splits['train']['Y'] = np.array(data_splits['train']['Y'])
data_splits['val']['X'] = np.array(data_splits['val']['X'])
data_splits['val']['Y'] = np.array(data_splits['val']['Y'])

print(f'X_train: {data_splits["train"]["X"].shape}, Y_train: {data_splits["train"]["Y"].shape}')
print(f'X_val: {data_splits["val"]["X"].shape}, Y_val: {data_splits["val"]["Y"].shape}')

X_train: (900, 144000), Y_train: (900,)
X_val: (300, 144000), Y_val: (300,)


In [54]:
#用于向信号中添加加性高斯白噪声（AWGN）,模拟现实中的噪声环境，从而增加模型的鲁棒性和泛化能力
def addAWGN(signal, num_bits=16, augmented_num=2, snr_low=15, snr_high=30): 
    signal_len = len(signal)
    # 生成高斯白噪声
    noise = np.random.normal(size=(augmented_num, signal_len))
    # 对信号和噪声进行归一化
    norm_constant = 2.0 ** (num_bits - 1)
    signal_norm = signal / norm_constant
    noise_norm = noise / norm_constant
    # 计算信号和噪声的功率
    s_power = np.sum(signal_norm ** 2) / signal_len
    n_power = np.sum(noise_norm ** 2, axis=1) / signal_len
    # 随机选择信噪比：范围在15到30之间的均匀分布
    target_snr = np.random.randint(snr_low, snr_high)
    # 计算每个噪声的协方差矩阵K
    K = np.sqrt((s_power / n_power) * 10 ** (-target_snr / 10))
    K = np.ones((signal_len, augmented_num)) * K  
    # 生成带有噪声的信号
    return signal + K.T * noise

In [55]:
# 初始化列表以存储增强数据
aug_signals = []
aug_labels = []

print('增强数据...')
# 遍历 X_train 中的每个信号
for i in tqdm(range(data_splits["train"]["X"].shape[0])):
    signal = data_splits["train"]["X"][i, :]
    augmented_signals = addAWGN(signal)

    # 遍历每个增强信号
    for j in range(augmented_signals.shape[0]):
        aug_labels.append(data.loc[i, "Emotion"])  # 添加情绪标签
        aug_signals.append(augmented_signals[j, :])  # 添加增强信号
        
        # 使用 pd.concat() 将原始行的副本附加到列表中
        data_row = data.iloc[i].copy()
        data_row.name = len(data)  # 确保每个附加行具有唯一索引
        data = pd.concat([data, data_row.to_frame().T], ignore_index=True)

# 将列表转换为 numpy 数组
aug_signals = np.stack(aug_signals, axis=0)

# 将原始 X_train 与增强信号连接起来
data_splits["train"]["X"] = np.concatenate([data_splits["train"]["X"], aug_signals], axis=0)

# 将 aug_labels 堆叠为 numpy 数组，并与原始 Y_train 连接起来
aug_labels = np.array(aug_labels)
data_splits["train"]["Y"]= np.concatenate([data_splits["train"]["Y"], aug_labels])

# 打印形状以进行验证
print(f'X_train: {data_splits["train"]["X"].shape}, Y_train: {data_splits["train"]["Y"].shape}')


增强数据...


100%|██████████| 900/900 [00:01<00:00, 762.37it/s]

X_train: (2700, 308), Y_train: (2700,)





In [66]:
def getMELspectrogram(audio, sample_rate):
    mel_spec = librosa.feature.melspectrogram(y=audio,
                                              sr=sample_rate
                                             )
    mel_spec = np.mean(mel_spec.T, axis=0)
    return mel_spec

def getMFCC(audio, sample_rate):
    mfcc = librosa.feature.mfcc(y=audio, 
                                    sr=sample_rate, 
                                    n_mfcc=50)
    mfccsstd = np.std(mfcc.T, axis=0)
    mfccmax = np.max(mfcc.T, axis=0)
    mfcc = np.mean(mfcc.T, axis=0)
    return mfcc,mfccsstd,mfccmax

def get_chroma(audio, sample_rate):
    stft = np.abs(librosa.stft(audio))
    chroma=librosa.feature.chroma_stft(S=stft, sr=sample_rate)
    chroma = np.mean(chroma.T, axis=0)
    return chroma

def get_contrast(audio, sample_rate):
    stft = np.abs(librosa.stft(audio))
    contrast = librosa.feature.spectral_contrast(S=stft, sr=sample_rate)
    contrast = np.mean(contrast.T, axis=0)
    return contrast

def co_ext_features(audio, sample_rate):
    stft = np.abs(librosa.stft(audio))

    flatness = librosa.feature.spectral_flatness(y=audio)
    flatness = np.mean(flatness)

    #过零率
    zero=librosa.feature.zero_crossing_rate(audio)
    zero = np.mean(zero)

    S, _ = librosa.magphase(stft)
    meanMagnitude = np.mean(S)
    stdMagnitude = np.std(S)
    maxMagnitude = np.max(S)

    # 频谱质心
    cent = librosa.feature.spectral_centroid(y=audio, sr=sample_rate)
    meancent = np.mean(cent / np.sum(cent))
    stdcent = np.std(cent / np.sum(cent))
    maxcent = np.max(cent / np.sum(cent))

    # 均方根能量
    rmse = librosa.feature.rms(S=S)[0]
    meanrms = np.mean(rmse)
    stdrms = np.std(rmse)
    maxrms = np.max(rmse)

    #组合音频信息
    feature=np.array([flatness,zero,meanMagnitude,stdMagnitude,maxMagnitude,meancent,stdcent,maxcent,meanrms,stdrms,maxrms])

    return feature

def co_multi_feature(audio, sample_rate):
    mel_spectrogram = getMELspectrogram(audio, sample_rate=SAMPLE_RATE)
    mfcc,mfccsstd,mfccmax=getMFCC(audio, sample_rate)
    chroma=get_chroma(audio, sample_rate)
    contrast=get_contrast(audio, sample_rate)
    extract_feature=co_ext_features(audio, sample_rate)
    return np.concatenate((mel_spectrogram,mfcc,mfccsstd,mfccmax,chroma,contrast,extract_feature))


train_features =[]
print("Calculatin mel spectrograms for train set")
for i in tqdm(range(data_splits["train"]["X"].shape[0])):
    audio=data_splits["train"]["X"][i,:]
    train_feature=co_multi_feature(audio, sample_rate)
    train_features.append(train_feature)
    
train_features = np.stack(train_features,axis=0)
print(train_features.shape)
data_splits["train"]["X"] = train_features

val_features = []
print("Calculatin mel spectrograms for val set")
for i in tqdm(range(data_splits["val"]["X"] .shape[0])):
    audio=data_splits["val"]["X"][i,:]
    val_feature=co_multi_feature(audio, sample_rate)
    val_features.append(val_feature)
val_features = np.stack(val_features,axis=0)
data_splits["val"]["X"]= val_features

print(f'X_train:{data_splits["train"]["X"].shape}, Y_train:{data_splits["train"]["Y"].shape}')
print(f'X_val:{data_splits["val"]["X"].shape}, Y_val:{data_splits["val"]["Y"].shape}')

Calculatin mel spectrograms for train set


100%|██████████| 900/900 [01:37<00:00,  9.21it/s]


(900, 308)
Calculatin mel spectrograms for val set


100%|██████████| 300/300 [00:30<00:00,  9.84it/s]

X_train:(900, 308), Y_train:(900,)
X_val:(300, 308), Y_val:(300,)





In [67]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()

data_splits["train"]["X"]= scaler.fit_transform(data_splits["train"]["X"])

data_splits["val"]["X"] = scaler.transform(data_splits["val"]["X"])

In [68]:
# 初始化分类器
from sklearn.svm import SVC

# 初始化SVM分类器，这里使用线性核
model = SVC(kernel='linear', random_state=42)
# 训练模型
model.fit(data_splits["train"]["X"], data_splits["train"]["Y"])

In [69]:
# 预测验证集
Y_val_pred = model.predict(data_splits["val"]["X"])
from sklearn.metrics import classification_report, accuracy_score

# 打印分类报告
print(classification_report(data_splits["val"]["Y"], Y_val_pred))

# 打印准确率
print("Accuracy:", accuracy_score(data_splits["val"]["Y"], Y_val_pred))

              precision    recall  f1-score   support

           0       0.50      0.06      0.11        50
           1       0.50      0.14      0.22        50
           2       0.28      0.96      0.43        50
           3       0.44      0.64      0.52        50
           4       0.72      0.26      0.38        50
           5       0.19      0.06      0.09        50

    accuracy                           0.35       300
   macro avg       0.44      0.35      0.29       300
weighted avg       0.44      0.35      0.29       300

Accuracy: 0.35333333333333333


In [70]:
from sklearn.model_selection import GridSearchCV

# 定义参数网格
param_grid = {
    'C': [0.1, 1, 10],
    'gamma': ['scale', 'auto'],
    'kernel': ['linear', 'rbf', 'poly'],
    # 添加其他超参数
}

# 初始化网格搜索
grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=5, scoring='accuracy')

# 拟合网格搜索
grid_search.fit(data_splits["train"]["X"], data_splits["train"]["Y"])

# 获得最佳模型
best_model = grid_search.best_estimator_

## 训练集上的表现

In [71]:
# 使用最佳模型进行预测
Y_val_pred_best = best_model.predict(data_splits["val"]["X"])

from sklearn.metrics import classification_report, accuracy_score

# 打印分类报告
print(classification_report(data_splits["val"]["Y"], Y_val_pred_best))

# 打印准确率
print("Accuracy:", accuracy_score(data_splits["val"]["Y"], Y_val_pred_best))

              precision    recall  f1-score   support

           0       0.60      0.12      0.20        50
           1       0.78      0.14      0.24        50
           2       0.26      0.90      0.41        50
           3       0.50      0.70      0.58        50
           4       0.75      0.48      0.59        50
           5       0.12      0.02      0.03        50

    accuracy                           0.39       300
   macro avg       0.50      0.39      0.34       300
weighted avg       0.50      0.39      0.34       300

Accuracy: 0.3933333333333333


## 保存模型

In [None]:
SAVE_PATH = os.path.join(os.getcwd(),'models')
os.makedirs('models',exist_ok=True)
torch.save(model.state_dict(),os.path.join(SAVE_PATH,'cnn_lstm_parallel_model.pt'))
print('Model is saved to {}'.format(os.path.join(SAVE_PATH,'cnn_lstm_parallel_model.pt')))

## 加载模型

In [None]:
LOAD_PATH = os.path.join(os.getcwd(),'models')
model = ParallelModel(len(EMOTIONS))
model.load_state_dict(torch.load(os.path.join(LOAD_PATH,'cnn_lstm_parallel_model.pt')))
print('Model is loaded from {}'.format(os.path.join(LOAD_PATH,'cnn_lstm_parallel_model.pt')))