In [None]:
import tensorflow as tf
import numpy as np
import librosa
import os
from sklearn.model_selection import train_test_split


def load_and_preprocess_audio(file_path):
    """
    加载音频文件并进行预处理
    """
    # 加载音频
    audio, _ = librosa.load(file_path, sr=SAMPLE_RATE) # , duration=DURATION

    # 确保音频长度一致
    if len(audio) < SAMPLE_RATE * DURATION:
        audio = np.pad(audio, (0, int(SAMPLE_RATE * DURATION) - len(audio)))
    else:
        audio = audio[:int(SAMPLE_RATE * DURATION)]

    # 提取梅尔频谱特征
    mel_spec = librosa.feature.melspectrogram(
        y=audio,
        sr=SAMPLE_RATE,
        n_mels=N_MELS,
        n_fft=N_FFT,
        hop_length=HOP_LENGTH
    )

    # 转换为分贝单位
    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)

    # 归一化
    mel_spec_db = (mel_spec_db - mel_spec_db.mean()) / mel_spec_db.std()

    # 确保数据类型匹配模型输入
    # mel_spec_db = mel_spec_db.astype(np.float32)

    return mel_spec_db

def prepare_dataset(speech_dir, noise_dir):
    """
    准备训练数据集
    """
    features = []
    labels = []

    # 加载语音数据
    for file_name in os.listdir(speech_dir):
        # if file_name.endswith('.wav'):
        feature = load_and_preprocess_audio(os.path.join(speech_dir, file_name))
        if np.isnan(feature).any():
            print(f"Skipping file {file_name} due to NaN values.")
            continue
        features.append(feature)
        labels.append(1)  # 1表示有语音

    # 加载噪声数据
    for file_name in os.listdir(noise_dir):
        # if file_name.endswith('.wav'):
        feature = load_and_preprocess_audio(os.path.join(noise_dir, file_name))
        if np.isnan(feature).any():
            print(f"Skipping file {file_name} due to NaN values.")
            continue
        features.append(feature)
        labels.append(0)  # 0表示无语音

    return np.array(features), np.array(labels)

# 原始的模型
def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(N_MELS, 32, 1)),

        # 添加L2正则化到卷积层
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same',
                             kernel_regularizer=tf.keras.regularizers.l2(0.0001) ),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same',
                              kernel_regularizer=tf.keras.regularizers.l2(0.0001) ),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Dropout(0.25),  # 在卷积层后添加Dropout

        tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same',
                              kernel_regularizer=tf.keras.regularizers.l2(0.0001) ),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same',
                               kernel_regularizer=tf.keras.regularizers.l2(0.0001)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Dropout(0.25),  # 在卷积层后添加Dropout

        tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same',
                              kernel_regularizer=tf.keras.regularizers.l2(0.001) ),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.GlobalAveragePooling2D(),

        # 增加更多的全连接层
        tf.keras.layers.Dense(64, activation='relu',
                              ),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.5),

        # 添加temperature scaling层
        tf.keras.layers.Dense(1, activation=None),
        tf.keras.layers.Lambda(lambda x: x / 2.0),
        tf.keras.layers.Activation('sigmoid')
    ])
    return model

def train_model(model, X_train, y_train, X_val, y_val):
    """
    训练模型
    """
    # 使用学习率衰减
    initial_learning_rate = 0.001
    decay_steps = 1000
    decay_rate = 0.9
    learning_rate_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate, decay_steps, decay_rate)

    # 使用更好的优化器
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate_schedule)
    model.compile(
        optimizer=optimizer,
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    # 添加早停以防止过拟合
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=3,
        restore_best_weights=True
    )

    # 训练模型
    history = model.fit(
        X_train, y_train,
        epochs=5,
        batch_size=32,
        validation_data=(X_val, y_val),
        callbacks=[early_stopping]
    )

    return history


def convert_to_tflite(model, output_path):
    """
    将模型转换为TFLite格式，支持动态输入维度
    """

    def preprocess_audio(audio_path):
      """预处理音频文件"""
      # 加载音频
      audio, _ = librosa.load(audio_path, sr=16000)

        # 确保音频长度一致
      if len(audio) < 16000:
          audio = np.pad(audio, (0, int(16000 * 1) - len(audio)))
      else:
          audio = audio[:int(16000)]


      # 提取梅尔频谱特征
      mel_spec = librosa.feature.melspectrogram(
          y=audio,
          sr=SAMPLE_RATE,
          n_mels=N_MELS,
          n_fft=N_FFT,
          hop_length=HOP_LENGTH
      )

      # 转换为分贝单位
      mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
      # 归一化
      mel_spec_db = (mel_spec_db - mel_spec_db.mean()) / mel_spec_db.std()

      # 添加batch和channel维度
      mel_spec_db = np.expand_dims(mel_spec_db, axis=[0, -1])
      # 确保数据类型匹配模型输入
      mel_spec_db = mel_spec_db.astype(np.float32)

      return mel_spec_db

    def representative_dataset_gen():
      # 准备校准数据集
      # 这里需要准备一些具有代表性的输入数据
      paths = os.listdir('/content/data')
      for path in paths:
          # 生成随机数据作为示例
          # 请根据实际模型的输入要求修改shape和数据范围
          data = preprocess_audio(f'/content/data/{path}')
          yield [data]

    converter = tf.lite.TFLiteConverter.from_keras_model(model)

    # 设置完全整数量化
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.representative_dataset = representative_dataset_gen
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

    # 强制所有操作使用INT8
    converter.inference_input_type = tf.int8  # ---------------------
    converter.inference_output_type = tf.int8

    # 执行转换
    int8_model = converter.convert()

    # 保存完全整数量化后的模型
    with open(output_path, "wb") as f:
        f.write(int8_model)

