In [2]:
import os
import sys
import librosa
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from dsp import *

In [3]:
# def readDat(filepath):
#     # 使用16位有符号整数(int16)格式读取原始数据
#     y = np.fromfile(filepath, dtype=np.int16)
#     # 将整数值归一化到[-1, 1]范围
#     y = y.astype(np.float32) / 32768.0
#     return y
def readDat(filepath):
    ext = os.path.splitext(filepath)[1].lower()
    if ext == '.dat':
        y = np.fromfile(filepath, dtype=np.int16)
        y = y.astype(np.float32) / 32768.0
        return y
    elif ext == '.wav':
        y, _ = librosa.load(filepath, sr=8000)
        return y
    else:
        raise ValueError(f"不支持的音频格式: {filepath}")

In [4]:
# 特征提取
def extractFeature(audio_file, sr=8000, win=256, step=160):
    y = readDat(audio_file)
    
    segments = vad(y, sr, win, step)

    if len(segments) == 0:
        print(f"警告: {audio_file} 没有检测到语音段")
        return None
    
    # 取所有音频段的起始和结束时间
    start = segments[0][0]
    end = segments[-1][1]
    y_speech = y[int(start):int(end)]
    
    # 确保语音段足够长
    min_length = win * 2  # 确保能生成至少一个完整的STFT帧
    if len(y_speech) < min_length:
        print(f"警告: {audio_file} 语音段太短({len(y_speech)}采样点)，填充至{min_length}采样点")
        y_speech = np.pad(y_speech, (0, min_length - len(y_speech)), 'constant')
        # return None

    # 提取 MFCC 特征
    mfcc_feature = computeMFCC(y_speech, sr, win, D=13, M=26)
    
    n_frames = mfcc_feature.shape[1]
    if n_frames < 3:
        print(f"警告: {audio_file} 帧数太少({n_frames})，跳过该样本")
        return None

    # 计算一阶差分（ΔMFCC）
    delta = np.diff(mfcc_feature, n=1, axis=1)
    delta = np.pad(delta, ((0,0),(1,0)), mode='edge')  # 保持帧数一致

    # 计算二阶差分（ΔΔMFCC）
    delta2 = np.diff(mfcc_feature, n=2, axis=1)
    delta2 = np.pad(delta2, ((0,0),(2,0)), mode='edge')  # 保持帧数一致

    # 拼接特征 [MFCC; ΔMFCC; ΔΔMFCC]
    feature = np.vstack([mfcc_feature, delta, delta2])  # shape: (39, n_frames)
    return feature

In [5]:
# 准备数据集
def prepareData(base_dir, classes):
    features = []
    labels = []

    print(f"开始从 {base_dir} 加载数据...")
    
    total_files = 0         # 总文件数
    error_files = 0         # 出错的文件数
    
    # 先计算总文件数
    for idx, name in enumerate(classes):
        class_dir = os.path.join(base_dir, str(idx).zfill(2))
        if os.path.exists(class_dir):
            for filename in os.listdir(class_dir):
                if filename.endswith('.dat'):
                    total_files += 1
    print(f"找到 {total_files} 个文件")
    
    with tqdm(total=total_files, desc="总进度") as pbar:
        for idx, name in enumerate(classes):
            class_dir = os.path.join(base_dir, str(idx).zfill(2))
            
            if not os.path.exists(class_dir):
                print(f"警告：目录 {class_dir} 不存在，跳过类别 '{name}'")
                continue
                
            class_files = [f for f in os.listdir(class_dir) if f.endswith('.dat')]
            print(f"正在处理类别 '{name}' ({len(class_files)}个文件)...")
            
            for filename in class_files:
                filepath = os.path.join(class_dir, filename)
                feature = extractFeature(filepath)
                if feature is not None:
                    features.append(feature)
                    labels.append(idx)
                else:
                    error_files += 1

                pbar.set_postfix({"出错文件数": error_files})
                pbar.update(1)
            
    print(f"\n处理完成: 共 {total_files} 个文件, 出错 {error_files} 个")
    if len(features) == 0:
        raise ValueError("没有成功处理任何文件!请检查数据路径和文件格式")
    
    return features, np.array(labels)

