### 安装依赖

第一次执行即可，后面无需执行避免浪费时间

In [None]:
!pip install librosa==0.11.0 cffi==1.17.1

### 解压数据集

第一次执行即可，后面无需执行避免浪费时间

Tips: 在 jupyter 中可以用 `!command` 的形式来执行终端命令，这里用 unzip 进行解压，`-q` 表示静默模式避免显示过多的输出，`-n` 表示只解压不重复的文件，避免二次执行时询问是否覆盖

In [None]:
!unzip -q -n data.zip

### 导入必要的依赖库

其中 librosa 是一个**音频处理**的库，在本实验中需要利用它来加载音频并提取音频的时序特征，其它的是深度学习常见的库

In [None]:
import os
import time
import pandas as pd
import librosa
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch_npu

from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm  # 进度条
from torch.nn.utils.rnn import pad_sequence # 用于填充序列
from IPython.display import clear_output, Audio # 用于播放音频和清除输出

### 定义数据集

数据组织结构如下：
```text
|- data
│   |- train
|   |  |- 000.wav
|   |  `- ...
│   `-- test
|      |- 000.wav
|      `- ...
│- train.csv
`- test.csv
```

**这部分已经提供好，无需同学们完成**。注意在这里需要自定义 `collate_fn` 用于将一个 batch 中所有的音频填充到相同的长度以方便进行批量计算。

In [None]:
# ---- 数据集加载和特征提取 ----

class AudioDataset(Dataset):
    def __init__(self, dataframe, data_dir, n_mfcc=13):
        """
        Args:
            dataframe (pd.DataFrame): df contains file names and labels
            data_dir (str): data directory containing audio files
            n_mfcc (int): MFCC features count
        """
        self.dataframe = dataframe
        self.data_dir = data_dir
        self.n_mfcc = n_mfcc

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        """
        Returns:
            features (torch.Tensor): (seq_len, feature_dim)
            label (int): int
        """
        row = self.dataframe.iloc[idx]
        file_name = row['filename']
        label = row['class']  # class 列是类别名 ('dog', 'cat', 'bird')
        
        # 音频路径
        file_path = os.path.join(self.data_dir, file_name)
        
        # 加载音频并提取时序特征 (MFCC)
        y, sr = librosa.load(file_path, sr=None)
        
        # 提取 MFCC 特征
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=self.n_mfcc)
        mfcc = torch.tensor(mfcc.T, dtype=torch.float32)  # 转置为 (seq_len, feature_dim)
        
        # 将类别字符串映射为索引
        label_mapping = {'cat': 0, 'dog': 1, 'bird': 2}
        label = label_mapping[label]

        return mfcc, label


# 由于不同音频长度不同，需要使用自定义的 collate_fn 来将同一个 batch 中的音频 padding 到其中的最大长度
def collate_fn(batch):
    """
    for padding data with different lengths

    Args:
        batch (tuple): (features, label)

    Returns:
        padded_batch_features (torch.Tensor): (batch_size, max_seq_len, feature_dim)
        batch_labels (torch.Tensor): (batch_size,)
    """
    features = [item[0] for item in batch]  # 提取每个样本的 features
    labels = [item[1] for item in batch]    # 提取每个样本的 label

    # 对不定长序列进行 padding，填充后 shape: (batch_size, max_seq_len, feature_dim)
    padded_features = pad_sequence(features, batch_first=True)
    labels = torch.tensor(labels, dtype=torch.long)             # 转为 Tensor

    return padded_features, labels

### 实例化 Dataset 和 DataLoader

这里需要同学们完成一些简单的操作来熟悉数据加载步骤

在云平台上，建议在 `DataLoader` 中设置 `num_workers` 来加快数据集加载速度，在 OpenI 平台上可以设置 `num_workers=16`，在华为云平台上如果使用 NPU 环境设置 `num_workers=6`，使用 CPU 环境设置 `num_workers=3`