def inference(audio_path, model):
    """
    使用模型进行推理
    """
    # 预处理音频
    feature = load_and_preprocess_audio(audio_path)
    feature = np.expand_dims(feature, axis=[0, -1])  # 添加batch和channel维度
    # 预测
    prediction = model.predict(feature)
    print(prediction)
    return prediction[0][0]  # 返回预测概率


In [None]:
# 0. 配置参数
SAMPLE_RATE = 16000  # 采样率
DURATION = 1  # 每个音频片段的持续时间（秒）
N_MELS = 40  # 梅尔频谱的频率维度
HOP_LENGTH = 512  # STFT的跳跃长度
N_FFT = 2048  # FFT窗口大小

In [None]:
# 1. 准备数据
speech_dir = f"/content/1.0s"  # 包含语音的音频文件夹
noise_dir = f"/content/silence_data_1.0s"    # 包含噪声的音频文件夹

features, labels = prepare_dataset(speech_dir, noise_dir)
features = np.expand_dims(features, axis=-1)
# 分割训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(
    features, labels, test_size=0.2, random_state=42
)

In [None]:
# 2. 创建和训练模型
model = create_model()
history = train_model(model, X_train, y_train, X_val, y_val)

Epoch 1/5
[1m226/226[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 43ms/step - accuracy: 0.9485 - loss: 0.1644 - val_accuracy: 0.9217 - val_loss: 0.2526
Epoch 2/5
[1m226/226[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 46ms/step - accuracy: 0.9935 - loss: 0.0296 - val_accuracy: 0.9800 - val_loss: 0.0589
Epoch 3/5
[1m226/226[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 49ms/step - accuracy: 0.9973 - loss: 0.0140 - val_accuracy: 0.9967 - val_loss: 0.0099
Epoch 4/5
[1m226/226[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 42ms/step - accuracy: 0.9970 - loss: 0.0123 - val_accuracy: 0.9989 - val_loss: 0.0061
Epoch 5/5
[1m226/226[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 52ms/step - accuracy: 0.9972 - loss: 0.0132 - val_accuracy: 0.9961 - val_loss: 0.0117


In [None]:
# 3. 转换为TFLite格式
convert_to_tflite(model, "model/vad_model_1.0s.tflite")

Saved artifact at '/tmp/tmpgioq6qb3'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 20, 32, 1), dtype=tf.float32, name='keras_tensor_152')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  135633669271648: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633661375264: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668693984: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668692752: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668685888: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668686768: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668688528: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668689760: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668686240: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668690112: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135633668696



In [None]:
# 4. tf模型测试推理
import numpy as np
import tensorflow as tf
import librosa
import IPython.display as disp
import os
import time

class VADInference:
    def __init__(self, model_path, duration):
        # 模型参数
        self.SAMPLE_RATE = 16000
        self.DURATION = duration
        self.N_MELS = 40
        self.HOP_LENGTH = 512
        self.N_FFT = 2048

        # 加载TFLite模型
        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()

        # 获取输入和输出张量的细节
        self.input_details = self.interpreter.get_input_details()
        self.output_details = self.interpreter.get_output_details()

    def preprocess_audio(self, audio_path, start=None):
        """预处理音频文件"""
        # 加载音频
        audio, _ = librosa.load(audio_path, sr=self.SAMPLE_RATE)

        # 确保音频长度一致
        if len(audio) < self.SAMPLE_RATE * self.DURATION:
            audio = np.pad(audio, (0, int(self.SAMPLE_RATE * self.DURATION) - len(audio)))
        else:
            audio = audio[int(16000*start):int(16000*(self.DURATION+start))]

        disp.display(disp.Audio(audio, rate=16000))

        # 提取梅尔频谱特征
        mel_spec = librosa.feature.melspectrogram(
            y=audio,
            sr=self.SAMPLE_RATE,
            n_mels=self.N_MELS,
            n_fft=self.N_FFT,
            hop_length=self.HOP_LENGTH
        )

        # 转换为分贝单位
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        # 归一化
        mel_spec_db = (mel_spec_db - mel_spec_db.mean()) / mel_spec_db.std()

        # 添加batch和channel维度
        mel_spec_db = np.expand_dims(mel_spec_db, axis=[0, -1])
        # 确保数据类型匹配模型输入
        mel_spec_db = mel_spec_db.astype(np.float32)

        return mel_spec_db

    def predict(self, audio_path, threshold=0.5, start=None):
        """对音频文件进行VAD预测"""
        # 预处理音频
        input_data = self.preprocess_audio(audio_path, start=start)

        # 设置输入张量
        self.interpreter.set_tensor(self.input_details[0]['index'], input_data)

        # 运行推理
        self.interpreter.invoke()

        # 获取输出结果
        output_data = self.interpreter.get_tensor(self.output_details[0]['index'])

        # 获取预测概率
        probability = output_data[0][0]

        # 根据阈值判断是否有人声
        has_voice = probability > threshold

        return {
            'probability': float(probability),
            'has_voice': bool(has_voice)
        }



import torchaudio
audio_file = 'path/to/your/audio.mp3'
audio, sr = torchaudio.load(audio_file)
for i in range(int(audio.shape[1]/sr)):
  vad = VADInference(model_path="/content/model/vad_model_1.0s.tflite", duration=1)
  result = vad.predict(audio_path=audio_file, threshold=0.5, start=i)
  print(result)

