In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
!pip install python_speech_features

Collecting python_speech_features
  Downloading python_speech_features-0.6.tar.gz (5.6 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: python_speech_features
  Building wheel for python_speech_features (setup.py) ... [?25l[?25hdone
  Created wheel for python_speech_features: filename=python_speech_features-0.6-py3-none-any.whl size=5869 sha256=83ab1f27a6559d7030a263746c2358c83700d665650e22deb28481ff693cda94
  Stored in directory: /root/.cache/pip/wheels/5a/9e/68/30bad9462b3926c29e315df16b562216d12bdc215f4d240294
Successfully built python_speech_features
Installing collected packages: python_speech_features
Successfully installed python_speech_features-0.6


In [3]:
from google.colab import drive  # the sounds are stored in google drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [4]:
# folder where files are stored
%cd /content/gdrive/MyDrive/data/audio

/content/gdrive/MyDrive/data/audio


In [5]:
import numpy as np
import os
from scipy.io import wavfile
import librosa
from scipy.signal import cheby1, filtfilt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from python_speech_features import mfcc
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import LSTM, Dense, Dropout, Conv1D, MaxPooling1D, Flatten, BatchNormalization, TimeDistributed

In [6]:
def read_wav_files(batch_size, data_directory):
    """
    随机读取指定数量的.wav文件及其对应的.hea文件中的标签，跳过找不到标签的文件。

    参数:
    - batch_size: int, 每个批次的大小。
    - data_directory: str, 存储.wav和.hea文件的目录路径。

    返回:
    - batch_audio: list, 每个元素为一个.wav文件的音频数据。
    - batch_labels: list, 每个元素为对应.wav文件的标签。
    """
    all_filenames = [x for x in os.listdir(data_directory) if x.endswith('.wav')]
    batch_audio = []
    batch_labels = []

    while len(batch_audio) < batch_size:
        if not all_filenames:  # 如果没有足够的文件满足batch_size，跳出循环
            break
        filename = np.random.choice(all_filenames)
        all_filenames.remove(filename)  # 从列表中移除，避免重复选择

        # 尝试读取.wav文件和.hea文件来获取标签
        filepath = os.path.join(data_directory, filename)
        hea_path = filepath.replace('.wav', '.hea')
        try:
            with open(hea_path, 'r') as f:
                label = None
                for line in f:
                    if line.startswith('#'):
                        label = line[1:].strip()  # 去掉'#'字符并去除两端空白
                        break
                if label:  # 如果找到了标签
                    sampling_freq, audio = wavfile.read(filepath)
                    batch_audio.append(audio)
                    batch_labels.append(label)
                # 如果没有找到标签或.hea文件不存在，自动继续下一次循环，选择新文件
        except FileNotFoundError:
            continue

    return batch_audio, batch_labels


In [7]:
def preprocess_and_resample_audio(batch_audio, original_fs, target_fs=8000, cutoff_freq=3000, filter_order=5):
    """
    对批次中的每个音频信号先重采样到目标采样频率，然后应用Ⅰ型切比雪夫低通滤波器。

    参数:
    - batch_audio: list, 包含批次中每个音频信号的列表。
    - original_fs: int, 原始音频的采样频率。
    - target_fs: int, 目标采样频率，默认为8000Hz。
    - cutoff_freq: int, 滤波器的截止频率，默认为3000Hz。
    - filter_order: int, 滤波器的阶数，默认为5。

    返回:
    - processed_batch_audio: list, 包含处理后音频信号的列表。
    """
    processed_batch_audio = []
    rp = 0.1  # 通带最大损失（纹波），单位为dB
    wn = cutoff_freq / (0.5 * target_fs)  # 归一化截止频率
    b, a = cheby1(filter_order, rp, wn, 'low')

    for audio in batch_audio:
        # 转换音频数据为浮点数类型
        audio = audio.astype(np.float32, order='C') / np.iinfo(audio.dtype).max
        # 重采样到目标采样率
        resampled_audio = librosa.resample(audio, orig_sr=original_fs, target_sr=target_fs)
        # 应用Ⅰ型切比雪夫低通滤波器
        filtered_audio = filtfilt(b, a, resampled_audio)
        processed_batch_audio.append(filtered_audio)

    return processed_batch_audio


In [8]:
def extract_mfcc_features(batch_audio, sample_rate):
    """
    提取批次中每个音频信号的MFCC特征。

    参数:
    - batch_audio: list, 包含处理后音频信号的列表。
    - sample_rate: int, 音频的采样频率。

    返回:
    - mfcc_features: np.ndarray, 每个音频信号的MFCC特征构成的数组。
    """
    mfcc_features = []
    for audio in batch_audio:
        # 提取MFCC特征
        mfcc_feat = mfcc(audio, samplerate=sample_rate, numcep=13, nfft=2048)
        mfcc_features.append(mfcc_feat)

    # 将列表转换为NumPy数组以方便后续处理
    mfcc_features = np.array(mfcc_features, dtype=object)
    return mfcc_features


In [12]:


# 假定你已经有了data_directory变量指向你的数据目录
data_directory = '/content/gdrive/MyDrive/data/audio'
batch_size = 2000  # 根据你的数据量和内存大小调整
original_fs = 2000  # 根据你的音频数据调整
target_fs = 8000  # 目标采样率

# 使用前面提供的函数读取、预处理数据并提取MFCC特征
batch_audio, batch_labels = read_wav_files(batch_size, data_directory)
processed_batch_audio = preprocess_and_resample_audio(batch_audio, original_fs, target_fs)
mfcc_features = extract_mfcc_features(processed_batch_audio, target_fs)

# 处理不同长度的MFCC特征
mfcc_features_padded = pad_sequences(mfcc_features, padding='post', dtype='float32')

# 准备标签数据
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(batch_labels)
one_hot_labels = to_categorical(encoded_labels)

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(mfcc_features_padded, one_hot_labels, test_size=0.2, random_state=42)

# 定义模型
model = Sequential([
    # 第一层卷积，卷积层可以帮助提取局部特征
    Conv1D(filters=64, kernel_size=5, activation='relu', padding='same', input_shape=(None, 13)),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    # 第二层卷积，增加卷积层可以帮助学习更复杂的特征
    Conv1D(filters=128, kernel_size=5, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    # LSTM层用于学习序列特征，可以通过增加层数来提高复杂度
    LSTM(128, return_sequences=True),
    LSTM(128),

    # 全连接层用于学习非序列特征
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(64, activation='relu'),
    Dropout(0.5),

    # 输出层
    Dense(one_hot_labels.shape[1], activation='softmax')
])

# 编译模型
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# 训练模型
history = model.fit(X_train, y_train, epochs=50, batch_size=200, validation_data=(X_test, y_test))

# 评估模型
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=2)
print(f"Test Loss: {test_loss}\nTest Accuracy: {test_acc}")


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
13/13 - 2s - loss: 0.5334 - accuracy: 0.7750 - 2s/epoch - 168ms/step
Test Loss: 0.5334219932556152
Test Accuracy: 0.7749999761581421