In [6]:
from hmmlearn import hmm
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import seaborn as sns
import joblib

# HMM 语音识别器
class HMMRecognizer:
    # n_states: 隐藏状态数量
    # n_mix: 每个状态的高斯混合数量
    # cov_type: 协方差类型 ('diag'/'full')
    # n_iter: 训练迭代次数
    def __init__(self, n_state=5, n_mix=8, cov_type='diag', n_iter=20):
        self.n_states = n_state
        self.n_mix = n_mix
        self.cov_type = cov_type
        self.n_iter = n_iter
        self.models = {}  # 存储每个类别的 HMM 模型
        self.classes = None
        
    def train(self, features, labels):
        self.classes = np.unique(labels)
        
        print(f"开始训练HMM模型, 共{len(self.classes)}个类别...")
        
        for c in tqdm(self.classes):
            # 获取当前类别的所有特征
            class_features = [feat for feat, label in zip(features, labels) if label == c]
            
            if len(class_features) == 0:
                print(f"警告: 类别{c}没有训练样本")
                continue
            
            # 将特征转换为适合HMM训练的格式
            # 1. 首先计算所有样本的总帧数
            total_frames = sum(feat.shape[1] for feat in class_features)
            
            # 2. 创建一个大的连续特征数组
            X = np.zeros((total_frames, class_features[0].shape[0]))
            
            # 3. 填充特征并记录每个样本的长度
            lengths = []
            idx = 0
            for feat in class_features:
                seq_len = feat.shape[1]
                X[idx:idx+seq_len] = feat.T  # 转置以匹配HMM输入要求
                lengths.append(seq_len)
                idx += seq_len
            
            # 初始化并训练HMM模型
            model = hmm.GMMHMM(
                n_components=self.n_states,
                n_mix=self.n_mix,
                covariance_type=self.cov_type,
                n_iter=self.n_iter,
                verbose=False,              # 是否打印训练过程信息
            )
            
            try:
                model.fit(X, lengths)  # 现在X的形状是(总帧数, 特征维度)
                self.models[c] = model
                print(f"类别 {c} 模型训练完成")
            except Exception as e:
                print(f"类别 {c} 模型训练失败: {e}")
    
    # 语音识别
    # feature: MFCC特征，形状为(n_features, n_frames)
    def recognize(self, feature):
        if not self.models:
            raise ValueError("模型尚未训练")
        
        # 计算每个模型的对数似然
        log_likelihoods = {}
        for class_idx, model in self.models.items():
            try:
                # 计算当前特征在该模型下的对数似然
                log_likelihood = model.score(feature.T)
                log_likelihoods[class_idx] = log_likelihood
            except Exception as e:
                print(f"计算类别 {class_idx} 的似然值时出错: {e}")
                log_likelihoods[class_idx] = -np.inf
        
        # 选择对数似然最高的类别
        if not log_likelihoods:
            return None, None
        
        best_class = max(log_likelihoods, key=log_likelihoods.get)
        return best_class, log_likelihoods[best_class]
    
    # 保存模型到文件
    def save(self, filepath):
        joblib.dump({
            'models': self.models,
            'classes': self.classes,
            'n_states': self.n_states,
            'n_mix': self.n_mix,
            'cov_type': self.cov_type
        }, filepath)
        
    def load(self, filepath):
        """加载模型"""
        data = joblib.load(filepath)
        self.models = data['models']
        self.classes = data['classes']
        self.n_states = data['n_states']
        self.n_mix = data['n_mix']
        self.cov_type = data['cov_type']
        
    # 评估模型
    def evaluate(self, features, true_labels):
        predictions = []
        
        print("开始评估模型...")
        for i, feature in enumerate(tqdm(features)):
            pred_class, _ = self.recognize(feature)
            predictions.append(pred_class)
            
        # 计算准确率
        accuracy = accuracy_score(true_labels, predictions)
        print(f"准确率: {accuracy:.4f}")
        
        # 计算混淆矩阵
        cm = confusion_matrix(true_labels, predictions)
        
        # 分类报告
        report = classification_report(true_labels, predictions)
        print("分类报告:")
        print(report)
        
        return accuracy, cm, report