In [None]:
# 加载训练集和测试集路径
dataset_dir = "data"
train_csv_path = f"{dataset_dir}/train.csv"
test_csv_path = f"{dataset_dir}/test.csv"
train_dir = f"{dataset_dir}/train"
test_dir = f"{dataset_dir}/test"

train_df = # TODO: 读取训练集 CSV 文件
test_df = # TODO: 读取测试集 CSV 文件

# 创建数据集
train_dataset = # TODO: 创建训练集对象
test_dataset = # TODO: 创建测试集对象

# 数据加载器
batch_size = 16 # Tips: 你可以根据显存来自己调整来获得最佳性能
train_loader = # TODO: 创建训练集数据加载器，注意对于训练集一般设置 shuffle=True，并且需要使用自定义的 collate_fn
test_loader = # TODO: 创建测试集数据加载器，对于测试集一般设置 shuffle=False，并且需要使用自定义的 collate_fn

In [None]:
# 查看下数据集，其中 class 表示类别，数据集中的类别有 cat, dog, bird 总共 3 类
train_df.head()

In [None]:
# 可以试听一下数据集中的音频文件
Audio(f"{train_dir}/002.wav")

让我们来看几个输入的 shape

In [None]:
# 查看第一个 batch 的数据形状
for i, (features, labels) in enumerate(train_loader):
    print(f"Batch {i+1}:")
    print(f"Features shape: {features.shape}")
    print(f"Labels shape: {labels.shape}")
    if i >= 2:
        break

如下图所示，大家看到的 3 个 batch 可能 shape 不一样（如果一样也是正常现象，可以多尝试几次看看能不能刷出来不一样的），这是因为每个音频长度不同，在 `collate_fn` 中是按照 batch 中最大的长度进行填充的，不同 batch 中音频的最大长度可能不同，因此每个 batch 的 shape 会有差异。数据的 shape 为 `(batch_size, max_seq_len, feature_dim)`

![batch_shape.png](attachment:batch_shape.png)

### RNN 模型定义

这部分需要同学们根据示例完成 `__init__` 中的参数定义和 `forward` 函数。

