# 使用原始语音信号和卷积神经网络进行语音分类

来自论文：VERY DEEP CONVOLUTIONAL NEURAL NETWORKS FOR RAW WAVEFORMS

### 加载数据集

导入必要的库

In [83]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import librosa
import numpy as np
import pandas as pd
import os
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

数据集类，包括预处理，论文中没有使用数据增强
预处理包括两个步骤：
1. 降采样：为提高计算速度，将音频波形下采样到 8kHz。这一步骤减少了数据量，使得后续处理更加高效。
2. 标准化：对音频数据进行标准化处理，使其均值为 0，方差为 1。标准化有助于模型更快地收敛，提高训练效果。

In [84]:
def preprocess_audio(file_path):
    # 加载音频文件，将采样率设置为8kHz
    audio, sr = librosa.load(file_path, sr=8000)
    # 标准化音频数据
    audio = librosa.util.normalize(audio)
    return audio.reshape(1,-1)

In [85]:
class SpeechDataset(Dataset):
    def __init__(self, data, audio_dir):
        """
        初始化数据集类
        :param data_csv_path: 包含音频文件信息和标签的CSV文件路径
        :param audio_dir: 音频文件所在的文件夹路径
        """
        self.data = data
        self.audio_dir = audio_dir

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        audio_filename = row['filename']
        label = row['target']
        audio_file_path = os.path.join(self.audio_dir, audio_filename)
        audio = preprocess_audio(audio_file_path)
        return torch.from_numpy(audio).float(), torch.tensor(label).long()

划分数据集

In [86]:
data_csv_path = "/home/nlp/songcw/data/ESC-50-master/meta/esc50.csv"
audio_dir = "/home/nlp/songcw/data/ESC-50-master/audio/"

# 读取数据集信息
data = pd.read_csv(data_csv_path)
audio_filenames = data['filename'].tolist()
labels = data['target'].tolist()

print(np.unique(labels))

audio_files = ["/home/nlp/songcw/data/ESC-50-master/audio/" + filename for filename in audio_filenames]

# 划分训练集和验证集
train_filenames, val_filenames, train_labels, val_labels = train_test_split(audio_filenames, labels,
                                                                              test_size=0.2, random_state=42)

train_data = pd.DataFrame({'filename': train_filenames, 'target': train_labels})
val_data = pd.DataFrame({'filename': val_filenames, 'target': val_labels})

print('train_data',train_data)
print('val_data',val_data)

train_dataset = SpeechDataset(train_data, audio_dir)
val_dataset = SpeechDataset(val_data, audio_dir)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

for batch_idx, (data, target) in enumerate(train_loader):
    print("Batch Index:", batch_idx)
    print("Data Shape:", data.shape)
    print("Target Shape:", target.shape)
    # 假设数据是图像或音频等多维数据，可以查看第一个样本的内容
    print("First Data Sample:", data[0])
    print("First Target Sample:", target[0])
    # 只查看第一个批次就退出循环
    if batch_idx == 0:
        break

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
 48 49]
train_data                filename  target
0     3-145387-A-29.wav      29
1      1-50455-A-44.wav      44
2     3-103599-A-25.wav      25
3       2-69131-B-5.wav       5
4      2-103424-A-3.wav       3
...                 ...     ...
1595  3-188726-A-35.wav      35
1596  4-159609-A-14.wav      14
1597  3-118487-A-26.wav      26
1598  4-188191-C-29.wav      29
1599   3-187549-A-6.wav       6

[1600 rows x 2 columns]
val_data               filename  target
0    5-221950-A-22.wav      22
1     1-79220-A-17.wav      17
2    4-165845-A-45.wav      45
3    3-130330-A-22.wav      22
4    4-157297-A-21.wav      21
..                 ...     ...
395  3-144891-A-19.wav      19
396  4-156827-A-46.wav      46
397   5-198891-D-8.wav       8
398   1-52290-A-30.wav      30
399  2-125966-A-11.wav      11

[400 rows x 2 columns]
Batch Index: 0
Data S

### 构建模型

M5模型

In [87]:
import torch
import torch.nn as nn


