In [20]:
import os
import moviepy
import moviepy.editor
import numpy as np
import matplotlib.pyplot as plt
import re
import scipy
import shutil
import wave
import librosa
import librosa.display
import scipy.signal as signal
import soundfile
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn import svm, metrics
import glob
import sklearn
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from datetime import datetime
import random
import pickle

In [2]:
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, TensorBoard, ModelCheckpoint
from keras.optimizers import SGD,Adam
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Flatten
from keras.layers.convolutional import MaxPooling2D,AveragePooling2D
from keras.layers.convolutional import SeparableConv2D, Conv2D
from keras.layers.normalization import BatchNormalization
from keras.layers.core import Activation
from keras.layers.core import Dropout
from keras.layers.core import Dense
from keras.models import Sequential
from keras import backend as K
from keras import layers
from keras import models
import tensorflow as tf
from keras.metrics import categorical_accuracy
from tensorboard.backend.event_processing import event_accumulator

Using TensorFlow backend.


In [3]:
train_files = glob.glob("G:\\GTA_audios\\input\\audio_train\\*.wav")  # 训练集
test_files = glob.glob("G:\\GTA_audios\\input\\audio_test\\*.wav")   # 测试集
labels = pd.read_excel("G:\\GTA_audios\\dataset.xlsx") # 训练集的标签
input_length = 48000*3  ## 表示输入语音的长度，48000表示采样率大小，3表示音频长度大小为3秒，可以根据具体情况进行修改
frame_size = 2048 # 每一帧的长度
hop_size =512 # 帧移大小

In [8]:
def calEnergy(frame):
    sumEnergy = 0
    for i in frame:
        sumEnergy += i**2
    return sumEnergy

def get_feature(pre_empha_wav, sr=48000):
    # 分帧加窗
    winfunc = signal.windows.hamming(frame_size)
    frames = librosa.util.frame(pre_empha_wav, frame_length=frame_size, hop_length=hop_size, axis=0)
    frames = np.array([frame * winfunc for frame in frames])
    # mfcc系数
    mfccs = librosa.feature.mfcc(y=pre_empha_wav, sr=sr, n_mfcc=24, center=False)
    mfccs_scaled_features = np.mean(mfccs.T,axis=0)
    # 短时能量
    energy = []
    for frame in frames:
        energy.append(calEnergy(frame))
    average_energy = np.average(np.array(energy))
    # 短时过零率
    zeroCrossingRate = librosa.feature.zero_crossing_rate(pre_empha_wav, frame_length=frame_size, hop_length=hop_size, center=False)[0]
    average_zcr = np.average(zeroCrossingRate)
    # 均方根能量
    rmse = librosa.feature.rms(pre_empha_wav, frame_length=frame_size, hop_length=hop_size, center=False)[0]
    average_rmse = np.average(rmse)
    # 频谱质心特征
    cent = librosa.feature.spectral_centroid(y=pre_empha_wav, sr=sr, n_fft=frame_size, hop_length=hop_size, center=False)[0] # 计算频谱质心特征
    average_cent = np.average(cent)
    # 计算二阶频谱带宽，主要是表征与频谱质心位置的偏移程度
    spec_bw = librosa.feature.spectral_bandwidth(y=pre_empha_wav, sr=sr, n_fft=frame_size, hop_length=hop_size, center=False)[0]
    average_spec_bw = np.average(spec_bw)
    # 计算声音信号频谱的平整度特征
    flatness = librosa.feature.spectral_flatness(y=pre_empha_wav, n_fft=frame_size, hop_length=hop_size, center=False)[0]
    average_flatness = np.average(flatness)
    # 滚降系数上限
    rolloff = librosa.feature.spectral_rolloff(y=pre_empha_wav, sr=sr, roll_percent=0.99, center=False)[0]
    average_rolloff = np.average(rolloff)
    # 滚降系数下限
    rolloff_min = librosa.feature.spectral_rolloff(y=pre_empha_wav, sr=sr, roll_percent=0.01, center=False)[0]
    average_rolloff_min = np.average(rolloff_min)
    # 计算六维音调质心特征
    y = librosa.effects.harmonic(pre_empha_wav)
    tonnetz = librosa.feature.tonnetz(y=y, sr=sr) 
    average_tonnetz = np.mean(tonnetz.T,axis=0)
    return np.hstack((
        mfccs_scaled_features, 
        average_energy, 
        average_zcr, 
        average_rmse,
        average_cent,
        average_spec_bw,
        average_flatness,
        average_rolloff,
        average_rolloff_min,
        average_tonnetz
    ))

def get_GBT_features(file_path, input_length=input_length):
    data = librosa.core.load(file_path, sr=None)[0] #, sr=None，使用原采样率读取wav音频文件，返回值为wav，sr
    if len(data)>input_length:  ## 如果音频的长度较长，则通过随机数的形式确定截取区间的范围        
        max_offset = len(data)-input_length      
        offset = np.random.randint(max_offset)      
        data = data[offset:(input_length+offset)]        
    elif len(data)<input_length: ## 如果音频的长度不足，通过随机选择padding的界限，将音频尽量集中到中间部分
        if input_length > len(data):
            max_offset = input_length - len(data)
            offset = np.random.randint(max_offset)
        else:
            offset = 0      
        data = np.pad(data, (offset, input_length - len(data) - offset), "constant")
    else:
        pass     
    feature = get_feature(data) # 获取log梅尔特征 
    return feature

