# Pretrain (no need to run)

In [None]:
import os
import scipy.io.wavfile as wav
from python_speech_features import mfcc, delta
from scipy import signal
import re

# 读取已经用 HTK 计算好的 MFCC 特征
RATE = 16000


def extract_MFCC(audio):
    wav_feature = mfcc(audio, samplerate=RATE, numcep=13, winlen=0.025, winstep=0.01, nfilt=26, nfft=512, lowfreq=0,
                       highfreq=None, preemph=0.97)
    d_mfcc_feat = delta(wav_feature, 1)
    d_mfcc_feat2 = delta(wav_feature, 2)
    feature_mfcc = np.hstack((wav_feature, d_mfcc_feat, d_mfcc_feat2))
    return feature_mfcc


sos = signal.butter(99, [2 * 250, 2 * 3e3], fs=RATE, output='sos', btype='bandpass')


def bandpass(wav_data, order, fre_c):
    filtedData = signal.sosfilt(sos, wav_data)  # data为要过滤的信号
    # return np.reshape(filtedData, (len(filtedData), 1))
    return filtedData


def getMFCC_train(datapath):
    labels = ['zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine']
    MFCC = []
    for i in labels:
        MFCC_rows = []
        path = os.path.join(datapath, i)
        # print(path)
        for file in os.listdir(path):  # 遍历文件夹
            file_name = os.path.join(datapath, i, file)
            fs, audio = wav.read(file_name)  # audio: (len, )
            feature = extract_MFCC(bandpass(audio, 99, 3e3))
            MFCC_rows.append(feature)
        MFCC.append(MFCC_rows)
    return MFCC


_datapath = r"D:\Program\pyProject\DSP_SpeechNumberRecognization\dataset"
_MFCC = getMFCC_train(_datapath)

# Load Pretrained

In [2]:
import numpy as np

_MFCC_np = np.load("MFCC_np.npy")
_labels = np.load("labels_np.npy")

In [3]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(_MFCC_np, _labels, test_size=0.33, random_state=42)
X_train_len, X_test_len = len(X_train), len(X_test)
ratio = 0.7
X_train = X_train[:int(X_train_len * ratio)]
X_test = X_test[:int(X_test_len * ratio)]
y_train = y_train[:int(X_train_len * ratio)]
y_test = y_test[:int(X_test_len * ratio)]
X_train_len, X_test_len = len(X_train), len(X_test)

# sklearn
## Adaboost

In [None]:

from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import ExtraTreeClassifier

abc = AdaBoostClassifier(base_estimator=ExtraTreeClassifier(max_depth=13), n_estimators=600, random_state=233)

abc.fit(X_train, y_train)
abc.score(X_test, y_test)

In [None]:
from sklearn.model_selection import GridSearchCV
# Find Best parameter first
from sklearn.tree import DecisionTreeClassifier, ExtraTreeClassifier

param_grid = {"base_estimator": [ExtraTreeClassifier(max_depth=i) for i in [10, 50, 100]],
              "n_estimators": [300, 600, 900],
              }
grid_search = GridSearchCV(AdaBoostClassifier(random_state=233), param_grid, cv=5, n_jobs=8)

# Fit Model
grid_search.fit(X_train, y_train)
print(f"Best Parameters: {grid_search.best_params_}")
print(f"Best Train Score: {grid_search.best_score_}")

## SVM

In [None]:
from sklearn.svm import SVC

svc = SVC()
svc.fit(X_train, y_train)
svc.score(X_test, y_test)
param_grid = {"C": [0.1, 1, 10],
              "kernel": ['rbf', 'poly', 'rbf']}
# grid_search = GridSearchCV(SVC(gamma='auto'), param_grid, cv=4, n_jobs=8)
grid_search = GridSearchCV(SVC(gamma='scale'), param_grid, cv=4, n_jobs=8)

# Fit Model
grid_search.fit(X_train, y_train)
print(f"Best Parameters: {grid_search.best_params_}")
print(f"Best Train Score: {grid_search.best_score_}")