In [8]:
recognizer = HMMRecognizer(
    n_state=5,  # 隐藏状态数
    n_mix=4,    # 每个状态的高斯混合数
    cov_type='diag', 
    n_iter=30   # 训练迭代次数
)

classes = ["数字", "语音", "语言", "处理", "中国", "忠告", "北京", "背景", "上海", "商行",
              "Speech", "Speaker", "Signal", "Sequence", "Processing", "Print", "Project", "File", "Open", "Close"]

In [None]:
# 准备数据集
# features, labels = prepareData('../myData', classes)
features, labels = prepareData('../Data', classes)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42, stratify=labels)

print(f"训练集: {len(X_train)} 个样本")
print(f"测试集: {len(X_test)} 个样本")

In [None]:
# 训练模型
recognizer.train(X_train, y_train)

# 保存模型
recognizer.save("hmm/hmm_models.pkl")

# 评估模型
accuracy, cm, report = recognizer.evaluate(X_test, y_test)

In [None]:
# 绘制混淆矩阵
plt.figure(figsize=(15, 15))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=[str(i) for i in recognizer.classes],
            yticklabels=[str(i) for i in recognizer.classes])
plt.xlabel('Predict Type')
plt.ylabel('True Type')
plt.title('HMM Speech Recognition Confusion Matrix')
plt.tight_layout()
plt.show()

In [9]:
# 识别单个语音文件
# audio_file: 语音文件路径
# recognizer: HMM识别器
# class_names: 类别名称列表
def recognize_speech_file(audio_file, recognizer, class_names):
    # 提取特征
    feature = extractFeature(audio_file)
    if feature is None:
        return "无法检测到语音", None
    
    # 使用HMM进行识别
    class_idx, log_likelihood = recognizer.recognize(feature)
    
    if class_idx is None:
        return "识别失败", None
    
    return class_names[class_idx], log_likelihood

In [10]:
# 测试识别
recognizer.load("hmm/hmm_models.pkl")               # 加载模型
test_dir = "/mnt/c/Users/Keats/Documents/录音/"     # 测试目录
# test_dir = "../records/22300240008-曹奕伦-录音/22300240008-曹奕伦-录音"

# 保存原始stderr
stderr_fileno = sys.stderr
sys.stderr = open(os.devnull, 'w')

for filename in os.listdir(test_dir):
    if filename.endswith('.dat') or filename.endswith('.wav'):
        test_file = os.path.join(test_dir, filename)
        pred_class, log_likelihood = recognize_speech_file(test_file, recognizer, classes)
        print(f"{filename} -> 识别结果: {pred_class}, 对数似然值: {log_likelihood}")

# 恢复stderr
sys.stderr.close()
sys.stderr = stderr_fileno

计算类别 19 的似然值时出错: startprob_ must sum to 1 (got nan)
录音 (2).wav -> 识别结果: 数字, 对数似然值: -7287.006115198465
计算类别 19 的似然值时出错: startprob_ must sum to 1 (got nan)
录音 (3).wav -> 识别结果: 语言, 对数似然值: -11470.018310817733
计算类别 19 的似然值时出错: startprob_ must sum to 1 (got nan)
录音 (4).wav -> 识别结果: 语音, 对数似然值: -9291.867593289287
计算类别 19 的似然值时出错: startprob_ must sum to 1 (got nan)
录音.wav -> 识别结果: File, 对数似然值: -10622.3388735593


In [None]:
X_cao, y_cao = prepareData('../records/22300240008-曹奕伦-录音/22300240008-曹奕伦-录音', classes)  # 重新准备测试数据集
accuracy, cm, report = recognizer.evaluate(X_cao, y_cao)