In [9]:
def get_CNN_features(file_path):
    data, fs = soundfile.read(file=file_path)
    data = data.T
    if len(data)>input_length:  ## 如果音频的长度较长，则通过随机数的形式确定截取区间的范围
        max_offset = len(data)-input_length
        offset = np.random.randint(max_offset)
        data = data[offset:(input_length+offset)]
    elif len(data)<input_length: ## 如果音频的长度不足，通过随机选择padding的界限，将音频尽量集中到中间部分
        if input_length > len(data):
            max_offset = input_length - len(data)
            offset = np.random.randint(max_offset)
        else:
            offset = 0
        data = np.pad(data, (offset, input_length - len(data) - offset), "constant")
    else:
        pass
    #Normalize data
    mean_value = np.mean(data)
    data -= mean_value
    max_value = max(abs(data)) + 0.05 #avoid per zero div
    data = data/max_value
    data = np.reshape(data,[-1,1])
    feature_matrix = get_mel_spectrogram(data, fs)
    return feature_matrix
def get_mel_spectrogram(audio,sr):
    eps=2.220446049250313e-16
    audio = audio.reshape([1,-1])
    ms = int(0.04*sr) #40ms at 44100 Hz
    window = scipy.signal.hamming(
                                ms,
                                sym=False
                                )
    mel_basis = librosa.filters.mel(sr=sr,
                                    n_fft=2048,
                                    n_mels=128,
                                    htk=False,
                                    norm=None
                                    )
    feature_matrix = np.empty((0,128))
    hop_length = int(sr/50)
    stft = librosa.stft(audio[0,:]+eps,
                            n_fft=2048,
                            win_length=ms,
                            hop_length=hop_length,
                            center=True,
                            window=window
                            )
    # print("stft shape : {}".format(stft.shape))
    spectrogram = np.abs(stft)**2
    mel_spectrogram = np.dot(mel_basis,spectrogram)
    mel_spectrogram = mel_spectrogram.T
    mel_spectrogram = np.log10(mel_spectrogram + eps)
    feature_matrix = np.append(feature_matrix,mel_spectrogram,axis=0)
    return feature_matrix

# 准备数据

In [10]:
# 构建文件标签字典，通过文件名获取对应音频的标签
# 先构造单标签映射的字典
file_to_label = {"G:\\GTA_audios\\input\\audio_train\\"+k:v for k,v in zip(labels["sample"].values, labels["speed"].values)}
list_labels = sorted(list(set(file_to_label.values()))) ## 将所有训练集样本的标签首先构造集合去重，然后排序
label_to_int = {k:v for v,k in enumerate(list_labels)} ## 将每种标签映射到0，1，2……
int_to_label = {v:k for k,v in label_to_int.items()} # 反转
file_to_int = {k:label_to_int[v] for k,v in file_to_label.items()} # 文件名映射到标签值
train_files = list(file_to_label.keys())
train_labels = [label_to_int[x] for x in file_to_label.values()]

In [13]:
CNN_features = [get_CNN_features(x) for x in train_files]
GBT_features = [get_GBT_features(x) for x in train_files]

In [14]:
CNN_features = np.array(CNN_features)
GBT_features = np.array(GBT_features)
train_labels = np.array(train_labels)
train_labels = train_labels.astype(np.float64)

In [155]:
x = y = np.arange(0,CNN_features.shape[0],1)
train_index, test_index, _1, _2 = sklearn.model_selection.train_test_split(x, y, random_state=10, train_size=0.8, test_size=0.2)

In [156]:
CNN_test_data = CNN_features[test_index][:,:,:,np.newaxis]
GBT_test_data = GBT_features[test_index]
test_label = train_labels[test_index]

# 加载模型

In [152]:
def gen_model():
    input = layers.Input(shape=(151,128,1))
    # First conv layer
    c_1 = layers.Conv2D(48,(3,8),padding='same')(input)
    c_2 = layers.Conv2D(32,(3,32),padding='same')(input)
    c_3 = layers.Conv2D(16,(3,64),padding='same')(input)
    c_4 = layers.Conv2D(16,(3,90),padding='same')(input)
    conv_1 = layers.Concatenate()([c_1,c_2,c_3,c_4])
    x = layers.BatchNormalization()(conv_1)
    x = layers.ReLU()(x)
#     x = layers.MaxPooling2D((5,5))(x)
    x = layers.AveragePooling2D((5,5))(x)
    # Second conv layer
    x = layers.Conv2D(224,5)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
#     x = layers.MaxPooling2D((11,4))(x)
    x = layers.AveragePooling2D((6,4))(x)
    # Output layer
    x = layers.Flatten()(x)
    # x = layers.Dropout(0.5)(x)
    x = layers.Dense(64)(x)
    x = layers.Dense(5,activation='softmax')(x)
    model = models.Model(input,x)
    return model
model = gen_model()
model.load_weights('model_181-0.65.h5')

In [153]:
with open('gbt2.pickle', 'rb')as f:
    gbt = pickle.load(f)

In [157]:
CNN_prob = model.predict(CNN_test_data)
GBT_prob = gbt.predict_proba(GBT_test_data)
total = CNN_test_data.shape[0]
CNN_correct = (np.argmax(CNN_prob, axis=1)==test_label).sum()
GBT_correct = (np.argmax(GBT_prob, axis=1)==test_label).sum()
fusion_correct = (np.argmax((CNN_prob + GBT_prob)/2, axis=1) == test_label).sum()
print("CNN_score is:%s" % (CNN_correct/total))
print("GBT_score is:%s" % (GBT_correct/total))
print("fusion_score is:%s" % (fusion_correct/total))

CNN_score is:0.6601307189542484
GBT_score is:0.8300653594771242
fusion_score is:0.8888888888888888