score = grid_search.score(X_train, y_train)
print(f"Test Score: {score} ")
score = grid_search.score(X_test, y_test)
print(f"Test Score: {score} ")

# MLP

## Model

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class mlp(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(nn.Linear(99 * 39, 1000))
        self.layer2 = nn.Sequential(nn.Linear(1000, 1000))
        self.layer3 = nn.Sequential(nn.Linear(1000, 200))
        self.layer4 = nn.Sequential(nn.Linear(200, 200))
        self.layer5 = nn.Sequential(nn.Linear(200, 10))

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x) + x)
        x = torch.relu(self.layer3(x))
        x = torch.relu(self.layer4(x) + x)
        x = self.layer5(x)
        return F.log_softmax(x, dim=1)


_model = mlp().float().cuda()

## Prepare Train Data

In [20]:

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter('runs/train')


class MfccData(Dataset):
    def __len__(self):
        return len(X_train)

    def __getitem__(self, index):
        return X_train[index], y_train[index]


bs = 1000
batch = 0

_loss_fn = nn.CrossEntropyLoss()
_optimizer = torch.optim.AdamW(_model.parameters(), lr=1e-4)

_mfcc_data = DataLoader(MfccData(), batch_size=bs, shuffle=True, num_workers=0)

def to_cuda(*ts: torch.Tensor):
    return [_ts.cuda() for _ts in ts]


## Evaluation

In [13]:

def eval():
    results = []
    test_bs = 100
    for _i in range(4):
        _index = np.random.choice(range(X_test.shape[0]), replace=False, size=(test_bs,))
        _X = torch.from_numpy(X_test[_index, :]).float()
        _y = (torch.from_numpy(y_test[_index])).long()
        _X, _y = to_cuda(_X, _y)
        out = _model(_X)
        _, pred = out.max(1)
        num_correct = (pred == _y).sum().item()
        result = num_correct / _y.shape[0]
        # print(result)   #64
        results.append(result)
    # print(f"avg: {np.mean(results)}")
    return np.mean(results)


## Train

In [22]:
batch = 4000
max_val_score = 0


In [23]:

is_break = False
while True:
    if is_break:
        break
    for b, (_X, _y) in enumerate(_mfcc_data):
        _model.train()
        _X = _X.float()  #.cuda()
        _y = _y.long()  #.cuda()
        _X, _y = to_cuda(_X, _y)
        pred = _model(_X)
        loss = _loss_fn(pred, _y)

        # BP
        _optimizer.zero_grad()
        loss.backward()
        _optimizer.step()
        batch += 1

        if batch % 50 == 0:
            _model.eval()
            eval_score = eval()
            print(f"{batch}, {loss} <-> {eval_score}")
            writer.add_scalars('Training Loss',
                               {'Train': loss, 'Val': eval_score},
                               batch)
            if eval_score > max_val_score:
                torch.save(_model.state_dict(), f"mlp-model-best.pth")
                print(f"best model at {batch}")
                max_val_score = eval_score
        if batch % 8800 == 0:
            is_break = True
            break


4050, 0.0616881288588047 <-> 0.77
best model at 4050
4100, 0.021399063989520073 <-> 0.72
4150, 0.013324248604476452 <-> 0.7375
4200, 0.009489394724369049 <-> 0.7550000000000001
4250, 0.006943423766642809 <-> 0.7649999999999999
4300, 0.005177389830350876 <-> 0.7725000000000001
best model at 4300
4350, 0.003626271616667509 <-> 0.7625
4400, 0.0028443268965929747 <-> 0.7525
4450, 0.0022522499784827232 <-> 0.73
4500, 0.0018260566284880042 <-> 0.7325
4550, 0.0013879138277843595 <-> 0.7375
4600, 0.001220334437675774 <-> 0.7124999999999999
4650, 0.0009969824459403753 <-> 0.7575
4700, 0.000898712663911283 <-> 0.7675
4750, 0.000696726085152477 <-> 0.7675000000000001
4800, 0.0005596652626991272 <-> 0.7525
4850, 0.000511816528160125 <-> 0.7725
4900, 0.00044374517165124416 <-> 0.7475
4950, 0.0003685857227537781 <-> 0.765
5000, 0.0002890059258788824 <-> 0.7224999999999999
5050, 0.00027758118812926114 <-> 0.7349999999999999
5100, 0.00022532336879521608 <-> 0.7475
5150, 0.00021757539070677012 <-> 0.77