在 `__init__`，同学们可以查阅 Pytorch 文档来获得 [nn.Parameter](https://pytorch.org/docs/2.1/generated/torch.nn.parameter.Parameter.html#torch.nn.parameter.Parameter) 和 [nn.init.xavier_uniform_](https://pytorch.org/docs/2.1/nn.init.html#torch.nn.init.xavier_uniform_) 信息。其中 Xavier uniform 是一个广泛使用的初始化方式，用于初始化权重参数；偏置通常初始化为 0。

在 `forward` 根据 RNN 模型隐状态更新公式计算即可，在计算时需要注意张量的 shape 匹配问题。
$$
h_t = \text{tanh}(W_{xh} x_t + W_{hh} h_{t-1} + b_h)\\
o_t = W_{ho} h_t + b_o\\
$$

In [None]:
# 使用自定义的 RNN 模型
class CustomRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        """
        RNN model initialization

        Args:
            input_size: feature dim
            hidden_size: hidden state dim
            output_size: output dim
        """
        super(CustomRNN, self).__init__()
        
        # 参数定义
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        # RNN 权重参数
        self.W_xh = # TODO: 定义输入到隐状态的权重，可以传入 torch.empty 或使用 torch.randn
        self.W_hh = # TODO: 定义隐状态到隐状态的权重
        self.b_h = # TODO: 定义隐状态的偏置，偏置一般初始化为零
        
        # 输出权重参数
        self.W_ho = # TODO: 定义隐状态到输出的权重
        self.b_o = # TODO: 定义输出的偏置
        
        # Xavier uniform 初始化权重
        nn.init.xavier_uniform_(self.W_xh)
        nn.init.xavier_uniform_(self.W_hh)
        nn.init.xavier_uniform_(self.W_ho)
        

    def forward(self, x, hidden_state=None):
        """
        Args:  
            x: (seq_len, batch_size, input_size)
            hidden_state: (batch_size, hidden_size) 

        Returns:  
            output_seq: (seq_len, batch_size, output_size)
        """
        seq_len, batch_size, _ = x.shape
        
        if hidden_state is None:  
            hidden_state = torch.zeros(batch_size, self.hidden_size).to(x.device)  # 初始化隐状态为零

        # 遍历时间步
        for t in range(seq_len):
            # TODO: 可以在此定义中间变量，但不是必须的
            
            hidden_state = # TODO: 使用 RNN 的公式更新隐状态
        
        logits = # TODO: 使用隐状态计算输出 logits，不需要使用 softmax，因为在损失函数中会使用 log_softmax
        return logits

### 训练参数设置

这部分同学们可以自行调整，建议先试用默认参数，这是助教多次尝试能够基本稳定取得预期效果的参数。

In [None]:
# 超参数设置
# Tips: 你可以自行调整来获得最佳性能
input_size = 13      # MFCC 特征数
hidden_size = 32     # 隐状态维度
output_size = 3      # 输出类别数
learning_rate = 0.005
num_epochs = 50

device = torch.device('npu')
device

### 模型初始化

由于是分类问题，损失函数当然是**交叉熵损失 (Cross Entropy Loss)**，在 Pytorch 中使用 [nn.CrossEntropyLoss](https://pytorch.org/docs/2.1/generated/torch.nn.CrossEntropyLoss.html) 实例化即可

优化器使用常见的 Adam 优化器，更多细节参见 [torch.optim.Adam](https://pytorch.org/docs/2.1/generated/torch.optim.Adam.html)。顺带一提，[Adam](https://arxiv.org/abs/1412.6980) 最近（笔者编辑时间是 2025 年 4 月）刚获得 ICLR 2025 的时间检验奖 (ICLR 2025 Test of Time Award), 作者在会上分享了 Adam 初次投稿被拒稿以及后来发邮件申诉接收的经历。

In [None]:
# 模型初始化
model = CustomRNN(input_size, hidden_size, output_size).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### 模型训练

这部分同学们按照提示完成即可

In [None]:
# 训练
losses = []

time_start = time.time()
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for features, labels in train_loader:
        features, labels = features.to(device), labels.to(device)
        # 转换为 RNN 的输入格式 (seq_len, batch_size, input_size)
        features = # TODO: 你可能需要用 .permute 来调整 shape 以适应前面定义的 RNN 模型
        logits = # TODO: 前向传播
        loss = criterion(logits, labels)

        # 反向传播和优化
        # TODO: 清零梯度
        # TODO: 反向传播
        # TODO: 更新参数

        total_loss += loss.item()

    losses.append(total_loss)

    # 每个epoch刷新loss曲线
    clear_output(wait=True)
    plt.figure(figsize=(8, 5))
    plt.plot(range(epoch + 1), losses, marker='.')
    plt.title("Training Loss per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.xlim(0, num_epochs)
    plt.grid()
    plt.show()

time_end = time.time()

# 输出最后的loss
print(f"Final Loss: {total_loss:.4f}")
print(f"Training Time: {time_end - time_start:.2f} seconds, {(time_end - time_start) / epoch:.2f} s/epoch")

### 评估

这部分根据提示完成即可

In [None]:
# 评估
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for features, labels in test_loader:
        features, labels = features.to(device), labels.to(device)
        features = # TODO: 你可能需要用 .permute 来调整 shape 以适应前面定义的 RNN 模型
        logits = # TODO: 前向传播
        predicted = # TODO: 将输出的 logits 转换为预测类别
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

在训练部分，同学们一般观测到 loss 下降缓慢，在训练 50 个 epoch 后 loss 仍是一个较大的值（一般大于 20.0），这表明模型没有很好地学会如何对输入的音频分类。同学们如果发现 loss 曲线未收敛，可以继续增加 epoch；如果发现 loss 曲线收敛到较大的 loss，可以减少 learning_rate 并增加 epoch，但这些方案耗时较长并且精度提升少，更推荐同学们先继续往下看尝试 LSTM 模型。

在评估部分，同学门一般观测到模型在测试集上的准确率为 30%-40%，这对于一个三分类任务来说是糟糕的，不比“盲猜”好多少。

这是因为基础的 RNN 模型对长序列的学习效果不理想，容易导致**梯度消失、梯度爆炸和信息丢失**等问题。因此现在 RNN 实际应用的较少，一般会采用其改进结构 LSTM（Long Short‐Term Memory，长短期记忆网络）或 GRU（Gated Recurrent Units，门控循环单元）。下面我们以 LSTM 为例来实现其改进版本。

### LSTM 模型实现

![image.png](attachment:image.png)

图片来源：[《动手学深度学习》](https://github.com/d2l-ai/d2l-zh)

LSTM 块结构如上图所示，我们只需要将前面的 RNN 块替换为 LSTM 块即可，同学们需要完成下面 `CustomLSTM` 中空缺的部分。

LSTM 的更新公式如下：
$$
f_t = \text{sigmoid}(W_{xf} x_t + W_{hf} h_{t - 1} + b_f)\\
i_t = \text{sigmoid}(W_{xi} x_t + W_{hi} h_{t - 1} + b_i)\\
\tilde{c_t} = \text{tanh}(W_{xc} x_t + W_{hc} h_{t - 1} + b_c)\\
c_t = f_t \odot c_{t - 1} + i_t \odot \tilde{c_t}\\
o_t = \text{sigmoid}(W_{xo} x_t + W_{ho} h_{t - 1} + b_o)\\
h_t = o_t \odot \text{tanh}(c_t)
$$
其中 $f_t,\ i_t,\ o_t$ 分别称为**遗忘门、输入门和输出门**，它们的取值范围是 $(0, 1)$，分别控制保留多少过去的记忆元 $c_{t - 1}$ 的内容、采用多少来自 $\tilde{c_t}$ 的新数据和 $c_t$ 中输出多少信息到当前的隐藏状态；$c_t$ 是 LSTM 引入的一种特殊隐状态，称为记忆，shape 和隐状态 $h_t$ 相同；$\tilde{c_t}$ 称为候选记忆。

同学们可以阅读 LSTM 的原论文 [Long short-term memory](https://ieeexplore.ieee.org/abstract/document/6795963/)。

最后，我们将隐状态 $h_t$ 经过一个线性层来完成 3 分类任务。这里使用隐状态是因为它代表当前时间步 LSTM 的输出，已经是对所有输入信息的“综合表达”，是 LSTM 设计用作“输出”的部分；而记忆 $c_t$ 更像是 LSTM 内部长期记忆的“仓库”，主要供网络内部后续时刻再利用，并非直接设计为外部特征。这里直接使用 [nn.Linear](https://pytorch.org/docs/2.1/generated/torch.nn.Linear.html) 作为线性层。

In [None]:
class CustomLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        """
        LSTM model initialization
        
        Args:
            input_size: feature dim
            hidden_size: hidden state dim
            output_size: output dim
        """
        super(CustomLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 初始化 LSTM 的权重参数
        # 遗忘门
        self.W_xf = # TODO: 用 nn.Parameter 定义权重
        self.W_hf = # TODO: 用 nn.Parameter 定义权重
        self.b_f = # TODO: 用 nn.Parameter 定义偏置，注意偏置一般初始化为零

        # 输入门
        self.W_xi = # TODO: 用 nn.Parameter 定义权重
        self.W_hi = # TODO: 用 nn.Parameter 定义权重
        self.b_i = # TODO: 用 nn.Parameter 定义偏置

        # 候选记忆单元
        self.W_xc = # TODO: 用 nn.Parameter 定义权重
        self.W_hc = # TODO: 用 nn.Parameter 定义权重
        self.b_c = # TODO: 用 nn.Parameter 定义偏置

        # 输出门
        self.W_xo = # TODO: 用 nn.Parameter 定义权重
        self.W_ho = # TODO: 用 nn.Parameter 定义权重
        self.b_o = # TODO: 用 nn.Parameter 定义偏置

        # 输出层参数
        self.fc = nn.Linear(in_features=, out_features=)  # TODO: 分类头，直接使用 nn.Linear 定义

        # 初始化参数
        self.init_weights()

    def init_weights(self):
        """
        Apply Xavier uniform initialization to the weights of the LSTM.
        """
        for param in self.parameters():
            if param.dim() > 1:  # 对权重矩阵进行初始化
                nn.init.xavier_uniform_(param)

    def forward(self, x):
        """
        Args:
            x: (seq_len, batch_size, input_size)
        
        Returns:
            logits: (batch_size, output_size)
        """
        seq_len, batch_size, _ = x.shape

        # 初始化隐状态和细胞状态
        h_t = torch.zeros(batch_size, self.hidden_size).to(x.device)  # 初始隐状态
        c_t = torch.zeros(batch_size, self.hidden_size).to(x.device)  # 初始细胞状态

        # 遍历时间步
        for t in range(seq_len):
            # TODO: 可以在此定义中间变量，但不是必须的

            # 遗忘门
            f_t = # TODO: 使用 LSTM 的公式更新遗忘门

            # 输入门
            i_t = # TODO: 使用 LSTM 的公式更新输入门

            # 候选单元
            c_hat_t = # TODO: 使用 LSTM 的公式更新候选单元

            # 更新细胞状态
            c_t = # TODO: 使用 LSTM 的公式更新细胞状态

            # 输出门
            o_t = # TODO: 使用 LSTM 的公式更新输出门

            # 更新隐状态
            h_t = # TODO: 使用 LSTM 的公式更新隐状态

        # 最后时间步的隐状态输入输出层
        logits = self.fc(h_t)  # (batch_size, output_size)
        return logits

In [None]:
# 模型初始化
lstm_model = CustomLSTM(input_size, hidden_size, output_size).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(lstm_model.parameters(), lr=learning_rate)

In [None]:
# 训练
losses = []

time_start = time.time()
for epoch in range(num_epochs):
    lstm_model.train()
    total_loss = 0
    for features, labels in train_loader:
        features, labels = features.to(device), labels.to(device)
        # 转换为 RNN 的输入格式 (seq_len, batch_size, input_size)
        features = # TODO: 你可能需要用 .permute 来调整 shape 以适应前面定义的 RNN 模型
        logits = # TODO: 前向传播
        loss = criterion(logits, labels)

        # 反向传播和优化
        # TODO: 清零梯度
        # TODO: 反向传播
        # TODO: 更新参数

        total_loss += loss.item()

    losses.append(total_loss)

    # 每个epoch刷新loss曲线
    clear_output(wait=True)
    plt.figure(figsize=(8, 5))
    plt.plot(range(epoch + 1), losses, marker='.')
    plt.title("Training Loss per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.xlim(0, num_epochs)
    plt.grid()
    plt.show()

time_end = time.time()

# 输出最后的loss
print(f"Final Loss: {total_loss:.4f}")
print(f"Training Time: {time_end - time_start:.2f} seconds, {(time_end - time_start) / epoch:.2f} s/epoch")

In [None]:
# 评估
lstm_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for features, labels in test_loader:
        features, labels = features.to(device), labels.to(device)
        features = # TODO: 你可能需要用 .permute 来调整 shape 以适应前面定义的 RNN 模型
        logits = # TODO: 前向传播
        predicted = # TODO: 将输出的 logits 转换为预测类别
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

使用 LSTM 后，同学们一般在训练阶段能观测到 loss 下降更为迅速，在训练 50 个 epoch 后 loss 下降到一个较低的水平（一般在 5.0 以下），并且在测试集上准确率能够达到 85%-95%，这表明 LSTM 模型学会了如何进行音频分类。