class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1):
        super(BasicBlock, self).__init__()
        # 卷积层
        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding=kernel_size // 2)
        # 批量归一化层
        self.bn = nn.BatchNorm1d(out_channels)
        # ReLU激活函数
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x


class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1):
        super(ResidualBlock, self).__init__()
        # 第一个卷积层
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding=kernel_size // 2)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        # 第二个卷积层
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, 1, padding=kernel_size // 2)
        self.bn2 = nn.BatchNorm1d(out_channels)
        # 跳跃连接
        if stride!= 1 or in_channels!= out_channels:
            self.downsample = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, 1, stride),
                nn.BatchNorm1d(out_channels)
            )
        else:
            self.downsample = None

    def forward(self, x):
        residual = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.downsample is not None:
            residual = self.downsample(residual)
        x += residual
        x = self.relu(x)
        return x


class RawWaveformCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(RawWaveformCNN, self).__init__()
        # 第一层卷积层，感受野为80，步长为4，256个滤波器
        self.layer1 = BasicBlock(1, 256, 80, 4)
        # 最大池化层，池化核大小为4x1
        self.maxpool1 = nn.MaxPool1d(4)
        # 卷积层，感受野为3，256个滤波器
        self.layer2 = BasicBlock(256, 256, 3)
        self.maxpool2 = nn.MaxPool1d(4)
        # 卷积层，感受野为3，512个滤波器
        self.layer3 = BasicBlock(256, 512, 3)
        self.maxpool3 = nn.MaxPool1d(4)
        # 卷积层，感受野为3，512个滤波器
        self.layer4 = BasicBlock(512, 512, 3)
        # 全局平均池化层
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        # 全连接层用于分类
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.layer1(x)
        x = self.maxpool1(x)
        x = self.layer2(x)
        x = self.maxpool2(x)
        x = self.layer3(x)
        x = self.maxpool3(x)
        x = self.layer4(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


class RawWaveformCNNResidual(nn.Module):
    def __init__(self, num_classes=10):
        super(RawWaveformCNNResidual, self).__init__()
        # 第一层卷积层，感受野为80，步长为4，48个滤波器
        self.layer1 = BasicBlock(1, 48, 80, 4)
        self.maxpool1 = nn.MaxPool1d(4)
        # 残差块，包含两个卷积层，感受野为3，64个滤波器
        self.resblock1 = ResidualBlock(48, 64, 3)
        self.maxpool2 = nn.MaxPool1d(4)
        self.resblock2 = ResidualBlock(64, 64, 3)
        self.maxpool3 = nn.MaxPool1d(4)
        self.resblock3 = ResidualBlock(64, 64, 3)
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.layer1(x)
        x = self.maxpool1(x)
        x = self.resblock1(x)
        x = self.maxpool2(x)
        x = self.resblock2(x)
        x = self.maxpool3(x)
        x = self.resblock3(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

M18模型

In [94]:
class M18Model(nn.Module):
    def __init__(self, num_classes=10):
        super(M18Model, self).__init__()
        # 第一层卷积层
        self.layer1 = nn.Conv1d(1, 48, 80, 4)
        self.bn1 = nn.BatchNorm1d(48)
        self.relu1 = nn.ReLU(inplace=True)
        self.maxpool1 = nn.MaxPool1d(4)
        # 第二层卷积层（三个堆叠的卷积层）
        self.layer2 = nn.Sequential(
            nn.Conv1d(48, 64, 3, 1, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 64, 3, 1, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Conv1d(64, 64, 3, 1, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True)
        )
        self.maxpool2 = nn.MaxPool1d(4)
        # 第三层卷积层（四个堆叠的卷积层）
        self.layer3 = nn.Sequential(
            nn.Conv1d(64, 128, 3, 1, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Conv1d(128, 128, 3, 1, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Conv1d(128, 128, 3, 1, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Conv1d(128, 128, 3, 1, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True)
        )
        self.maxpool3 = nn.MaxPool1d(4)
        # 第四层卷积层（四个堆叠的卷积层）
        self.layer4 = nn.Sequential(
            nn.Conv1d(128, 512, 3, 1, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, 3, 1, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, 3, 1, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Conv1d(512, 512, 3, 1, padding=1),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True)
        )
        self.maxpool4 = nn.MaxPool1d(4)
        # 全局平均池化层
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        # 全连接层用于分类
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.layer1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        x = self.layer2(x)
        x = self.maxpool2(x)
        x = self.layer3(x)
        x = self.maxpool3(x)
        x = self.layer4(x)
        x = self.maxpool4(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

查看M5模型结构和模型测试

In [88]:
# 实例化模型
model = RawWaveformCNN(50)  # 或 RawWaveformCNNResidual()
print(model)
# 随机生成输入数据（假设输入音频长度为40000）
input_data = torch.randn(1, 1, 40000)
# 前向传播
output = model(input_data)
print(output.shape)

RawWaveformCNN(
  (layer1): BasicBlock(
    (conv): Conv1d(1, 256, kernel_size=(80,), stride=(4,), padding=(40,))
    (bn): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
  )
  (maxpool1): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (layer2): BasicBlock(
    (conv): Conv1d(256, 256, kernel_size=(3,), stride=(1,), padding=(1,))
    (bn): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
  )
  (maxpool2): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (layer3): BasicBlock(
    (conv): Conv1d(256, 512, kernel_size=(3,), stride=(1,), padding=(1,))
    (bn): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
  )
  (maxpool3): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (layer4): BasicBlock(
    (conv): Conv1d(51

查看M18模型结构和模型测试

In [96]:
# 实例化模型
model = M18Model(50)  # 或 RawWaveformCNNResidual()
print(model)
# 随机生成输入数据（假设输入音频长度为40000）
input_data = torch.randn(32, 1, 40000)
# 前向传播
output = model(input_data)
print(output.shape)

M18Model(
  (layer1): Conv1d(1, 48, kernel_size=(80,), stride=(4,))
  (bn1): BatchNorm1d(48, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu1): ReLU(inplace=True)
  (maxpool1): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (layer2): Sequential(
    (0): Conv1d(48, 64, kernel_size=(3,), stride=(1,), padding=(1,))
    (1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
    (4): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,))
    (7): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU(inplace=True)
  )
  (maxpool2): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (layer3): Sequential(
    (0): Conv1d(64, 128, 

### 模型训练

保存模型检查点

In [89]:
def save_checkpoint(model, optimizer, epoch, loss, filename='RawWaveformCNNResidual.pth'):
    """
    保存模型检查点。

    参数：
    - model: 要保存的模型
    - optimizer: 模型对应的优化器
    - epoch: 当前的epoch
    - loss: 当前的损失
    - filename: 检查点保存的文件名
    """
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss
    }
    torch.save(checkpoint, filename)

加载模型检查点

In [90]:
def load_checkpoint(model, optimizer, filename='RawWaveformCNNResidual.pth'):
    """
    加载模型检查点。

    参数：
    - model: 初始化的模型实例
    - optimizer: 初始化的优化器实例
    - filename: 检查点文件名

    返回：
    - epoch: 上次保存时的epoch
    - loss: 上次保存时的损失
    """
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
    return epoch, loss


构建训练和验证函数

In [97]:

def train_model(model, train_loader, val_loader, epochs, learning_rate,filename = 'RawWaveformCNNResidual.pth'):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    best_val_acc = 0
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        # 训练阶段
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct / total

        # 验证阶段
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_epoch_loss = val_loss / len(val_loader)
        val_epoch_acc = val_correct / val_total
        if val_epoch_acc > best_val_acc:
            best_val_acc = val_epoch_acc
            save_checkpoint(model, optimizer, epoch, val_epoch_loss,filename=filename)

        print(f'Epoch {epoch + 1} Train Loss: {epoch_loss:.4f} Train Acc: {epoch_acc:.4f} '
              f'Val Loss: {val_epoch_loss:.4f} Val Acc: {val_epoch_acc:.4f}')
    return best_val_acc

超参数设置

In [92]:
num_classes = 50  # ESC-50数据集有50个类别
model = RawWaveformCNNResidual(num_classes)
epochs = 200
learning_rate = 0.001

训练M5

In [93]:
best_val_acc = train_model(model, train_loader, val_loader, epochs, learning_rate)
print("最好的结果为：ACC=",best_val_acc)

Epoch 1 Train Loss: 3.6412 Train Acc: 0.0969 Val Loss: 3.4341 Val Acc: 0.1275
Epoch 2 Train Loss: 3.1082 Train Acc: 0.1819 Val Loss: 3.0290 Val Acc: 0.1950
Epoch 3 Train Loss: 2.8479 Train Acc: 0.2275 Val Loss: 2.9116 Val Acc: 0.2125
Epoch 4 Train Loss: 2.6720 Train Acc: 0.2712 Val Loss: 2.7297 Val Acc: 0.2700
Epoch 5 Train Loss: 2.5349 Train Acc: 0.3100 Val Loss: 2.8098 Val Acc: 0.2550
Epoch 6 Train Loss: 2.4081 Train Acc: 0.3638 Val Loss: 2.4369 Val Acc: 0.3400
Epoch 7 Train Loss: 2.2737 Train Acc: 0.3762 Val Loss: 2.5519 Val Acc: 0.3225
Epoch 8 Train Loss: 2.1862 Train Acc: 0.3969 Val Loss: 2.4921 Val Acc: 0.3075
Epoch 9 Train Loss: 2.1115 Train Acc: 0.4181 Val Loss: 2.4110 Val Acc: 0.3425
Epoch 10 Train Loss: 2.0328 Train Acc: 0.4450 Val Loss: 2.2520 Val Acc: 0.3950
Epoch 11 Train Loss: 1.9437 Train Acc: 0.4575 Val Loss: 2.2780 Val Acc: 0.3875
Epoch 12 Train Loss: 1.8224 Train Acc: 0.5050 Val Loss: 2.1839 Val Acc: 0.4200
Epoch 13 Train Loss: 1.7516 Train Acc: 0.5194 Val Loss: 2.109

训练M18

In [98]:
num_classes = 50  # ESC-50数据集有50个类别
model = M18Model(num_classes)
epochs = 200
learning_rate = 0.001
best_val_acc = train_model(model, train_loader, val_loader, epochs, learning_rate,'M18Model')
print("最好的结果为：ACC=",best_val_acc)

Epoch 1 Train Loss: 3.5594 Train Acc: 0.0856 Val Loss: 3.6327 Val Acc: 0.0675
Epoch 2 Train Loss: 3.0592 Train Acc: 0.1500 Val Loss: 3.4105 Val Acc: 0.1075
Epoch 3 Train Loss: 2.7823 Train Acc: 0.2112 Val Loss: 2.8032 Val Acc: 0.1900
Epoch 4 Train Loss: 2.6430 Train Acc: 0.2325 Val Loss: 3.3124 Val Acc: 0.1700
Epoch 5 Train Loss: 2.5289 Train Acc: 0.2687 Val Loss: 3.6559 Val Acc: 0.1450
Epoch 6 Train Loss: 2.3957 Train Acc: 0.3081 Val Loss: 2.4588 Val Acc: 0.3000
Epoch 7 Train Loss: 2.3460 Train Acc: 0.3194 Val Loss: 2.8387 Val Acc: 0.2525
Epoch 8 Train Loss: 2.2397 Train Acc: 0.3500 Val Loss: 2.2826 Val Acc: 0.3325
Epoch 9 Train Loss: 2.1204 Train Acc: 0.3856 Val Loss: 2.6018 Val Acc: 0.3025
Epoch 10 Train Loss: 2.0677 Train Acc: 0.3881 Val Loss: 2.4812 Val Acc: 0.2950
Epoch 11 Train Loss: 1.9166 Train Acc: 0.4556 Val Loss: 2.1842 Val Acc: 0.3175
Epoch 12 Train Loss: 1.8539 Train Acc: 0.4512 Val Loss: 2.7688 Val Acc: 0.2250
Epoch 13 Train Loss: 1.8223 Train Acc: 0.4656 Val Loss: 2.834