In [19]:
torch.save(_model.state_dict(), f"mlp-model-{batch}.pth")

## External Test

In [None]:

from pydub import AudioSegment

from pydub.silence import detect_nonsilent


def mfcc_test_mlp(datapath):
    labels = ['zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine']
    MFCC = []
    files = os.listdir(datapath)  # 得到文件夹下的所有文件名称
    for i in range(10):
        for file in files:  # 遍历文件夹
            rule = re.compile(r'(.*?)_.*?')
            label = re.findall(rule, str(file))
            label = ''.join(label)
            if label == labels[i]:
                file_name = os.path.join(datapath, file)
                data = AudioSegment.from_wav(file_name)
                # print(f"db = {data.dBFS}")
                secs = detect_nonsilent(data, min_silence_len=50, silence_thresh=data.dBFS - 6)
                max_len = 0
                max_i = 0
                for i, sec in enumerate(secs):
                    sec_diff = sec[1] - sec[0]
                    if sec_diff > max_len:
                        max_len = sec_diff
                        max_i = i
                # 32000~199(200)~2000
                # 16000~99(100)~1000
                # sec_start, sec_end = int(secs[max_i][0] / 10), int(secs[max_i][0] / 10) + 99
                sec_start, sec_end = int(secs[0][0] / 10), int(secs[0][0] / 10) + 99

                fs, audio = wav.read(file_name)  # audio: (len, )
                feature = extract_MFCC(bandpass(audio, 99, 3e3))[sec_start:sec_end, :]
                import numpy as np

                feature = np.pad(feature, ((0, 99 - feature.shape[0]), (0, 0)), mode='median')
                print(feature.shape)
                feature = feature.ravel().reshape((1, -1))
                _X = torch.from_numpy(feature).float()
                out = _model(_X)
                _, pred = out.max(1)

                print(f"{file} <-> {pred}")


mfcc_test_mlp(r"D:\Program\pyProject\DSP_SpeechNumberRecognization\dataset_test")

In [None]:

from pydub.silence import detect_nonsilent

data = AudioSegment.from_wav(test_file)
# print(f"db = {data.dBFS}")
secs = detect_nonsilent(data, min_silence_len=50, silence_thresh=data.dBFS - 6)
max_len = 0
max_i = 0
for i, sec in enumerate(secs):
    sec_diff = sec[1] - sec[0]
    if sec_diff > max_len:
        max_len = sec_diff
        max_i = i
# 32000~199(200)~2000
# 16000~99(100)~1000
# sec_start, sec_end = int(secs[max_i][0] / 10), int(secs[max_i][0] / 10) + 99
sec_start, sec_end = int(secs[0][0] / 10), int(secs[0][0] / 10) + 99
test_file = r"D:\Program\pyProject\DSP_SpeechNumberRecognization\dataset_test\eight_18128.wav"
fs, audio = wav.read(test_file)  # audio: (len, )
feature = extract_MFCC(bandpass(audio, 99, 3e3))[sec_start:sec_end, :]

In [None]:

import numpy as np

feature = np.pad(feature, ((0, 99 - feature.shape[0]), (0, 0)), mode='median')
print(feature.shape)
feature = feature.ravel().reshape((1, -1))
_X = torch.from_numpy(feature).float()
out = _model(_X)
_, pred = out.max(1)
print(out, pred)
